00001 #ifndef LUX_DIST_CLIENTMANAGER_H
00002 #define LUX_DIST_CLIENTMANAGER_H
00003
00004 #include "common.h"
00005 #include "protocol.h"
00006 #include "message_queue.h"
00007 #include "lux/util.h"
00008
00009 namespace Lux {
00010 namespace Dist {
00011
00012 template<class T, class Message>
00013 class ClientManager {
00014 public:
00015 ClientManager(std::vector<server_inf_t> servers)
00016 : servers_(servers), idx_(0)
00017 {}
00018
00019 ~ClientManager()
00020 {}
00021
00022 bool start_clients(void)
00023 {
00024 queues_ = new MessageQueue<Message>[servers_.size()];
00025 for (int i = 0; i < servers_.size(); ++i) {
00026 servers_[i].id = i;
00027 if (!create_thread(&servers_[i])) {
00028 error_log("create_thread() failed");
00029 return false;
00030 }
00031 }
00032 return true;
00033 }
00034
00035 bool finish_clients(void)
00036 {
00037 std::cout << "finishing servers ..." << std::endl;
00038 for (int i = 0; i < threads_.size(); ++i) {
00039 threads_[i]->is_terminating = true;
00040 }
00041
00042 for (int i = 0; i < threads_.size(); ++i) {
00043 if (pthread_join(threads_[i]->tid, NULL) != 0) {
00044 perror("pthread_join");
00045 }
00046 delete threads_[i];
00047 }
00048 delete [] queues_;
00049 return true;
00050 }
00051
00052 bool enqueue_message(Message &message)
00053 {
00054 std::cout << "enqueued: " << (int) idx_ << std::endl;
00055
00056 queues_[idx_].enqueue_message(message);
00057
00058 if (++idx_ == servers_.size()) {
00059 idx_ = 0;
00060 }
00061 }
00062
00063 private:
00064 uint32_t idx_;
00065 std::vector< client_thread_inf_t * > threads_;
00066 std::vector< server_inf_t > servers_;
00067 MessageQueue<Message> *queues_;
00068
00069 bool create_thread(server_inf_t *server)
00070 {
00071 client_thread_inf_t *thread_inf = new client_thread_inf_t;
00072 thread_inf->server = server;
00073 thread_inf->is_terminating = false;
00074 thread_inf->queue = (void *) &queues_[server->id];
00075 if (pthread_create(&(thread_inf->tid), NULL,
00076 launch, (void *) thread_inf) != 0) {
00077 perror("pthread_create");
00078 return false;
00079 }
00080 threads_.push_back(thread_inf);
00081 return true;
00082 }
00083
00084 static void *launch(void *p)
00085 {
00086 T ins(p);
00087 ins.process();
00088 return NULL;
00089 }
00090 };
00091
00092 }
00093 }
00094
00095 #endif