MilvusServer.cpp 4.2 KB
Newer Older
G
groot 已提交
1 2 3 4 5
/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/
G
groot 已提交
6 7
#include "MilvusServer.h"
#include "RequestHandler.h"
G
groot 已提交
8
#include "ServerConfig.h"
G
groot 已提交
9
#include "ThreadPoolServer.h"
G
groot 已提交
10
#include "DBWrapper.h"
G
groot 已提交
11 12 13

#include "milvus_types.h"
#include "milvus_constants.h"
G
groot 已提交
14 15 16 17 18 19 20 21 22 23 24 25

#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/protocol/TJSONProtocol.h>
#include <thrift/protocol/TDebugProtocol.h>
#include <thrift/protocol/TCompactProtocol.h>
#include <thrift/server/TSimpleServer.h>
#include <thrift/server/TThreadPoolServer.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/concurrency/PosixThreadFactory.h>

#include <thread>
G
groot 已提交
26
#include <iostream>
G
groot 已提交
27 28

namespace zilliz {
J
jinhai 已提交
29
namespace milvus {
G
groot 已提交
30 31
namespace server {

G
groot 已提交
32
using namespace ::milvus::thrift;
G
groot 已提交
33 34 35 36 37 38 39 40
using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;
using namespace ::apache::thrift::concurrency;

static stdcxx::shared_ptr<TServer> s_server;

G
groot 已提交
41
void
G
groot 已提交
42
MilvusServer::StartService() {
G
groot 已提交
43 44 45 46 47 48 49 50
    if(s_server != nullptr){
        StopService();
    }

    ServerConfig &config = ServerConfig::GetInstance();
    ConfigNode server_config = config.GetConfig(CONFIG_SERVER);

    std::string address = server_config.GetValue(CONFIG_SERVER_ADDRESS, "127.0.0.1");
G
groot 已提交
51
    int32_t port = server_config.GetInt32Value(CONFIG_SERVER_PORT, 19530);
G
groot 已提交
52 53 54 55
    std::string protocol = server_config.GetValue(CONFIG_SERVER_PROTOCOL, "binary");
    std::string mode = server_config.GetValue(CONFIG_SERVER_MODE, "thread_pool");

    try {
G
groot 已提交
56 57
        DBWrapper::DB();//initialize db

G
groot 已提交
58 59
        stdcxx::shared_ptr<RequestHandler> handler(new RequestHandler());
        stdcxx::shared_ptr<TProcessor> processor(new MilvusServiceProcessor(handler));
G
groot 已提交
60 61 62 63 64 65 66 67 68 69 70
        stdcxx::shared_ptr<TServerTransport> server_transport(new TServerSocket(address, port));
        stdcxx::shared_ptr<TTransportFactory> transport_factory(new TBufferedTransportFactory());

        stdcxx::shared_ptr<TProtocolFactory> protocol_factory;
        if (protocol == "binary") {
            protocol_factory.reset(new TBinaryProtocolFactory());
        } else if (protocol == "json") {
            protocol_factory.reset(new TJSONProtocolFactory());
        } else if (protocol == "compact") {
            protocol_factory.reset(new TCompactProtocolFactory());
        } else {
G
groot 已提交
71
            //SERVER_LOG_INFO << "Service protocol: " << protocol << " is not supported currently";
G
groot 已提交
72 73 74
            return;
        }

G
groot 已提交
75
        std::string mode = "thread_pool";
G
groot 已提交
76 77 78 79 80 81 82 83 84
        if (mode == "simple") {
            s_server.reset(new TSimpleServer(processor, server_transport, transport_factory, protocol_factory));
            s_server->serve();
        } else if (mode == "thread_pool") {
            stdcxx::shared_ptr<ThreadManager> threadManager(ThreadManager::newSimpleThreadManager());
            stdcxx::shared_ptr<PosixThreadFactory> threadFactory(new PosixThreadFactory());
            threadManager->threadFactory(threadFactory);
            threadManager->start();

G
groot 已提交
85
            s_server.reset(new ThreadPoolServer(processor,
G
groot 已提交
86 87 88 89 90 91
                                                 server_transport,
                                                 transport_factory,
                                                 protocol_factory,
                                                 threadManager));
            s_server->serve();
        } else {
G
groot 已提交
92
            //SERVER_LOG_INFO << "Service mode: " << mode << " is not supported currently";
G
groot 已提交
93 94 95
            return;
        }
    } catch (apache::thrift::TException& ex) {
G
groot 已提交
96 97
        std::cout << "ERROR! " << ex.what() << std::endl;
        kill(0, SIGUSR1);
G
groot 已提交
98 99 100
    }
}

G
groot 已提交
101
void
G
groot 已提交
102
MilvusServer::StopService() {
G
groot 已提交
103 104 105 106 107 108 109 110 111 112 113 114 115
    auto stop_server_worker = [&]{
        if(s_server != nullptr) {
            s_server->stop();
        }
    };

    std::shared_ptr<std::thread> stop_thread = std::make_shared<std::thread>(stop_server_worker);
    stop_thread->join();
}

}
}
}