diff --git a/cpp/conf/server_config.yaml b/cpp/conf/server_config.yaml index 9b56ac7e85b7f20bc28cb32f82a1593801fccc04..a80a6f510890745fde862704b91717a0bc0275f6 100644 --- a/cpp/conf/server_config.yaml +++ b/cpp/conf/server_config.yaml @@ -2,7 +2,7 @@ server_config: address: 127.0.0.1 port: 33001 transfer_protocol: json #optional: binary, compact, json, simple_json, debug - server_mode: thread_pool #optional: simple, non_blocking, hsha, thread_pool, thread_selector + server_mode: simple #optional: simple, non_blocking, hsha, thread_pool, thread_selector log_config: global: diff --git a/cpp/src/server/ServiceWrapper.cpp b/cpp/src/server/ServiceWrapper.cpp index 8d630ba11efc5a99695a7251ba06d12268e8eabe..c0c4293ec38cc152877945755e21cd5e17fe65aa 100644 --- a/cpp/src/server/ServiceWrapper.cpp +++ b/cpp/src/server/ServiceWrapper.cpp @@ -38,6 +38,11 @@ public: // Your initialization goes here } + void dummy() { + // Your implementation goes here + printf("dummy\n"); + } + /** * group interfaces * @@ -121,6 +126,9 @@ void ServiceWrapper::StartService() { protocolFactory.reset(new TBinaryProtocolFactory()); } else if(protocol == "json") { protocolFactory.reset(new TJSONProtocolFactory()); + } else { + CommonUtil::PrintError("Service protocol: " + protocol + " is not supported currently"); + return; } if(mode == "simple") { @@ -135,7 +143,8 @@ void ServiceWrapper::StartService() { s_server.reset(new TThreadPoolServer(processor, serverTransport, transportFactory, protocolFactory, threadManager)); s_server->serve(); } else { - CommonUtil::PrintError("Server mode: " + mode + " is not supported currently"); + CommonUtil::PrintError("Service mode: " + mode + " is not supported currently"); + return; } } diff --git a/cpp/src/thrift/VectorService.thrift b/cpp/src/thrift/VectorService.thrift index 7008ebd54c2ee46e26964acf8e701939dc3c7b96..583dd1a8f6fb5dcd66e73a2918a0fb00551846c4 100644 --- a/cpp/src/thrift/VectorService.thrift +++ b/cpp/src/thrift/VectorService.thrift @@ -66,6 +66,7 @@ struct VecTimeRangeList { } service VecService { + void dummy(); /** * group interfaces */ diff --git a/cpp/src/thrift/gen-cpp/VecService.cpp b/cpp/src/thrift/gen-cpp/VecService.cpp index 6374f947eee49aa150ddab7fbd55773813393a38..e9a098453f7e3da59c2120e9f4a7559916b028ff 100644 --- a/cpp/src/thrift/gen-cpp/VecService.cpp +++ b/cpp/src/thrift/gen-cpp/VecService.cpp @@ -9,6 +9,141 @@ +VecService_dummy_args::~VecService_dummy_args() throw() { +} + + +uint32_t VecService_dummy_args::read(::apache::thrift::protocol::TProtocol* iprot) { + + ::apache::thrift::protocol::TInputRecursionTracker tracker(*iprot); + uint32_t xfer = 0; + std::string fname; + ::apache::thrift::protocol::TType ftype; + int16_t fid; + + xfer += iprot->readStructBegin(fname); + + using ::apache::thrift::protocol::TProtocolException; + + + while (true) + { + xfer += iprot->readFieldBegin(fname, ftype, fid); + if (ftype == ::apache::thrift::protocol::T_STOP) { + break; + } + xfer += iprot->skip(ftype); + xfer += iprot->readFieldEnd(); + } + + xfer += iprot->readStructEnd(); + + return xfer; +} + +uint32_t VecService_dummy_args::write(::apache::thrift::protocol::TProtocol* oprot) const { + uint32_t xfer = 0; + ::apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot); + xfer += oprot->writeStructBegin("VecService_dummy_args"); + + xfer += oprot->writeFieldStop(); + xfer += oprot->writeStructEnd(); + return xfer; +} + + +VecService_dummy_pargs::~VecService_dummy_pargs() throw() { +} + + +uint32_t VecService_dummy_pargs::write(::apache::thrift::protocol::TProtocol* oprot) const { + uint32_t xfer = 0; + ::apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot); + xfer += oprot->writeStructBegin("VecService_dummy_pargs"); + + xfer += oprot->writeFieldStop(); + xfer += oprot->writeStructEnd(); + return xfer; +} + + +VecService_dummy_result::~VecService_dummy_result() throw() { +} + + +uint32_t VecService_dummy_result::read(::apache::thrift::protocol::TProtocol* iprot) { + + ::apache::thrift::protocol::TInputRecursionTracker tracker(*iprot); + uint32_t xfer = 0; + std::string fname; + ::apache::thrift::protocol::TType ftype; + int16_t fid; + + xfer += iprot->readStructBegin(fname); + + using ::apache::thrift::protocol::TProtocolException; + + + while (true) + { + xfer += iprot->readFieldBegin(fname, ftype, fid); + if (ftype == ::apache::thrift::protocol::T_STOP) { + break; + } + xfer += iprot->skip(ftype); + xfer += iprot->readFieldEnd(); + } + + xfer += iprot->readStructEnd(); + + return xfer; +} + +uint32_t VecService_dummy_result::write(::apache::thrift::protocol::TProtocol* oprot) const { + + uint32_t xfer = 0; + + xfer += oprot->writeStructBegin("VecService_dummy_result"); + + xfer += oprot->writeFieldStop(); + xfer += oprot->writeStructEnd(); + return xfer; +} + + +VecService_dummy_presult::~VecService_dummy_presult() throw() { +} + + +uint32_t VecService_dummy_presult::read(::apache::thrift::protocol::TProtocol* iprot) { + + ::apache::thrift::protocol::TInputRecursionTracker tracker(*iprot); + uint32_t xfer = 0; + std::string fname; + ::apache::thrift::protocol::TType ftype; + int16_t fid; + + xfer += iprot->readStructBegin(fname); + + using ::apache::thrift::protocol::TProtocolException; + + + while (true) + { + xfer += iprot->readFieldBegin(fname, ftype, fid); + if (ftype == ::apache::thrift::protocol::T_STOP) { + break; + } + xfer += iprot->skip(ftype); + xfer += iprot->readFieldEnd(); + } + + xfer += iprot->readStructEnd(); + + return xfer; +} + + VecService_add_group_args::~VecService_add_group_args() throw() { } @@ -1545,6 +1680,58 @@ uint32_t VecService_search_vector_batch_presult::read(::apache::thrift::protocol return xfer; } +void VecServiceClient::dummy() +{ + send_dummy(); + recv_dummy(); +} + +void VecServiceClient::send_dummy() +{ + int32_t cseqid = 0; + oprot_->writeMessageBegin("dummy", ::apache::thrift::protocol::T_CALL, cseqid); + + VecService_dummy_pargs args; + args.write(oprot_); + + oprot_->writeMessageEnd(); + oprot_->getTransport()->writeEnd(); + oprot_->getTransport()->flush(); +} + +void VecServiceClient::recv_dummy() +{ + + int32_t rseqid = 0; + std::string fname; + ::apache::thrift::protocol::TMessageType mtype; + + iprot_->readMessageBegin(fname, mtype, rseqid); + if (mtype == ::apache::thrift::protocol::T_EXCEPTION) { + ::apache::thrift::TApplicationException x; + x.read(iprot_); + iprot_->readMessageEnd(); + iprot_->getTransport()->readEnd(); + throw x; + } + if (mtype != ::apache::thrift::protocol::T_REPLY) { + iprot_->skip(::apache::thrift::protocol::T_STRUCT); + iprot_->readMessageEnd(); + iprot_->getTransport()->readEnd(); + } + if (fname.compare("dummy") != 0) { + iprot_->skip(::apache::thrift::protocol::T_STRUCT); + iprot_->readMessageEnd(); + iprot_->getTransport()->readEnd(); + } + VecService_dummy_presult result; + result.read(iprot_); + iprot_->readMessageEnd(); + iprot_->getTransport()->readEnd(); + + return; +} + void VecServiceClient::add_group(const VecGroup& group) { send_add_group(group); @@ -1989,6 +2176,59 @@ bool VecServiceProcessor::dispatchCall(::apache::thrift::protocol::TProtocol* ip return true; } +void VecServiceProcessor::process_dummy(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext) +{ + void* ctx = NULL; + if (this->eventHandler_.get() != NULL) { + ctx = this->eventHandler_->getContext("VecService.dummy", callContext); + } + ::apache::thrift::TProcessorContextFreer freer(this->eventHandler_.get(), ctx, "VecService.dummy"); + + if (this->eventHandler_.get() != NULL) { + this->eventHandler_->preRead(ctx, "VecService.dummy"); + } + + VecService_dummy_args args; + args.read(iprot); + iprot->readMessageEnd(); + uint32_t bytes = iprot->getTransport()->readEnd(); + + if (this->eventHandler_.get() != NULL) { + this->eventHandler_->postRead(ctx, "VecService.dummy", bytes); + } + + VecService_dummy_result result; + try { + iface_->dummy(); + } catch (const std::exception& e) { + if (this->eventHandler_.get() != NULL) { + this->eventHandler_->handlerError(ctx, "VecService.dummy"); + } + + ::apache::thrift::TApplicationException x(e.what()); + oprot->writeMessageBegin("dummy", ::apache::thrift::protocol::T_EXCEPTION, seqid); + x.write(oprot); + oprot->writeMessageEnd(); + oprot->getTransport()->writeEnd(); + oprot->getTransport()->flush(); + return; + } + + if (this->eventHandler_.get() != NULL) { + this->eventHandler_->preWrite(ctx, "VecService.dummy"); + } + + oprot->writeMessageBegin("dummy", ::apache::thrift::protocol::T_REPLY, seqid); + result.write(oprot); + oprot->writeMessageEnd(); + bytes = oprot->getTransport()->writeEnd(); + oprot->getTransport()->flush(); + + if (this->eventHandler_.get() != NULL) { + this->eventHandler_->postWrite(ctx, "VecService.dummy", bytes); + } +} + void VecServiceProcessor::process_add_group(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext) { void* ctx = NULL; @@ -2393,6 +2633,83 @@ void VecServiceProcessor::process_search_vector_batch(int32_t seqid, ::apache::t return processor; } +void VecServiceConcurrentClient::dummy() +{ + int32_t seqid = send_dummy(); + recv_dummy(seqid); +} + +int32_t VecServiceConcurrentClient::send_dummy() +{ + int32_t cseqid = this->sync_.generateSeqId(); + ::apache::thrift::async::TConcurrentSendSentry sentry(&this->sync_); + oprot_->writeMessageBegin("dummy", ::apache::thrift::protocol::T_CALL, cseqid); + + VecService_dummy_pargs args; + args.write(oprot_); + + oprot_->writeMessageEnd(); + oprot_->getTransport()->writeEnd(); + oprot_->getTransport()->flush(); + + sentry.commit(); + return cseqid; +} + +void VecServiceConcurrentClient::recv_dummy(const int32_t seqid) +{ + + int32_t rseqid = 0; + std::string fname; + ::apache::thrift::protocol::TMessageType mtype; + + // the read mutex gets dropped and reacquired as part of waitForWork() + // The destructor of this sentry wakes up other clients + ::apache::thrift::async::TConcurrentRecvSentry sentry(&this->sync_, seqid); + + while(true) { + if(!this->sync_.getPending(fname, mtype, rseqid)) { + iprot_->readMessageBegin(fname, mtype, rseqid); + } + if(seqid == rseqid) { + if (mtype == ::apache::thrift::protocol::T_EXCEPTION) { + ::apache::thrift::TApplicationException x; + x.read(iprot_); + iprot_->readMessageEnd(); + iprot_->getTransport()->readEnd(); + sentry.commit(); + throw x; + } + if (mtype != ::apache::thrift::protocol::T_REPLY) { + iprot_->skip(::apache::thrift::protocol::T_STRUCT); + iprot_->readMessageEnd(); + iprot_->getTransport()->readEnd(); + } + if (fname.compare("dummy") != 0) { + iprot_->skip(::apache::thrift::protocol::T_STRUCT); + iprot_->readMessageEnd(); + iprot_->getTransport()->readEnd(); + + // in a bad state, don't commit + using ::apache::thrift::protocol::TProtocolException; + throw TProtocolException(TProtocolException::INVALID_DATA); + } + VecService_dummy_presult result; + result.read(iprot_); + iprot_->readMessageEnd(); + iprot_->getTransport()->readEnd(); + + sentry.commit(); + return; + } + // seqid != rseqid + this->sync_.updatePending(fname, mtype, rseqid); + + // this will temporarily unlock the readMutex, and let other clients get work done + this->sync_.waitForWork(seqid); + } // end while(true) +} + void VecServiceConcurrentClient::add_group(const VecGroup& group) { int32_t seqid = send_add_group(group); diff --git a/cpp/src/thrift/gen-cpp/VecService.h b/cpp/src/thrift/gen-cpp/VecService.h index 5cccdfa03b7bd1e9424d378f09ccc98ff1192867..83e99957af6fa1e2c1366c92e34e56c2f1970b5c 100644 --- a/cpp/src/thrift/gen-cpp/VecService.h +++ b/cpp/src/thrift/gen-cpp/VecService.h @@ -21,6 +21,7 @@ class VecServiceIf { public: virtual ~VecServiceIf() {} + virtual void dummy() = 0; /** * group interfaces @@ -81,6 +82,9 @@ class VecServiceIfSingletonFactory : virtual public VecServiceIfFactory { class VecServiceNull : virtual public VecServiceIf { public: virtual ~VecServiceNull() {} + void dummy() { + return; + } void add_group(const VecGroup& /* group */) { return; } @@ -105,6 +109,80 @@ class VecServiceNull : virtual public VecServiceIf { } }; + +class VecService_dummy_args { + public: + + VecService_dummy_args(const VecService_dummy_args&); + VecService_dummy_args& operator=(const VecService_dummy_args&); + VecService_dummy_args() { + } + + virtual ~VecService_dummy_args() throw(); + + bool operator == (const VecService_dummy_args & /* rhs */) const + { + return true; + } + bool operator != (const VecService_dummy_args &rhs) const { + return !(*this == rhs); + } + + bool operator < (const VecService_dummy_args & ) const; + + uint32_t read(::apache::thrift::protocol::TProtocol* iprot); + uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const; + +}; + + +class VecService_dummy_pargs { + public: + + + virtual ~VecService_dummy_pargs() throw(); + + uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const; + +}; + + +class VecService_dummy_result { + public: + + VecService_dummy_result(const VecService_dummy_result&); + VecService_dummy_result& operator=(const VecService_dummy_result&); + VecService_dummy_result() { + } + + virtual ~VecService_dummy_result() throw(); + + bool operator == (const VecService_dummy_result & /* rhs */) const + { + return true; + } + bool operator != (const VecService_dummy_result &rhs) const { + return !(*this == rhs); + } + + bool operator < (const VecService_dummy_result & ) const; + + uint32_t read(::apache::thrift::protocol::TProtocol* iprot); + uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const; + +}; + + +class VecService_dummy_presult { + public: + + + virtual ~VecService_dummy_presult() throw(); + + uint32_t read(::apache::thrift::protocol::TProtocol* iprot); + +}; + typedef struct _VecService_add_group_args__isset { _VecService_add_group_args__isset() : group(false) {} bool group :1; @@ -954,6 +1032,9 @@ class VecServiceClient : virtual public VecServiceIf { apache::thrift::stdcxx::shared_ptr< ::apache::thrift::protocol::TProtocol> getOutputProtocol() { return poprot_; } + void dummy(); + void send_dummy(); + void recv_dummy(); void add_group(const VecGroup& group); void send_add_group(const VecGroup& group); void recv_add_group(); @@ -990,6 +1071,7 @@ class VecServiceProcessor : public ::apache::thrift::TDispatchProcessor { typedef void (VecServiceProcessor::*ProcessFunction)(int32_t, ::apache::thrift::protocol::TProtocol*, ::apache::thrift::protocol::TProtocol*, void*); typedef std::map ProcessMap; ProcessMap processMap_; + void process_dummy(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext); void process_add_group(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext); void process_get_group(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext); void process_del_group(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext); @@ -1000,6 +1082,7 @@ class VecServiceProcessor : public ::apache::thrift::TDispatchProcessor { public: VecServiceProcessor(::apache::thrift::stdcxx::shared_ptr iface) : iface_(iface) { + processMap_["dummy"] = &VecServiceProcessor::process_dummy; processMap_["add_group"] = &VecServiceProcessor::process_add_group; processMap_["get_group"] = &VecServiceProcessor::process_get_group; processMap_["del_group"] = &VecServiceProcessor::process_del_group; @@ -1035,6 +1118,15 @@ class VecServiceMultiface : virtual public VecServiceIf { ifaces_.push_back(iface); } public: + void dummy() { + size_t sz = ifaces_.size(); + size_t i = 0; + for (; i < (sz - 1); ++i) { + ifaces_[i]->dummy(); + } + ifaces_[i]->dummy(); + } + void add_group(const VecGroup& group) { size_t sz = ifaces_.size(); size_t i = 0; @@ -1132,6 +1224,9 @@ class VecServiceConcurrentClient : virtual public VecServiceIf { apache::thrift::stdcxx::shared_ptr< ::apache::thrift::protocol::TProtocol> getOutputProtocol() { return poprot_; } + void dummy(); + int32_t send_dummy(); + void recv_dummy(const int32_t seqid); void add_group(const VecGroup& group); int32_t send_add_group(const VecGroup& group); void recv_add_group(const int32_t seqid); diff --git a/cpp/src/thrift/gen-cpp/VecService_server.skeleton.cpp b/cpp/src/thrift/gen-cpp/VecService_server.skeleton.cpp index 81726ee8fe5eadc5607af8732d5e235deb957367..2309c8c3846fbffc631b659641ad6a5667a88683 100644 --- a/cpp/src/thrift/gen-cpp/VecService_server.skeleton.cpp +++ b/cpp/src/thrift/gen-cpp/VecService_server.skeleton.cpp @@ -18,6 +18,11 @@ class VecServiceHandler : virtual public VecServiceIf { // Your initialization goes here } + void dummy() { + // Your implementation goes here + printf("dummy\n"); + } + /** * group interfaces * @@ -80,7 +85,7 @@ int main(int argc, char **argv) { int port = 9090; ::apache::thrift::stdcxx::shared_ptr handler(new VecServiceHandler()); ::apache::thrift::stdcxx::shared_ptr processor(new VecServiceProcessor(handler)); - ::apache::thrift::stdcxx::shared_ptr serverTransport(new TServerSocket(port)); + ::apache::thrift::stdcxx::shared_ptr serverTransport(new TServerSocket("localhost", port)); ::apache::thrift::stdcxx::shared_ptr transportFactory(new TBufferedTransportFactory()); ::apache::thrift::stdcxx::shared_ptr protocolFactory(new TBinaryProtocolFactory()); diff --git a/cpp/test_client/CMakeLists.txt b/cpp/test_client/CMakeLists.txt index 8cc517d3616b87e169813054c2d1c6bebea2e12f..cd74f0c605686dbd072a5779168e94e5c7b60331 100644 --- a/cpp/test_client/CMakeLists.txt +++ b/cpp/test_client/CMakeLists.txt @@ -41,3 +41,11 @@ set(client_libs pthread) target_link_libraries(test_client ${client_libs}) + +add_executable(skeleton_server + ../src/thrift/gen-cpp/VecService_server.skeleton.cpp + ../src/thrift/gen-cpp/VecService.cpp + ../src/thrift/gen-cpp/VectorService_constants.cpp + ../src/thrift/gen-cpp/VectorService_types.cpp) + +target_link_libraries(skeleton_server thrift) diff --git a/cpp/test_client/src/ClientApp.cpp b/cpp/test_client/src/ClientApp.cpp index 8ba918ac7ef2044bf16dfd57238f1065eaf391b1..78d5f1c999c33e6d7cce32c1bdeaf6f7c61c245d 100644 --- a/cpp/test_client/src/ClientApp.cpp +++ b/cpp/test_client/src/ClientApp.cpp @@ -19,6 +19,7 @@ #include #include #include +#include #include #include @@ -41,20 +42,34 @@ void ClientApp::Run(const std::string &config_file) { std::string protocol = server_config.GetValue(server::CONFIG_SERVER_PROTOCOL, "binary"); std::string mode = server_config.GetValue(server::CONFIG_SERVER_MODE, "thread_pool"); - ::apache::thrift::stdcxx::shared_ptr protocolFactory; + + ::apache::thrift::stdcxx::shared_ptr socket_ptr(new ::apache::thrift::transport::TSocket("localhost", 9090)); + ::apache::thrift::stdcxx::shared_ptr transport_ptr(new TBufferedTransport(socket_ptr)); + ::apache::thrift::stdcxx::shared_ptr protocol_ptr; if(protocol == "binary") { - protocolFactory.reset(new TBinaryProtocolFactory()); + protocol_ptr.reset(new TBinaryProtocol(transport_ptr)); } else if(protocol == "json") { - protocolFactory.reset(new TJSONProtocolFactory()); + protocol_ptr.reset(new TJSONProtocol(transport_ptr)); + } else { + server::CommonUtil::PrintError("Service protocol: " + protocol + " is not supported currently"); + return; } - if(mode == "simple") { - - } else if(mode == "thread_pool") { - - } else { - server::CommonUtil::PrintError("Server mode: " + mode + " is not supported currently"); + transport_ptr->open(); + VecServiceClient client(protocol_ptr); + try { + client.dummy(); +// VecGroup group; +// group.id = "test_group"; +// group.dimension = 256; +// group.index_type = 0; +// client.add_group(group); + } catch (apache::thrift::TException& ex) { + printf("%s", ex.what()); } + + transport_ptr->close(); + server::CommonUtil::PrintInfo("test_client exit..."); } }