lux/dist/daemon_manager.h

00001 #ifndef LUX_DIST_DAEMONMANAGER_H
00002 #define LUX_DIST_DAEMONMANAGER_H
00003 
00004 #include "common.h"
00005 #include "protocol.h"
00006 #include "lux/util.h"
00007 #include <sys/types.h>
00008 #include <sys/socket.h>
00009 #include <arpa/inet.h>
00010 #include <netinet/in.h>
00011 #include <netdb.h>
00012 #include <signal.h>
00013 #include <vector>
00014 
00015 #define ARR_SIZE(arr) (sizeof(arr)/sizeof(arr[0]))
00016 
00017 namespace Lux {
00018 namespace Dist {
00019 
00020   typedef struct {
00021     pthread_t tid; // thread id
00022     void *ins;
00023   } at_arg_t; // at: admin thread
00024 
00025   int signals[] = {SIGINT, SIGQUIT, SIGTERM};
00026 
00027   template<class T>
00028   class DaemonManager {
00029   public:
00030     DaemonManager(int port, uint16_t default_num_threads,
00031             uint16_t max_num_threads)
00032     : port_(port),
00033       default_num_threads_(default_num_threads),
00034       max_num_threads_(max_num_threads),
00035       is_terminating_(false),
00036       supp_(NULL)
00037     {
00038       sigset_t ss;
00039       sigemptyset(&ss);
00040       sigfillset(&ss);
00041       sigprocmask(SIG_BLOCK, &ss, 0);
00042     }
00043 
00044     ~DaemonManager()
00045     {}
00046 
00047     bool listen(void)
00048     {
00049       if ((sock_ = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
00050         error_log("socket() failed.");
00051         return false;
00052       }
00053       int on = 1;
00054       setsockopt(sock_, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
00055       setsockopt(sock_, IPPROTO_TCP, TCP_NODELAY, &on, sizeof(on));
00056 
00057       struct sockaddr_in saddr;
00058       memset((char *) &saddr, 0, sizeof(saddr));
00059       saddr.sin_family = AF_INET;
00060       saddr.sin_addr.s_addr = htonl(INADDR_ANY);
00061       saddr.sin_port = htons(port_);
00062 
00063       if (bind(sock_, (struct sockaddr *) &saddr, sizeof(saddr)) < 0) {
00064         error_log("bind() failed.");
00065         return false;
00066       }
00067 
00068       ::listen(sock_, max_num_threads_); // temporary hardcoding
00069 
00070       return true;
00071     }
00072 
00073     void add_managee_arg(void *supp)
00074     {
00075       supp_ = supp;
00076     }
00077 
00078     bool start_servers(void)
00079     {
00080       stats_ = new global_thread_stats_t;
00081       stats_->num_threads = 0;
00082       stats_->num_active_threads = 0;
00083       stats_->num_processed = 0;
00084       stats_->mutex = new pthread_mutex_t;
00085       pthread_mutex_init(stats_->mutex, NULL);
00086       stats_->cond = new pthread_cond_t;
00087       pthread_cond_init(stats_->cond, NULL);
00088 
00089       for (int i = 0; i < default_num_threads_; ++i) {
00090         if (!create_thread()) {
00091           error_log("create_thread() failed");
00092         }
00093       }
00094 
00095       if (!create_signal_handler()) {
00096         return false;
00097       }
00098 
00099       // manager loop
00100       pthread_mutex_lock(stats_->mutex);
00101       while (!is_terminating_) {
00102         pthread_cond_wait(stats_->cond, stats_->mutex);
00103         if (is_terminating_) { break; }
00104         std::cout << "[manager] num_active_threads: " << stats_->num_active_threads << "\n"
00105                   << "[manager] num_threads: " << stats_->num_threads << std::endl;
00106         if (stats_->num_active_threads >= stats_->num_threads) {
00107           std::cout << "better to add more threads" << std::endl;
00108           if (stats_->num_threads < max_num_threads_) {
00109             if (!create_thread()) {
00110               error_log("create_thread() failed");
00111             }
00112           }
00113         } else {
00114           uint16_t dec_threshold = default_num_threads_
00115                                   + (stats_->num_threads - default_num_threads_) / 2;
00116           if (stats_->num_active_threads < dec_threshold) {
00117             if (stats_->num_threads > default_num_threads_) {
00118               if (!send_internal_close_request()) {
00119                 error_log("send_internal_close_request() failed.");
00120               }
00121             } 
00122           }
00123         }
00124       }
00125       pthread_mutex_unlock(stats_->mutex);
00126 
00127       finish_servers();
00128 
00129       return true;
00130     }
00131 
00132     bool finish_servers(void)
00133     {
00134       std::cout << "finishing servers ..." << std::endl;
00135       // send internal dummy connection
00136       pthread_mutex_lock(stats_->mutex);
00137       for (int i = 0; i < stats_->num_threads; ++i) {
00138         if (!send_internal_close_request()) {
00139           error_log("send_internal_close_request() failed.");
00140         }
00141       }
00142       pthread_mutex_unlock(stats_->mutex);
00143 
00144       int retry = 30;
00145       while (--retry) {
00146         sleep(1);
00147         pthread_mutex_lock(stats_->mutex);
00148         if (stats_->num_threads == 0) { break; }
00149         pthread_mutex_unlock(stats_->mutex);
00150       }
00151       close(sock_);
00152 
00153       delete stats_->mutex;
00154       delete stats_->cond;
00155       delete stats_;
00156 
00157       return true;
00158     }
00159 
00160   private:
00161     int port_;
00162     sock_t sock_;
00163     uint16_t default_num_threads_;
00164     uint16_t max_num_threads_;
00165     bool is_terminating_;
00166     global_thread_stats_t *stats_;
00167     void *supp_;
00168 
00169     bool create_thread(void)
00170     {
00171       thread_inf_t *thread_inf = new thread_inf_t;
00172       thread_inf->sock = sock_;
00173       thread_inf->stats = stats_;
00174       thread_inf->supp_arg = supp_;
00175       if (pthread_create(&thread_inf->tid, NULL,
00176                          launch, (void *) thread_inf) != 0) {
00177         perror("pthread_create");
00178         return false;
00179       }
00180       if (pthread_detach(thread_inf->tid) != 0) {
00181         error_log("pthread_detach failed");
00182         return false;
00183       }
00184       return true;
00185     }
00186 
00187     static void *launch(void *p)
00188     {
00189       T ins(p);
00190       ins.process();
00191       return NULL;
00192     }
00193 
00194     bool send_internal_close_request(void)
00195     {
00196       at_arg_t *arg = new at_arg_t;
00197       arg->ins = this;
00198       if (pthread_create(&arg->tid, NULL,
00199                          DaemonManager::run_icr, (void *) arg) != 0) {
00200         perror("pthread_create");
00201         return false;
00202       }
00203       if (pthread_detach(arg->tid) != 0) {
00204         error_log("pthread_detach failed");
00205         return false;
00206       }
00207       return true;
00208     }
00209 
00210     static void *run_icr(void *arg)
00211     {
00212       if (!((DaemonManager *) ((at_arg_t *) arg)->ins)->send_quit()) {
00213         error_log("send_quit failed."); 
00214       }
00215       delete (at_arg_t *) arg;
00216       return NULL;
00217     }
00218 
00219     bool send_quit(void)
00220     {
00221       struct hostent *he;
00222       if ((he = gethostbyname("localhost")) == NULL) {
00223         error_log("gethostbyname failed");
00224         return false;
00225       }
00226       uint32_t dest_ip;
00227       memcpy((char *) &dest_ip, (char *) he->h_addr, he->h_length);
00228 
00229       int s;
00230       struct sockaddr_in server;
00231       if ((s = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
00232         perror("socket");
00233         return false;
00234       }
00235 
00236       memset((char *) &server, 0, sizeof(server));
00237       server.sin_family = AF_INET;
00238       server.sin_addr.s_addr = dest_ip;
00239       server.sin_port = htons(port_);
00240 
00241       if (lut_connect(s, (struct sockaddr *) &server, sizeof(server)) < 0) {
00242         perror("connect");
00243         return false;
00244       }
00245       request_header_t header = {0, 0, COM_QUIT};
00246       if (lut_send(s, &header, sizeof(header), 0) < 0) {
00247         perror("send");
00248         return false;
00249       }
00250 
00251       response_header_t res;
00252       if (lut_recv(s, (char *) &res, sizeof(res), 0) <= 0) {
00253         return false;
00254       }
00255       close(s);
00256 
00257       if (res.status != STATUS_OK) {
00258         return false;
00259       }
00260       return true;
00261     }
00262 
00263     bool create_signal_handler(void)
00264     {
00265       at_arg_t *arg = new at_arg_t;
00266       arg->ins = this;
00267       pthread_create(&arg->tid, NULL, DaemonManager::run_signal_handler, (void *) arg);
00268       if (pthread_detach(arg->tid) != 0) {
00269         error_log("pthread_detach failed");
00270         return false;
00271       }
00272       return true;
00273     }
00274 
00275     static void *run_signal_handler(void *arg)
00276     {
00277       if (!((DaemonManager *) ((at_arg_t *) arg)->ins)->handle_signals()) {
00278         error_log("send_quit failed."); 
00279       }
00280       delete (at_arg_t *) arg;
00281       return NULL;
00282     }
00283 
00284     bool handle_signals(void)
00285     {
00286       sigset_t ss;
00287       int sig;
00288 
00289       sigemptyset(&ss);
00290       for(uint32_t i = 0; i < ARR_SIZE(signals); ++i) {
00291         sigaddset(&ss, signals[i]);
00292       }
00293       pthread_sigmask(SIG_BLOCK, &ss, 0);
00294 
00295       while (!is_terminating_) {
00296         if (sigwait(&ss, &sig)) {
00297           error_log("signal error.");
00298           continue;
00299         }
00300         for (uint32_t i = 0; i < ARR_SIZE(signals); ++i) {
00301           if (signals[i] != sig) {
00302             continue; // ignored
00303           }
00304           std::cout << "signal received: " << sig << std::endl;
00305           is_terminating_ = true;
00306           pthread_mutex_lock(stats_->mutex);
00307           pthread_cond_signal(stats_->cond);
00308           pthread_mutex_unlock(stats_->mutex);
00309           break;
00310         }
00311       }
00312       return true;
00313     }
00314   };
00315 
00316 }
00317 }
00318 
00319 #endif

Generated on Fri Feb 5 15:50:30 2010 for Lux by  doxygen 1.4.7