00001 #ifndef LUX_DIST_DAEMONMANAGER_H
00002 #define LUX_DIST_DAEMONMANAGER_H
00003
00004 #include "common.h"
00005 #include "protocol.h"
00006 #include "lux/util.h"
00007 #include <sys/types.h>
00008 #include <sys/socket.h>
00009 #include <arpa/inet.h>
00010 #include <netinet/in.h>
00011 #include <netdb.h>
00012 #include <signal.h>
00013 #include <vector>
00014
00015 #define ARR_SIZE(arr) (sizeof(arr)/sizeof(arr[0]))
00016
00017 namespace Lux {
00018 namespace Dist {
00019
00020 typedef struct {
00021 pthread_t tid;
00022 void *ins;
00023 } at_arg_t;
00024
00025 int signals[] = {SIGINT, SIGQUIT, SIGTERM};
00026
00027 template<class T>
00028 class DaemonManager {
00029 public:
00030 DaemonManager(int port, uint16_t default_num_threads,
00031 uint16_t max_num_threads)
00032 : port_(port),
00033 default_num_threads_(default_num_threads),
00034 max_num_threads_(max_num_threads),
00035 is_terminating_(false),
00036 supp_(NULL)
00037 {
00038 sigset_t ss;
00039 sigemptyset(&ss);
00040 sigfillset(&ss);
00041 sigprocmask(SIG_BLOCK, &ss, 0);
00042 }
00043
00044 ~DaemonManager()
00045 {}
00046
00047 bool listen(void)
00048 {
00049 if ((sock_ = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
00050 error_log("socket() failed.");
00051 return false;
00052 }
00053 int on = 1;
00054 setsockopt(sock_, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
00055 setsockopt(sock_, IPPROTO_TCP, TCP_NODELAY, &on, sizeof(on));
00056
00057 struct sockaddr_in saddr;
00058 memset((char *) &saddr, 0, sizeof(saddr));
00059 saddr.sin_family = AF_INET;
00060 saddr.sin_addr.s_addr = htonl(INADDR_ANY);
00061 saddr.sin_port = htons(port_);
00062
00063 if (bind(sock_, (struct sockaddr *) &saddr, sizeof(saddr)) < 0) {
00064 error_log("bind() failed.");
00065 return false;
00066 }
00067
00068 ::listen(sock_, max_num_threads_);
00069
00070 return true;
00071 }
00072
00073 void add_managee_arg(void *supp)
00074 {
00075 supp_ = supp;
00076 }
00077
00078 bool start_servers(void)
00079 {
00080 stats_ = new global_thread_stats_t;
00081 stats_->num_threads = 0;
00082 stats_->num_active_threads = 0;
00083 stats_->num_processed = 0;
00084 stats_->mutex = new pthread_mutex_t;
00085 pthread_mutex_init(stats_->mutex, NULL);
00086 stats_->cond = new pthread_cond_t;
00087 pthread_cond_init(stats_->cond, NULL);
00088
00089 for (int i = 0; i < default_num_threads_; ++i) {
00090 if (!create_thread()) {
00091 error_log("create_thread() failed");
00092 }
00093 }
00094
00095 if (!create_signal_handler()) {
00096 return false;
00097 }
00098
00099
00100 pthread_mutex_lock(stats_->mutex);
00101 while (!is_terminating_) {
00102 pthread_cond_wait(stats_->cond, stats_->mutex);
00103 if (is_terminating_) { break; }
00104 std::cout << "[manager] num_active_threads: " << stats_->num_active_threads << "\n"
00105 << "[manager] num_threads: " << stats_->num_threads << std::endl;
00106 if (stats_->num_active_threads >= stats_->num_threads) {
00107 std::cout << "better to add more threads" << std::endl;
00108 if (stats_->num_threads < max_num_threads_) {
00109 if (!create_thread()) {
00110 error_log("create_thread() failed");
00111 }
00112 }
00113 } else {
00114 uint16_t dec_threshold = default_num_threads_
00115 + (stats_->num_threads - default_num_threads_) / 2;
00116 if (stats_->num_active_threads < dec_threshold) {
00117 if (stats_->num_threads > default_num_threads_) {
00118 if (!send_internal_close_request()) {
00119 error_log("send_internal_close_request() failed.");
00120 }
00121 }
00122 }
00123 }
00124 }
00125 pthread_mutex_unlock(stats_->mutex);
00126
00127 finish_servers();
00128
00129 return true;
00130 }
00131
00132 bool finish_servers(void)
00133 {
00134 std::cout << "finishing servers ..." << std::endl;
00135
00136 pthread_mutex_lock(stats_->mutex);
00137 for (int i = 0; i < stats_->num_threads; ++i) {
00138 if (!send_internal_close_request()) {
00139 error_log("send_internal_close_request() failed.");
00140 }
00141 }
00142 pthread_mutex_unlock(stats_->mutex);
00143
00144 int retry = 30;
00145 while (--retry) {
00146 sleep(1);
00147 pthread_mutex_lock(stats_->mutex);
00148 if (stats_->num_threads == 0) { break; }
00149 pthread_mutex_unlock(stats_->mutex);
00150 }
00151 close(sock_);
00152
00153 delete stats_->mutex;
00154 delete stats_->cond;
00155 delete stats_;
00156
00157 return true;
00158 }
00159
00160 private:
00161 int port_;
00162 sock_t sock_;
00163 uint16_t default_num_threads_;
00164 uint16_t max_num_threads_;
00165 bool is_terminating_;
00166 global_thread_stats_t *stats_;
00167 void *supp_;
00168
00169 bool create_thread(void)
00170 {
00171 thread_inf_t *thread_inf = new thread_inf_t;
00172 thread_inf->sock = sock_;
00173 thread_inf->stats = stats_;
00174 thread_inf->supp_arg = supp_;
00175 if (pthread_create(&thread_inf->tid, NULL,
00176 launch, (void *) thread_inf) != 0) {
00177 perror("pthread_create");
00178 return false;
00179 }
00180 if (pthread_detach(thread_inf->tid) != 0) {
00181 error_log("pthread_detach failed");
00182 return false;
00183 }
00184 return true;
00185 }
00186
00187 static void *launch(void *p)
00188 {
00189 T ins(p);
00190 ins.process();
00191 return NULL;
00192 }
00193
00194 bool send_internal_close_request(void)
00195 {
00196 at_arg_t *arg = new at_arg_t;
00197 arg->ins = this;
00198 if (pthread_create(&arg->tid, NULL,
00199 DaemonManager::run_icr, (void *) arg) != 0) {
00200 perror("pthread_create");
00201 return false;
00202 }
00203 if (pthread_detach(arg->tid) != 0) {
00204 error_log("pthread_detach failed");
00205 return false;
00206 }
00207 return true;
00208 }
00209
00210 static void *run_icr(void *arg)
00211 {
00212 if (!((DaemonManager *) ((at_arg_t *) arg)->ins)->send_quit()) {
00213 error_log("send_quit failed.");
00214 }
00215 delete (at_arg_t *) arg;
00216 return NULL;
00217 }
00218
00219 bool send_quit(void)
00220 {
00221 struct hostent *he;
00222 if ((he = gethostbyname("localhost")) == NULL) {
00223 error_log("gethostbyname failed");
00224 return false;
00225 }
00226 uint32_t dest_ip;
00227 memcpy((char *) &dest_ip, (char *) he->h_addr, he->h_length);
00228
00229 int s;
00230 struct sockaddr_in server;
00231 if ((s = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
00232 perror("socket");
00233 return false;
00234 }
00235
00236 memset((char *) &server, 0, sizeof(server));
00237 server.sin_family = AF_INET;
00238 server.sin_addr.s_addr = dest_ip;
00239 server.sin_port = htons(port_);
00240
00241 if (lut_connect(s, (struct sockaddr *) &server, sizeof(server)) < 0) {
00242 perror("connect");
00243 return false;
00244 }
00245 request_header_t header = {0, 0, COM_QUIT};
00246 if (lut_send(s, &header, sizeof(header), 0) < 0) {
00247 perror("send");
00248 return false;
00249 }
00250
00251 response_header_t res;
00252 if (lut_recv(s, (char *) &res, sizeof(res), 0) <= 0) {
00253 return false;
00254 }
00255 close(s);
00256
00257 if (res.status != STATUS_OK) {
00258 return false;
00259 }
00260 return true;
00261 }
00262
00263 bool create_signal_handler(void)
00264 {
00265 at_arg_t *arg = new at_arg_t;
00266 arg->ins = this;
00267 pthread_create(&arg->tid, NULL, DaemonManager::run_signal_handler, (void *) arg);
00268 if (pthread_detach(arg->tid) != 0) {
00269 error_log("pthread_detach failed");
00270 return false;
00271 }
00272 return true;
00273 }
00274
00275 static void *run_signal_handler(void *arg)
00276 {
00277 if (!((DaemonManager *) ((at_arg_t *) arg)->ins)->handle_signals()) {
00278 error_log("send_quit failed.");
00279 }
00280 delete (at_arg_t *) arg;
00281 return NULL;
00282 }
00283
00284 bool handle_signals(void)
00285 {
00286 sigset_t ss;
00287 int sig;
00288
00289 sigemptyset(&ss);
00290 for(uint32_t i = 0; i < ARR_SIZE(signals); ++i) {
00291 sigaddset(&ss, signals[i]);
00292 }
00293 pthread_sigmask(SIG_BLOCK, &ss, 0);
00294
00295 while (!is_terminating_) {
00296 if (sigwait(&ss, &sig)) {
00297 error_log("signal error.");
00298 continue;
00299 }
00300 for (uint32_t i = 0; i < ARR_SIZE(signals); ++i) {
00301 if (signals[i] != sig) {
00302 continue;
00303 }
00304 std::cout << "signal received: " << sig << std::endl;
00305 is_terminating_ = true;
00306 pthread_mutex_lock(stats_->mutex);
00307 pthread_cond_signal(stats_->cond);
00308 pthread_mutex_unlock(stats_->mutex);
00309 break;
00310 }
00311 }
00312 return true;
00313 }
00314 };
00315
00316 }
00317 }
00318
00319 #endif