提交 885b95e2 编写于 作者: G groot

refine code


Former-commit-id: 5916e0c3afaa752bcc3ff3cef69d30244be0b6c7
上级 fb5c43f1
...@@ -4,36 +4,14 @@ ...@@ -4,36 +4,14 @@
* Proprietary and confidential. * Proprietary and confidential.
******************************************************************************/ ******************************************************************************/
#include "ClientApp.h" #include "ClientApp.h"
#include "ClientSession.h"
#include "server/ServerConfig.h" #include "server/ServerConfig.h"
#include "Log.h" #include "Log.h"
#include <iostream>
#include <yaml-cpp/yaml.h>
#include "thrift/gen-cpp/VecService.h"
#include "thrift/gen-cpp/VectorService_types.h"
#include "thrift/gen-cpp/VectorService_constants.h"
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/protocol/TJSONProtocol.h>
#include <thrift/protocol/TCompactProtocol.h>
#include <thrift/protocol/TDebugProtocol.h>
#include <thrift/server/TSimpleServer.h>
#include <thrift/server/TThreadPoolServer.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TSocket.h>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/concurrency/PosixThreadFactory.h>
namespace zilliz { namespace zilliz {
namespace vecwise { namespace vecwise {
namespace client { namespace client {
using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::concurrency;
void ClientApp::Run(const std::string &config_file) { void ClientApp::Run(const std::string &config_file) {
server::ServerConfig& config = server::ServerConfig::GetInstance(); server::ServerConfig& config = server::ServerConfig::GetInstance();
config.LoadConfigFile(config_file); config.LoadConfigFile(config_file);
...@@ -44,54 +22,34 @@ void ClientApp::Run(const std::string &config_file) { ...@@ -44,54 +22,34 @@ void ClientApp::Run(const std::string &config_file) {
std::string address = server_config.GetValue(server::CONFIG_SERVER_ADDRESS, "127.0.0.1"); std::string address = server_config.GetValue(server::CONFIG_SERVER_ADDRESS, "127.0.0.1");
int32_t port = server_config.GetInt32Value(server::CONFIG_SERVER_PORT, 33001); int32_t port = server_config.GetInt32Value(server::CONFIG_SERVER_PORT, 33001);
std::string protocol = server_config.GetValue(server::CONFIG_SERVER_PROTOCOL, "binary"); std::string protocol = server_config.GetValue(server::CONFIG_SERVER_PROTOCOL, "binary");
std::string mode = server_config.GetValue(server::CONFIG_SERVER_MODE, "thread_pool"); //std::string mode = server_config.GetValue(server::CONFIG_SERVER_MODE, "thread_pool");
CLIENT_LOG_INFO << "Connect to server: " << address << ":" << std::to_string(port); CLIENT_LOG_INFO << "Connect to server: " << address << ":" << std::to_string(port);
try { try {
stdcxx::shared_ptr<TSocket> socket_ptr(new transport::TSocket(address, port)); ClientSession session(address, port, protocol);
stdcxx::shared_ptr<TTransport> transport_ptr(new TBufferedTransport(socket_ptr));
stdcxx::shared_ptr<TProtocol> protocol_ptr; const int32_t dim = 256;
if(protocol == "binary") { VecGroup group;
protocol_ptr.reset(new TBinaryProtocol(transport_ptr)); group.id = "test_group";
} else if(protocol == "json") { group.dimension = dim;
protocol_ptr.reset(new TJSONProtocol(transport_ptr)); group.index_type = 0;
} else if(protocol == "compact") { session.interface()->add_group(group);
protocol_ptr.reset(new TCompactProtocol(transport_ptr));
} else if(protocol == "debug") { for(int64_t k = 0; k < 10000; k++) {
protocol_ptr.reset(new TDebugProtocol(transport_ptr)); VecTensor tensor;
} else { for(int32_t i = 0; i < dim; i++) {
CLIENT_LOG_ERROR << "Service protocol: " << protocol << " is not supported currently"; tensor.tensor.push_back((double)(i + k));
return;
}
transport_ptr->open();
VecServiceClient client(protocol_ptr);
try {
const int32_t dim = 256;
VecGroup group;
group.id = "test_group";
group.dimension = dim;
group.index_type = 0;
client.add_group(group);
for(int64_t k = 0; k < 10000; k++) {
VecTensor tensor;
for(int32_t i = 0; i < dim; i++) {
tensor.tensor.push_back((double)(i + k));
}
VecTensorIdList result;
client.add_vector(result, group.id, tensor);
} }
} catch (apache::thrift::TException& ex) { VecTensorIdList result;
printf("%s", ex.what()); session.interface()->add_vector(result, group.id, tensor);
CLIENT_LOG_INFO << "add vector no." << k;
} }
transport_ptr->close(); } catch (std::exception& ex) {
} catch (apache::thrift::TException& ex) { CLIENT_LOG_ERROR << "request encounter exception: " << ex.what();
CLIENT_LOG_ERROR << "Server encounter exception: " << ex.what();
} }
CLIENT_LOG_INFO << "Test finished"; CLIENT_LOG_INFO << "Test finished";
......
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "ClientSession.h"
#include "Log.h"
#include "thrift/gen-cpp/VectorService_types.h"
#include "thrift/gen-cpp/VectorService_constants.h"
#include <exception>
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/protocol/TJSONProtocol.h>
#include <thrift/protocol/TCompactProtocol.h>
#include <thrift/protocol/TDebugProtocol.h>
#include <thrift/server/TSimpleServer.h>
#include <thrift/server/TThreadPoolServer.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TSocket.h>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/concurrency/PosixThreadFactory.h>
namespace zilliz {
namespace vecwise {
namespace client {
using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::concurrency;
ClientSession::ClientSession(const std::string &address, int32_t port, const std::string &protocol)
: client_(nullptr) {
try {
stdcxx::shared_ptr<TSocket> socket_ptr(new transport::TSocket(address, port));
stdcxx::shared_ptr<TTransport> transport_ptr(new TBufferedTransport(socket_ptr));
stdcxx::shared_ptr<TProtocol> protocol_ptr;
if(protocol == "binary") {
protocol_ptr.reset(new TBinaryProtocol(transport_ptr));
} else if(protocol == "json") {
protocol_ptr.reset(new TJSONProtocol(transport_ptr));
} else if(protocol == "compact") {
protocol_ptr.reset(new TCompactProtocol(transport_ptr));
} else if(protocol == "debug") {
protocol_ptr.reset(new TDebugProtocol(transport_ptr));
} else {
CLIENT_LOG_ERROR << "Service protocol: " << protocol << " is not supported currently";
return;
}
transport_ptr->open();
client_ = std::make_shared<VecServiceClient>(protocol_ptr);
} catch ( std::exception& ex) {
CLIENT_LOG_ERROR << "connect encounter exception: " << ex.what();
}
}
ClientSession::~ClientSession() {
try {
if(client_ != nullptr) {
auto protocol = client_->getInputProtocol();
if(protocol != nullptr) {
auto transport = protocol->getTransport();
if(transport != nullptr) {
transport->close();
}
}
}
} catch ( std::exception& ex) {
CLIENT_LOG_ERROR << "disconnect encounter exception: " << ex.what();
}
}
VecServiceClientPtr ClientSession::interface() {
if(client_ == nullptr) {
throw std::exception();
}
return client_;
}
}
}
}
\ No newline at end of file
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include "thrift/gen-cpp/VecService.h"
#include <memory>
namespace zilliz {
namespace vecwise {
namespace client {
using VecServiceClientPtr = std::shared_ptr<VecServiceClient>;
class ClientSession {
public:
ClientSession(const std::string& address, int32_t port, const std::string& protocol);
~ClientSession();
VecServiceClientPtr interface();
VecServiceClientPtr client_;
};
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册