MegasearchServer.cpp 4.2 KB
Newer Older
G
groot 已提交
1 2 3 4 5
/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/
G
groot 已提交
6 7 8 9
#include "MegasearchServer.h"
#include "MegasearchHandler.h"
#include "megasearch_types.h"
#include "megasearch_constants.h"
G
groot 已提交
10
#include "ServerConfig.h"
Y
yu yunfeng 已提交
11
#include "MegasearchThreadPoolServer.h"
G
groot 已提交
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28

#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/protocol/TJSONProtocol.h>
#include <thrift/protocol/TDebugProtocol.h>
#include <thrift/protocol/TCompactProtocol.h>
#include <thrift/server/TSimpleServer.h>
#include <thrift/server/TThreadPoolServer.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/concurrency/PosixThreadFactory.h>

#include <thread>

namespace zilliz {
namespace vecwise {
namespace server {

G
groot 已提交
29
using namespace megasearch::thrift;
G
groot 已提交
30 31 32 33 34 35 36 37
using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;
using namespace ::apache::thrift::concurrency;

static stdcxx::shared_ptr<TServer> s_server;

G
groot 已提交
38 39
void
MegasearchServer::StartService() {
G
groot 已提交
40 41 42 43 44 45 46 47 48 49 50 51 52
    if(s_server != nullptr){
        StopService();
    }

    ServerConfig &config = ServerConfig::GetInstance();
    ConfigNode server_config = config.GetConfig(CONFIG_SERVER);

    std::string address = server_config.GetValue(CONFIG_SERVER_ADDRESS, "127.0.0.1");
    int32_t port = server_config.GetInt32Value(CONFIG_SERVER_PORT, 33001);
    std::string protocol = server_config.GetValue(CONFIG_SERVER_PROTOCOL, "binary");
    std::string mode = server_config.GetValue(CONFIG_SERVER_MODE, "thread_pool");

    try {
G
groot 已提交
53 54
        stdcxx::shared_ptr<MegasearchServiceHandler> handler(new MegasearchServiceHandler());
        stdcxx::shared_ptr<TProcessor> processor(new MegasearchServiceProcessor(handler));
G
groot 已提交
55 56 57 58 59 60 61 62 63 64 65
        stdcxx::shared_ptr<TServerTransport> server_transport(new TServerSocket(address, port));
        stdcxx::shared_ptr<TTransportFactory> transport_factory(new TBufferedTransportFactory());

        stdcxx::shared_ptr<TProtocolFactory> protocol_factory;
        if (protocol == "binary") {
            protocol_factory.reset(new TBinaryProtocolFactory());
        } else if (protocol == "json") {
            protocol_factory.reset(new TJSONProtocolFactory());
        } else if (protocol == "compact") {
            protocol_factory.reset(new TCompactProtocolFactory());
        } else {
G
groot 已提交
66
            //SERVER_LOG_INFO << "Service protocol: " << protocol << " is not supported currently";
G
groot 已提交
67 68 69
            return;
        }

G
groot 已提交
70
        std::string mode = "thread_pool";
G
groot 已提交
71 72 73 74 75 76 77 78 79
        if (mode == "simple") {
            s_server.reset(new TSimpleServer(processor, server_transport, transport_factory, protocol_factory));
            s_server->serve();
        } else if (mode == "thread_pool") {
            stdcxx::shared_ptr<ThreadManager> threadManager(ThreadManager::newSimpleThreadManager());
            stdcxx::shared_ptr<PosixThreadFactory> threadFactory(new PosixThreadFactory());
            threadManager->threadFactory(threadFactory);
            threadManager->start();

Y
yu yunfeng 已提交
80
            s_server.reset(new MegasearchThreadPoolServer(processor,
G
groot 已提交
81 82 83 84 85 86
                                                 server_transport,
                                                 transport_factory,
                                                 protocol_factory,
                                                 threadManager));
            s_server->serve();
        } else {
G
groot 已提交
87
            //SERVER_LOG_INFO << "Service mode: " << mode << " is not supported currently";
G
groot 已提交
88 89 90
            return;
        }
    } catch (apache::thrift::TException& ex) {
G
groot 已提交
91
        //SERVER_LOG_ERROR << "Server encounter exception: " << ex.what();
G
groot 已提交
92 93 94
    }
}

G
groot 已提交
95 96
void
MegasearchServer::StopService() {
G
groot 已提交
97 98 99 100 101 102 103 104 105 106 107 108 109
    auto stop_server_worker = [&]{
        if(s_server != nullptr) {
            s_server->stop();
        }
    };

    std::shared_ptr<std::thread> stop_thread = std::make_shared<std::thread>(stop_server_worker);
    stop_thread->join();
}

}
}
}