diff --git a/cpp/src/server/Server.cpp b/cpp/src/server/Server.cpp index 2dca04ad72caf42f9e3e530c613d45a6114d3dc6..368f8018d2f89a342fbc08e894b1448466ea193a 100644 --- a/cpp/src/server/Server.cpp +++ b/cpp/src/server/Server.cpp @@ -5,7 +5,7 @@ //////////////////////////////////////////////////////////////////////////////// #include "Server.h" #include "ServerConfig.h" -#include "ServiceWrapper.h" +#include "VecServiceWrapper.h" #include "utils/Log.h" #include "utils/SignalUtil.h" #include "utils/TimeRecorder.h" @@ -215,12 +215,12 @@ Server::LoadConfig() { void Server::StartService() { - ServiceWrapper::StartService(); + VecServiceWrapper::StartService(); } void Server::StopService() { - ServiceWrapper::StopService(); + VecServiceWrapper::StopService(); } } diff --git a/cpp/src/server/ServiceWrapper.cpp b/cpp/src/server/ServiceWrapper.cpp deleted file mode 100644 index 33528f6136ec2d1193496b3f30e4b1eeb433bbc1..0000000000000000000000000000000000000000 --- a/cpp/src/server/ServiceWrapper.cpp +++ /dev/null @@ -1,186 +0,0 @@ -/******************************************************************************* - * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved - * Unauthorized copying of this file, via any medium is strictly prohibited. - * Proprietary and confidential. - ******************************************************************************/ -#include "ServiceWrapper.h" -#include "ServerConfig.h" - -#include "utils/Log.h" - -#include "thrift/gen-cpp/VecService.h" -#include "thrift/gen-cpp/VectorService_types.h" -#include "thrift/gen-cpp/VectorService_constants.h" - -#include -#include -#include -#include -#include -//#include -#include -#include -#include -#include - -#include - -namespace zilliz { -namespace vecwise { -namespace server { - -using namespace ::apache::thrift; -using namespace ::apache::thrift::protocol; -using namespace ::apache::thrift::transport; -using namespace ::apache::thrift::server; -using namespace ::apache::thrift::concurrency; - -class VecServiceHandler : virtual public VecServiceIf { -public: - VecServiceHandler() { - // Your initialization goes here - } - - void dummy() { - // Your implementation goes here - SERVER_LOG_INFO << "dummy() called"; - } - - /** - * group interfaces - * - * @param group - */ - void add_group(const VecGroup& group) { - // Your implementation goes here - SERVER_LOG_INFO << "add_group() called"; - } - - void get_group(VecGroup& _return, const std::string& group_id) { - // Your implementation goes here - SERVER_LOG_INFO << "get_group() called"; - } - - void del_group(const std::string& group_id) { - // Your implementation goes here - SERVER_LOG_INFO << "del_group() called"; - } - - /** - * vector interfaces - * - * - * @param group_id - * @param tensor - */ - int64_t add_vector(const std::string& group_id, const VecTensor& tensor) { - // Your implementation goes here - SERVER_LOG_INFO << "add_vector() called"; - } - - void add_vector_batch(VecTensorIdList& _return, const std::string& group_id, const VecTensorList& tensor_list) { - // Your implementation goes here - SERVER_LOG_INFO << "add_vector_batch() called"; - } - - /** - * search interfaces - * if time_range_list is empty, engine will search without time limit - * - * @param group_id - * @param top_k - * @param tensor - * @param time_range_list - */ - void search_vector(VecSearchResult& _return, const std::string& group_id, const int64_t top_k, const VecTensor& tensor, const VecTimeRangeList& time_range_list) { - // Your implementation goes here - SERVER_LOG_INFO << "search_vector() called"; - } - - void search_vector_batch(VecSearchResultList& _return, const std::string& group_id, const int64_t top_k, const VecTensorList& tensor_list, const VecTimeRangeList& time_range_list) { - // Your implementation goes here - SERVER_LOG_INFO << "search_vector_batch() called"; - } - -}; - -static ::apache::thrift::stdcxx::shared_ptr s_server; - -void ServiceWrapper::StartService() { - if(s_server != nullptr){ - StopService(); - } - - ServerConfig &config = ServerConfig::GetInstance(); - ConfigNode server_config = config.GetConfig(CONFIG_SERVER); - - std::string address = server_config.GetValue(CONFIG_SERVER_ADDRESS, "127.0.0.1"); - int32_t port = server_config.GetInt32Value(CONFIG_SERVER_PORT, 33001); - std::string protocol = server_config.GetValue(CONFIG_SERVER_PROTOCOL, "binary"); - std::string mode = server_config.GetValue(CONFIG_SERVER_MODE, "thread_pool"); - - ::apache::thrift::stdcxx::shared_ptr handler(new VecServiceHandler()); - ::apache::thrift::stdcxx::shared_ptr processor(new VecServiceProcessor(handler)); - ::apache::thrift::stdcxx::shared_ptr server_transport(new TServerSocket(address, port)); - ::apache::thrift::stdcxx::shared_ptr transport_factory(new TBufferedTransportFactory()); - - ::apache::thrift::stdcxx::shared_ptr protocol_factory; - if(protocol == "binary") { - protocol_factory.reset(new TBinaryProtocolFactory()); - } else if(protocol == "json") { - protocol_factory.reset(new TJSONProtocolFactory()); - } else if(protocol == "compact") { - protocol_factory.reset(new TCompactProtocolFactory()); - } else if(protocol == "debug") { - protocol_factory.reset(new TDebugProtocolFactory()); - } else { - SERVER_LOG_INFO << "Service protocol: " << protocol << " is not supported currently"; - return; - } - - if(mode == "simple") { - s_server.reset(new TSimpleServer(processor, server_transport, transport_factory, protocol_factory)); - s_server->serve(); -// } else if(mode == "non_blocking") { -// ::apache::thrift::stdcxx::shared_ptr nb_server_transport(new TServerSocket(address, port)); -// ::apache::thrift::stdcxx::shared_ptr threadManager(ThreadManager::newSimpleThreadManager()); -// ::apache::thrift::stdcxx::shared_ptr threadFactory(new PosixThreadFactory()); -// threadManager->threadFactory(threadFactory); -// threadManager->start(); -// -// s_server.reset(new TNonblockingServer(processor, -// protocol_factory, -// nb_server_transport, -// threadManager)); - } else if(mode == "thread_pool") { - ::apache::thrift::stdcxx::shared_ptr threadManager(ThreadManager::newSimpleThreadManager()); - ::apache::thrift::stdcxx::shared_ptr threadFactory(new PosixThreadFactory()); - threadManager->threadFactory(threadFactory); - threadManager->start(); - - s_server.reset(new TThreadPoolServer(processor, - server_transport, - transport_factory, - protocol_factory, - threadManager)); - s_server->serve(); - } else { - SERVER_LOG_INFO << "Service mode: " << mode << " is not supported currently"; - return; - } -} - -void ServiceWrapper::StopService() { - auto stop_server_worker = [&]{ - if(s_server != nullptr) { - s_server->stop(); - } - }; - - std::shared_ptr stop_thread = std::make_shared(stop_server_worker); - stop_thread->join(); -} - -} -} -} \ No newline at end of file diff --git a/cpp/src/server/VecServiceHandler.cpp b/cpp/src/server/VecServiceHandler.cpp new file mode 100644 index 0000000000000000000000000000000000000000..2b757da3482ba3d3c31e117ad4c729f79dac08a0 --- /dev/null +++ b/cpp/src/server/VecServiceHandler.cpp @@ -0,0 +1,119 @@ +// +// Created by yhmo on 19-4-16. +// + +#include "VecServiceHandler.h" +#include "utils/Log.h" + +namespace zilliz { +namespace vecwise { +namespace server { + +VecServiceHandler::VecServiceHandler() { +} + +void +VecServiceHandler::add_group(const VecGroup &group) { + SERVER_LOG_INFO << "add_group() called"; + SERVER_LOG_TRACE << "group.id = " << group.id << ", group.dimension = " << group.dimension + << ", group.index_type = " << group.index_type; + + try { + + } catch (std::exception& ex) { + SERVER_LOG_ERROR << ex.what(); + } +} + +void +VecServiceHandler::get_group(VecGroup &_return, const std::string &group_id) { + SERVER_LOG_INFO << "get_group() called"; + SERVER_LOG_TRACE << "group_id = " << group_id; + + try { + + } catch (std::exception& ex) { + SERVER_LOG_ERROR << ex.what(); + } +} + +void +VecServiceHandler::del_group(const std::string &group_id) { + SERVER_LOG_INFO << "del_group() called"; + SERVER_LOG_TRACE << "group_id = " << group_id; + + try { + + } catch (std::exception& ex) { + SERVER_LOG_ERROR << ex.what(); + } +} + + +int64_t +VecServiceHandler::add_vector(const std::string &group_id, const VecTensor &tensor) { + SERVER_LOG_INFO << "add_vector() called"; + SERVER_LOG_TRACE << "group_id = " << group_id << ", vector size = " << tensor.tensor.size(); + + try { + + } catch (std::exception& ex) { + SERVER_LOG_ERROR << ex.what(); + } +} + +void +VecServiceHandler::add_vector_batch(VecTensorIdList &_return, + const std::string &group_id, + const VecTensorList &tensor_list) { + SERVER_LOG_INFO << "add_vector_batch() called"; + SERVER_LOG_TRACE << "group_id = " << group_id << ", vector list size = " + << tensor_list.tensor_list.size(); + + try { + + } catch (std::exception& ex) { + SERVER_LOG_ERROR << ex.what(); + } +} + + +void +VecServiceHandler::search_vector(VecSearchResult &_return, + const std::string &group_id, + const int64_t top_k, + const VecTensor &tensor, + const VecTimeRangeList &time_range_list) { + SERVER_LOG_INFO << "search_vector() called"; + SERVER_LOG_TRACE << "group_id = " << group_id << ", top_k = " << top_k + << ", vector size = " << tensor.tensor.size() + << ", time range list size = " << time_range_list.range_list.size(); + + try { + + } catch (std::exception& ex) { + SERVER_LOG_ERROR << ex.what(); + } +} + +void +VecServiceHandler::search_vector_batch(VecSearchResultList &_return, + const std::string &group_id, + const int64_t top_k, + const VecTensorList &tensor_list, + const VecTimeRangeList &time_range_list) { + SERVER_LOG_INFO << "search_vector_batch() called"; + SERVER_LOG_TRACE << "group_id = " << group_id << ", top_k = " << top_k + << ", vector list size = " << tensor_list.tensor_list.size() + << ", time range list size = " << time_range_list.range_list.size(); + + try { + + } catch (std::exception& ex) { + SERVER_LOG_ERROR << ex.what(); + } +} + +} +} +} \ No newline at end of file diff --git a/cpp/src/server/VecServiceHandler.h b/cpp/src/server/VecServiceHandler.h new file mode 100644 index 0000000000000000000000000000000000000000..74b684709ce3a56ebab3e01ff98b8607b25616c1 --- /dev/null +++ b/cpp/src/server/VecServiceHandler.h @@ -0,0 +1,70 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include "utils/Error.h" +#include "thrift/gen-cpp/VecService.h" + +#include +#include + +namespace zilliz { +namespace vecwise { +namespace server { + +class VecServiceHandler : virtual public VecServiceIf { +public: + VecServiceHandler(); + + /** + * group interfaces + * + * @param group + */ + void add_group(const VecGroup& group); + + void get_group(VecGroup& _return, const std::string& group_id); + + void del_group(const std::string& group_id); + + /** + * vector interfaces + * + * + * @param group_id + * @param tensor + */ + int64_t add_vector(const std::string& group_id, const VecTensor& tensor); + + void add_vector_batch(VecTensorIdList& _return, const std::string& group_id, const VecTensorList& tensor_list); + + /** + * search interfaces + * if time_range_list is empty, engine will search without time limit + * + * @param group_id + * @param top_k + * @param tensor + * @param time_range_list + */ + void search_vector(VecSearchResult& _return, + const std::string& group_id, + const int64_t top_k, + const VecTensor& tensor, + const VecTimeRangeList& time_range_list); + + void search_vector_batch(VecSearchResultList& _return, + const std::string& group_id, + const int64_t top_k, + const VecTensorList& tensor_list, + const VecTimeRangeList& time_range_list); + +}; + + +} +} +} diff --git a/cpp/src/server/VecServiceWrapper.cpp b/cpp/src/server/VecServiceWrapper.cpp new file mode 100644 index 0000000000000000000000000000000000000000..682e8482c8cf85cf4556d2fbdf932b3610b5321e --- /dev/null +++ b/cpp/src/server/VecServiceWrapper.cpp @@ -0,0 +1,121 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#include "VecServiceWrapper.h" +#include "VecServiceHandler.h" +#include "ServerConfig.h" + +#include "utils/Log.h" + +#include "thrift/gen-cpp/VectorService_types.h" +#include "thrift/gen-cpp/VectorService_constants.h" + +#include +#include +#include +#include +#include +//#include +#include +#include +#include +#include + +#include + +namespace zilliz { +namespace vecwise { +namespace server { + +using namespace ::apache::thrift; +using namespace ::apache::thrift::protocol; +using namespace ::apache::thrift::transport; +using namespace ::apache::thrift::server; +using namespace ::apache::thrift::concurrency; + +static stdcxx::shared_ptr s_server; + +void VecServiceWrapper::StartService() { + if(s_server != nullptr){ + StopService(); + } + + ServerConfig &config = ServerConfig::GetInstance(); + ConfigNode server_config = config.GetConfig(CONFIG_SERVER); + + std::string address = server_config.GetValue(CONFIG_SERVER_ADDRESS, "127.0.0.1"); + int32_t port = server_config.GetInt32Value(CONFIG_SERVER_PORT, 33001); + std::string protocol = server_config.GetValue(CONFIG_SERVER_PROTOCOL, "binary"); + std::string mode = server_config.GetValue(CONFIG_SERVER_MODE, "thread_pool"); + + try { + stdcxx::shared_ptr handler(new VecServiceHandler()); + stdcxx::shared_ptr processor(new VecServiceProcessor(handler)); + stdcxx::shared_ptr server_transport(new TServerSocket(address, port)); + stdcxx::shared_ptr transport_factory(new TBufferedTransportFactory()); + + stdcxx::shared_ptr protocol_factory; + if (protocol == "binary") { + protocol_factory.reset(new TBinaryProtocolFactory()); + } else if (protocol == "json") { + protocol_factory.reset(new TJSONProtocolFactory()); + } else if (protocol == "compact") { + protocol_factory.reset(new TCompactProtocolFactory()); + } else if (protocol == "debug") { + protocol_factory.reset(new TDebugProtocolFactory()); + } else { + SERVER_LOG_INFO << "Service protocol: " << protocol << " is not supported currently"; + return; + } + + if (mode == "simple") { + s_server.reset(new TSimpleServer(processor, server_transport, transport_factory, protocol_factory)); + s_server->serve(); +// } else if(mode == "non_blocking") { +// ::apache::thrift::stdcxx::shared_ptr nb_server_transport(new TServerSocket(address, port)); +// ::apache::thrift::stdcxx::shared_ptr threadManager(ThreadManager::newSimpleThreadManager()); +// ::apache::thrift::stdcxx::shared_ptr threadFactory(new PosixThreadFactory()); +// threadManager->threadFactory(threadFactory); +// threadManager->start(); +// +// s_server.reset(new TNonblockingServer(processor, +// protocol_factory, +// nb_server_transport, +// threadManager)); + } else if (mode == "thread_pool") { + stdcxx::shared_ptr threadManager(ThreadManager::newSimpleThreadManager()); + stdcxx::shared_ptr threadFactory(new PosixThreadFactory()); + threadManager->threadFactory(threadFactory); + threadManager->start(); + + s_server.reset(new TThreadPoolServer(processor, + server_transport, + transport_factory, + protocol_factory, + threadManager)); + s_server->serve(); + } else { + SERVER_LOG_INFO << "Service mode: " << mode << " is not supported currently"; + return; + } + } catch (apache::thrift::TException& ex) { + SERVER_LOG_ERROR << "Server encounter exception: " << ex.what(); + } +} + +void VecServiceWrapper::StopService() { + auto stop_server_worker = [&]{ + if(s_server != nullptr) { + s_server->stop(); + } + }; + + std::shared_ptr stop_thread = std::make_shared(stop_server_worker); + stop_thread->join(); +} + +} +} +} \ No newline at end of file diff --git a/cpp/src/server/ServiceWrapper.h b/cpp/src/server/VecServiceWrapper.h similarity index 95% rename from cpp/src/server/ServiceWrapper.h rename to cpp/src/server/VecServiceWrapper.h index 00b01f2c19231501cc2b2e79896de3d4093fe322..7af9cb5e563f46072f86d1f7ed225575865f7d79 100644 --- a/cpp/src/server/ServiceWrapper.h +++ b/cpp/src/server/VecServiceWrapper.h @@ -14,7 +14,7 @@ namespace zilliz { namespace vecwise { namespace server { -class ServiceWrapper { +class VecServiceWrapper { public: static void StartService(); static void StopService(); diff --git a/cpp/src/thrift/VectorService.thrift b/cpp/src/thrift/VectorService.thrift index 583dd1a8f6fb5dcd66e73a2918a0fb00551846c4..7008ebd54c2ee46e26964acf8e701939dc3c7b96 100644 --- a/cpp/src/thrift/VectorService.thrift +++ b/cpp/src/thrift/VectorService.thrift @@ -66,7 +66,6 @@ struct VecTimeRangeList { } service VecService { - void dummy(); /** * group interfaces */ diff --git a/cpp/src/thrift/gen-cpp/VecService.cpp b/cpp/src/thrift/gen-cpp/VecService.cpp index e9a098453f7e3da59c2120e9f4a7559916b028ff..6374f947eee49aa150ddab7fbd55773813393a38 100644 --- a/cpp/src/thrift/gen-cpp/VecService.cpp +++ b/cpp/src/thrift/gen-cpp/VecService.cpp @@ -9,141 +9,6 @@ -VecService_dummy_args::~VecService_dummy_args() throw() { -} - - -uint32_t VecService_dummy_args::read(::apache::thrift::protocol::TProtocol* iprot) { - - ::apache::thrift::protocol::TInputRecursionTracker tracker(*iprot); - uint32_t xfer = 0; - std::string fname; - ::apache::thrift::protocol::TType ftype; - int16_t fid; - - xfer += iprot->readStructBegin(fname); - - using ::apache::thrift::protocol::TProtocolException; - - - while (true) - { - xfer += iprot->readFieldBegin(fname, ftype, fid); - if (ftype == ::apache::thrift::protocol::T_STOP) { - break; - } - xfer += iprot->skip(ftype); - xfer += iprot->readFieldEnd(); - } - - xfer += iprot->readStructEnd(); - - return xfer; -} - -uint32_t VecService_dummy_args::write(::apache::thrift::protocol::TProtocol* oprot) const { - uint32_t xfer = 0; - ::apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot); - xfer += oprot->writeStructBegin("VecService_dummy_args"); - - xfer += oprot->writeFieldStop(); - xfer += oprot->writeStructEnd(); - return xfer; -} - - -VecService_dummy_pargs::~VecService_dummy_pargs() throw() { -} - - -uint32_t VecService_dummy_pargs::write(::apache::thrift::protocol::TProtocol* oprot) const { - uint32_t xfer = 0; - ::apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot); - xfer += oprot->writeStructBegin("VecService_dummy_pargs"); - - xfer += oprot->writeFieldStop(); - xfer += oprot->writeStructEnd(); - return xfer; -} - - -VecService_dummy_result::~VecService_dummy_result() throw() { -} - - -uint32_t VecService_dummy_result::read(::apache::thrift::protocol::TProtocol* iprot) { - - ::apache::thrift::protocol::TInputRecursionTracker tracker(*iprot); - uint32_t xfer = 0; - std::string fname; - ::apache::thrift::protocol::TType ftype; - int16_t fid; - - xfer += iprot->readStructBegin(fname); - - using ::apache::thrift::protocol::TProtocolException; - - - while (true) - { - xfer += iprot->readFieldBegin(fname, ftype, fid); - if (ftype == ::apache::thrift::protocol::T_STOP) { - break; - } - xfer += iprot->skip(ftype); - xfer += iprot->readFieldEnd(); - } - - xfer += iprot->readStructEnd(); - - return xfer; -} - -uint32_t VecService_dummy_result::write(::apache::thrift::protocol::TProtocol* oprot) const { - - uint32_t xfer = 0; - - xfer += oprot->writeStructBegin("VecService_dummy_result"); - - xfer += oprot->writeFieldStop(); - xfer += oprot->writeStructEnd(); - return xfer; -} - - -VecService_dummy_presult::~VecService_dummy_presult() throw() { -} - - -uint32_t VecService_dummy_presult::read(::apache::thrift::protocol::TProtocol* iprot) { - - ::apache::thrift::protocol::TInputRecursionTracker tracker(*iprot); - uint32_t xfer = 0; - std::string fname; - ::apache::thrift::protocol::TType ftype; - int16_t fid; - - xfer += iprot->readStructBegin(fname); - - using ::apache::thrift::protocol::TProtocolException; - - - while (true) - { - xfer += iprot->readFieldBegin(fname, ftype, fid); - if (ftype == ::apache::thrift::protocol::T_STOP) { - break; - } - xfer += iprot->skip(ftype); - xfer += iprot->readFieldEnd(); - } - - xfer += iprot->readStructEnd(); - - return xfer; -} - - VecService_add_group_args::~VecService_add_group_args() throw() { } @@ -1680,58 +1545,6 @@ uint32_t VecService_search_vector_batch_presult::read(::apache::thrift::protocol return xfer; } -void VecServiceClient::dummy() -{ - send_dummy(); - recv_dummy(); -} - -void VecServiceClient::send_dummy() -{ - int32_t cseqid = 0; - oprot_->writeMessageBegin("dummy", ::apache::thrift::protocol::T_CALL, cseqid); - - VecService_dummy_pargs args; - args.write(oprot_); - - oprot_->writeMessageEnd(); - oprot_->getTransport()->writeEnd(); - oprot_->getTransport()->flush(); -} - -void VecServiceClient::recv_dummy() -{ - - int32_t rseqid = 0; - std::string fname; - ::apache::thrift::protocol::TMessageType mtype; - - iprot_->readMessageBegin(fname, mtype, rseqid); - if (mtype == ::apache::thrift::protocol::T_EXCEPTION) { - ::apache::thrift::TApplicationException x; - x.read(iprot_); - iprot_->readMessageEnd(); - iprot_->getTransport()->readEnd(); - throw x; - } - if (mtype != ::apache::thrift::protocol::T_REPLY) { - iprot_->skip(::apache::thrift::protocol::T_STRUCT); - iprot_->readMessageEnd(); - iprot_->getTransport()->readEnd(); - } - if (fname.compare("dummy") != 0) { - iprot_->skip(::apache::thrift::protocol::T_STRUCT); - iprot_->readMessageEnd(); - iprot_->getTransport()->readEnd(); - } - VecService_dummy_presult result; - result.read(iprot_); - iprot_->readMessageEnd(); - iprot_->getTransport()->readEnd(); - - return; -} - void VecServiceClient::add_group(const VecGroup& group) { send_add_group(group); @@ -2176,59 +1989,6 @@ bool VecServiceProcessor::dispatchCall(::apache::thrift::protocol::TProtocol* ip return true; } -void VecServiceProcessor::process_dummy(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext) -{ - void* ctx = NULL; - if (this->eventHandler_.get() != NULL) { - ctx = this->eventHandler_->getContext("VecService.dummy", callContext); - } - ::apache::thrift::TProcessorContextFreer freer(this->eventHandler_.get(), ctx, "VecService.dummy"); - - if (this->eventHandler_.get() != NULL) { - this->eventHandler_->preRead(ctx, "VecService.dummy"); - } - - VecService_dummy_args args; - args.read(iprot); - iprot->readMessageEnd(); - uint32_t bytes = iprot->getTransport()->readEnd(); - - if (this->eventHandler_.get() != NULL) { - this->eventHandler_->postRead(ctx, "VecService.dummy", bytes); - } - - VecService_dummy_result result; - try { - iface_->dummy(); - } catch (const std::exception& e) { - if (this->eventHandler_.get() != NULL) { - this->eventHandler_->handlerError(ctx, "VecService.dummy"); - } - - ::apache::thrift::TApplicationException x(e.what()); - oprot->writeMessageBegin("dummy", ::apache::thrift::protocol::T_EXCEPTION, seqid); - x.write(oprot); - oprot->writeMessageEnd(); - oprot->getTransport()->writeEnd(); - oprot->getTransport()->flush(); - return; - } - - if (this->eventHandler_.get() != NULL) { - this->eventHandler_->preWrite(ctx, "VecService.dummy"); - } - - oprot->writeMessageBegin("dummy", ::apache::thrift::protocol::T_REPLY, seqid); - result.write(oprot); - oprot->writeMessageEnd(); - bytes = oprot->getTransport()->writeEnd(); - oprot->getTransport()->flush(); - - if (this->eventHandler_.get() != NULL) { - this->eventHandler_->postWrite(ctx, "VecService.dummy", bytes); - } -} - void VecServiceProcessor::process_add_group(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext) { void* ctx = NULL; @@ -2633,83 +2393,6 @@ void VecServiceProcessor::process_search_vector_batch(int32_t seqid, ::apache::t return processor; } -void VecServiceConcurrentClient::dummy() -{ - int32_t seqid = send_dummy(); - recv_dummy(seqid); -} - -int32_t VecServiceConcurrentClient::send_dummy() -{ - int32_t cseqid = this->sync_.generateSeqId(); - ::apache::thrift::async::TConcurrentSendSentry sentry(&this->sync_); - oprot_->writeMessageBegin("dummy", ::apache::thrift::protocol::T_CALL, cseqid); - - VecService_dummy_pargs args; - args.write(oprot_); - - oprot_->writeMessageEnd(); - oprot_->getTransport()->writeEnd(); - oprot_->getTransport()->flush(); - - sentry.commit(); - return cseqid; -} - -void VecServiceConcurrentClient::recv_dummy(const int32_t seqid) -{ - - int32_t rseqid = 0; - std::string fname; - ::apache::thrift::protocol::TMessageType mtype; - - // the read mutex gets dropped and reacquired as part of waitForWork() - // The destructor of this sentry wakes up other clients - ::apache::thrift::async::TConcurrentRecvSentry sentry(&this->sync_, seqid); - - while(true) { - if(!this->sync_.getPending(fname, mtype, rseqid)) { - iprot_->readMessageBegin(fname, mtype, rseqid); - } - if(seqid == rseqid) { - if (mtype == ::apache::thrift::protocol::T_EXCEPTION) { - ::apache::thrift::TApplicationException x; - x.read(iprot_); - iprot_->readMessageEnd(); - iprot_->getTransport()->readEnd(); - sentry.commit(); - throw x; - } - if (mtype != ::apache::thrift::protocol::T_REPLY) { - iprot_->skip(::apache::thrift::protocol::T_STRUCT); - iprot_->readMessageEnd(); - iprot_->getTransport()->readEnd(); - } - if (fname.compare("dummy") != 0) { - iprot_->skip(::apache::thrift::protocol::T_STRUCT); - iprot_->readMessageEnd(); - iprot_->getTransport()->readEnd(); - - // in a bad state, don't commit - using ::apache::thrift::protocol::TProtocolException; - throw TProtocolException(TProtocolException::INVALID_DATA); - } - VecService_dummy_presult result; - result.read(iprot_); - iprot_->readMessageEnd(); - iprot_->getTransport()->readEnd(); - - sentry.commit(); - return; - } - // seqid != rseqid - this->sync_.updatePending(fname, mtype, rseqid); - - // this will temporarily unlock the readMutex, and let other clients get work done - this->sync_.waitForWork(seqid); - } // end while(true) -} - void VecServiceConcurrentClient::add_group(const VecGroup& group) { int32_t seqid = send_add_group(group); diff --git a/cpp/src/thrift/gen-cpp/VecService.h b/cpp/src/thrift/gen-cpp/VecService.h index 83e99957af6fa1e2c1366c92e34e56c2f1970b5c..5cccdfa03b7bd1e9424d378f09ccc98ff1192867 100644 --- a/cpp/src/thrift/gen-cpp/VecService.h +++ b/cpp/src/thrift/gen-cpp/VecService.h @@ -21,7 +21,6 @@ class VecServiceIf { public: virtual ~VecServiceIf() {} - virtual void dummy() = 0; /** * group interfaces @@ -82,9 +81,6 @@ class VecServiceIfSingletonFactory : virtual public VecServiceIfFactory { class VecServiceNull : virtual public VecServiceIf { public: virtual ~VecServiceNull() {} - void dummy() { - return; - } void add_group(const VecGroup& /* group */) { return; } @@ -109,80 +105,6 @@ class VecServiceNull : virtual public VecServiceIf { } }; - -class VecService_dummy_args { - public: - - VecService_dummy_args(const VecService_dummy_args&); - VecService_dummy_args& operator=(const VecService_dummy_args&); - VecService_dummy_args() { - } - - virtual ~VecService_dummy_args() throw(); - - bool operator == (const VecService_dummy_args & /* rhs */) const - { - return true; - } - bool operator != (const VecService_dummy_args &rhs) const { - return !(*this == rhs); - } - - bool operator < (const VecService_dummy_args & ) const; - - uint32_t read(::apache::thrift::protocol::TProtocol* iprot); - uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const; - -}; - - -class VecService_dummy_pargs { - public: - - - virtual ~VecService_dummy_pargs() throw(); - - uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const; - -}; - - -class VecService_dummy_result { - public: - - VecService_dummy_result(const VecService_dummy_result&); - VecService_dummy_result& operator=(const VecService_dummy_result&); - VecService_dummy_result() { - } - - virtual ~VecService_dummy_result() throw(); - - bool operator == (const VecService_dummy_result & /* rhs */) const - { - return true; - } - bool operator != (const VecService_dummy_result &rhs) const { - return !(*this == rhs); - } - - bool operator < (const VecService_dummy_result & ) const; - - uint32_t read(::apache::thrift::protocol::TProtocol* iprot); - uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const; - -}; - - -class VecService_dummy_presult { - public: - - - virtual ~VecService_dummy_presult() throw(); - - uint32_t read(::apache::thrift::protocol::TProtocol* iprot); - -}; - typedef struct _VecService_add_group_args__isset { _VecService_add_group_args__isset() : group(false) {} bool group :1; @@ -1032,9 +954,6 @@ class VecServiceClient : virtual public VecServiceIf { apache::thrift::stdcxx::shared_ptr< ::apache::thrift::protocol::TProtocol> getOutputProtocol() { return poprot_; } - void dummy(); - void send_dummy(); - void recv_dummy(); void add_group(const VecGroup& group); void send_add_group(const VecGroup& group); void recv_add_group(); @@ -1071,7 +990,6 @@ class VecServiceProcessor : public ::apache::thrift::TDispatchProcessor { typedef void (VecServiceProcessor::*ProcessFunction)(int32_t, ::apache::thrift::protocol::TProtocol*, ::apache::thrift::protocol::TProtocol*, void*); typedef std::map ProcessMap; ProcessMap processMap_; - void process_dummy(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext); void process_add_group(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext); void process_get_group(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext); void process_del_group(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext); @@ -1082,7 +1000,6 @@ class VecServiceProcessor : public ::apache::thrift::TDispatchProcessor { public: VecServiceProcessor(::apache::thrift::stdcxx::shared_ptr iface) : iface_(iface) { - processMap_["dummy"] = &VecServiceProcessor::process_dummy; processMap_["add_group"] = &VecServiceProcessor::process_add_group; processMap_["get_group"] = &VecServiceProcessor::process_get_group; processMap_["del_group"] = &VecServiceProcessor::process_del_group; @@ -1118,15 +1035,6 @@ class VecServiceMultiface : virtual public VecServiceIf { ifaces_.push_back(iface); } public: - void dummy() { - size_t sz = ifaces_.size(); - size_t i = 0; - for (; i < (sz - 1); ++i) { - ifaces_[i]->dummy(); - } - ifaces_[i]->dummy(); - } - void add_group(const VecGroup& group) { size_t sz = ifaces_.size(); size_t i = 0; @@ -1224,9 +1132,6 @@ class VecServiceConcurrentClient : virtual public VecServiceIf { apache::thrift::stdcxx::shared_ptr< ::apache::thrift::protocol::TProtocol> getOutputProtocol() { return poprot_; } - void dummy(); - int32_t send_dummy(); - void recv_dummy(const int32_t seqid); void add_group(const VecGroup& group); int32_t send_add_group(const VecGroup& group); void recv_add_group(const int32_t seqid); diff --git a/cpp/src/thrift/gen-cpp/VecService_server.skeleton.cpp b/cpp/src/thrift/gen-cpp/VecService_server.skeleton.cpp index 2309c8c3846fbffc631b659641ad6a5667a88683..81726ee8fe5eadc5607af8732d5e235deb957367 100644 --- a/cpp/src/thrift/gen-cpp/VecService_server.skeleton.cpp +++ b/cpp/src/thrift/gen-cpp/VecService_server.skeleton.cpp @@ -18,11 +18,6 @@ class VecServiceHandler : virtual public VecServiceIf { // Your initialization goes here } - void dummy() { - // Your implementation goes here - printf("dummy\n"); - } - /** * group interfaces * @@ -85,7 +80,7 @@ int main(int argc, char **argv) { int port = 9090; ::apache::thrift::stdcxx::shared_ptr handler(new VecServiceHandler()); ::apache::thrift::stdcxx::shared_ptr processor(new VecServiceProcessor(handler)); - ::apache::thrift::stdcxx::shared_ptr serverTransport(new TServerSocket("localhost", port)); + ::apache::thrift::stdcxx::shared_ptr serverTransport(new TServerSocket(port)); ::apache::thrift::stdcxx::shared_ptr transportFactory(new TBufferedTransportFactory()); ::apache::thrift::stdcxx::shared_ptr protocolFactory(new TBinaryProtocolFactory()); diff --git a/cpp/test_client/src/ClientApp.cpp b/cpp/test_client/src/ClientApp.cpp index 82a06de1990b6e03a34798e992d4251b7680e070..e6d1518ff7ce76952dde87f91a840176a05d024f 100644 --- a/cpp/test_client/src/ClientApp.cpp +++ b/cpp/test_client/src/ClientApp.cpp @@ -47,38 +47,44 @@ void ClientApp::Run(const std::string &config_file) { std::string mode = server_config.GetValue(server::CONFIG_SERVER_MODE, "thread_pool"); CLIENT_LOG_INFO << "Connect to server: " << address << ":" << std::to_string(port); - ::apache::thrift::stdcxx::shared_ptr socket_ptr(new ::apache::thrift::transport::TSocket(address, port)); - ::apache::thrift::stdcxx::shared_ptr transport_ptr(new TBufferedTransport(socket_ptr)); - ::apache::thrift::stdcxx::shared_ptr protocol_ptr; - if(protocol == "binary") { - protocol_ptr.reset(new TBinaryProtocol(transport_ptr)); - } else if(protocol == "json") { - protocol_ptr.reset(new TJSONProtocol(transport_ptr)); - } else if(protocol == "compact") { - protocol_ptr.reset(new TCompactProtocol(transport_ptr)); - } else if(protocol == "debug") { - protocol_ptr.reset(new TDebugProtocol(transport_ptr)); - } else { - CLIENT_LOG_ERROR << "Service protocol: " << protocol << " is not supported currently"; - return; - } - transport_ptr->open(); - VecServiceClient client(protocol_ptr); try { - client.dummy(); + stdcxx::shared_ptr socket_ptr(new transport::TSocket(address, port)); + stdcxx::shared_ptr transport_ptr(new TBufferedTransport(socket_ptr)); + stdcxx::shared_ptr protocol_ptr; + if(protocol == "binary") { + protocol_ptr.reset(new TBinaryProtocol(transport_ptr)); + } else if(protocol == "json") { + protocol_ptr.reset(new TJSONProtocol(transport_ptr)); + } else if(protocol == "compact") { + protocol_ptr.reset(new TCompactProtocol(transport_ptr)); + } else if(protocol == "debug") { + protocol_ptr.reset(new TDebugProtocol(transport_ptr)); + } else { + CLIENT_LOG_ERROR << "Service protocol: " << protocol << " is not supported currently"; + return; + } + + transport_ptr->open(); + VecServiceClient client(protocol_ptr); + try { + VecGroup group; + group.id = "test_group"; + group.dimension = 256; + group.index_type = 0; + client.add_group(group); + - VecGroup group; - group.id = "test_group"; - group.dimension = 256; - group.index_type = 0; - client.add_group(group); + + } catch (apache::thrift::TException& ex) { + printf("%s", ex.what()); + } + + transport_ptr->close(); } catch (apache::thrift::TException& ex) { - printf("%s", ex.what()); + CLIENT_LOG_ERROR << "Server encounter exception: " << ex.what(); } - transport_ptr->close(); - CLIENT_LOG_INFO << "Test finished"; }