lux/dist/client_manager.h

00001 #ifndef LUX_DIST_CLIENTMANAGER_H
00002 #define LUX_DIST_CLIENTMANAGER_H
00003 
00004 #include "common.h"
00005 #include "protocol.h"
00006 #include "message_queue.h"
00007 #include "lux/util.h"
00008 
00009 namespace Lux {
00010 namespace Dist {
00011 
00012   template<class T, class Message>
00013   class ClientManager {
00014   public:
00015     ClientManager(std::vector<server_inf_t> servers)
00016     : servers_(servers), idx_(0)
00017     {}
00018 
00019     ~ClientManager()
00020     {}
00021 
00022     bool start_clients(void)
00023     {
00024       queues_ = new MessageQueue<Message>[servers_.size()];
00025       for (int i = 0; i < servers_.size(); ++i) {
00026         servers_[i].id = i;
00027         if (!create_thread(&servers_[i])) {
00028           error_log("create_thread() failed");
00029           return false;
00030         }
00031       }
00032       return true;
00033     }
00034 
00035     bool finish_clients(void)
00036     {
00037       std::cout << "finishing servers ..." << std::endl;
00038       for (int i = 0; i < threads_.size(); ++i) {
00039         threads_[i]->is_terminating = true;
00040       }
00041 
00042       for (int i = 0; i < threads_.size(); ++i) {
00043         if (pthread_join(threads_[i]->tid, NULL) != 0) {
00044           perror("pthread_join");
00045         }
00046         delete threads_[i];
00047       }
00048       delete [] queues_;
00049       return true;
00050     }
00051 
00052     bool enqueue_message(Message &message)
00053     {
00054       std::cout << "enqueued: " << (int) idx_ << std::endl;
00055       // put message into the queue 
00056       queues_[idx_].enqueue_message(message);
00057       // round robin
00058       if (++idx_ == servers_.size()) {
00059         idx_ = 0;
00060       }
00061     }
00062 
00063   private:
00064     uint32_t idx_;
00065     std::vector< client_thread_inf_t * > threads_;
00066     std::vector< server_inf_t > servers_;
00067     MessageQueue<Message> *queues_;
00068 
00069     bool create_thread(server_inf_t *server)
00070     {
00071       client_thread_inf_t *thread_inf = new client_thread_inf_t;
00072       thread_inf->server = server;
00073       thread_inf->is_terminating = false;
00074       thread_inf->queue = (void *) &queues_[server->id];
00075       if (pthread_create(&(thread_inf->tid), NULL,
00076                          launch, (void *) thread_inf) != 0) {
00077         perror("pthread_create");
00078         return false;
00079       }
00080       threads_.push_back(thread_inf);
00081       return true;
00082     }
00083 
00084     static void *launch(void *p)
00085     {
00086       T ins(p);
00087       ins.process();
00088       return NULL;
00089     }
00090   };
00091 
00092 }
00093 }
00094 
00095 #endif

Generated on Fri Feb 5 15:50:30 2010 for Lux by  doxygen 1.4.7