提交 765e5daa 编写于 作者: X xulongteng

Merge remote-tracking branch 'refs/remotes/origin/ctr_model_serving' into ctr_model_serving

merge cube
......@@ -105,6 +105,7 @@ if (NOT CLIENT_ONLY)
list(APPEND EXTERNAL_LIBS opencv)
endif()
add_subdirectory(cube)
add_subdirectory(configure)
add_subdirectory(pdcodegen)
add_subdirectory(sdk-cpp)
......
# Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License
add_subdirectory(cube-server)
add_subdirectory(cube-api)
# Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License
cmake_minimum_required(VERSION 2.8.10)
project(CubeAPI C CXX)
option(CUBE_API_LINK_SO "Whether cube are linked dynamically" OFF)
execute_process(
COMMAND bash -c "find ${PROJECT_SOURCE_DIR}/../.. -type d -regex \".*output/include$\" | head -n1 | xargs dirname | tr -d '\n'"
OUTPUT_VARIABLE OUTPUT_PATH
)
set(CMAKE_PREFIX_PATH ${OUTPUT_PATH})
include(FindThreads)
include(FindProtobuf)
protobuf_generate_cpp(PROTO_SRC PROTO_HEADER idl/cube.proto idl/control.proto)
# include PROTO_HEADER
include_directories(${CMAKE_CURRENT_BINARY_DIR})
# Search for libthrift* by best effort. If it is not found and brpc is
# compiled with thrift protocol enabled, a link error would be reported.
find_library(THRIFT_LIB NAMES thrift)
if (NOT THRIFT_LIB)
set(THRIFT_LIB "")
endif()
find_library(THRIFTNB_LIB NAMES thriftnb)
if (NOT THRIFTNB_LIB)
set(THRIFTNB_LIB "")
endif()
find_path(GFLAGS_INCLUDE_PATH gflags/gflags.h)
find_library(GFLAGS_LIBRARY NAMES gflags libgflags)
if((NOT GFLAGS_INCLUDE_PATH) OR (NOT GFLAGS_LIBRARY))
message(FATAL_ERROR "Fail to find gflags")
endif()
include_directories(${GFLAGS_INCLUDE_PATH})
if(CMAKE_SYSTEM_NAME STREQUAL "Darwin")
include(CheckFunctionExists)
CHECK_FUNCTION_EXISTS(clock_gettime HAVE_CLOCK_GETTIME)
if(NOT HAVE_CLOCK_GETTIME)
set(DEFINE_CLOCK_GETTIME "-DNO_CLOCK_GETTIME_IN_MAC")
endif()
endif()
set(CMAKE_CXX_FLAGS "${CMAKE_CPP_FLAGS} -DNDEBUG -O2 -D__const__= -pipe -W -Wall -Wno-unused-parameter -fPIC -fno-omit-frame-pointer")
if(CMAKE_VERSION VERSION_LESS "3.1.3")
if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
endif()
if(CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
endif()
else()
set(CMAKE_CXX_STANDARD 11)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
endif()
find_path(LEVELDB_INCLUDE_PATH NAMES leveldb/db.h)
find_library(LEVELDB_LIB NAMES leveldb)
if ((NOT LEVELDB_INCLUDE_PATH) OR (NOT LEVELDB_LIB))
message(FATAL_ERROR "Fail to find leveldb")
endif()
include_directories(${LEVELDB_INCLUDE_PATH})
find_library(SSL_LIB NAMES ssl)
if (NOT SSL_LIB)
message(FATAL_ERROR "Fail to find ssl")
endif()
find_library(CRYPTO_LIB NAMES crypto)
if (NOT CRYPTO_LIB)
message(FATAL_ERROR "Fail to find crypto")
endif()
add_executable(cube-cli src/cube_cli.cpp src/cube_api.cpp src/meta.cpp
${PROTO_SRC} ${PROTO_HEADER})
add_library(cube-api STATIC src/cube_api.cpp src/meta.cpp
${PROTO_SRC} ${PROTO_HEADER})
set(DYNAMIC_LIB
${CMAKE_THREAD_LIBS_INIT}
${GFLAGS_LIBRARY}
${PROTOBUF_LIBRARIES}
${LEVELDB_LIB}
${SSL_LIB}
${CRYPTO_LIB}
${THRIFT_LIB}
${THRIFTNB_LIB}
)
if(CMAKE_SYSTEM_NAME STREQUAL "Darwin")
set(DYNAMIC_LIB ${DYNAMIC_LIB}
"-framework CoreFoundation"
"-framework CoreGraphics"
"-framework CoreData"
"-framework CoreText"
"-framework Security"
"-framework Foundation"
"-Wl,-U,_MallocExtension_ReleaseFreeMemory"
"-Wl,-U,_ProfilerStart"
"-Wl,-U,_ProfilerStop")
endif()
target_link_libraries(cube-cli ${DYNAMIC_LIB} brpc -lpthread -ldl -lz)
target_link_libraries(cube-api ${DYNAMIC_LIB} brpc -lpthread -ldl -lz)
# install
install(TARGETS cube-api
ARCHIVE DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/lib
)
install(DIRECTORY ${CMAKE_CURRENT_LIST_DIR}/include
DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/include/cube-api)
FILE(GLOB inc ${CMAKE_CURRENT_BINARY_DIR}/*.pb.h)
install(FILES ${inc}
DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/include/cube-api)
[{
"dict_name": "table_test_1",
"shard": 5,
"dup": 1,
"timeout": 200,
"retry": 3,
"backup_request": 10,
"type": "ipport",
"load_balancer": "la",
"nodes": [{
"ip": "127.0.0.1",
"port": 8950
},{
"ip": "127.0.0.1",
"port": 8951
},{
"ip": "127.0.0.1",
"port": 8952
},{
"ip": "127.0.0.1",
"port": 8953
},{
"ip": "127.0.0.1",
"port": 8954
}]
}]
[{
"dict_name": "test_dict",
"shard": 1,
"dup": 1,
"timeout": 200,
"retry": 3,
"backup_request": 100,
"type": "ipport_list",
"load_balancer": "rr",
"nodes": [{
"ipport_list": "list://127.0.0.1:8000,127.0.0.1:8001"
}]
}]
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto2";
package rec.mcube;
option cc_generic_services = true;
message HttpRequest {};
message HttpResponse {};
service ControlService { rpc cmd(HttpRequest) returns (HttpResponse); };
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto2";
package rec.mcube;
option cc_generic_services = true;
message DictRequest {
repeated uint64 keys = 1;
optional bool version_required = 2 [ default = false ];
};
message DictValue {
required uint32 status = 1;
required bytes value = 2;
};
message DictResponse {
repeated DictValue values = 1;
optional string version = 2;
};
message ParallelDictRequest { repeated DictRequest requests = 2; };
message ParallelDictResponse { repeated DictResponse responses = 1; };
service DictService {
rpc seek(DictRequest) returns (DictResponse);
rpc parallel_seek(ParallelDictRequest) returns (ParallelDictResponse);
};
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <stdint.h>
#include <string>
#include <vector>
#include "brpc/server.h"
#include "cube/cube-api/cube.pb.h"
#include "cube/cube-api/include/meta.h"
namespace rec {
namespace mcube {
struct CubeValue {
int error;
std::string buff;
};
class CubeAPI {
public:
static CubeAPI* instance();
static void cleanup();
public:
CubeAPI();
~CubeAPI();
/**
* @brief: init api, not thread safe, should be called before seek.
* @param [in] conf_file: filepath of config file.
* @return: error code. 0 for succ.
*/
int init(const char* conf_file);
/**
* @brief: destroy api, should be called before program exit.
* @return: error code. 0 for succ.
*/
int destroy();
/** brief: get data from cube server, thread safe.
* @param [in] dict_name: dict name.
* @param [in] key: key to seek.
* @param [out] val: value of key.
* @return: error code. 0 for succ.
*/
int seek(const std::string& dict_name, const uint64_t& key, CubeValue* val);
/**
* @brief: get data from cube server, thread safe.
* @param [in] dict_name: dict name.
* @param [in] keys: keys to seek.
* @param [out] vals: value of keys.
* @return: TODO
*/
int seek(const std::string& dict_name,
const std::vector<uint64_t>& keys,
std::vector<CubeValue>* vals);
int opt_seek(const std::string& dict_name,
const std::vector<uint64_t>& keys,
std::function<void(DictValue*, size_t)> parse);
/**
* @brief: get data from cube server, thread safe.
* @param [in] dict_name: dict name.
* @param [in] keys: keys to seek.
* @param [out] vals: value of keys.
* @param [out] version: data version.
* @return: TODO
*/
int seek(const std::string& dict_name,
const std::vector<uint64_t>& keys,
std::vector<CubeValue>* vals,
std::string* version);
int opt_seek(const std::string& dict_name,
const std::vector<uint64_t>& keys,
std::function<void(DictValue*, size_t)> parse,
std::string* version);
public:
static const char* error_msg(int error_code);
private:
CubeAPI(const CubeAPI&) {}
private:
Meta* _meta;
bthread_key_t _tls_key;
// void split(const std::vector<uint64_t>& keys);
};
} // namespace mcube
} // namespace rec
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include "bvar/bvar.h"
#include "bvar/recorder.h"
#include "bvar/window.h"
namespace rec {
namespace mcube {
bvar::Adder<uint64_t> g_cube_keys_num("cube_keys_num");
bvar::Window<bvar::Adder<uint64_t>> g_cube_keys_num_minute(
"cube_keys_num_minute", &g_cube_keys_num, 60);
bvar::Adder<uint64_t> g_cube_keys_miss_num("cube_keys_miss_num");
bvar::Window<bvar::Adder<uint64_t>> g_cube_keys_miss_num_minute(
"cube_keys_miss_num_minute", &g_cube_keys_miss_num, 60);
bvar::IntRecorder g_cube_value_size("cube_value_size");
bvar::Window<bvar::IntRecorder> g_cube_value_size_win(
"cube_value_size_win", &g_cube_value_size, bvar::FLAGS_bvar_dump_interval);
} // namespace mcube
} // namespace rec
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
namespace rec {
namespace mcube {
struct CubeError {
enum Code {
E_OK = 0,
E_NO_SUCH_KEY = -1,
E_SEEK_FAILED = -2,
E_ALL_SEEK_FAILED = -3,
}; // enum Code
static const char* error_msg(Code code);
}; // struct CubeError
} // namespace mcube
} // namespace rec
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <atomic>
#include <string>
#include <unordered_map>
#include <vector>
#include "brpc/channel.h"
#include "brpc/parallel_channel.h"
#include "butil/third_party/rapidjson/document.h"
#include "bvar/bvar.h"
namespace rec {
namespace mcube {
struct MetaInfo {
MetaInfo() : dict_name(""), shard_num(0), dup_num(0) {}
~MetaInfo() {
for (size_t i = 0; i != cube_conn.size(); ++i) {
if (cube_conn[i] != NULL) {
delete cube_conn[i];
cube_conn[i] = NULL;
}
}
cube_conn.clear();
}
std::string dict_name;
int shard_num;
int dup_num;
std::vector<::brpc::Channel*> cube_conn;
bvar::Adder<uint64_t> cube_request_num;
bvar::Adder<uint64_t> cube_rpcfail_num;
};
class Meta {
public:
static Meta* instance();
public:
~Meta();
int init(const char* conf_file);
int destroy();
MetaInfo* get_meta(const std::string& dict_name);
const std::vector<const MetaInfo*> metas();
void reset_bvar(const std::string& dict_name);
private:
MetaInfo* create_meta(const BUTIL_RAPIDJSON_NAMESPACE::Value& meta_config);
int create_meta_from_ipport(MetaInfo* meta,
const BUTIL_RAPIDJSON_NAMESPACE::Value& nodes,
int timeout,
int retry,
int backup_request);
int create_meta_from_ipport_list(
MetaInfo* meta,
const BUTIL_RAPIDJSON_NAMESPACE::Value& nodes,
int timeout,
int retry,
int backup_request,
const std::string& load_balancer);
int create_meta_from_ipport_parallel(
MetaInfo* meta,
const BUTIL_RAPIDJSON_NAMESPACE::Value& nodes,
int timeout,
int retry,
int backup_request);
private:
Meta() {}
Meta(const Meta&) {}
private:
std::unordered_map<std::string, MetaInfo*> _metas;
}; // class Meta
} // namespace mcube
} // namespace rec
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "cube/cube-api/include/cube_api.h"
#include <brpc/channel.h>
#include <brpc/parallel_channel.h>
#include <google/protobuf/descriptor.h>
#include "cube/cube-api/include/cube_api_bvar.h"
#include "cube/cube-api/include/error.h"
#include "cube/cube-api/include/meta.h"
namespace {
static ::rec::mcube::CubeAPI* g_ins = NULL;
}
namespace rec {
namespace mcube {
struct DictRpcData {
std::vector<DictRequest> sub_reqs;
std::vector<DictResponse> sub_res;
};
static void dict_rpc_deleter(void* d) { delete static_cast<DictRpcData*>(d); }
static void sub_seek_done(DictResponse* response,
brpc::Controller* cntl,
std::vector<int>* offset,
std::function<void(DictValue*, size_t)> parse) {
// std::unique_ptr<DictResponse> response_guard(response);
// std::unique_ptr<brpc::Controller> cntl_guard(cntl);
if (cntl->Failed()) {
for (int i = 0; i < response->values().size(); ++i) {
DictValue* val = response->mutable_values(i);
val->set_status(CubeError::E_SEEK_FAILED);
*val->mutable_value() = "";
parse(val, (*offset)[i]);
}
} else {
for (int i = 0; i < response->values().size(); ++i) {
DictValue* val = response->mutable_values(i);
parse(val, (*offset)[i]);
}
}
}
struct DoNothing : public google::protobuf::Closure {
void Run() {}
};
CubeAPI* CubeAPI::instance() {
if (g_ins == NULL) {
g_ins = new CubeAPI();
}
return g_ins;
}
void CubeAPI::cleanup() {
if (g_ins != NULL) {
g_ins->destroy();
delete g_ins;
g_ins = NULL;
}
}
CubeAPI::CubeAPI() : _meta(NULL) {}
CubeAPI::~CubeAPI() { CHECK_EQ(0, bthread_key_delete(_tls_key)); }
int CubeAPI::init(const char* conf_file) {
// init meta
_meta = Meta::instance();
int ret = _meta->init(conf_file);
if (ret != 0) {
LOG(ERROR) << "Init cube meta failed";
return ret;
}
CHECK_EQ(0, bthread_key_create(&_tls_key, dict_rpc_deleter));
return 0;
}
int CubeAPI::destroy() {
// Meta* meta = Meta::instance();
if (_meta == NULL) {
LOG(WARNING) << "Destroy, cube meta is null";
return 0;
}
int ret = _meta->destroy();
if (ret != 0) {
LOG(WARNING) << "Destroy cube meta failed";
}
return 0;
}
int CubeAPI::seek(const std::string& dict_name,
const uint64_t& key,
CubeValue* val) {
if (_meta == NULL) {
LOG(ERROR) << "seek, meta is null";
return -1;
}
MetaInfo* info = _meta->get_meta(dict_name);
if (info == NULL) {
LOG(ERROR) << "get meta [" << dict_name << "] failed";
return -1;
}
int shard_id = key % info->shard_num;
DictRequest req;
DictResponse res;
req.add_keys(key);
::brpc::Channel* chan = info->cube_conn[shard_id];
DictService_Stub* stub = new DictService_Stub(chan);
brpc::Controller* cntl = new ::brpc::Controller();
stub->seek(cntl, &req, &res, NULL);
int ret = CubeError::E_OK;
if (cntl->Failed()) {
info->cube_rpcfail_num << 1;
val->error = CubeError::E_SEEK_FAILED;
val->buff.assign("");
ret = CubeError::E_ALL_SEEK_FAILED;
LOG(WARNING) << "cube seek from shard [" << shard_id << "] failed ["
<< cntl->ErrorText() << "]";
} else if (res.values().size() > 0) {
DictValue* res_val = res.mutable_values(0);
if (static_cast<int>(res_val->status()) == CubeError::E_NO_SUCH_KEY) {
g_cube_keys_miss_num << 1;
}
val->error = res_val->status();
val->buff.swap(*res_val->mutable_value());
} else {
val->error = CubeError::E_SEEK_FAILED;
val->buff.assign("");
ret = CubeError::E_ALL_SEEK_FAILED;
}
info->cube_request_num << 1;
g_cube_keys_num << 1;
// cleanup
delete stub;
stub = NULL;
delete cntl;
cntl = NULL;
return ret;
}
int CubeAPI::seek(const std::string& dict_name,
const std::vector<uint64_t>& keys,
std::vector<CubeValue>* vals) {
// Meta* meta = Meta::instance();
if (_meta == NULL) {
LOG(ERROR) << "seek, meta is null";
return -1;
}
MetaInfo* info = _meta->get_meta(dict_name);
if (info == NULL) {
LOG(ERROR) << "get meta [" << dict_name << "] failed";
return -1;
}
int shard_num = info->shard_num;
DictRpcData* rpc_data =
static_cast<DictRpcData*>(bthread_getspecific(_tls_key));
if (rpc_data == NULL) {
rpc_data = new DictRpcData;
CHECK_EQ(0, bthread_setspecific(_tls_key, rpc_data));
}
rpc_data->sub_reqs.resize(shard_num);
rpc_data->sub_res.resize(shard_num);
std::vector<std::vector<int>> offset;
offset.resize(shard_num);
int init_cnt = keys.size() * 2 / shard_num;
for (int i = 0; i < shard_num; ++i) {
offset[i].reserve(init_cnt);
}
for (size_t i = 0; i < keys.size(); ++i) {
uint64_t shard_id = keys[i] % shard_num;
rpc_data->sub_reqs[shard_id].add_keys(keys[i]);
offset[shard_id].push_back(i);
}
std::vector<DictService_Stub*> stubs(shard_num);
std::vector<brpc::Controller*> cntls(shard_num);
for (int i = 0; i < shard_num; ++i) {
::brpc::Channel* chan = info->cube_conn[i];
stubs[i] = new DictService_Stub(chan);
cntls[i] = new ::brpc::Controller();
}
DoNothing do_nothing;
for (int i = 0; i < shard_num; ++i) {
stubs[i]->seek(
cntls[i], &rpc_data->sub_reqs[i], &rpc_data->sub_res[i], &do_nothing);
}
int cntls_failed_cnt = 0;
for (int i = 0; i < shard_num; ++i) {
brpc::Join(cntls[i]->call_id());
if (cntls[i]->Failed()) {
++cntls_failed_cnt;
LOG(WARNING) << "cube seek from shard [" << i << "] failed ["
<< cntls[i]->ErrorText() << "]";
}
}
int ret = CubeError::E_OK;
info->cube_request_num << 1;
if (cntls_failed_cnt > 0) {
info->cube_rpcfail_num << 1;
if (cntls_failed_cnt == shard_num) {
ret = CubeError::E_ALL_SEEK_FAILED;
} else {
ret = CubeError::E_SEEK_FAILED;
}
}
vals->resize(keys.size());
// merge
size_t miss_cnt = 0;
for (int i = 0; i < shard_num; ++i) {
if (cntls[i]->Failed()) {
for (int j = 0; j < rpc_data->sub_res[i].values().size(); ++j) {
(*vals)[offset[i][j]].error = CubeError::E_SEEK_FAILED;
(*vals)[offset[i][j]].buff.assign("");
}
} else {
for (int j = 0; j < rpc_data->sub_res[i].values().size(); ++j) {
DictValue* val = rpc_data->sub_res[i].mutable_values(j);
if (static_cast<int>(val->status()) == CubeError::E_NO_SUCH_KEY) {
miss_cnt += 1;
}
(*vals)[offset[i][j]].error = val->status();
(*vals)[offset[i][j]].buff.swap(*val->mutable_value());
}
}
}
// bvar stats
g_cube_keys_num << keys.size();
if (keys.size() > 0) {
g_cube_keys_miss_num << miss_cnt;
g_cube_value_size << (*vals)[0].buff.size();
}
// cleanup
for (int i = 0; i < shard_num; ++i) {
delete stubs[i];
stubs[i] = NULL;
delete cntls[i];
cntls[i] = NULL;
rpc_data->sub_reqs[i].Clear();
rpc_data->sub_res[i].Clear();
}
return ret;
}
int CubeAPI::opt_seek(const std::string& dict_name,
const std::vector<uint64_t>& keys,
std::function<void(DictValue*, size_t)> parse) {
if (_meta == NULL) {
LOG(ERROR) << "seek, meta is null";
return -1;
}
MetaInfo* info = _meta->get_meta(dict_name);
if (info == NULL) {
LOG(ERROR) << "get meta [" << dict_name << "] failed";
return -1;
}
int shard_num = info->shard_num;
DictRpcData* rpc_data =
static_cast<DictRpcData*>(bthread_getspecific(_tls_key));
if (rpc_data == NULL) {
rpc_data = new DictRpcData;
CHECK_EQ(0, bthread_setspecific(_tls_key, rpc_data));
}
rpc_data->sub_reqs.resize(shard_num);
rpc_data->sub_res.resize(shard_num);
std::vector<std::vector<int>> offset;
offset.resize(shard_num);
int init_cnt = keys.size() * 2 / shard_num;
for (int i = 0; i < shard_num; ++i) {
offset[i].reserve(init_cnt);
}
for (size_t i = 0; i < keys.size(); ++i) {
uint64_t shard_id = keys[i] % shard_num;
rpc_data->sub_reqs[shard_id].add_keys(keys[i]);
offset[shard_id].push_back(i);
}
std::vector<DictService_Stub*> stubs(shard_num);
std::vector<brpc::Controller*> cntls(shard_num);
for (int i = 0; i < shard_num; ++i) {
::brpc::Channel* chan = info->cube_conn[i];
stubs[i] = new DictService_Stub(chan);
cntls[i] = new ::brpc::Controller();
}
for (int i = 0; i < shard_num; ++i) {
stubs[i]->seek(cntls[i],
&rpc_data->sub_reqs[i],
&rpc_data->sub_res[i],
brpc::NewCallback(sub_seek_done,
&rpc_data->sub_res[i],
cntls[i],
&(offset[i]),
parse));
}
int cntls_failed_cnt = 0;
for (int i = 0; i < shard_num; ++i) {
brpc::Join(cntls[i]->call_id());
if (cntls[i]->Failed()) {
++cntls_failed_cnt;
LOG(WARNING) << "cube seek from shard [" << i << "] failed ["
<< cntls[i]->ErrorText() << "]";
}
}
int ret = CubeError::E_OK;
info->cube_request_num << 1;
if (cntls_failed_cnt > 0) {
info->cube_rpcfail_num << 1;
if (cntls_failed_cnt == shard_num) {
ret = CubeError::E_ALL_SEEK_FAILED;
} else {
ret = CubeError::E_SEEK_FAILED;
}
}
// merge
size_t miss_cnt = 0;
for (int i = 0; i < shard_num; ++i) {
if (!cntls[i]->Failed()) {
for (int j = 0; j < rpc_data->sub_res[i].values().size(); ++j) {
if (static_cast<int>(rpc_data->sub_res[i].values(j).status()) ==
CubeError::E_NO_SUCH_KEY) {
++miss_cnt;
}
}
}
}
// bvar stats
g_cube_keys_num << keys.size();
if (keys.size() > 0) {
g_cube_keys_miss_num << miss_cnt;
}
// cleanup
for (int i = 0; i < shard_num; ++i) {
delete stubs[i];
stubs[i] = NULL;
delete cntls[i];
cntls[i] = NULL;
rpc_data->sub_reqs[i].Clear();
rpc_data->sub_res[i].Clear();
}
return ret;
}
int CubeAPI::seek(const std::string& dict_name,
const std::vector<uint64_t>& keys,
std::vector<CubeValue>* vals,
std::string* version) {
// Meta* meta = Meta::instance();
if (_meta == NULL) {
LOG(ERROR) << "seek, meta is null";
return -1;
}
MetaInfo* info = _meta->get_meta(dict_name);
if (info == NULL) {
LOG(ERROR) << "get meta [" << dict_name << "] failed";
return -1;
}
int shard_num = info->shard_num;
DictRpcData* rpc_data =
static_cast<DictRpcData*>(bthread_getspecific(_tls_key));
if (rpc_data == NULL) {
rpc_data = new DictRpcData;
CHECK_EQ(0, bthread_setspecific(_tls_key, rpc_data));
}
rpc_data->sub_reqs.resize(shard_num);
rpc_data->sub_res.resize(shard_num);
std::vector<std::vector<int>> offset;
offset.resize(shard_num);
int init_cnt = keys.size() * 2 / shard_num;
for (int i = 0; i < shard_num; ++i) {
offset[i].reserve(init_cnt);
rpc_data->sub_reqs[i].set_version_required(true);
}
for (size_t i = 0; i < keys.size(); ++i) {
uint64_t shard_id = keys[i] % shard_num;
rpc_data->sub_reqs[shard_id].add_keys(keys[i]);
offset[shard_id].push_back(i);
}
std::vector<DictService_Stub*> stubs(shard_num);
std::vector<brpc::Controller*> cntls(shard_num);
for (int i = 0; i < shard_num; ++i) {
::brpc::Channel* chan = info->cube_conn[i];
stubs[i] = new DictService_Stub(chan);
cntls[i] = new ::brpc::Controller();
}
DoNothing do_nothing;
for (int i = 0; i < shard_num; ++i) {
stubs[i]->seek(
cntls[i], &rpc_data->sub_reqs[i], &rpc_data->sub_res[i], &do_nothing);
}
int cntls_failed_cnt = 0;
for (int i = 0; i < shard_num; ++i) {
brpc::Join(cntls[i]->call_id());
if (cntls[i]->Failed()) {
++cntls_failed_cnt;
LOG(WARNING) << "cube seek from shard [" << i << "] failed ["
<< cntls[i]->ErrorText() << "]";
}
}
int ret = CubeError::E_OK;
info->cube_request_num << 1;
if (cntls_failed_cnt > 0) {
info->cube_rpcfail_num << 1;
if (cntls_failed_cnt == shard_num) {
ret = CubeError::E_ALL_SEEK_FAILED;
} else {
ret = CubeError::E_SEEK_FAILED;
}
}
vals->resize(keys.size());
// merge
size_t miss_cnt = 0;
for (int i = 0; i < shard_num; ++i) {
if (cntls[i]->Failed()) {
for (int j = 0; j < rpc_data->sub_res[i].values().size(); ++j) {
(*vals)[offset[i][j]].error = CubeError::E_SEEK_FAILED;
(*vals)[offset[i][j]].buff.assign("");
}
} else {
for (int j = 0; j < rpc_data->sub_res[i].values().size(); ++j) {
DictValue* val = rpc_data->sub_res[i].mutable_values(j);
if (static_cast<int>(val->status()) == CubeError::E_NO_SUCH_KEY) {
miss_cnt += 1;
}
(*vals)[offset[i][j]].error = val->status();
(*vals)[offset[i][j]].buff.swap(*val->mutable_value());
}
if (version->compare(rpc_data->sub_res[i].version()) < 0) {
*version = rpc_data->sub_res[i].version();
}
}
}
// bvar stats
g_cube_keys_num << keys.size();
if (keys.size() > 0) {
g_cube_keys_miss_num << miss_cnt;
g_cube_value_size << (*vals)[0].buff.size();
}
// cleanup
for (int i = 0; i < shard_num; ++i) {
delete stubs[i];
stubs[i] = NULL;
delete cntls[i];
cntls[i] = NULL;
rpc_data->sub_reqs[i].Clear();
rpc_data->sub_res[i].Clear();
}
return ret;
}
int CubeAPI::opt_seek(const std::string& dict_name,
const std::vector<uint64_t>& keys,
std::function<void(DictValue*, size_t)> parse,
std::string* version) {
if (_meta == NULL) {
LOG(ERROR) << "seek, meta is null";
return -1;
}
MetaInfo* info = _meta->get_meta(dict_name);
if (info == NULL) {
LOG(ERROR) << "get meta [" << dict_name << "] failed";
return -1;
}
int shard_num = info->shard_num;
DictRpcData* rpc_data =
static_cast<DictRpcData*>(bthread_getspecific(_tls_key));
if (rpc_data == NULL) {
rpc_data = new DictRpcData;
CHECK_EQ(0, bthread_setspecific(_tls_key, rpc_data));
}
rpc_data->sub_reqs.resize(shard_num);
rpc_data->sub_res.resize(shard_num);
std::vector<std::vector<int>> offset;
offset.resize(shard_num);
int init_cnt = keys.size() * 2 / shard_num;
for (int i = 0; i < shard_num; ++i) {
offset[i].reserve(init_cnt);
rpc_data->sub_reqs[i].set_version_required(true);
}
for (size_t i = 0; i < keys.size(); ++i) {
uint64_t shard_id = keys[i] % shard_num;
rpc_data->sub_reqs[shard_id].add_keys(keys[i]);
offset[shard_id].push_back(i);
}
std::vector<DictService_Stub*> stubs(shard_num);
std::vector<brpc::Controller*> cntls(shard_num);
for (int i = 0; i < shard_num; ++i) {
::brpc::Channel* chan = info->cube_conn[i];
stubs[i] = new DictService_Stub(chan);
cntls[i] = new ::brpc::Controller();
}
for (int i = 0; i < shard_num; ++i) {
stubs[i]->seek(cntls[i],
&rpc_data->sub_reqs[i],
&rpc_data->sub_res[i],
brpc::NewCallback(sub_seek_done,
&rpc_data->sub_res[i],
cntls[i],
&(offset[i]),
parse));
}
int cntls_failed_cnt = 0;
for (int i = 0; i < shard_num; ++i) {
brpc::Join(cntls[i]->call_id());
if (cntls[i]->Failed()) {
++cntls_failed_cnt;
LOG(WARNING) << "cube seek from shard [" << i << "] failed ["
<< cntls[i]->ErrorText() << "]";
}
}
int ret = CubeError::E_OK;
info->cube_request_num << 1;
if (cntls_failed_cnt > 0) {
info->cube_rpcfail_num << 1;
if (cntls_failed_cnt == shard_num) {
ret = CubeError::E_ALL_SEEK_FAILED;
} else {
ret = CubeError::E_SEEK_FAILED;
}
}
// merge
size_t miss_cnt = 0;
for (int i = 0; i < shard_num; ++i) {
if (!cntls[i]->Failed()) {
for (int j = 0; j < rpc_data->sub_res[i].values().size(); ++j) {
if (static_cast<int>(rpc_data->sub_res[i].values(j).status()) ==
CubeError::E_NO_SUCH_KEY) {
++miss_cnt;
}
}
if (version->compare(rpc_data->sub_res[i].version()) < 0) {
*version = rpc_data->sub_res[i].version();
}
}
}
// bvar stats
g_cube_keys_num << keys.size();
if (keys.size() > 0) {
g_cube_keys_miss_num << miss_cnt;
}
// cleanup
for (int i = 0; i < shard_num; ++i) {
delete stubs[i];
stubs[i] = NULL;
delete cntls[i];
cntls[i] = NULL;
rpc_data->sub_reqs[i].Clear();
rpc_data->sub_res[i].Clear();
}
return ret;
}
} // namespace mcube
} // namespace rec
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <gflags/gflags.h>
#include <atomic>
#include "cube/cube-api/include/cube_api.h"
#define TIME_FLAG(flag) \
struct timeval flag; \
gettimeofday(&(flag), NULL);
DEFINE_string(config_file, "./cube.conf", "m-cube config file");
DEFINE_string(keys, "keys", "keys to seek");
DEFINE_string(dict, "dict", "dict to seek");
DEFINE_uint64(batch, 500, "batch size");
DEFINE_int32(timeout, 200, "timeout in ms");
DEFINE_int32(retry, 3, "retry times");
DEFINE_bool(print_output, false, "print output flag");
namespace {
inline uint64_t time_diff(const struct timeval& start_time,
const struct timeval& end_time) {
return (end_time.tv_sec - start_time.tv_sec) * 1000000 +
(end_time.tv_usec - start_time.tv_usec);
}
}
namespace rec {
namespace mcube {
std::string string_to_hex(const std::string& input) {
static const char* const lut = "0123456789ABCDEF";
size_t len = input.length();
std::string output;
output.reserve(2 * len);
for (size_t i = 0; i < len; ++i) {
const unsigned char c = input[i];
output.push_back(lut[c >> 4]);
output.push_back(lut[c & 15]);
}
return output;
}
int run(int argc, char** argv) {
google::ParseCommandLineFlags(&argc, &argv, true);
CubeAPI* cube = CubeAPI::instance();
int ret = cube->init(FLAGS_config_file.c_str());
if (ret != 0) {
LOG(ERROR) << "init cube api failed err=" << ret;
return ret;
}
FILE* key_file = fopen(FLAGS_keys.c_str(), "r");
if (key_file == NULL) {
LOG(ERROR) << "open key file [" << FLAGS_keys << "] failed";
return -1;
}
std::atomic<uint64_t> seek_counter(0);
std::atomic<uint64_t> seek_cost_total(0);
uint64_t seek_cost_max = 0;
uint64_t seek_cost_min = 500000;
char buffer[1024];
std::vector<uint64_t> keys;
std::vector<CubeValue> values;
while (fgets(buffer, 1024, key_file)) {
uint64_t key = strtoul(buffer, NULL, 10);
keys.push_back(key);
int ret = 0;
if (keys.size() > FLAGS_batch) {
TIME_FLAG(seek_start);
ret = cube->seek(FLAGS_dict, keys, &values);
TIME_FLAG(seek_end);
if (ret != 0) {
LOG(WARNING) << "cube seek failed";
} else if (FLAGS_print_output) {
for (size_t i = 0; i < keys.size(); ++i) {
fprintf(stdout,
"key:%lu value:%s\n",
keys[i],
string_to_hex(values[i].buff).c_str());
}
}
++seek_counter;
uint64_t seek_cost = time_diff(seek_start, seek_end);
seek_cost_total += seek_cost;
if (seek_cost > seek_cost_max) {
seek_cost_max = seek_cost;
}
if (seek_cost < seek_cost_min) {
seek_cost_min = seek_cost;
}
keys.clear();
values.clear();
}
}
if (keys.size() > 0) {
int ret = 0;
values.resize(keys.size());
TIME_FLAG(seek_start);
ret = cube->seek(FLAGS_dict, keys, &values);
TIME_FLAG(seek_end);
if (ret != 0) {
LOG(WARNING) << "cube seek failed";
} else if (FLAGS_print_output) {
for (size_t i = 0; i < keys.size(); ++i) {
fprintf(stdout,
"key:%lu value:%s\n",
keys[i],
string_to_hex(values[i].buff).c_str());
}
}
++seek_counter;
uint64_t seek_cost = time_diff(seek_start, seek_end);
seek_cost_total += seek_cost;
if (seek_cost > seek_cost_max) {
seek_cost_max = seek_cost;
}
if (seek_cost < seek_cost_min) {
seek_cost_min = seek_cost;
}
}
fclose(key_file);
ret = cube->destroy();
if (ret != 0) {
LOG(WARNING) << "destroy cube api failed err=" << ret;
}
uint64_t seek_cost_avg = seek_cost_total / seek_counter;
LOG(INFO) << "seek cost avg = " << seek_cost_avg;
LOG(INFO) << "seek cost max = " << seek_cost_max;
LOG(INFO) << "seek cost min = " << seek_cost_min;
return 0;
}
} // namespace mcube
} // namespace rec
int main(int argc, char** argv) { return ::rec::mcube::run(argc, argv); }
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "cube/cube-api/include/meta.h"
#include <google/protobuf/descriptor.h>
#include <string.h>
#include <fstream>
#include <new>
#include <sstream>
#include "cube/cube-api/cube.pb.h"
namespace {
static ::rec::mcube::Meta* g_ins = NULL;
}
namespace rec {
namespace mcube {
Meta* Meta::instance() {
if (g_ins == NULL) {
g_ins = new Meta();
}
return g_ins;
}
Meta::~Meta() {}
int Meta::init(const char* conf_file) {
std::ifstream ifs(conf_file);
if (!ifs.is_open()) {
LOG(ERROR) << "open conf file [" << conf_file << "]";
return -1;
}
std::string content((std::istreambuf_iterator<char>(ifs)),
std::istreambuf_iterator<char>());
BUTIL_RAPIDJSON_NAMESPACE::Document document;
document.Parse(content.c_str());
int ret = 0;
if (document.IsArray()) {
for (auto it = document.Begin(); it != document.End(); ++it) {
MetaInfo* info = create_meta(*it);
if (info == NULL) {
LOG(ERROR) << "create dict meta failed";
ret = -1;
break;
}
LOG(INFO) << "init cube dict [" << info->dict_name << "] succ";
_metas[info->dict_name] = info;
}
} else {
LOG(ERROR)
<< "conf file [" << conf_file
<< "] json schema error, please ensure that the json is an array";
}
if (_metas.size() == 0) {
LOG(ERROR) << "no valid cube";
ret = -1;
}
return ret;
}
int Meta::destroy() {
for (auto it = _metas.begin(); it != _metas.end(); ++it) {
if (it->second != NULL) {
delete it->second;
it->second = NULL;
}
}
_metas.clear();
return 0;
}
MetaInfo* Meta::get_meta(const std::string& dict_name) {
auto iter = _metas.find(dict_name);
if (iter != _metas.end()) {
return iter->second;
}
return NULL;
}
void Meta::reset_bvar(const std::string& dict_name) {
auto iter = _metas.find(dict_name);
if (iter != _metas.end()) {
iter->second->cube_request_num.reset();
iter->second->cube_rpcfail_num.reset();
} else {
LOG(WARNING) << "reset_bvar to invalid dict [" << dict_name << "]";
}
}
MetaInfo* Meta::create_meta(const BUTIL_RAPIDJSON_NAMESPACE::Value& config) {
if (!config.HasMember("dict_name") || !config.HasMember("shard") ||
!config.HasMember("dup") || !config.HasMember("timeout") ||
!config.HasMember("retry") || !config.HasMember("backup_request") ||
!config.HasMember("type")) {
LOG(ERROR) << "create meta failed, required fields miss";
return NULL;
}
if (!config["dict_name"].IsString() || !config["shard"].IsInt() ||
!config["dup"].IsInt() || !config["timeout"].IsInt() ||
!config["retry"].IsInt() || !config["backup_request"].IsInt() ||
!config["type"].IsString()) {
LOG(ERROR) << "create meta failed, required fields type error";
return NULL;
}
MetaInfo* info = new MetaInfo();
info->dict_name = config["dict_name"].GetString();
info->shard_num = config["shard"].GetInt();
info->dup_num = config["dup"].GetInt();
int timeout = config["timeout"].GetInt();
int retry = config["retry"].GetInt();
int backup_request = config["backup_request"].GetInt();
std::string type = config["type"].GetString();
std::string load_balancer = "rr";
if (config.HasMember("load_balancer") && config["load_balancer"].IsString()) {
load_balancer = config["load_balancer"].GetString();
}
int ret = 0;
if (type.compare("ipport") == 0) {
ret = create_meta_from_ipport(
info, config["nodes"], timeout, retry, backup_request);
} else if (type.compare("ipport_list") == 0) {
ret = create_meta_from_ipport_list(
info, config["nodes"], timeout, retry, backup_request, load_balancer);
} else if (type.compare("ipport_parallel") == 0) {
ret = create_meta_from_ipport_parallel(
info, config["nodes"], timeout, retry, backup_request);
} else {
ret = -1;
}
if (ret != 0) {
LOG(ERROR) << "create meta failed error=" << ret;
delete info;
return NULL;
}
return info;
}
int Meta::create_meta_from_ipport(MetaInfo* meta,
const BUTIL_RAPIDJSON_NAMESPACE::Value& nodes,
int timeout,
int retry,
int backup_request) {
brpc::ChannelOptions options;
options.timeout_ms = timeout;
options.max_retry = retry;
if (backup_request > 0) {
options.backup_request_ms = backup_request;
}
meta->cube_conn.resize(meta->shard_num, NULL);
for (int i = 0; i < static_cast<int>(nodes.Size()); ++i) {
const BUTIL_RAPIDJSON_NAMESPACE::Value& node = nodes[i];
const std::string& node_ip = node["ip"].GetString();
int node_port = node["port"].GetInt();
::brpc::Channel* chan = new ::brpc::Channel();
if (chan->Init(node_ip.c_str(), node_port, &options) != 0) {
LOG(ERROR) << "Init connection to [" << node_ip << ":" << node_port
<< "] failed";
delete chan;
return -1;
}
meta->cube_conn[i] = chan;
}
return 0;
}
int Meta::create_meta_from_ipport_list(
MetaInfo* meta,
const BUTIL_RAPIDJSON_NAMESPACE::Value& nodes,
int timeout,
int retry,
int backup_request,
const std::string& load_balancer) {
brpc::ChannelOptions options;
options.timeout_ms = timeout;
options.max_retry = retry;
if (backup_request > 0) {
options.backup_request_ms = backup_request;
}
meta->cube_conn.resize(meta->shard_num, NULL);
for (int i = 0; i < static_cast<int>(nodes.Size()); ++i) {
const BUTIL_RAPIDJSON_NAMESPACE::Value& node = nodes[i];
const std::string& ipport_list = node["ipport_list"].GetString();
::brpc::Channel* chan = new ::brpc::Channel();
if (chan->Init(ipport_list.c_str(), load_balancer.c_str(), &options) != 0) {
LOG(ERROR) << "Init connection to [" << ipport_list << "] failed";
delete chan;
return -1;
}
meta->cube_conn[i] = chan;
}
return 0;
}
int Meta::create_meta_from_ipport_parallel(
MetaInfo* meta,
const BUTIL_RAPIDJSON_NAMESPACE::Value& nodes,
int timeout,
int retry,
int backup_request) {
if (nodes.Size() < 1) {
LOG(ERROR) << "Config nodes size less than 0";
return -1;
}
brpc::ChannelOptions options;
options.timeout_ms = timeout;
options.max_retry = retry;
if (backup_request > 0) {
options.backup_request_ms = backup_request;
}
meta->cube_conn.resize(meta->shard_num, NULL);
for (int i = 0; i < meta->shard_num; ++i) {
const BUTIL_RAPIDJSON_NAMESPACE::Value& node = nodes[0];
const std::string& node_ip = node["ip"].GetString();
int node_port = node["port"].GetInt();
::brpc::Channel* chan = new ::brpc::Channel();
if (chan->Init(node_ip.c_str(), node_port, &options) != 0) {
LOG(ERROR) << "Init connection to [" << node_ip << ":" << node_port
<< "] failed";
delete chan;
return -1;
}
meta->cube_conn[i] = chan;
}
return 0;
}
const std::vector<const MetaInfo*> Meta::metas() {
std::vector<const MetaInfo*> metas;
for (auto i : _metas) {
metas.push_back(i.second);
}
return metas;
}
} // namespace mcube
} // namespace rec
# Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License
cmake_minimum_required(VERSION 2.8.10)
project(Cube C CXX)
option(CUBE_LINK_SO "Whether cube are linked dynamically" OFF)
execute_process(
COMMAND bash -c "find ${PROJECT_SOURCE_DIR}/../.. -type d -regex \".*output/include$\" | head -n1 | xargs dirname | tr -d '\n'"
OUTPUT_VARIABLE OUTPUT_PATH
)
set(CMAKE_PREFIX_PATH ${OUTPUT_PATH})
include_directories(SYSTEM ${CMAKE_CURRENT_LIST_DIR}/include)
include_directories(SYSTEM ${CMAKE_CURRENT_BINARY_DIR}/../)
include(FindProtobuf)
protobuf_generate_cpp(PROTO_SRC PROTO_HEADER proto/cube.proto proto/control.proto)
# include PROTO_HEADER
include_directories(${CMAKE_CURRENT_BINARY_DIR})
include(FindThreads)
# compiled with thrift protocol enabled, a link error would be reported.
find_library(THRIFT_LIB NAMES thrift)
if (NOT THRIFT_LIB)
set(THRIFT_LIB "")
endif()
find_library(THRIFTNB_LIB NAMES thriftnb)
if (NOT THRIFTNB_LIB)
set(THRIFTNB_LIB "")
endif()
set(CMAKE_CXX_FLAGS "${CMAKE_CPP_FLAGS} -DNDEBUG -O2 -D__const__= -pipe -W -Wall -Wno-unused-parameter -fPIC -fno-omit-frame-pointer")
if(CMAKE_VERSION VERSION_LESS "3.1.3")
if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
endif()
if(CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
endif()
else()
set(CMAKE_CXX_STANDARD 11)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
endif()
find_path(LEVELDB_INCLUDE_PATH NAMES leveldb/db.h)
find_library(LEVELDB_LIB NAMES leveldb)
if ((NOT LEVELDB_INCLUDE_PATH) OR (NOT LEVELDB_LIB))
message(FATAL_ERROR "Fail to find leveldb")
endif()
include_directories(${LEVELDB_INCLUDE_PATH})
find_library(SSL_LIB NAMES ssl)
if (NOT SSL_LIB)
message(FATAL_ERROR "Fail to find ssl")
endif()
find_library(CRYPTO_LIB NAMES crypto)
if (NOT CRYPTO_LIB)
message(FATAL_ERROR "Fail to find crypto")
endif()
set(SRC_LIST src/server.cpp
src/cube_bvar.cpp
src/dict.cpp
src/dict_set.cpp
src/recycle.cpp
src/framework.cpp
src/control.cpp
src/server.cpp)
add_executable(cube ${SRC_LIST} src/main.cpp
${PROTO_SRC} ${PROTO_HEADER})
set(DYNAMIC_LIB
${CMAKE_THREAD_LIBS_INIT}
${PROTOBUF_LIBRARIES}
${LEVELDB_LIB}
${SSL_LIB}
${CRYPTO_LIB}
${THRIFT_LIB}
${THRIFTNB_LIB}
)
if(CMAKE_SYSTEM_NAME STREQUAL "Darwin")
set(DYNAMIC_LIB ${DYNAMIC_LIB}
"-framework CoreFoundation"
"-framework CoreGraphics"
"-framework CoreData"
"-framework CoreText"
"-framework Security"
"-framework Foundation"
"-Wl,-U,_MallocExtension_ReleaseFreeMemory"
"-Wl,-U,_ProfilerStart"
"-Wl,-U,_ProfilerStop")
endif()
target_link_libraries(cube ${DYNAMIC_LIB} brpc -lpthread -ldl -lz)
add_executable(cube_test ${SRC_LIST} test/cube_test.cpp
${PROTO_SRC} ${PROTO_HEADER})
target_link_libraries(cube_test ${DYNAMIC_LIB} brpc gtest -lpthread -ldl -lz)
# install
install(TARGETS cube
RUNTIME DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/bin
)
--port=8027
--dict_split=1
--in_mem=true
--log_dir=./log/
\ No newline at end of file
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <string>
#include "cube-server/control.pb.h"
#include "butil/third_party/rapidjson/document.h"
#include "butil/third_party/rapidjson/prettywriter.h"
#include "butil/third_party/rapidjson/stringbuffer.h"
namespace rec {
namespace mcube {
class Control : public ControlService {
public:
Control();
virtual ~Control();
virtual void cmd(::google::protobuf::RpcController* controller,
const ::rec::mcube::HttpRequest* request,
::rec::mcube::HttpResponse* response,
::google::protobuf::Closure* done);
private:
int handle_status(const BUTIL_RAPIDJSON_NAMESPACE::Document& cmd,
BUTIL_RAPIDJSON_NAMESPACE::Document* res);
int handle_reload_base(const BUTIL_RAPIDJSON_NAMESPACE::Document& cmd,
const std::string& v_path);
int handle_reload_patch(const BUTIL_RAPIDJSON_NAMESPACE::Document& cmd,
const std::string& v_path);
int handle_bg_load_base(const BUTIL_RAPIDJSON_NAMESPACE::Document& cmd,
const std::string& v_path);
int handle_bg_load_patch(const BUTIL_RAPIDJSON_NAMESPACE::Document& cmd,
const std::string& v_path);
int handle_bg_unload(const BUTIL_RAPIDJSON_NAMESPACE::Document& cmd);
int handle_bg_switch(const BUTIL_RAPIDJSON_NAMESPACE::Document& cmd);
int handle_enable(const BUTIL_RAPIDJSON_NAMESPACE::Document& cmd);
std::mutex _cmd_mutex;
}; // class Control
} // namespace mcube
} // namespace rec
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include "bvar/bvar.h"
#include "bvar/recorder.h"
#include "bvar/window.h"
namespace bvar {
DECLARE_bool(bvar_dump);
DECLARE_string(bvar_dump_file);
}
namespace rec {
namespace mcube {
extern bvar::IntRecorder g_keys_num;
extern bvar::Adder<uint64_t> g_request_num;
extern bvar::IntRecorder g_data_load_time;
extern bvar::IntRecorder g_data_size;
extern bvar::Adder<uint64_t> g_long_value_num;
extern bvar::Adder<uint64_t> g_unfound_key_num;
extern bvar::Adder<uint64_t> g_total_key_num;
} // namespace mcube
} // namespace rec
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <stdint.h>
#include <atomic>
#include <memory>
#include <string>
#include <vector>
#include "cube/rw_lock.h"
#include "cube/slim_hash_map.h"
#include "cube/virtual_dict.h"
namespace rec {
namespace mcube {
class Dict : public VirtualDict {
public:
Dict();
virtual ~Dict();
/*
void set_base_dict(const Dict * dict){
_base_dict = dict;
}
*/
virtual void set_base_dict(const VirtualDict* dict) {
_base_dict = static_cast<const Dict*>(dict);
}
int load(const std::string& dict_path,
bool in_mem,
const std::string& v_path);
int destroy();
bool seek(uint64_t key, char* buff, uint64_t* buff_size);
const std::string& version(); // no lock, used by framework seek
std::string guard_version();
void atom_inc_seek_num();
void atom_dec_seek_num();
uint32_t atom_seek_num();
private:
int load_index(const std::string& dict_path, const std::string& v_path);
int load_data(const std::string& dict_path, const std::string& v_path);
int load_data_mmap(const std::string& dict_path, const std::string& v_path);
void set_version(const std::string& v_path);
private:
struct DataBlock {
DataBlock() : size(0), fd(-1) {}
std::shared_ptr<char> s_data;
uint32_t size;
int fd;
};
private:
// boost::unordered_map<uint64_t, uint64_t> _table;
slim_hash_map<uint64_t, uint64_t> _slim_table;
std::vector<DataBlock> _block_set;
std::atomic<uint32_t> _seek_num;
const Dict* _base_dict;
std::string _version;
RWLock _rw_lock;
}; // class Dict
typedef std::shared_ptr<Dict> DictPtr;
} // namespace mcube
} // namespace rec
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <atomic>
#include <string>
#include <vector>
#include "cube/dict.h"
#include "cube/virtual_dict.h"
namespace rec {
namespace mcube {
class DictSet : public VirtualDict {
public:
explicit DictSet(int dict_split);
virtual ~DictSet();
virtual int load(const std::vector<std::string>& dict_path,
bool in_mem,
const std::string& v_path);
virtual int destroy();
virtual const std::string& version();
virtual std::string guard_version();
virtual void set_base_dict(const VirtualDict* dict);
virtual bool seek(uint64_t key, char* buff, uint64_t* buff_size);
virtual void atom_inc_seek_num();
virtual void atom_dec_seek_num();
virtual uint32_t atom_seek_num();
private:
std::atomic<uint32_t> _seek_num{0};
std::vector<DictPtr> _dict_set;
int _dict_split{0};
std::string _version{""};
RWLock _rw_lock;
}; // DictSet
} // namespace mcube
} // namespace rec
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
namespace rec {
namespace mcube {
/*
struct Error {
enum ErrorCode {
E_OK = 0,
E_CONF_ERROR,
E_PARAM_ERROR,
E_SYS_ERROR,
E_INTERNAL_ERROR,
E_NETWORK_ERROR,
E_DATA_ERROR,
E_NO_IMPL,
E_UNKNOWN,
E_NUM,
E_NOT_FOUND,
E_NO_RES,
E_NO_REQ_ERROR,
E_TYPE_CONVERT_ERROR
}; // enum ErrorCode
}; // struct Error
*/
const int E_OK = 0;
const int E_DATA_ERROR = -1;
const int E_OOM = -2;
const int E_NOT_IMPL = -3;
} // namespace mcube
} // namespace rec
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <atomic>
#include <string>
#include <vector>
#include "butil/third_party/rapidjson/document.h"
#include "butil/third_party/rapidjson/prettywriter.h"
#include "butil/third_party/rapidjson/stringbuffer.h"
#include "cube-server/cube.pb.h"
#include "cube/rw_lock.h"
#include "cube/virtual_dict.h"
namespace rec {
namespace mcube {
struct Status {
enum StatusCode { F_RUNNING = 0, F_LOADING };
};
class Framework {
public:
static Framework* instance();
public:
~Framework();
int init(uint32_t dict_split, bool in_mem);
int destroy();
int status(BUTIL_RAPIDJSON_NAMESPACE::Document* res);
int seek(const DictRequest* req, DictResponse* res);
int reload(const std::string& v_path);
int patch(const std::string& v_path);
// load dict base
int bg_load_base(const std::string& v_path);
// load dict patch
int bg_load_patch(const std::string& v_path);
int bg_unload();
int bg_switch();
int enable(const std::string& version);
private:
void init_dict(uint32_t dict_split);
VirtualDict* create_dict();
private:
VirtualDict* get_cur_dict() {
_rw_lock.r_lock();
VirtualDict* dict = _dict[_dict_idx];
dict->atom_inc_seek_num();
_rw_lock.unlock();
return dict;
}
std::string get_cur_version() {
_rw_lock.r_lock();
VirtualDict* dict = _dict[_dict_idx];
std::string version = dict->version();
_rw_lock.unlock();
return version;
}
VirtualDict* get_bg_dict() const { return _dict[1 - _dict_idx]; }
std::string get_bg_version() {
_bg_rw_lock.r_lock();
VirtualDict* dict = _dict[1 - _dict_idx];
std::string version = "";
if (dict) {
version = dict->guard_version();
}
_bg_rw_lock.unlock();
return version;
}
void set_bg_dict(VirtualDict* dict) {
_bg_rw_lock.w_lock();
_dict[1 - _dict_idx] = dict;
_bg_rw_lock.unlock();
}
void release(VirtualDict* dict);
private:
Framework() : _dict_idx(0) {}
Framework(const Framework&) {}
private:
VirtualDict* _dict[2]{nullptr, nullptr};
int _dict_idx{0};
std::string _dict_path{""};
bool _in_mem{true};
std::atomic_int _status;
RWLock _rw_lock;
RWLock _bg_rw_lock;
uint32_t _max_val_size{0};
uint32_t _dict_split{0};
std::vector<std::string> _dict_set_path;
}; // class Framework
} // namespace mcube
} // namespace rec
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <pthread.h>
#include <queue>
#include "cube/dict.h"
namespace rec {
namespace mcube {
class Recycle {
public:
static Recycle* get_instance();
public:
~Recycle();
/**
* @brief init recycle module
*/
int init();
/**
* @brief destroy recycle module and wait recycle quit
*/
int destroy();
/**
* @brief send dict to recycle module
*/
// void recycle(Dict* dict);
/**
* @brief send dict to recycle module
*/
void recycle(VirtualDict* dict);
private:
static void* recycle_func(void*);
Recycle();
/**
* @brief lock recycle list
*/
void lock();
/**
* @brief unlock recycle list
*/
void unlock();
private:
pthread_t _recycle_thread;
pthread_mutex_t _recycle_mutex;
// std::queue<Dict*> _recycle_list;
std::queue<VirtualDict*> _recycle_list;
bool _running;
}; // class Recycle
} // namespace mcube
} // namespace rec
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <pthread.h>
namespace rec {
namespace mcube {
class RWLock {
public:
RWLock() { pthread_rwlock_init(&_lock, NULL); }
~RWLock() { pthread_rwlock_destroy(&_lock); }
void r_lock() { pthread_rwlock_rdlock(&_lock); }
void w_lock() { pthread_rwlock_wrlock(&_lock); }
void unlock() { pthread_rwlock_unlock(&_lock); }
private:
pthread_rwlock_t _lock;
}; // class RWLock
} // namespace mcube
} // namespace rec
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include "cube-server/cube.pb.h"
namespace rec {
namespace mcube {
class Server : public DictService {
public:
Server();
virtual ~Server();
virtual void seek(::google::protobuf::RpcController* controller,
const ::rec::mcube::DictRequest* request,
::rec::mcube::DictResponse* response,
::google::protobuf::Closure* done);
};
} // namespace mcube
} // namespace rec
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <stdint.h>
#include <stdio.h>
#include <string.h>
#include <map>
#include <utility>
#include <vector>
#ifdef BCLOUD
#include "base/logging.h"
#else
#include "butil/logging.h"
#endif
/**
* hash node
*/
#pragma pack(push, 1)
template <typename key_t, typename value_t>
struct slim_hash_node_t {
std::pair<key_t, value_t> data;
uint32_t next;
};
#pragma pack(pop)
/**
* hash map
*/
template <typename key_t, typename value_t>
class slim_hash_map {
public:
typedef key_t key_type;
typedef value_t mapped_type;
typedef std::pair<key_t, value_t> value_type;
typedef size_t size_type;
private:
typedef slim_hash_node_t<key_t, value_t> hash_node_t;
static const size_type MAX_BLOCK_NUM = 512;
static const size_type BLOCK_SIZE = 8388608;
static const uint32_t DUMP_GRANULARITY =
1000; /**< the granularity of every save job */
static const int iterm_size = sizeof(key_t) + sizeof(value_t);
hash_node_t* m_blockAddr[MAX_BLOCK_NUM];
uint32_t m_nFreeEntries;
uint32_t m_nNextEntry;
size_type m_nBlockNum;
uint32_t* m_hashTable;
size_type m_nHashSize;
size_type m_nSize;
std::vector<uint64_t> m_fileLens;
struct DumpBar {
FILE* fp; /**< the fp of the file to dump */
char* buf; /**< buffer to serialize the date */
int buf_size; /**< the size of buf */
// int startIndex; /**< the begin index of the last dump job */
int lastIndex; /**< the index where last dump job ended */
DumpBar() : fp(NULL), buf(NULL) {}
DumpBar(FILE* _fp, char* _buf, int _buf_size, int _lastIndex)
: fp(_fp), buf(_buf), buf_size(_buf_size), lastIndex(_lastIndex) {}
~DumpBar() { reset(); }
void reset() {
if (fp) {
fclose(fp);
}
fp = NULL;
free(buf);
buf = NULL;
buf_size = 0;
lastIndex = 0;
return;
}
};
DumpBar dumpBar;
public:
struct iterator {
friend class slim_hash_map;
private:
slim_hash_map* container;
hash_node_t* node;
uint32_t index;
public:
iterator(slim_hash_map* container, uint32_t index, hash_node_t* node) {
this->container = container;
this->index = index;
this->node = node;
}
hash_node_t* get_node() { return node; }
uint32_t cur_index() { return index; }
std::pair<key_t, value_t>* operator->() { return &node->data; }
std::pair<key_t, value_t>& operator*() { return node->data; }
iterator& operator++() {
if (node == NULL) {
return *this;
}
if (node->next == 0) {
++index;
while (index < container->m_nHashSize) {
if (container->m_hashTable[index] != 0) {
break;
}
++index;
}
if (index == container->m_nHashSize) {
index = -1;
node = NULL;
return *this;
}
node = container->get_node(container->m_hashTable[index]);
} else {
node = container->get_node(node->next);
}
return *this;
}
iterator operator++(int) {
iterator it_bak = *this;
this->operator++();
return it_bak;
}
enum IteratorStatus { END = 0, OneBlockEND = 1, CONTINUE = 2 };
IteratorStatus IncrIterSenseBlock() {
if (NULL == node) {
return END;
}
if (0 == node->next) {
++index;
while (index < container->m_nHashSize) {
if (container->m_hashTable[index] != 0) {
break;
}
++index;
}
if (index == container->m_nHashSize) {
index = -1;
node = NULL;
return END;
}
node = container->get_node(container->m_hashTable[index]);
return OneBlockEND;
}
node = container->get_node(node->next);
return CONTINUE;
}
bool operator==(const iterator& it) const {
return container == it.container && index == it.index && node == it.node;
}
bool operator!=(const iterator& it) const { return !operator==(it); }
};
slim_hash_map() {
m_nFreeEntries = 0;
m_nNextEntry = 1;
memset(m_blockAddr, 0, sizeof(m_blockAddr));
m_nBlockNum = 0;
m_hashTable = 0;
m_nSize = 0;
m_nHashSize = 0;
}
void destroy() {
delete[] m_hashTable;
m_hashTable = NULL;
m_nHashSize = 0;
for (size_type i = 0; i < m_nBlockNum; ++i) {
delete[] m_blockAddr[i];
m_blockAddr[i] = NULL;
}
m_nBlockNum = 0;
}
~slim_hash_map() { destroy(); }
int copy_data_from(const slim_hash_map& rhs) {
destroy();
if (rhs.m_nHashSize > 0) {
m_hashTable = new (std::nothrow) uint32_t[rhs.m_nHashSize];
if (!m_hashTable) {
LOG(ERROR) << "new m_hashTable failed, size "
<< sizeof(uint32_t) * rhs.m_nHashSize;
return -1;
}
memcpy(m_hashTable, rhs.m_hashTable, sizeof(uint32_t) * rhs.m_nHashSize);
}
m_nHashSize = rhs.m_nHashSize;
for (m_nBlockNum = 0; m_nBlockNum < rhs.m_nBlockNum; ++m_nBlockNum) {
m_blockAddr[m_nBlockNum] = new (std::nothrow) hash_node_t[BLOCK_SIZE];
if (!m_blockAddr[m_nBlockNum]) {
LOG(ERROR) << "new m_blockAddr[" << m_nBlockNum << "] failed, size "
<< sizeof(hash_node_t) * BLOCK_SIZE;
return -1;
}
memcpy(m_blockAddr[m_nBlockNum],
rhs.m_blockAddr[m_nBlockNum],
sizeof(hash_node_t) * BLOCK_SIZE);
}
m_nFreeEntries = rhs.m_nFreeEntries;
m_nNextEntry = rhs.m_nNextEntry;
m_nSize = rhs.m_nSize;
m_fileLens = rhs.m_fileLens;
return 0;
}
size_type size() const { return m_nSize; }
size_type max_size() const { return (size_type)-1; }
bool empty() const { return m_nSize == 0; }
size_type bucket_count() const { return m_nHashSize; }
iterator begin(uint32_t index = 0) {
for (size_type i = index; i < m_nHashSize; ++i) {
if (m_hashTable[i] != 0) {
return iterator(this, i, get_node(m_hashTable[i]));
}
}
return end();
}
iterator end() { return iterator(this, -1, NULL); }
iterator find(const key_type& key) {
if (m_nHashSize == 0) {
return iterator(this, 0, NULL);
}
size_type index = key % m_nHashSize;
hash_node_t* node = get_node(m_hashTable[index]);
while (node != NULL && node->data.first != key) {
node = get_node(node->next);
}
if (node == NULL) {
return end();
}
return iterator(this, index, node);
}
size_type erase(const key_type& key) {
size_type index = key % m_nHashSize;
uint32_t* last_node_pointer = &m_hashTable[index];
uint32_t addr = m_hashTable[index];
hash_node_t* node = get_node(addr);
while (node != NULL && node->data.first != key) {
last_node_pointer = &node->next;
addr = node->next;
node = get_node(addr);
}
if (node == NULL) {
return 0;
}
*last_node_pointer = node->next;
release_node(addr, node);
return 1;
}
void erase(const iterator& it) {
uint32_t* last_node_pointer = &m_hashTable[it.index];
uint32_t addr = m_hashTable[it.index];
hash_node_t* node = get_node(addr);
while (node != it.node) {
last_node_pointer = &node->next;
addr = node->next;
node = get_node(addr);
}
*last_node_pointer = node->next;
release_node(addr, node);
return;
}
static uint64_t next_prime(uint64_t n) {
static const uint64_t s_prime_list[] = {
53ul, 97ul, 193ul, 389ul, 769ul,
1543ul, 3079ul, 6151ul, 12289ul, 24593ul,
49157ul, 98317ul, 196613ul, 393241ul, 786433ul,
1572869ul, 3145739ul, 6291469ul, 12582917ul, 25165843ul,
50331653ul, 100663319ul, 200000033ul, 201326611ul, 210000047ul,
220000051ul, 230000059ul, 240000073ul, 250000103ul, 260000137ul,
270000161ul, 280000241ul, 290000251ul, 300000277ul, 310000283ul,
320000287ul, 330000371ul, 340000387ul, 350000411ul, 360000451ul,
370000489ul, 380000519ul, 390000521ul, 400000543ul, 402653189ul,
805306457ul, 1610612741ul, 3221225473ul, 4294967291ul};
const size_t s_num_primes = sizeof(s_prime_list) / sizeof(s_prime_list[0]);
const uint64_t* first = s_prime_list;
const uint64_t* last = s_prime_list + static_cast<int>(s_num_primes);
const uint64_t* pos = std::lower_bound(first, last, n);
return (pos == last) ? *(last - 1) : *pos;
}
void resize(size_type size) {
uint32_t* hashTable;
size = next_prime(size);
if (size <= m_nHashSize) {
LOG(INFO) << "next prime[" << size << "] <= m_nHashSize[" << m_nHashSize
<< "], no need to resize";
return;
}
LOG(INFO) << "resize m_nHashSize[" << m_nHashSize << "] to next prime["
<< size << "]";
try {
hashTable = new uint32_t[size];
} catch (std::exception& e) {
LOG(ERROR) << "std::exception[" << e.what() << "] was thrown!";
return;
} catch (...) {
LOG(ERROR) << "unkown exception was thrown!";
return;
}
memset(hashTable, 0, sizeof(uint32_t) * size);
if (m_hashTable == NULL) {
m_hashTable = hashTable;
} else {
// rehash
for (size_type bucket = 0; bucket < m_nHashSize; ++bucket) {
uint32_t first_addr = m_hashTable[bucket];
hash_node_t* first = get_node(first_addr);
while (first) {
size_type new_bucket = first->data.first % size;
uint32_t next_addr = first->next;
m_hashTable[bucket] = next_addr;
first->next = hashTable[new_bucket];
hashTable[new_bucket] = first_addr;
first_addr = next_addr;
first = get_node(first_addr);
}
}
delete[] m_hashTable;
m_hashTable = hashTable;
}
m_nHashSize = size;
}
mapped_type& operator[](const key_type& key) {
uint32_t index = key % m_nHashSize;
hash_node_t* node = get_node(m_hashTable[index]);
while (node != NULL && node->data.first != key) {
node = get_node(node->next);
}
if (node != NULL) {
return node->data.second;
}
return add_node(index, key)->data.second;
}
void clear() {
memset(m_hashTable, 0, sizeof(uint32_t) * m_nHashSize);
m_nNextEntry = 1;
m_nFreeEntries = 0;
m_nSize = 0;
}
bool load(const char* file) {
// clear();
int size = sizeof(key_t) + sizeof(value_t);
FILE* fp = fopen(file, "rb");
char* buf = reinterpret_cast<char*>(malloc(size * 100000));
if (fp == NULL || buf == NULL) {
return false;
}
size_t read_count;
bool err = false;
key_t key;
value_t value;
while ((read_count = fread(buf, size, 100000, fp)) != 0) {
if (ferror(fp) != 0) {
err = true;
break;
}
for (int i = 0; i < static_cast<int>(read_count); ++i) {
key = *(reinterpret_cast<key_t*>(buf + i * size));
value = *(reinterpret_cast<value_t*>(buf + i * size + sizeof(key_t)));
(*this)[key] = value;
}
}
if (err) {
clear();
}
fclose(fp);
free(buf);
return !err;
}
/**
* @brief dump data in memory into file.
* DUMP_GRANULARITYs entry will dump every execution.
*
* @param file the file name to store the data
*
* @return -1: failed to dump data;
* 0 : complete the dumping;
* greater than 0: the index of the entry in hashtable where dump job
* goes
*/
int save(const char* file) {
static const int FACTOR = 1000;
bool writerErr = false;
hash_node_t* node = NULL;
if (NULL == file && NULL == dumpBar.fp) {
return -1;
}
if (file) { // the begin of the dump job
dumpBar.fp = fopen(file, "wb");
if (NULL == dumpBar.fp) {
goto ERR_RET;
}
dumpBar.buf_size = iterm_size * DUMP_GRANULARITY * FACTOR;
dumpBar.buf = reinterpret_cast<char*>(malloc(dumpBar.buf_size));
if (NULL == dumpBar.buf) {
goto ERR_RET;
}
}
for (uint32_t i = 0;
i < DUMP_GRANULARITY && (size_type)dumpBar.lastIndex < m_nHashSize;
i++, dumpBar.lastIndex++) {
node = get_node(m_hashTable[dumpBar.lastIndex]);
for (uint32_t j = 0; NULL != node;) {
*(reinterpret_cast<key_t*>(dumpBar.buf + j * iterm_size)) =
node->data.first;
*(reinterpret_cast<value_t*>(dumpBar.buf + j * iterm_size +
sizeof(key_t))) = node->data.second;
++j;
node = get_node(node->next);
if (j == DUMP_GRANULARITY * FACTOR || NULL == node) {
if (fwrite(dumpBar.buf, iterm_size, j, dumpBar.fp) != j) {
writerErr = true;
goto ERR_RET;
}
j = 0;
}
} // end of for
} // end of for
if ((size_type)dumpBar.lastIndex == m_nHashSize) { // mission complete
dumpBar.reset();
return 0;
}
// not complete yet
return dumpBar.lastIndex;
ERR_RET:
if (writerErr) {
remove(file);
}
dumpBar.reset();
return -1;
}
void add_file_len(uint64_t len) { m_fileLens.push_back(len); }
// return true if m_fileLens is prefix of lens
bool check_file_len(const std::vector<uint64_t>& lens,
size_t* old_size) const {
*old_size = m_fileLens.size();
if (*old_size > lens.size()) {
LOG(WARNING) << "old file_len size[" << *old_size
<< "] > new file_len size[" << lens.size()
<< "] in patch mode";
return false;
}
for (size_t i = 0; i < *old_size; ++i) {
if (m_fileLens[i] != lens[i]) {
LOG(WARNING) << "old file_len[" << m_fileLens[i] << "] != new file_len["
<< lens[i] << "] of pos " << i << " in patch mode";
return false;
}
}
return true;
}
protected:
hash_node_t* get_node(uint32_t addr) {
if (addr == 0) {
return NULL;
}
uint32_t block = ((addr & 0xFF800000) >> 23);
uint32_t index = (addr & 0x7FFFFF);
return &m_blockAddr[block][index];
}
void release_node(uint32_t addr, hash_node_t* node) {
--m_nSize;
node->next = m_nFreeEntries;
m_nFreeEntries = addr;
return;
}
hash_node_t* add_node(uint32_t index, const key_type& key) {
++m_nSize;
if (m_nFreeEntries) {
uint32_t addr = m_nFreeEntries;
hash_node_t* node = get_node(addr);
m_nFreeEntries = node->next;
node->next = m_hashTable[index];
m_hashTable[index] = addr;
node->data.first = key;
return node;
}
uint32_t block = ((m_nNextEntry & 0xFF800000) >> 23);
if (block >= m_nBlockNum) {
try {
m_blockAddr[m_nBlockNum++] = new hash_node_t[BLOCK_SIZE];
} catch (std::exception& e) {
LOG(ERROR) << "std::exception[" << e.what() << "] was thrown!";
return NULL;
} catch (...) {
LOG(ERROR) << "unkown exception was thrown!";
return NULL;
}
}
uint32_t addr = m_nNextEntry;
++m_nNextEntry;
hash_node_t* node = get_node(addr);
node->next = m_hashTable[index];
m_hashTable[index] = addr;
node->data.first = key;
return node;
}
};
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
namespace rec {
namespace mcube {
#define TIME_FLAG(flag) \
struct timeval flag; \
gettimeofday(&(flag), NULL);
inline uint64_t time_diff(const struct timeval& start_time,
const struct timeval& end_time) {
return (end_time.tv_sec - start_time.tv_sec) * 1000 +
(end_time.tv_usec - start_time.tv_usec) / 1000;
}
} // namespace mcube
} // namespace rec
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <string>
#include <vector>
#include "cube/error.h"
namespace rec {
namespace mcube {
class VirtualDict {
public:
VirtualDict() {}
virtual ~VirtualDict() {}
virtual int load(const std::string& /*dict_path*/,
bool /*in_mem*/,
const std::string& /*v_path*/) {
return E_NOT_IMPL;
}
virtual int load(const std::vector<std::string>& /*dict_path*/,
bool /*in_mem*/,
const std::string& /*v_path*/) {
return E_NOT_IMPL;
}
virtual int destroy() { return E_NOT_IMPL; }
virtual const std::string& version() {
static std::string UNKNOWN_VERSION = "UNKNOWN";
return UNKNOWN_VERSION;
}
virtual std::string guard_version() {
static std::string UNKNOWN_VERSION = "UNKNOWN";
return UNKNOWN_VERSION;
}
virtual void set_base_dict(const VirtualDict* dict) = 0;
virtual bool seek(uint64_t key, char* buff, uint64_t* buff_size) = 0;
virtual void atom_inc_seek_num() = 0;
virtual void atom_dec_seek_num() = 0;
virtual uint32_t atom_seek_num() = 0;
}; // class VirtualDict
} // namespace mcube
} // namespace rec
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto2";
package rec.mcube;
option cc_generic_services = true;
message HttpRequest {};
message HttpResponse {};
service ControlService { rpc cmd(HttpRequest) returns (HttpResponse); };
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto2";
package rec.mcube;
option cc_generic_services = true;
message DictRequest {
repeated uint64 keys = 1;
optional bool version_required = 2 [ default = false ];
};
message DictValue {
required uint32 status = 1;
required bytes value = 2;
};
message DictResponse {
repeated DictValue values = 1;
optional string version = 2;
};
service DictService { rpc seek(DictRequest) returns (DictResponse); };
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <brpc/server.h>
#include "cube/control.h"
#include "cube/framework.h"
namespace rec {
namespace mcube {
using ::rec::mcube::HttpRequest;
using ::rec::mcube::HttpResponse;
using ::google::protobuf::RpcController;
using ::google::protobuf::Closure;
using ::brpc::HttpHeader;
using ::brpc::URI;
using ::brpc::Controller;
using ::brpc::ClosureGuard;
using BUTIL_RAPIDJSON_NAMESPACE::Document;
std::string rapidjson_value_to_string(
const BUTIL_RAPIDJSON_NAMESPACE::Value& value) {
BUTIL_RAPIDJSON_NAMESPACE::StringBuffer buffer;
BUTIL_RAPIDJSON_NAMESPACE::PrettyWriter<
BUTIL_RAPIDJSON_NAMESPACE::StringBuffer>
writer(buffer);
value.Accept(writer);
return buffer.GetString();
}
Control::Control() {}
Control::~Control() {}
void Control::cmd(::google::protobuf::RpcController* cntl_base,
const ::rec::mcube::HttpRequest* /*request*/,
::rec::mcube::HttpResponse* /*response*/,
::google::protobuf::Closure* done) {
ClosureGuard done_guard(done);
Controller* cntl = static_cast<Controller*>(cntl_base);
std::string cmd_str = cntl->request_attachment().to_string();
Document cmd;
cmd.Parse(cmd_str.c_str());
LOG(INFO) << "HANDLE CMD: " << cmd_str;
if (!cmd.IsObject()) {
LOG(ERROR) << "parse command failed";
cntl->http_response().set_status_code(brpc::HTTP_STATUS_BAD_REQUEST);
return;
}
if (!cmd.HasMember("cmd") || !cmd["cmd"].IsString()) {
cntl->http_response().set_status_code(brpc::HTTP_STATUS_BAD_REQUEST);
return;
}
std::string cmd_name = cmd["cmd"].GetString();
std::string version_path = "";
if (cmd.HasMember("version_path") && cmd["version_path"].IsString()) {
version_path = cmd["version_path"].GetString();
}
int ret = 0;
Document response;
if (cmd_name.compare("status") == 0) {
ret = handle_status(cmd, &response);
} else if (_cmd_mutex.try_lock()) {
if (cmd_name.compare("reload_base") == 0) {
ret = handle_reload_base(cmd, version_path);
} else if (cmd_name.compare("reload_patch") == 0) {
ret = handle_reload_patch(cmd, version_path);
} else if (cmd_name.compare("bg_load_base") == 0) {
ret = handle_bg_load_base(cmd, version_path);
} else if (cmd_name.compare("bg_load_patch") == 0) {
ret = handle_bg_load_patch(cmd, version_path);
} else if (cmd_name.compare("bg_unload") == 0) {
ret = handle_bg_unload(cmd);
} else if (cmd_name.compare("bg_switch") == 0) {
ret = handle_bg_switch(cmd);
} else if (cmd_name.compare("enable") == 0) {
ret = handle_enable(cmd);
} else {
ret = -1;
LOG(ERROR) << "unknown cmd: " << cmd_name;
}
_cmd_mutex.unlock();
} else {
LOG(ERROR) << "try to get cmd mutex failed cmd: " << cmd_name;
ret = -1;
}
cntl->response_attachment().append(rapidjson_value_to_string(response));
if (ret == 0) {
cntl->http_response().set_status_code(brpc::HTTP_STATUS_OK);
} else {
cntl->http_response().set_status_code(
brpc::HTTP_STATUS_INTERNAL_SERVER_ERROR);
}
LOG(INFO) << "CMD DONE: " << cmd_str;
return;
}
int Control::handle_status(const Document& /*cmd*/, Document* res) {
Framework* framework = Framework::instance();
return framework->status(res);
}
int Control::handle_reload_patch(const Document& /*cmd*/,
const std::string& v_path) {
Framework* framework = Framework::instance();
return framework->patch(v_path);
}
int Control::handle_reload_base(const Document& /*cmd*/,
const std::string& v_path) {
Framework* framework = Framework::instance();
return framework->reload(v_path);
}
int Control::handle_bg_load_patch(const Document& /*cmd*/,
const std::string& v_path) {
Framework* framework = Framework::instance();
return framework->bg_load_patch(v_path);
}
int Control::handle_bg_load_base(const Document& /*cmd*/,
const std::string& v_path) {
Framework* framework = Framework::instance();
return framework->bg_load_base(v_path);
}
int Control::handle_bg_unload(const Document& /*cmd*/) {
Framework* framework = Framework::instance();
return framework->bg_unload();
}
int Control::handle_bg_switch(const Document& /*cmd*/) {
Framework* framework = Framework::instance();
return framework->bg_switch();
}
int Control::handle_enable(const Document& cmd) {
if (!cmd.HasMember("version") || !cmd["version"].IsString()) {
return -1;
}
Framework* framework = Framework::instance();
return framework->enable(cmd["version"].GetString());
}
} // namespace mcube
} // namespace rec
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "cube/cube_bvar.h"
namespace rec {
namespace mcube {
bvar::IntRecorder g_keys_num;
bvar::Window<bvar::IntRecorder> g_keys_win("keys_per_request_num",
&g_keys_num,
bvar::FLAGS_bvar_dump_interval);
bvar::Adder<uint64_t> g_request_num("request_num");
bvar::Window<bvar::Adder<uint64_t>> g_request_num_minute("request_num_minute",
&g_request_num,
60);
bvar::IntRecorder g_data_load_time("data_load_time");
bvar::IntRecorder g_data_size("data_size");
bvar::Adder<uint64_t> g_long_value_num("long_value_num");
bvar::Window<bvar::Adder<uint64_t>> g_long_value_num_minute(
"long_value_num_minute", &g_long_value_num, 60);
bvar::Adder<uint64_t> g_unfound_key_num("unfound_key_num");
bvar::Window<bvar::Adder<uint64_t>> g_unfound_key_num_minute(
"unfound_key_num_minute", &g_unfound_key_num, 60);
bvar::Adder<uint64_t> g_total_key_num("total_key_num");
bvar::Window<bvar::Adder<uint64_t>> g_total_key_num_minute(
"total_key_num_minute", &g_total_key_num, 60);
} // namespace mcube
} // namespace rec
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <functional>
#include "cube/cube_bvar.h"
#include "cube/dict.h"
#include "cube/error.h"
#include "cube/util.h"
namespace rec {
namespace mcube {
static void munmap_deleter(void* data, uint32_t size) {
if (data != MAP_FAILED) {
munmap(data, size);
}
}
Dict::Dict() : _seek_num(0), _base_dict(NULL) {}
Dict::~Dict() {}
void Dict::atom_dec_seek_num() { --_seek_num; }
void Dict::atom_inc_seek_num() { ++_seek_num; }
uint32_t Dict::atom_seek_num() { return _seek_num; }
int Dict::load(const std::string& dict_path,
bool in_mem,
const std::string& v_path) {
TIME_FLAG(load_start);
int ret = load_index(dict_path, v_path);
if (ret != E_OK) {
LOG(WARNING) << "load index failed";
return ret;
}
if (in_mem) {
ret = load_data(dict_path, v_path);
if (ret != E_OK) {
LOG(ERROR) << "load data failed";
return ret;
}
} else {
ret = load_data_mmap(dict_path, v_path);
if (ret != E_OK) {
LOG(ERROR) << "load data failed";
return ret;
}
}
set_version(v_path);
TIME_FLAG(load_end);
g_data_load_time << time_diff(load_start, load_end);
return E_OK;
}
int Dict::load_index(const std::string& dict_path, const std::string& v_path) {
std::string index_n_path(dict_path);
index_n_path.append(v_path);
index_n_path.append("/index.n");
LOG(INFO) << "index file path: " << index_n_path;
std::unique_ptr<FILE, decltype(&fclose)> pf(fopen(index_n_path.c_str(), "rb"),
&fclose);
if (pf.get() == NULL) {
LOG(WARNING) << "open index: " << index_n_path << " failed";
return E_DATA_ERROR;
}
int type = 0;
if (fread(reinterpret_cast<void*>(&type), sizeof(int), 1, pf.get()) != 1) {
LOG(ERROR) << "index syntax error";
return E_DATA_ERROR;
}
uint32_t count = 0;
if (fread(reinterpret_cast<void*>(&count), sizeof(uint32_t), 1, pf.get()) !=
1) {
LOG(ERROR) << "index syntax error";
return E_DATA_ERROR;
}
uint32_t file_num = 0;
if (fread(
reinterpret_cast<void*>(&file_num), sizeof(uint32_t), 1, pf.get()) !=
1) {
LOG(ERROR) << "index syntax error";
return E_DATA_ERROR;
}
LOG(INFO) << "index type:" << type << ", count:" << count
<< ", file_num:" << file_num;
// read file_lens begin
uint32_t file_cnt = file_num;
uint64_t len[1024];
std::vector<uint64_t> file_lens;
file_lens.reserve(file_num);
while (file_cnt > 0) {
uint32_t to_read = file_cnt > 1024 ? 1024 : file_cnt;
file_cnt -= to_read;
if (fread(reinterpret_cast<void*>(&len),
sizeof(uint64_t),
to_read,
pf.get()) != to_read) {
return E_DATA_ERROR;
}
for (uint32_t i = 0; i < to_read; ++i) {
file_lens.push_back(len[i]);
}
}
if (file_lens.size() != file_num) {
LOG(ERROR) << "file_num[" << file_num << "] != file_lens size["
<< file_lens.size() << "], shouldn't happen";
return E_DATA_ERROR;
}
// try patch mode
size_t file_idx = 0;
if (_base_dict) {
if (_base_dict->_slim_table.check_file_len(file_lens, &file_idx)) {
LOG(INFO) << "index check file len ok in patch mode, set file_idx to "
<< file_idx;
if (_slim_table.copy_data_from(_base_dict->_slim_table) != 0) {
LOG(ERROR) << "copy data from old index failed in patch mode";
return E_DATA_ERROR;
}
} else {
file_idx = 0;
LOG(INFO)
<< "index check file len failed in patch mode, set file_idx to 0";
}
}
_slim_table.resize(count / 2);
char file[1024];
struct stat fstat;
for (; file_idx < file_num; ++file_idx) {
snprintf(file,
sizeof(file),
"%s%s/index.%lu",
dict_path.c_str(),
v_path.c_str(),
file_idx);
if (stat(file, &fstat) < 0) {
if (errno == ENOENT) {
LOG(WARNING) << "index." << file_idx << " not exist";
_slim_table.add_file_len(0);
continue;
}
return E_DATA_ERROR;
}
if ((uint64_t)fstat.st_size != file_lens[file_idx]) {
LOG(ERROR) << "load_index failed, expect index file[" << file_idx
<< "] size is " << file_lens[file_idx] << ", actual size is "
<< (uint64_t)fstat.st_size;
return E_DATA_ERROR;
}
LOG(INFO) << "loading from index." << file_idx;
if (!_slim_table.load(file) || _slim_table.size() > count) {
return E_DATA_ERROR;
}
_slim_table.add_file_len(file_lens[file_idx]);
}
return E_OK;
}
int Dict::load_data(const std::string& dict_path, const std::string& v_path) {
if (_base_dict) {
_block_set = _base_dict->_block_set;
}
std::string data_n_path(dict_path);
data_n_path.append(v_path);
data_n_path.append("/data.n");
FILE* pf = fopen(data_n_path.c_str(), "rb");
if (pf == NULL) {
LOG(ERROR) << "open data [" << data_n_path << "] failed";
return E_DATA_ERROR;
}
uint32_t count = 0;
if (fread(reinterpret_cast<void*>(&count), sizeof(uint32_t), 1, pf) != 1) {
LOG(ERROR) << "data syntax error";
fclose(pf);
return E_DATA_ERROR;
}
std::vector<uint32_t> block_size;
uint64_t total_data_size = 0;
for (uint32_t i = 0; i < count; ++i) {
uint32_t size = 0;
if (fread(reinterpret_cast<void*>(&size), sizeof(uint32_t), 1, pf) != 1) {
LOG(ERROR) << "data syntax error";
fclose(pf);
return E_DATA_ERROR;
}
block_size.push_back(size);
total_data_size += size;
}
g_data_size << (total_data_size / 1024 / 1024);
fclose(pf);
pf = NULL;
uint32_t old_size = _block_set.size();
for (size_t i = 0; i < old_size; ++i) {
if (_block_set[i].size != block_size[i]) {
old_size = 0;
break;
}
}
_block_set.resize(count);
for (size_t i = old_size; i < _block_set.size(); ++i) {
char data_path[1024];
LOG(INFO) << "load from data." << i;
snprintf(
data_path, 1024, "%s%s/data.%lu", dict_path.c_str(), v_path.c_str(), i);
FILE* data_file = fopen(data_path, "rb");
if (data_file == NULL) {
LOG(WARNING) << "open data file [" << data_path << " failed";
_block_set[i].s_data.reset();
_block_set[i].size = 0;
continue;
}
_block_set[i].s_data.reset(
reinterpret_cast<char*>(malloc(block_size[i] * sizeof(char))));
if (_block_set[i].s_data.get() == NULL) {
LOG(ERROR) << "malloc data failed";
fclose(data_file);
return E_OOM;
}
_block_set[i].size = block_size[i];
if (fread(reinterpret_cast<void*>(_block_set[i].s_data.get()),
sizeof(char),
_block_set[i].size,
data_file) != _block_set[i].size) {
LOG(ERROR) << "read data failed";
fclose(data_file);
return E_DATA_ERROR;
}
fclose(data_file);
}
return E_OK;
}
int Dict::load_data_mmap(const std::string& dict_path,
const std::string& v_path) {
std::string data_n_path(dict_path);
data_n_path.append(v_path);
data_n_path.append("/data.n");
FILE* pf = fopen(data_n_path.c_str(), "rb");
if (pf == NULL) {
LOG(ERROR) << "open data [" << data_n_path << "] failed";
return E_DATA_ERROR;
}
uint32_t count = 0;
if (fread(reinterpret_cast<void*>(&count), sizeof(uint32_t), 1, pf) != 1) {
LOG(ERROR) << "data syntax error";
fclose(pf);
return E_DATA_ERROR;
}
std::vector<uint32_t> block_size;
uint64_t total_data_size = 0;
for (uint32_t i = 0; i < count; ++i) {
uint32_t size = 0;
if (fread(reinterpret_cast<void*>(&size), sizeof(uint32_t), 1, pf) != 1) {
LOG(ERROR) << "data syntax error";
fclose(pf);
return E_DATA_ERROR;
}
block_size.push_back(size);
total_data_size += size;
}
g_data_size << (total_data_size / 1024 / 1024);
fclose(pf);
pf = NULL;
uint32_t old_size = _block_set.size();
_block_set.resize(count);
for (size_t i = old_size; i < _block_set.size(); ++i) {
char data_path[1024];
LOG(INFO) << "load from data." << i;
snprintf(
data_path, 1024, "%s%s/data.%lu", dict_path.c_str(), v_path.c_str(), i);
int data_fd = open(data_path,
O_RDONLY | O_NONBLOCK,
S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
if (data_fd < 0) {
LOG(WARNING) << "open data file [" << data_path << "] failed";
_block_set[i].s_data.reset();
_block_set[i].size = 0;
continue;
}
_block_set[i].s_data.reset(
reinterpret_cast<char*>(
mmap(NULL, block_size[i], PROT_READ, MAP_SHARED, data_fd, 0)),
std::bind(munmap_deleter, std::placeholders::_1, block_size[i]));
if (_block_set[i].s_data.get() == MAP_FAILED) {
LOG(WARNING) << "map data file [" << data_path << "] failed";
_block_set[i].s_data.reset();
_block_set[i].size = 0;
continue;
}
_block_set[i].size = block_size[i];
_block_set[i].fd = data_fd;
}
return E_OK;
}
int Dict::destroy() {
for (size_t i = 0; i < _block_set.size(); ++i) {
if (_block_set[i].fd > 0) {
close(_block_set[i].fd);
_block_set[i].fd = -1;
}
_block_set[i].size = 0;
}
return E_OK;
}
void Dict::set_version(const std::string& v_path) {
_rw_lock.w_lock();
_version = (v_path == "") ? "" : v_path.substr(1);
_rw_lock.unlock();
}
const std::string& Dict::version() { return _version; }
std::string Dict::guard_version() {
_rw_lock.r_lock();
std::string version = _version;
_rw_lock.unlock();
return version;
}
bool Dict::seek(uint64_t key, char* buff, uint64_t* buff_size) {
slim_hash_map<uint64_t, uint64_t>::iterator it = _slim_table.find(key);
if (it.get_node() == NULL) {
*(reinterpret_cast<uint32_t*>(buff)) = 0;
*buff_size = sizeof(uint32_t);
g_unfound_key_num << 1;
return false;
}
if (it == _slim_table.end()) {
*(reinterpret_cast<uint32_t*>(buff)) = 0;
*buff_size = sizeof(uint32_t);
return false;
}
uint64_t flag = it->second;
uint32_t id = (uint32_t)(flag >> 32);
uint64_t addr = (uint32_t)(flag);
if (_block_set.size() > id) {
uint32_t block_size = _block_set[id].size;
char* block_data = NULL;
block_data = _block_set[id].s_data.get();
if (block_data && addr + sizeof(uint32_t) <= block_size) {
uint32_t len = *(reinterpret_cast<uint32_t*>(block_data + addr));
if (addr + len <= block_size && len >= sizeof(uint32_t)) {
uint64_t default_buffer_size = *buff_size;
*buff_size = len - sizeof(uint32_t);
if (*buff_size > default_buffer_size) {
g_long_value_num << 1;
LOG(ERROR) << "value len is " << *buff_size
<< ", larger than default_buffer_size "
<< default_buffer_size;
return false;
}
memcpy(buff,
(block_data + addr + sizeof(uint32_t)),
len - sizeof(uint32_t));
return true;
} else {
*(reinterpret_cast<uint32_t*>(buff)) = 0;
*buff_size = sizeof(uint32_t);
return false;
}
} else {
*(reinterpret_cast<uint32_t*>(buff)) = 0;
*buff_size = sizeof(uint32_t);
return false;
}
} else {
*(reinterpret_cast<uint32_t*>(buff)) = 0;
*buff_size = sizeof(uint32_t);
return false;
}
return false;
}
} // namespace mcube
} // namespace rec
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "cube/dict_set.h"
namespace rec {
namespace mcube {
DictSet::DictSet(int dict_split) : VirtualDict(), _dict_split(dict_split) {
_dict_set.resize(_dict_split);
}
DictSet::~DictSet() {}
void DictSet::set_base_dict(const VirtualDict* dict) {
const DictSet* dict_set = static_cast<const DictSet*>(dict);
for (size_t i = 0; i < _dict_set.size(); ++i) {
if (!_dict_set[i]) {
_dict_set[i] = std::make_shared<Dict>();
}
_dict_set[i]->set_base_dict(dict_set->_dict_set[i].get());
}
}
int DictSet::load(const std::vector<std::string>& dict_path,
bool in_mem,
const std::string& v_path) {
if ((uint32_t)_dict_split != dict_path.size()) {
return E_DATA_ERROR;
}
for (size_t i = 0; i < dict_path.size(); ++i) {
if (!_dict_set[i]) {
_dict_set[i] = std::make_shared<Dict>();
}
if (_dict_set[i]->load(dict_path[i], in_mem, v_path) != E_OK) {
LOG(ERROR) << "dict split[" << i << "] load failed";
return E_DATA_ERROR;
}
}
_rw_lock.w_lock();
_version = (v_path == "") ? "" : v_path.substr(1);
_rw_lock.unlock();
return E_OK;
}
int DictSet::destroy() {
for (size_t i = 0; i < _dict_set.size(); ++i) {
if (_dict_set[i]->destroy() != E_OK) {
LOG(WARNING) << "dict split[" << i << "] destory failed";
}
}
return E_OK;
}
const std::string& DictSet::version() { return _version; }
std::string DictSet::guard_version() {
_rw_lock.r_lock();
std::string version = _version;
_rw_lock.unlock();
return version;
}
bool DictSet::seek(uint64_t key, char* buff, uint64_t* buff_size) {
return _dict_set[key % _dict_split]->seek(key, buff, buff_size);
}
void DictSet::atom_inc_seek_num() { ++_seek_num; }
void DictSet::atom_dec_seek_num() { --_seek_num; }
uint32_t DictSet::atom_seek_num() { return _seek_num; }
} // namespace mcube
} // namespace rec
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <pthread.h>
#include <fstream>
#include <sstream>
#include "cube/cube_bvar.h"
#include "cube/dict.h"
#include "cube/dict_set.h"
#include "cube/framework.h"
#include "cube/recycle.h"
using BUTIL_RAPIDJSON_NAMESPACE::Document;
using BUTIL_RAPIDJSON_NAMESPACE::Value;
using BUTIL_RAPIDJSON_NAMESPACE::StringRef;
namespace {
static ::rec::mcube::Framework* g_instance = NULL;
}
namespace rec {
namespace mcube {
Framework* Framework::instance() {
if (g_instance == NULL) {
g_instance = new Framework();
}
return g_instance;
}
Framework::~Framework() {}
int Framework::init(uint32_t dict_split, bool in_mem) {
Recycle* rec = Recycle::get_instance();
int ret = rec->init();
if (ret != 0) {
LOG(ERROR) << "init recycle failed";
return ret;
}
/*
_dict[0] = new (std::nothrow) Dict();
_dict[1] = NULL;
*/
init_dict(dict_split);
VirtualDict* cur_dict = _dict[_dict_idx];
_dict_path = "./data";
_max_val_size = 1024;
_in_mem = in_mem;
std::string version_file = _dict_path + "/VERSION";
std::string version_path = "";
std::ifstream input(version_file.c_str());
if (!std::getline(input, version_path)) {
version_path = "";
} else {
version_path = "/" + version_path;
}
input.close();
LOG(INFO) << "load dict from" << _dict_path << version_path;
if (_dict_split > 1) {
_dict_set_path.clear();
_dict_set_path.resize(_dict_split);
std::stringstream dict_set_path_buf;
for (size_t i = 0; i < _dict_split; ++i) {
dict_set_path_buf.str(std::string());
dict_set_path_buf.clear();
dict_set_path_buf << _dict_path << "/" << i;
_dict_set_path[i] = dict_set_path_buf.str();
}
ret = cur_dict->load(_dict_set_path, _in_mem, version_path);
} else {
ret = cur_dict->load(_dict_path, _in_mem, version_path);
}
if (ret != 0) {
LOG(WARNING) << "init: load dict data failed err=" << ret
<< ". starting service with empty data.";
} else {
LOG(INFO) << "load dict from " << _dict_path << version_path << " done";
}
_status = Status::F_RUNNING;
return 0;
}
int Framework::destroy() {
Recycle* recycle = Recycle::get_instance();
int ret = recycle->destroy();
if (ret != 0) {
LOG(WARNING) << "destroy recycle failed";
}
return 0;
}
void Framework::init_dict(uint32_t dict_split) {
_dict_split = dict_split;
if (_dict_split <= 1) {
_dict[0] = new (std::nothrow) Dict();
_dict[1] = NULL;
} else {
_dict[0] = new (std::nothrow) DictSet(_dict_split);
_dict[1] = NULL;
}
}
VirtualDict* Framework::create_dict() {
if (_dict_split > 1) {
return new (std::nothrow) DictSet(_dict_split);
} else {
return new (std::nothrow) Dict();
}
}
void Framework::release(VirtualDict* dict) { dict->atom_dec_seek_num(); }
int Framework::status(Document* res) {
res->SetObject();
Document::AllocatorType& allocator = res->GetAllocator();
Value cur_version;
Value bg_version;
cur_version.SetString(StringRef(get_cur_version().c_str()));
bg_version.SetString(StringRef((get_bg_version().c_str())));
res->AddMember("cur_version", cur_version, allocator);
res->AddMember("bg_version", bg_version, allocator);
res->AddMember("status", _status.load(), allocator);
return 0;
}
int Framework::seek(const DictRequest* req, DictResponse* res) {
g_request_num << 1;
VirtualDict* cur_dict = get_cur_dict();
char* val_buf = new char[_max_val_size];
g_keys_num << req->keys_size();
g_total_key_num << req->keys_size();
std::vector<DictValue*> values(req->keys_size());
for (int i = 0; i < req->keys_size(); ++i) {
values[i] = res->add_values();
}
for (int i = 0; i < req->keys_size(); ++i) {
uint64_t val_size = _max_val_size;
// DictValue* val = res->add_values();
DictValue* val = values[i];
if (cur_dict->seek(req->keys(i), val_buf, &val_size)) {
val->set_status(0);
val->set_value(val_buf, val_size);
} else {
val->set_status(-1);
val->set_value("");
}
}
if (req->version_required()) {
res->set_version(cur_dict->version());
}
// delete [] keys;
delete[] val_buf;
release(cur_dict);
return 0;
}
int Framework::reload(const std::string& v_path) {
int ret = bg_load_base(v_path);
if (ret != 0) {
LOG(WARNING) << "background load dict base failed";
} else {
LOG(INFO) << "background load dict base succ";
}
ret = bg_switch();
if (ret != 0) {
LOG(WARNING) << "switch background dict failed";
} else {
LOG(INFO) << "switch background dict succ";
}
ret = bg_unload();
if (ret != 0) {
LOG(WARNING) << "unload background dict failed";
} else {
LOG(INFO) << "unload background dict succ";
}
return ret;
}
int Framework::patch(const std::string& v_path) {
int ret = bg_load_patch(v_path);
if (ret != 0) {
LOG(WARNING) << "background load dict patch failed";
} else {
LOG(INFO) << "background load dict patch succ";
}
ret = bg_switch();
if (ret != 0) {
LOG(WARNING) << "switch background dict failed";
} else {
LOG(INFO) << "switch background dict succ";
}
ret = bg_unload();
if (ret != 0) {
LOG(WARNING) << "unload background dict failed";
} else {
LOG(INFO) << "unload background dict succ";
}
return ret;
}
int Framework::bg_load_base(const std::string& v_path) {
int ret = bg_unload();
if (ret != 0) {
LOG(WARNING) << "unload background dict failed";
}
VirtualDict* bg_dict = create_dict();
if (!bg_dict) {
LOG(ERROR) << "create Dict failed";
return -1;
}
_status = Status::F_LOADING;
if (_dict_split > 1) {
ret = bg_dict->load(_dict_set_path, _in_mem, v_path);
} else {
ret = bg_dict->load(_dict_path, _in_mem, v_path);
}
_status = Status::F_RUNNING;
if (ret != 0) {
LOG(WARNING) << "load background dict failed";
delete bg_dict;
bg_dict = NULL;
return ret;
} else {
LOG(INFO) << "load background dict succ";
set_bg_dict(bg_dict);
}
return ret;
}
int Framework::bg_load_patch(const std::string& v_path) {
int ret = bg_unload();
if (ret != 0) {
LOG(WARNING) << "unload background dict failed";
}
VirtualDict* bg_dict = create_dict();
if (!bg_dict) {
LOG(ERROR) << "create Dict failed";
return -1;
}
_status = Status::F_LOADING;
if (_dict[_dict_idx]) {
bg_dict->set_base_dict(_dict[_dict_idx]);
LOG(INFO) << "set base dict from current dict " << _dict_idx;
}
if (_dict_split > 1) {
ret = bg_dict->load(_dict_set_path, _in_mem, v_path);
} else {
ret = bg_dict->load(_dict_path, _in_mem, v_path);
}
_status = Status::F_RUNNING;
if (ret != 0) {
LOG(WARNING) << "load background dict failed";
delete bg_dict;
bg_dict = NULL;
return ret;
} else {
LOG(INFO) << "load background dict succ";
set_bg_dict(bg_dict);
}
return ret;
}
int Framework::bg_unload() {
VirtualDict* bg_dict = get_bg_dict();
if (bg_dict != NULL) {
set_bg_dict(NULL);
Recycle* recycle = Recycle::get_instance();
recycle->recycle(bg_dict);
}
LOG(INFO) << "unload background dict succ";
return 0;
}
int Framework::bg_switch() {
_rw_lock.w_lock();
int bg_idx = 1 - _dict_idx;
if (!_dict[bg_idx]) {
LOG(WARNING) << "switch dict failed because NULL";
_rw_lock.unlock();
return -1;
}
_dict_idx = bg_idx;
_rw_lock.unlock();
return 0;
}
int Framework::enable(const std::string& version) {
int ret = 0;
if (version != "" && version == get_cur_version()) {
ret = 0;
} else if (version == get_bg_version()) {
ret = bg_switch();
} else {
LOG(WARNING) << "bg dict version not matched";
ret = -1;
}
return ret;
}
} // namespace mcube
} // namespace rec
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <sys/stat.h>
#include <brpc/server.h>
#include <gflags/gflags.h>
#ifdef BCLOUD
#include "base/logging.h"
#else
#include "butil/logging.h"
#endif
#include "cube/control.h"
#include "cube/framework.h"
#include "cube/server.h"
DEFINE_int32(port, 8000, "TCP Port of this server");
DEFINE_int32(dict_split, 1, "data dict split for dictset");
DEFINE_bool(in_mem,
true,
"True[load data into memory] False[mmap data in disk]");
DECLARE_string(flagfile);
namespace rec {
namespace mcube {
bool g_signal_quit = false;
static void sigint_handler(int) { g_signal_quit = true; } // sigint_handler
int run(int argc, char** argv) {
google::ParseCommandLineFlags(&argc, &argv, true);
// initialize logger instance
#ifdef BCLOUD
logging::LoggingSettings settings;
settings.logging_dest = logging::LOG_TO_FILE;
std::string filename(argv[0]);
filename = filename.substr(filename.find_last_of('/') + 1);
settings.log_file =
strdup((std::string("./log/") + filename + ".log").c_str());
settings.delete_old = logging::DELETE_OLD_LOG_FILE;
logging::InitLogging(settings);
logging::ComlogSinkOptions cso;
cso.process_name = filename;
cso.enable_wf_device = true;
logging::ComlogSink::GetInstance()->Setup(&cso);
#else
if (FLAGS_log_dir == "") {
FLAGS_log_dir = "./log";
}
struct stat st_buf;
int ret = 0;
if ((ret = stat(FLAGS_log_dir.c_str(), &st_buf)) != 0) {
mkdir(FLAGS_log_dir.c_str(), 0777);
ret = stat(FLAGS_log_dir.c_str(), &st_buf);
if (ret != 0) {
LOG(WARNING) << "Log path " << FLAGS_log_dir
<< " not exist, and create fail";
return -1;
}
}
google::InitGoogleLogging(strdup(argv[0]));
FLAGS_logbufsecs = 0;
FLAGS_logbuflevel = -1;
#endif
LOG(INFO) << "Succ initialize logger";
Framework* framework = Framework::instance();
ret = framework->init(FLAGS_dict_split, FLAGS_in_mem);
if (ret != 0) {
LOG(ERROR) << "init predict framework failed";
return ret;
}
Server cube;
Control cntl;
brpc::Server server;
server.set_version("Cube Service");
brpc::ServerOptions option;
if (server.AddService(&cube, brpc::SERVER_DOESNT_OWN_SERVICE) != 0) {
LOG(ERROR) << "Failed to add predict service";
return -1;
}
if (server.AddService(&cntl, brpc::SERVER_DOESNT_OWN_SERVICE) != 0) {
LOG(ERROR) << "Failed to add predict service";
return -1;
}
if (server.Start(FLAGS_port, &option) != 0) {
LOG(ERROR) << "Fail to start service";
return -1;
}
LOG(INFO) << "cube service start";
signal(SIGINT, sigint_handler);
while (!g_signal_quit) {
sleep(1);
}
return 0;
}
} // namespace mcube
} // namespace rec
int main(int argc, char** argv) {
if (google::SetCommandLineOption("bvar_dump", "true").empty()) {
LOG(ERROR) << "Failed to dump bvar file";
return -1;
}
google::SetCommandLineOption("flagfile", "conf/gflags.conf");
return ::rec::mcube::run(argc, argv);
}
/* vim: set ts=4 sw=4 sts=4 tw=100 */
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "cube/recycle.h"
namespace {
using rec::mcube::Recycle;
static Recycle* g_instance = NULL;
}
namespace rec {
namespace mcube {
Recycle* Recycle::get_instance() {
if (g_instance == NULL) {
g_instance = new Recycle();
}
return g_instance;
}
Recycle::Recycle() : _running(false) {}
Recycle::~Recycle() {}
int Recycle::init() {
// init mutex lock;
if (pthread_mutex_init(&_recycle_mutex, NULL) != 0) {
LOG(ERROR) << "init recycle lock failed";
return -1;
}
_running = true;
// init thread;
if (pthread_create(&_recycle_thread,
NULL,
Recycle::recycle_func,
reinterpret_cast<void*>(this)) != 0) {
LOG(ERROR) << "init recycle thread failed";
return -1;
}
return 0;
}
int Recycle::destroy() {
_running = false;
// join thread;
if (pthread_join(_recycle_thread, NULL) != 0) {
LOG(WARNING) << "join recycle thread failed";
}
// destroy lock
if (pthread_mutex_destroy(&_recycle_mutex) != 0) {
LOG(WARNING) << "destroy recycle lock failed";
}
return 0;
}
/*
void Recycle::recycle(Dict* dict) {
lock();
_recycle_list.push(dict);
unlock();
}
*/
void Recycle::recycle(VirtualDict* dict) {
lock();
_recycle_list.push(dict);
unlock();
}
void Recycle::lock() { pthread_mutex_lock(&_recycle_mutex); }
void Recycle::unlock() { pthread_mutex_unlock(&_recycle_mutex); }
void* Recycle::recycle_func(void* arg) {
Recycle* recycle = reinterpret_cast<Recycle*>(arg);
std::queue<VirtualDict*>& recycle_list = recycle->_recycle_list;
while (recycle->_running) {
recycle->lock();
if (recycle_list.empty()) {
recycle->unlock();
sleep(1);
continue;
}
VirtualDict* dict = recycle_list.front();
recycle_list.pop();
recycle->unlock();
while (dict->atom_seek_num() != 0) {
sleep(1);
}
int ret = dict->destroy();
if (ret != 0) {
LOG(WARNING) << "destroy dict failed";
}
delete dict;
}
return NULL;
}
} // namespace mcube
} // namespace rec
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <brpc/server.h>
#include "cube/framework.h"
#include "cube/server.h"
namespace rec {
namespace mcube {
Server::Server() {}
Server::~Server() {}
void Server::seek(::google::protobuf::RpcController* /*cntl_base*/,
const ::rec::mcube::DictRequest* request,
::rec::mcube::DictResponse* response,
::google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);
Framework* framework = Framework::instance();
int ret = framework->seek(request, response);
if (ret != 0) {
LOG(ERROR) << "seek failed err=" << ret;
}
}
} // namespace mcube
} // namespace rec
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <brpc/channel.h>
#include "gtest/gtest.h"
#include "cube/control.h"
namespace rec {
namespace mcube {
namespace unittest {
struct DoNothing : public google::protobuf::Closure {
void Run() {}
};
class ControlTest : public ::testing::Test {
protected:
ControlTest() {}
virtual ~ControlTest() {}
virtual void SetUp() {}
virtual void TearDown() {}
}; // class ControlTest
TEST_F(ControlTest, control_cmd) {
brpc::Controller cntl;
DoNothing do_nothing;
Control control;
control.cmd(&cntl, NULL, NULL, &do_nothing);
ASSERT_EQ(brpc::HTTP_STATUS_BAD_REQUEST, cntl.http_response().status_code());
cntl.Reset();
}
int run(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
} // namespace unittest
} // namespace mcube
} // namespace rec
int main(int argc, char** argv) {
return ::rec::mcube::unittest::run(argc, argv);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册