00001 #ifndef LUX_DIST_CLUSTERSEARCHER_H
00002 #define LUX_DIST_CLUSTERSEARCHER_H
00003
00004 #include "common.h"
00005 #include "protocol.h"
00006 #include "result.h"
00007 #include "lux/search.h"
00008 #include "lux/document/document_serializer.h"
00009 #include "server-config.pb.h"
00010 #include <string>
00011 #include <vector>
00012 #include <deque>
00013 #include <event.h>
00014
00015 namespace Lux {
00016 namespace Dist {
00017
00018 class ClusterSearcher;
00019
00020 typedef enum {
00021 INITIALIZED,
00022 CONNECTING,
00023 READING,
00024 DONE,
00025 ERROR
00026 } status_t;
00027
00028 typedef struct {
00029 response_header_t *h;
00030 uint32_t h_received;
00031 char *b;
00032 uint32_t b_received;
00033 uint32_t b_length;
00034 } response_t;
00035
00036 typedef struct {
00037 response_t *resp;
00038 status_t status;
00039 } client_t;
00040
00041 typedef struct {
00042 ClusterSearcher *ins;
00043 server_inf_t *serv;
00044 client_t *cli;
00045 command_t command;
00046 struct event e;
00047
00048 } arg_t;
00049
00050 typedef std::vector<server_inf_t> servers_inf;
00051 typedef servers_inf::iterator servers_inf_itr;
00052 typedef std::vector< std::deque<doc_id_t> > DocLocations;
00053
00054 class ClusterSearcher {
00055 public:
00056 ClusterSearcher(Lux::Config::Cluster *cluster_config);
00057 ~ClusterSearcher(void);
00058
00059 ResultSet search(const char *query, Condition &cond);
00060 ResultSet search(std::string query, Condition &cond);
00061
00062 MergeResultSet search_servers_and_merge(const char *query,
00063 Condition &cond);
00064 MergeResultSet search_servers_and_merge(const std::string &query,
00065 Condition &cond);
00066 TmpResultSet get_docs_from_servers(MergeResultSet &mrs);
00067
00068 private:
00069 str_vec server_name_list_;
00070 server_inf_t *servers_;
00071 client_t *clients_;
00072 arg_t *args_;
00073 uint32_t num_servers_;
00074 std::string query_;
00075 Condition cond_;
00076 DocLocations dloc_;
00077 static DocumentSerializer doc_serializer_;
00078 struct event_base *ebase_;
00079
00080 void release_all(void);
00081 void empty_clients(void);
00082 bool nonblocking_connect(uint32_t sid, command_t command);
00083 bool search_indices(void);
00084 bool search_docs(void);
00085 bool merge_results(MergeResultSet &mrs);
00086 bool collect_doc_by_server(MergeResultSet &mrs);
00087 static void run(int fd, short event, void *arg);
00088 void handler(int fd, short event, void *arg);
00089 bool send_request(server_inf_t *serv, command_t command);
00090 bool send_search_request(server_inf_t *serv);
00091 bool send_getdocs_request(server_inf_t *serv);
00092 int recv_search_response(int fd, response_t *resp);
00093 void set_nonblocking(int &socket);
00094 void init_sockaddr(struct sockaddr_in &addr, in_addr_t ip, uint16_t port);
00095 ResultSet get_resultset(MergeResultSet &mrs);
00096 TmpResultSet get_tmp_resultset(MergeResultSet &mrs);
00097 };
00098
00099 }
00100 }
00101
00102 #endif