diff --git a/CMakeLists.txt b/CMakeLists.txt index 177f4f07d78df6d1416255bc57d4dd005838b95c..8542345a8ed3b86fbe58f35eac5ba7719d807c40 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -105,6 +105,7 @@ if (NOT CLIENT_ONLY) list(APPEND EXTERNAL_LIBS opencv) endif() +add_subdirectory(cube) add_subdirectory(configure) add_subdirectory(pdcodegen) add_subdirectory(sdk-cpp) diff --git a/cube/CMakeLists.txt b/cube/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..a3b3573dc38f7910735463292b13120e0647d80a --- /dev/null +++ b/cube/CMakeLists.txt @@ -0,0 +1,16 @@ +# Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License + +add_subdirectory(cube-server) +add_subdirectory(cube-api) diff --git a/cube/cube-api/CMakeLists.txt b/cube/cube-api/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..a80c5a250c95d1c9091756ff63c426c4e71a93a3 --- /dev/null +++ b/cube/cube-api/CMakeLists.txt @@ -0,0 +1,133 @@ +# Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License + +cmake_minimum_required(VERSION 2.8.10) +project(CubeAPI C CXX) + +option(CUBE_API_LINK_SO "Whether cube are linked dynamically" OFF) + +execute_process( + COMMAND bash -c "find ${PROJECT_SOURCE_DIR}/../.. -type d -regex \".*output/include$\" | head -n1 | xargs dirname | tr -d '\n'" + OUTPUT_VARIABLE OUTPUT_PATH +) + + +set(CMAKE_PREFIX_PATH ${OUTPUT_PATH}) + +include(FindThreads) +include(FindProtobuf) +protobuf_generate_cpp(PROTO_SRC PROTO_HEADER idl/cube.proto idl/control.proto) +# include PROTO_HEADER +include_directories(${CMAKE_CURRENT_BINARY_DIR}) + +# Search for libthrift* by best effort. If it is not found and brpc is +# compiled with thrift protocol enabled, a link error would be reported. +find_library(THRIFT_LIB NAMES thrift) +if (NOT THRIFT_LIB) + set(THRIFT_LIB "") +endif() +find_library(THRIFTNB_LIB NAMES thriftnb) +if (NOT THRIFTNB_LIB) + set(THRIFTNB_LIB "") +endif() + +find_path(GFLAGS_INCLUDE_PATH gflags/gflags.h) +find_library(GFLAGS_LIBRARY NAMES gflags libgflags) +if((NOT GFLAGS_INCLUDE_PATH) OR (NOT GFLAGS_LIBRARY)) + message(FATAL_ERROR "Fail to find gflags") +endif() +include_directories(${GFLAGS_INCLUDE_PATH}) + +if(CMAKE_SYSTEM_NAME STREQUAL "Darwin") + include(CheckFunctionExists) + CHECK_FUNCTION_EXISTS(clock_gettime HAVE_CLOCK_GETTIME) + if(NOT HAVE_CLOCK_GETTIME) + set(DEFINE_CLOCK_GETTIME "-DNO_CLOCK_GETTIME_IN_MAC") + endif() +endif() + +set(CMAKE_CXX_FLAGS "${CMAKE_CPP_FLAGS} -DNDEBUG -O2 -D__const__= -pipe -W -Wall -Wno-unused-parameter -fPIC -fno-omit-frame-pointer") + +if(CMAKE_VERSION VERSION_LESS "3.1.3") + if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") + endif() + if(CMAKE_CXX_COMPILER_ID STREQUAL "Clang") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") + endif() +else() + set(CMAKE_CXX_STANDARD 11) + set(CMAKE_CXX_STANDARD_REQUIRED ON) +endif() + +find_path(LEVELDB_INCLUDE_PATH NAMES leveldb/db.h) +find_library(LEVELDB_LIB NAMES leveldb) +if ((NOT LEVELDB_INCLUDE_PATH) OR (NOT LEVELDB_LIB)) + message(FATAL_ERROR "Fail to find leveldb") +endif() +include_directories(${LEVELDB_INCLUDE_PATH}) + +find_library(SSL_LIB NAMES ssl) +if (NOT SSL_LIB) + message(FATAL_ERROR "Fail to find ssl") +endif() + +find_library(CRYPTO_LIB NAMES crypto) +if (NOT CRYPTO_LIB) + message(FATAL_ERROR "Fail to find crypto") +endif() + +add_executable(cube-cli src/cube_cli.cpp src/cube_api.cpp src/meta.cpp + ${PROTO_SRC} ${PROTO_HEADER}) + +add_library(cube-api STATIC src/cube_api.cpp src/meta.cpp + ${PROTO_SRC} ${PROTO_HEADER}) + +set(DYNAMIC_LIB + ${CMAKE_THREAD_LIBS_INIT} + ${GFLAGS_LIBRARY} + ${PROTOBUF_LIBRARIES} + ${LEVELDB_LIB} + ${SSL_LIB} + ${CRYPTO_LIB} + ${THRIFT_LIB} + ${THRIFTNB_LIB} + ) + +if(CMAKE_SYSTEM_NAME STREQUAL "Darwin") + set(DYNAMIC_LIB ${DYNAMIC_LIB} + "-framework CoreFoundation" + "-framework CoreGraphics" + "-framework CoreData" + "-framework CoreText" + "-framework Security" + "-framework Foundation" + "-Wl,-U,_MallocExtension_ReleaseFreeMemory" + "-Wl,-U,_ProfilerStart" + "-Wl,-U,_ProfilerStop") +endif() + +target_link_libraries(cube-cli ${DYNAMIC_LIB} brpc -lpthread -ldl -lz) +target_link_libraries(cube-api ${DYNAMIC_LIB} brpc -lpthread -ldl -lz) + +# install +install(TARGETS cube-api + ARCHIVE DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/lib + ) +install(DIRECTORY ${CMAKE_CURRENT_LIST_DIR}/include + DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/include/cube-api) + +FILE(GLOB inc ${CMAKE_CURRENT_BINARY_DIR}/*.pb.h) +install(FILES ${inc} + DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/include/cube-api) diff --git a/cube/cube-api/conf/cube.conf b/cube/cube-api/conf/cube.conf new file mode 100644 index 0000000000000000000000000000000000000000..4d18315e053c95d8bfd9482da4ac938d84811912 --- /dev/null +++ b/cube/cube-api/conf/cube.conf @@ -0,0 +1,26 @@ +[{ + "dict_name": "table_test_1", + "shard": 5, + "dup": 1, + "timeout": 200, + "retry": 3, + "backup_request": 10, + "type": "ipport", + "load_balancer": "la", + "nodes": [{ + "ip": "127.0.0.1", + "port": 8950 + },{ + "ip": "127.0.0.1", + "port": 8951 + },{ + "ip": "127.0.0.1", + "port": 8952 + },{ + "ip": "127.0.0.1", + "port": 8953 + },{ + "ip": "127.0.0.1", + "port": 8954 + }] +}] diff --git a/cube/cube-api/conf/cube.conf.ipport_list b/cube/cube-api/conf/cube.conf.ipport_list new file mode 100644 index 0000000000000000000000000000000000000000..d8c3a07308bf53b42fc9dade1adc183068410fe4 --- /dev/null +++ b/cube/cube-api/conf/cube.conf.ipport_list @@ -0,0 +1,13 @@ +[{ + "dict_name": "test_dict", + "shard": 1, + "dup": 1, + "timeout": 200, + "retry": 3, + "backup_request": 100, + "type": "ipport_list", + "load_balancer": "rr", + "nodes": [{ + "ipport_list": "list://127.0.0.1:8000,127.0.0.1:8001" + }] +}] diff --git a/cube/cube-api/idl/control.proto b/cube/cube-api/idl/control.proto new file mode 100644 index 0000000000000000000000000000000000000000..ec4a2a72bf7109f2ed983f93be9a877682045523 --- /dev/null +++ b/cube/cube-api/idl/control.proto @@ -0,0 +1,24 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto2"; + +package rec.mcube; + +option cc_generic_services = true; + +message HttpRequest {}; +message HttpResponse {}; + +service ControlService { rpc cmd(HttpRequest) returns (HttpResponse); }; diff --git a/cube/cube-api/idl/cube.proto b/cube/cube-api/idl/cube.proto new file mode 100644 index 0000000000000000000000000000000000000000..2b81cb9d07d68a8f00eb7ef7eaeb38461ddd1809 --- /dev/null +++ b/cube/cube-api/idl/cube.proto @@ -0,0 +1,43 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto2"; + +package rec.mcube; + +option cc_generic_services = true; + +message DictRequest { + repeated uint64 keys = 1; + optional bool version_required = 2 [ default = false ]; +}; + +message DictValue { + required uint32 status = 1; + required bytes value = 2; +}; + +message DictResponse { + repeated DictValue values = 1; + optional string version = 2; +}; + +message ParallelDictRequest { repeated DictRequest requests = 2; }; + +message ParallelDictResponse { repeated DictResponse responses = 1; }; + +service DictService { + rpc seek(DictRequest) returns (DictResponse); + rpc parallel_seek(ParallelDictRequest) returns (ParallelDictResponse); +}; diff --git a/cube/cube-api/include/cube_api.h b/cube/cube-api/include/cube_api.h new file mode 100644 index 0000000000000000000000000000000000000000..aee82f6dd1d009d5b5b53b3f6fe8de0d591e5d51 --- /dev/null +++ b/cube/cube-api/include/cube_api.h @@ -0,0 +1,110 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include + +#include "brpc/server.h" +#include "cube/cube-api/cube.pb.h" +#include "cube/cube-api/include/meta.h" + +namespace rec { +namespace mcube { + +struct CubeValue { + int error; + std::string buff; +}; + +class CubeAPI { + public: + static CubeAPI* instance(); + + static void cleanup(); + + public: + CubeAPI(); + ~CubeAPI(); + + /** + * @brief: init api, not thread safe, should be called before seek. + * @param [in] conf_file: filepath of config file. + * @return: error code. 0 for succ. + */ + int init(const char* conf_file); + + /** + * @brief: destroy api, should be called before program exit. + * @return: error code. 0 for succ. + */ + int destroy(); + + /** brief: get data from cube server, thread safe. + * @param [in] dict_name: dict name. + * @param [in] key: key to seek. + * @param [out] val: value of key. + * @return: error code. 0 for succ. + */ + int seek(const std::string& dict_name, const uint64_t& key, CubeValue* val); + + /** + * @brief: get data from cube server, thread safe. + * @param [in] dict_name: dict name. + * @param [in] keys: keys to seek. + * @param [out] vals: value of keys. + * @return: TODO + */ + int seek(const std::string& dict_name, + const std::vector& keys, + std::vector* vals); + + int opt_seek(const std::string& dict_name, + const std::vector& keys, + std::function parse); + + /** + * @brief: get data from cube server, thread safe. + * @param [in] dict_name: dict name. + * @param [in] keys: keys to seek. + * @param [out] vals: value of keys. + * @param [out] version: data version. + * @return: TODO + */ + int seek(const std::string& dict_name, + const std::vector& keys, + std::vector* vals, + std::string* version); + + int opt_seek(const std::string& dict_name, + const std::vector& keys, + std::function parse, + std::string* version); + + public: + static const char* error_msg(int error_code); + + private: + CubeAPI(const CubeAPI&) {} + + private: + Meta* _meta; + bthread_key_t _tls_key; + // void split(const std::vector& keys); +}; + +} // namespace mcube +} // namespace rec diff --git a/cube/cube-api/include/cube_api_bvar.h b/cube/cube-api/include/cube_api_bvar.h new file mode 100644 index 0000000000000000000000000000000000000000..b366c9ed63a0ac63b27186efbc295ad11d61c056 --- /dev/null +++ b/cube/cube-api/include/cube_api_bvar.h @@ -0,0 +1,33 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include "bvar/bvar.h" +#include "bvar/recorder.h" +#include "bvar/window.h" + +namespace rec { +namespace mcube { +bvar::Adder g_cube_keys_num("cube_keys_num"); +bvar::Window> g_cube_keys_num_minute( + "cube_keys_num_minute", &g_cube_keys_num, 60); +bvar::Adder g_cube_keys_miss_num("cube_keys_miss_num"); +bvar::Window> g_cube_keys_miss_num_minute( + "cube_keys_miss_num_minute", &g_cube_keys_miss_num, 60); +bvar::IntRecorder g_cube_value_size("cube_value_size"); +bvar::Window g_cube_value_size_win( + "cube_value_size_win", &g_cube_value_size, bvar::FLAGS_bvar_dump_interval); +} // namespace mcube +} // namespace rec diff --git a/cube/cube-api/include/error.h b/cube/cube-api/include/error.h new file mode 100644 index 0000000000000000000000000000000000000000..b437a1d737ca8ca5bde5a34bb31428313eb9655a --- /dev/null +++ b/cube/cube-api/include/error.h @@ -0,0 +1,32 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +namespace rec { +namespace mcube { + +struct CubeError { + enum Code { + E_OK = 0, + E_NO_SUCH_KEY = -1, + E_SEEK_FAILED = -2, + E_ALL_SEEK_FAILED = -3, + }; // enum Code + + static const char* error_msg(Code code); +}; // struct CubeError + +} // namespace mcube +} // namespace rec diff --git a/cube/cube-api/include/meta.h b/cube/cube-api/include/meta.h new file mode 100644 index 0000000000000000000000000000000000000000..ec891720c55503ad77de24cd178db53b2170023e --- /dev/null +++ b/cube/cube-api/include/meta.h @@ -0,0 +1,100 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include + +#include "brpc/channel.h" +#include "brpc/parallel_channel.h" +#include "butil/third_party/rapidjson/document.h" +#include "bvar/bvar.h" + +namespace rec { +namespace mcube { + +struct MetaInfo { + MetaInfo() : dict_name(""), shard_num(0), dup_num(0) {} + ~MetaInfo() { + for (size_t i = 0; i != cube_conn.size(); ++i) { + if (cube_conn[i] != NULL) { + delete cube_conn[i]; + cube_conn[i] = NULL; + } + } + cube_conn.clear(); + } + + std::string dict_name; + int shard_num; + int dup_num; + std::vector<::brpc::Channel*> cube_conn; + bvar::Adder cube_request_num; + bvar::Adder cube_rpcfail_num; +}; + +class Meta { + public: + static Meta* instance(); + + public: + ~Meta(); + + int init(const char* conf_file); + + int destroy(); + + MetaInfo* get_meta(const std::string& dict_name); + + const std::vector metas(); + + void reset_bvar(const std::string& dict_name); + + private: + MetaInfo* create_meta(const BUTIL_RAPIDJSON_NAMESPACE::Value& meta_config); + + int create_meta_from_ipport(MetaInfo* meta, + const BUTIL_RAPIDJSON_NAMESPACE::Value& nodes, + int timeout, + int retry, + int backup_request); + + int create_meta_from_ipport_list( + MetaInfo* meta, + const BUTIL_RAPIDJSON_NAMESPACE::Value& nodes, + int timeout, + int retry, + int backup_request, + const std::string& load_balancer); + + int create_meta_from_ipport_parallel( + MetaInfo* meta, + const BUTIL_RAPIDJSON_NAMESPACE::Value& nodes, + int timeout, + int retry, + int backup_request); + + private: + Meta() {} + Meta(const Meta&) {} + + private: + std::unordered_map _metas; +}; // class Meta + +} // namespace mcube +} // namespace rec diff --git a/cube/cube-api/src/cube_api.cpp b/cube/cube-api/src/cube_api.cpp new file mode 100644 index 0000000000000000000000000000000000000000..8a9cebb9aa92f1bdb13c47cb7c065eaf2738c73f --- /dev/null +++ b/cube/cube-api/src/cube_api.cpp @@ -0,0 +1,676 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "cube/cube-api/include/cube_api.h" +#include +#include +#include +#include "cube/cube-api/include/cube_api_bvar.h" + +#include "cube/cube-api/include/error.h" +#include "cube/cube-api/include/meta.h" + +namespace { +static ::rec::mcube::CubeAPI* g_ins = NULL; +} + +namespace rec { +namespace mcube { + +struct DictRpcData { + std::vector sub_reqs; + std::vector sub_res; +}; + +static void dict_rpc_deleter(void* d) { delete static_cast(d); } + +static void sub_seek_done(DictResponse* response, + brpc::Controller* cntl, + std::vector* offset, + std::function parse) { + // std::unique_ptr response_guard(response); + // std::unique_ptr cntl_guard(cntl); + if (cntl->Failed()) { + for (int i = 0; i < response->values().size(); ++i) { + DictValue* val = response->mutable_values(i); + val->set_status(CubeError::E_SEEK_FAILED); + *val->mutable_value() = ""; + parse(val, (*offset)[i]); + } + } else { + for (int i = 0; i < response->values().size(); ++i) { + DictValue* val = response->mutable_values(i); + parse(val, (*offset)[i]); + } + } +} + +struct DoNothing : public google::protobuf::Closure { + void Run() {} +}; + +CubeAPI* CubeAPI::instance() { + if (g_ins == NULL) { + g_ins = new CubeAPI(); + } + + return g_ins; +} + +void CubeAPI::cleanup() { + if (g_ins != NULL) { + g_ins->destroy(); + delete g_ins; + g_ins = NULL; + } +} + +CubeAPI::CubeAPI() : _meta(NULL) {} +CubeAPI::~CubeAPI() { CHECK_EQ(0, bthread_key_delete(_tls_key)); } + +int CubeAPI::init(const char* conf_file) { + // init meta + + _meta = Meta::instance(); + int ret = _meta->init(conf_file); + + if (ret != 0) { + LOG(ERROR) << "Init cube meta failed"; + return ret; + } + + CHECK_EQ(0, bthread_key_create(&_tls_key, dict_rpc_deleter)); + + return 0; +} + +int CubeAPI::destroy() { + // Meta* meta = Meta::instance(); + if (_meta == NULL) { + LOG(WARNING) << "Destroy, cube meta is null"; + return 0; + } + + int ret = _meta->destroy(); + + if (ret != 0) { + LOG(WARNING) << "Destroy cube meta failed"; + } + + return 0; +} + +int CubeAPI::seek(const std::string& dict_name, + const uint64_t& key, + CubeValue* val) { + if (_meta == NULL) { + LOG(ERROR) << "seek, meta is null"; + return -1; + } + + MetaInfo* info = _meta->get_meta(dict_name); + + if (info == NULL) { + LOG(ERROR) << "get meta [" << dict_name << "] failed"; + return -1; + } + + int shard_id = key % info->shard_num; + DictRequest req; + DictResponse res; + + req.add_keys(key); + + ::brpc::Channel* chan = info->cube_conn[shard_id]; + + DictService_Stub* stub = new DictService_Stub(chan); + brpc::Controller* cntl = new ::brpc::Controller(); + + stub->seek(cntl, &req, &res, NULL); + + int ret = CubeError::E_OK; + if (cntl->Failed()) { + info->cube_rpcfail_num << 1; + + val->error = CubeError::E_SEEK_FAILED; + val->buff.assign(""); + + ret = CubeError::E_ALL_SEEK_FAILED; + LOG(WARNING) << "cube seek from shard [" << shard_id << "] failed [" + << cntl->ErrorText() << "]"; + } else if (res.values().size() > 0) { + DictValue* res_val = res.mutable_values(0); + if (static_cast(res_val->status()) == CubeError::E_NO_SUCH_KEY) { + g_cube_keys_miss_num << 1; + } + val->error = res_val->status(); + val->buff.swap(*res_val->mutable_value()); + } else { + val->error = CubeError::E_SEEK_FAILED; + val->buff.assign(""); + ret = CubeError::E_ALL_SEEK_FAILED; + } + info->cube_request_num << 1; + g_cube_keys_num << 1; + + // cleanup + delete stub; + stub = NULL; + delete cntl; + cntl = NULL; + + return ret; +} + +int CubeAPI::seek(const std::string& dict_name, + const std::vector& keys, + std::vector* vals) { + // Meta* meta = Meta::instance(); + if (_meta == NULL) { + LOG(ERROR) << "seek, meta is null"; + return -1; + } + + MetaInfo* info = _meta->get_meta(dict_name); + + if (info == NULL) { + LOG(ERROR) << "get meta [" << dict_name << "] failed"; + return -1; + } + + int shard_num = info->shard_num; + + DictRpcData* rpc_data = + static_cast(bthread_getspecific(_tls_key)); + + if (rpc_data == NULL) { + rpc_data = new DictRpcData; + CHECK_EQ(0, bthread_setspecific(_tls_key, rpc_data)); + } + + rpc_data->sub_reqs.resize(shard_num); + rpc_data->sub_res.resize(shard_num); + + std::vector> offset; + offset.resize(shard_num); + int init_cnt = keys.size() * 2 / shard_num; + + for (int i = 0; i < shard_num; ++i) { + offset[i].reserve(init_cnt); + } + + for (size_t i = 0; i < keys.size(); ++i) { + uint64_t shard_id = keys[i] % shard_num; + rpc_data->sub_reqs[shard_id].add_keys(keys[i]); + offset[shard_id].push_back(i); + } + + std::vector stubs(shard_num); + std::vector cntls(shard_num); + + for (int i = 0; i < shard_num; ++i) { + ::brpc::Channel* chan = info->cube_conn[i]; + stubs[i] = new DictService_Stub(chan); + cntls[i] = new ::brpc::Controller(); + } + + DoNothing do_nothing; + + for (int i = 0; i < shard_num; ++i) { + stubs[i]->seek( + cntls[i], &rpc_data->sub_reqs[i], &rpc_data->sub_res[i], &do_nothing); + } + + int cntls_failed_cnt = 0; + + for (int i = 0; i < shard_num; ++i) { + brpc::Join(cntls[i]->call_id()); + + if (cntls[i]->Failed()) { + ++cntls_failed_cnt; + LOG(WARNING) << "cube seek from shard [" << i << "] failed [" + << cntls[i]->ErrorText() << "]"; + } + } + + int ret = CubeError::E_OK; + + info->cube_request_num << 1; + + if (cntls_failed_cnt > 0) { + info->cube_rpcfail_num << 1; + if (cntls_failed_cnt == shard_num) { + ret = CubeError::E_ALL_SEEK_FAILED; + } else { + ret = CubeError::E_SEEK_FAILED; + } + } + + vals->resize(keys.size()); + + // merge + size_t miss_cnt = 0; + for (int i = 0; i < shard_num; ++i) { + if (cntls[i]->Failed()) { + for (int j = 0; j < rpc_data->sub_res[i].values().size(); ++j) { + (*vals)[offset[i][j]].error = CubeError::E_SEEK_FAILED; + (*vals)[offset[i][j]].buff.assign(""); + } + } else { + for (int j = 0; j < rpc_data->sub_res[i].values().size(); ++j) { + DictValue* val = rpc_data->sub_res[i].mutable_values(j); + if (static_cast(val->status()) == CubeError::E_NO_SUCH_KEY) { + miss_cnt += 1; + } + (*vals)[offset[i][j]].error = val->status(); + (*vals)[offset[i][j]].buff.swap(*val->mutable_value()); + } + } + } + + // bvar stats + g_cube_keys_num << keys.size(); + if (keys.size() > 0) { + g_cube_keys_miss_num << miss_cnt; + g_cube_value_size << (*vals)[0].buff.size(); + } + + // cleanup + for (int i = 0; i < shard_num; ++i) { + delete stubs[i]; + stubs[i] = NULL; + delete cntls[i]; + cntls[i] = NULL; + rpc_data->sub_reqs[i].Clear(); + rpc_data->sub_res[i].Clear(); + } + + return ret; +} + +int CubeAPI::opt_seek(const std::string& dict_name, + const std::vector& keys, + std::function parse) { + if (_meta == NULL) { + LOG(ERROR) << "seek, meta is null"; + return -1; + } + + MetaInfo* info = _meta->get_meta(dict_name); + + if (info == NULL) { + LOG(ERROR) << "get meta [" << dict_name << "] failed"; + return -1; + } + + int shard_num = info->shard_num; + + DictRpcData* rpc_data = + static_cast(bthread_getspecific(_tls_key)); + + if (rpc_data == NULL) { + rpc_data = new DictRpcData; + CHECK_EQ(0, bthread_setspecific(_tls_key, rpc_data)); + } + + rpc_data->sub_reqs.resize(shard_num); + rpc_data->sub_res.resize(shard_num); + + std::vector> offset; + offset.resize(shard_num); + int init_cnt = keys.size() * 2 / shard_num; + + for (int i = 0; i < shard_num; ++i) { + offset[i].reserve(init_cnt); + } + + for (size_t i = 0; i < keys.size(); ++i) { + uint64_t shard_id = keys[i] % shard_num; + rpc_data->sub_reqs[shard_id].add_keys(keys[i]); + offset[shard_id].push_back(i); + } + + std::vector stubs(shard_num); + std::vector cntls(shard_num); + + for (int i = 0; i < shard_num; ++i) { + ::brpc::Channel* chan = info->cube_conn[i]; + stubs[i] = new DictService_Stub(chan); + cntls[i] = new ::brpc::Controller(); + } + + for (int i = 0; i < shard_num; ++i) { + stubs[i]->seek(cntls[i], + &rpc_data->sub_reqs[i], + &rpc_data->sub_res[i], + brpc::NewCallback(sub_seek_done, + &rpc_data->sub_res[i], + cntls[i], + &(offset[i]), + parse)); + } + + int cntls_failed_cnt = 0; + + for (int i = 0; i < shard_num; ++i) { + brpc::Join(cntls[i]->call_id()); + + if (cntls[i]->Failed()) { + ++cntls_failed_cnt; + LOG(WARNING) << "cube seek from shard [" << i << "] failed [" + << cntls[i]->ErrorText() << "]"; + } + } + + int ret = CubeError::E_OK; + + info->cube_request_num << 1; + + if (cntls_failed_cnt > 0) { + info->cube_rpcfail_num << 1; + if (cntls_failed_cnt == shard_num) { + ret = CubeError::E_ALL_SEEK_FAILED; + } else { + ret = CubeError::E_SEEK_FAILED; + } + } + + // merge + size_t miss_cnt = 0; + for (int i = 0; i < shard_num; ++i) { + if (!cntls[i]->Failed()) { + for (int j = 0; j < rpc_data->sub_res[i].values().size(); ++j) { + if (static_cast(rpc_data->sub_res[i].values(j).status()) == + CubeError::E_NO_SUCH_KEY) { + ++miss_cnt; + } + } + } + } + + // bvar stats + g_cube_keys_num << keys.size(); + if (keys.size() > 0) { + g_cube_keys_miss_num << miss_cnt; + } + + // cleanup + for (int i = 0; i < shard_num; ++i) { + delete stubs[i]; + stubs[i] = NULL; + delete cntls[i]; + cntls[i] = NULL; + rpc_data->sub_reqs[i].Clear(); + rpc_data->sub_res[i].Clear(); + } + + return ret; +} + +int CubeAPI::seek(const std::string& dict_name, + const std::vector& keys, + std::vector* vals, + std::string* version) { + // Meta* meta = Meta::instance(); + if (_meta == NULL) { + LOG(ERROR) << "seek, meta is null"; + return -1; + } + + MetaInfo* info = _meta->get_meta(dict_name); + + if (info == NULL) { + LOG(ERROR) << "get meta [" << dict_name << "] failed"; + return -1; + } + + int shard_num = info->shard_num; + + DictRpcData* rpc_data = + static_cast(bthread_getspecific(_tls_key)); + + if (rpc_data == NULL) { + rpc_data = new DictRpcData; + CHECK_EQ(0, bthread_setspecific(_tls_key, rpc_data)); + } + + rpc_data->sub_reqs.resize(shard_num); + rpc_data->sub_res.resize(shard_num); + + std::vector> offset; + offset.resize(shard_num); + int init_cnt = keys.size() * 2 / shard_num; + + for (int i = 0; i < shard_num; ++i) { + offset[i].reserve(init_cnt); + rpc_data->sub_reqs[i].set_version_required(true); + } + + for (size_t i = 0; i < keys.size(); ++i) { + uint64_t shard_id = keys[i] % shard_num; + rpc_data->sub_reqs[shard_id].add_keys(keys[i]); + offset[shard_id].push_back(i); + } + + std::vector stubs(shard_num); + std::vector cntls(shard_num); + + for (int i = 0; i < shard_num; ++i) { + ::brpc::Channel* chan = info->cube_conn[i]; + stubs[i] = new DictService_Stub(chan); + cntls[i] = new ::brpc::Controller(); + } + + DoNothing do_nothing; + + for (int i = 0; i < shard_num; ++i) { + stubs[i]->seek( + cntls[i], &rpc_data->sub_reqs[i], &rpc_data->sub_res[i], &do_nothing); + } + + int cntls_failed_cnt = 0; + + for (int i = 0; i < shard_num; ++i) { + brpc::Join(cntls[i]->call_id()); + + if (cntls[i]->Failed()) { + ++cntls_failed_cnt; + LOG(WARNING) << "cube seek from shard [" << i << "] failed [" + << cntls[i]->ErrorText() << "]"; + } + } + + int ret = CubeError::E_OK; + info->cube_request_num << 1; + + if (cntls_failed_cnt > 0) { + info->cube_rpcfail_num << 1; + if (cntls_failed_cnt == shard_num) { + ret = CubeError::E_ALL_SEEK_FAILED; + } else { + ret = CubeError::E_SEEK_FAILED; + } + } + + vals->resize(keys.size()); + + // merge + size_t miss_cnt = 0; + for (int i = 0; i < shard_num; ++i) { + if (cntls[i]->Failed()) { + for (int j = 0; j < rpc_data->sub_res[i].values().size(); ++j) { + (*vals)[offset[i][j]].error = CubeError::E_SEEK_FAILED; + (*vals)[offset[i][j]].buff.assign(""); + } + } else { + for (int j = 0; j < rpc_data->sub_res[i].values().size(); ++j) { + DictValue* val = rpc_data->sub_res[i].mutable_values(j); + if (static_cast(val->status()) == CubeError::E_NO_SUCH_KEY) { + miss_cnt += 1; + } + (*vals)[offset[i][j]].error = val->status(); + (*vals)[offset[i][j]].buff.swap(*val->mutable_value()); + } + if (version->compare(rpc_data->sub_res[i].version()) < 0) { + *version = rpc_data->sub_res[i].version(); + } + } + } + + // bvar stats + g_cube_keys_num << keys.size(); + if (keys.size() > 0) { + g_cube_keys_miss_num << miss_cnt; + g_cube_value_size << (*vals)[0].buff.size(); + } + + // cleanup + for (int i = 0; i < shard_num; ++i) { + delete stubs[i]; + stubs[i] = NULL; + delete cntls[i]; + cntls[i] = NULL; + rpc_data->sub_reqs[i].Clear(); + rpc_data->sub_res[i].Clear(); + } + + return ret; +} + +int CubeAPI::opt_seek(const std::string& dict_name, + const std::vector& keys, + std::function parse, + std::string* version) { + if (_meta == NULL) { + LOG(ERROR) << "seek, meta is null"; + return -1; + } + + MetaInfo* info = _meta->get_meta(dict_name); + + if (info == NULL) { + LOG(ERROR) << "get meta [" << dict_name << "] failed"; + return -1; + } + + int shard_num = info->shard_num; + + DictRpcData* rpc_data = + static_cast(bthread_getspecific(_tls_key)); + + if (rpc_data == NULL) { + rpc_data = new DictRpcData; + CHECK_EQ(0, bthread_setspecific(_tls_key, rpc_data)); + } + + rpc_data->sub_reqs.resize(shard_num); + rpc_data->sub_res.resize(shard_num); + + std::vector> offset; + offset.resize(shard_num); + int init_cnt = keys.size() * 2 / shard_num; + + for (int i = 0; i < shard_num; ++i) { + offset[i].reserve(init_cnt); + rpc_data->sub_reqs[i].set_version_required(true); + } + + for (size_t i = 0; i < keys.size(); ++i) { + uint64_t shard_id = keys[i] % shard_num; + rpc_data->sub_reqs[shard_id].add_keys(keys[i]); + offset[shard_id].push_back(i); + } + + std::vector stubs(shard_num); + std::vector cntls(shard_num); + + for (int i = 0; i < shard_num; ++i) { + ::brpc::Channel* chan = info->cube_conn[i]; + stubs[i] = new DictService_Stub(chan); + cntls[i] = new ::brpc::Controller(); + } + + for (int i = 0; i < shard_num; ++i) { + stubs[i]->seek(cntls[i], + &rpc_data->sub_reqs[i], + &rpc_data->sub_res[i], + brpc::NewCallback(sub_seek_done, + &rpc_data->sub_res[i], + cntls[i], + &(offset[i]), + parse)); + } + + int cntls_failed_cnt = 0; + + for (int i = 0; i < shard_num; ++i) { + brpc::Join(cntls[i]->call_id()); + + if (cntls[i]->Failed()) { + ++cntls_failed_cnt; + LOG(WARNING) << "cube seek from shard [" << i << "] failed [" + << cntls[i]->ErrorText() << "]"; + } + } + + int ret = CubeError::E_OK; + + info->cube_request_num << 1; + + if (cntls_failed_cnt > 0) { + info->cube_rpcfail_num << 1; + if (cntls_failed_cnt == shard_num) { + ret = CubeError::E_ALL_SEEK_FAILED; + } else { + ret = CubeError::E_SEEK_FAILED; + } + } + + // merge + size_t miss_cnt = 0; + for (int i = 0; i < shard_num; ++i) { + if (!cntls[i]->Failed()) { + for (int j = 0; j < rpc_data->sub_res[i].values().size(); ++j) { + if (static_cast(rpc_data->sub_res[i].values(j).status()) == + CubeError::E_NO_SUCH_KEY) { + ++miss_cnt; + } + } + if (version->compare(rpc_data->sub_res[i].version()) < 0) { + *version = rpc_data->sub_res[i].version(); + } + } + } + + // bvar stats + g_cube_keys_num << keys.size(); + if (keys.size() > 0) { + g_cube_keys_miss_num << miss_cnt; + } + + // cleanup + for (int i = 0; i < shard_num; ++i) { + delete stubs[i]; + stubs[i] = NULL; + delete cntls[i]; + cntls[i] = NULL; + rpc_data->sub_reqs[i].Clear(); + rpc_data->sub_res[i].Clear(); + } + + return ret; +} + +} // namespace mcube +} // namespace rec diff --git a/cube/cube-api/src/cube_cli.cpp b/cube/cube-api/src/cube_cli.cpp new file mode 100644 index 0000000000000000000000000000000000000000..6df52710782e9a9e6ff1db9985a031669869080d --- /dev/null +++ b/cube/cube-api/src/cube_cli.cpp @@ -0,0 +1,159 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include "cube/cube-api/include/cube_api.h" + +#define TIME_FLAG(flag) \ + struct timeval flag; \ + gettimeofday(&(flag), NULL); + +DEFINE_string(config_file, "./cube.conf", "m-cube config file"); +DEFINE_string(keys, "keys", "keys to seek"); +DEFINE_string(dict, "dict", "dict to seek"); +DEFINE_uint64(batch, 500, "batch size"); +DEFINE_int32(timeout, 200, "timeout in ms"); +DEFINE_int32(retry, 3, "retry times"); +DEFINE_bool(print_output, false, "print output flag"); + +namespace { +inline uint64_t time_diff(const struct timeval& start_time, + const struct timeval& end_time) { + return (end_time.tv_sec - start_time.tv_sec) * 1000000 + + (end_time.tv_usec - start_time.tv_usec); +} +} + +namespace rec { +namespace mcube { +std::string string_to_hex(const std::string& input) { + static const char* const lut = "0123456789ABCDEF"; + size_t len = input.length(); + + std::string output; + output.reserve(2 * len); + for (size_t i = 0; i < len; ++i) { + const unsigned char c = input[i]; + output.push_back(lut[c >> 4]); + output.push_back(lut[c & 15]); + } + return output; +} + +int run(int argc, char** argv) { + google::ParseCommandLineFlags(&argc, &argv, true); + + CubeAPI* cube = CubeAPI::instance(); + int ret = cube->init(FLAGS_config_file.c_str()); + if (ret != 0) { + LOG(ERROR) << "init cube api failed err=" << ret; + return ret; + } + + FILE* key_file = fopen(FLAGS_keys.c_str(), "r"); + if (key_file == NULL) { + LOG(ERROR) << "open key file [" << FLAGS_keys << "] failed"; + return -1; + } + + std::atomic seek_counter(0); + std::atomic seek_cost_total(0); + uint64_t seek_cost_max = 0; + uint64_t seek_cost_min = 500000; + + char buffer[1024]; + std::vector keys; + std::vector values; + + while (fgets(buffer, 1024, key_file)) { + uint64_t key = strtoul(buffer, NULL, 10); + keys.push_back(key); + int ret = 0; + if (keys.size() > FLAGS_batch) { + TIME_FLAG(seek_start); + ret = cube->seek(FLAGS_dict, keys, &values); + TIME_FLAG(seek_end); + if (ret != 0) { + LOG(WARNING) << "cube seek failed"; + } else if (FLAGS_print_output) { + for (size_t i = 0; i < keys.size(); ++i) { + fprintf(stdout, + "key:%lu value:%s\n", + keys[i], + string_to_hex(values[i].buff).c_str()); + } + } + ++seek_counter; + uint64_t seek_cost = time_diff(seek_start, seek_end); + seek_cost_total += seek_cost; + if (seek_cost > seek_cost_max) { + seek_cost_max = seek_cost; + } + if (seek_cost < seek_cost_min) { + seek_cost_min = seek_cost; + } + + keys.clear(); + values.clear(); + } + } + + if (keys.size() > 0) { + int ret = 0; + values.resize(keys.size()); + TIME_FLAG(seek_start); + ret = cube->seek(FLAGS_dict, keys, &values); + TIME_FLAG(seek_end); + if (ret != 0) { + LOG(WARNING) << "cube seek failed"; + } else if (FLAGS_print_output) { + for (size_t i = 0; i < keys.size(); ++i) { + fprintf(stdout, + "key:%lu value:%s\n", + keys[i], + string_to_hex(values[i].buff).c_str()); + } + } + + ++seek_counter; + uint64_t seek_cost = time_diff(seek_start, seek_end); + seek_cost_total += seek_cost; + if (seek_cost > seek_cost_max) { + seek_cost_max = seek_cost; + } + if (seek_cost < seek_cost_min) { + seek_cost_min = seek_cost; + } + } + fclose(key_file); + + ret = cube->destroy(); + if (ret != 0) { + LOG(WARNING) << "destroy cube api failed err=" << ret; + } + + uint64_t seek_cost_avg = seek_cost_total / seek_counter; + LOG(INFO) << "seek cost avg = " << seek_cost_avg; + LOG(INFO) << "seek cost max = " << seek_cost_max; + LOG(INFO) << "seek cost min = " << seek_cost_min; + + return 0; +} + +} // namespace mcube +} // namespace rec + +int main(int argc, char** argv) { return ::rec::mcube::run(argc, argv); } diff --git a/cube/cube-api/src/meta.cpp b/cube/cube-api/src/meta.cpp new file mode 100644 index 0000000000000000000000000000000000000000..69ce43a08e0f5460dfa4e440958ff247458f6140 --- /dev/null +++ b/cube/cube-api/src/meta.cpp @@ -0,0 +1,293 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "cube/cube-api/include/meta.h" + +#include +#include +#include +#include +#include + +#include "cube/cube-api/cube.pb.h" + +namespace { +static ::rec::mcube::Meta* g_ins = NULL; +} + +namespace rec { +namespace mcube { + +Meta* Meta::instance() { + if (g_ins == NULL) { + g_ins = new Meta(); + } + + return g_ins; +} + +Meta::~Meta() {} + +int Meta::init(const char* conf_file) { + std::ifstream ifs(conf_file); + if (!ifs.is_open()) { + LOG(ERROR) << "open conf file [" << conf_file << "]"; + return -1; + } + + std::string content((std::istreambuf_iterator(ifs)), + std::istreambuf_iterator()); + + BUTIL_RAPIDJSON_NAMESPACE::Document document; + document.Parse(content.c_str()); + + int ret = 0; + if (document.IsArray()) { + for (auto it = document.Begin(); it != document.End(); ++it) { + MetaInfo* info = create_meta(*it); + + if (info == NULL) { + LOG(ERROR) << "create dict meta failed"; + ret = -1; + break; + } + + LOG(INFO) << "init cube dict [" << info->dict_name << "] succ"; + _metas[info->dict_name] = info; + } + } else { + LOG(ERROR) + << "conf file [" << conf_file + << "] json schema error, please ensure that the json is an array"; + } + + if (_metas.size() == 0) { + LOG(ERROR) << "no valid cube"; + ret = -1; + } + + return ret; +} + +int Meta::destroy() { + for (auto it = _metas.begin(); it != _metas.end(); ++it) { + if (it->second != NULL) { + delete it->second; + it->second = NULL; + } + } + _metas.clear(); + return 0; +} + +MetaInfo* Meta::get_meta(const std::string& dict_name) { + auto iter = _metas.find(dict_name); + + if (iter != _metas.end()) { + return iter->second; + } + + return NULL; +} + +void Meta::reset_bvar(const std::string& dict_name) { + auto iter = _metas.find(dict_name); + + if (iter != _metas.end()) { + iter->second->cube_request_num.reset(); + iter->second->cube_rpcfail_num.reset(); + } else { + LOG(WARNING) << "reset_bvar to invalid dict [" << dict_name << "]"; + } +} + +MetaInfo* Meta::create_meta(const BUTIL_RAPIDJSON_NAMESPACE::Value& config) { + if (!config.HasMember("dict_name") || !config.HasMember("shard") || + !config.HasMember("dup") || !config.HasMember("timeout") || + !config.HasMember("retry") || !config.HasMember("backup_request") || + !config.HasMember("type")) { + LOG(ERROR) << "create meta failed, required fields miss"; + return NULL; + } + + if (!config["dict_name"].IsString() || !config["shard"].IsInt() || + !config["dup"].IsInt() || !config["timeout"].IsInt() || + !config["retry"].IsInt() || !config["backup_request"].IsInt() || + !config["type"].IsString()) { + LOG(ERROR) << "create meta failed, required fields type error"; + return NULL; + } + + MetaInfo* info = new MetaInfo(); + info->dict_name = config["dict_name"].GetString(); + info->shard_num = config["shard"].GetInt(); + info->dup_num = config["dup"].GetInt(); + + int timeout = config["timeout"].GetInt(); + int retry = config["retry"].GetInt(); + int backup_request = config["backup_request"].GetInt(); + std::string type = config["type"].GetString(); + std::string load_balancer = "rr"; + + if (config.HasMember("load_balancer") && config["load_balancer"].IsString()) { + load_balancer = config["load_balancer"].GetString(); + } + + int ret = 0; + + if (type.compare("ipport") == 0) { + ret = create_meta_from_ipport( + info, config["nodes"], timeout, retry, backup_request); + } else if (type.compare("ipport_list") == 0) { + ret = create_meta_from_ipport_list( + info, config["nodes"], timeout, retry, backup_request, load_balancer); + } else if (type.compare("ipport_parallel") == 0) { + ret = create_meta_from_ipport_parallel( + info, config["nodes"], timeout, retry, backup_request); + } else { + ret = -1; + } + + if (ret != 0) { + LOG(ERROR) << "create meta failed error=" << ret; + delete info; + return NULL; + } + + return info; +} + +int Meta::create_meta_from_ipport(MetaInfo* meta, + const BUTIL_RAPIDJSON_NAMESPACE::Value& nodes, + int timeout, + int retry, + int backup_request) { + brpc::ChannelOptions options; + options.timeout_ms = timeout; + options.max_retry = retry; + + if (backup_request > 0) { + options.backup_request_ms = backup_request; + } + + meta->cube_conn.resize(meta->shard_num, NULL); + + for (int i = 0; i < static_cast(nodes.Size()); ++i) { + const BUTIL_RAPIDJSON_NAMESPACE::Value& node = nodes[i]; + const std::string& node_ip = node["ip"].GetString(); + int node_port = node["port"].GetInt(); + + ::brpc::Channel* chan = new ::brpc::Channel(); + + if (chan->Init(node_ip.c_str(), node_port, &options) != 0) { + LOG(ERROR) << "Init connection to [" << node_ip << ":" << node_port + << "] failed"; + delete chan; + return -1; + } + + meta->cube_conn[i] = chan; + } + + return 0; +} + +int Meta::create_meta_from_ipport_list( + MetaInfo* meta, + const BUTIL_RAPIDJSON_NAMESPACE::Value& nodes, + int timeout, + int retry, + int backup_request, + const std::string& load_balancer) { + brpc::ChannelOptions options; + options.timeout_ms = timeout; + options.max_retry = retry; + + if (backup_request > 0) { + options.backup_request_ms = backup_request; + } + + meta->cube_conn.resize(meta->shard_num, NULL); + + for (int i = 0; i < static_cast(nodes.Size()); ++i) { + const BUTIL_RAPIDJSON_NAMESPACE::Value& node = nodes[i]; + const std::string& ipport_list = node["ipport_list"].GetString(); + + ::brpc::Channel* chan = new ::brpc::Channel(); + + if (chan->Init(ipport_list.c_str(), load_balancer.c_str(), &options) != 0) { + LOG(ERROR) << "Init connection to [" << ipport_list << "] failed"; + delete chan; + return -1; + } + + meta->cube_conn[i] = chan; + } + + return 0; +} + +int Meta::create_meta_from_ipport_parallel( + MetaInfo* meta, + const BUTIL_RAPIDJSON_NAMESPACE::Value& nodes, + int timeout, + int retry, + int backup_request) { + if (nodes.Size() < 1) { + LOG(ERROR) << "Config nodes size less than 0"; + return -1; + } + + brpc::ChannelOptions options; + options.timeout_ms = timeout; + options.max_retry = retry; + + if (backup_request > 0) { + options.backup_request_ms = backup_request; + } + + meta->cube_conn.resize(meta->shard_num, NULL); + + for (int i = 0; i < meta->shard_num; ++i) { + const BUTIL_RAPIDJSON_NAMESPACE::Value& node = nodes[0]; + const std::string& node_ip = node["ip"].GetString(); + int node_port = node["port"].GetInt(); + + ::brpc::Channel* chan = new ::brpc::Channel(); + + if (chan->Init(node_ip.c_str(), node_port, &options) != 0) { + LOG(ERROR) << "Init connection to [" << node_ip << ":" << node_port + << "] failed"; + delete chan; + return -1; + } + + meta->cube_conn[i] = chan; + } + + return 0; +} + +const std::vector Meta::metas() { + std::vector metas; + + for (auto i : _metas) { + metas.push_back(i.second); + } + + return metas; +} + +} // namespace mcube +} // namespace rec diff --git a/cube/cube-server/CMakeLists.txt b/cube/cube-server/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..6d80ef013372cce19e048b784e32dbd440ec2a06 --- /dev/null +++ b/cube/cube-server/CMakeLists.txt @@ -0,0 +1,123 @@ +# Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License + +cmake_minimum_required(VERSION 2.8.10) +project(Cube C CXX) + +option(CUBE_LINK_SO "Whether cube are linked dynamically" OFF) + +execute_process( + COMMAND bash -c "find ${PROJECT_SOURCE_DIR}/../.. -type d -regex \".*output/include$\" | head -n1 | xargs dirname | tr -d '\n'" + OUTPUT_VARIABLE OUTPUT_PATH +) + + +set(CMAKE_PREFIX_PATH ${OUTPUT_PATH}) + +include_directories(SYSTEM ${CMAKE_CURRENT_LIST_DIR}/include) +include_directories(SYSTEM ${CMAKE_CURRENT_BINARY_DIR}/../) + +include(FindProtobuf) +protobuf_generate_cpp(PROTO_SRC PROTO_HEADER proto/cube.proto proto/control.proto) +# include PROTO_HEADER +include_directories(${CMAKE_CURRENT_BINARY_DIR}) +include(FindThreads) + +# compiled with thrift protocol enabled, a link error would be reported. +find_library(THRIFT_LIB NAMES thrift) +if (NOT THRIFT_LIB) + set(THRIFT_LIB "") +endif() +find_library(THRIFTNB_LIB NAMES thriftnb) +if (NOT THRIFTNB_LIB) + set(THRIFTNB_LIB "") +endif() + +set(CMAKE_CXX_FLAGS "${CMAKE_CPP_FLAGS} -DNDEBUG -O2 -D__const__= -pipe -W -Wall -Wno-unused-parameter -fPIC -fno-omit-frame-pointer") + +if(CMAKE_VERSION VERSION_LESS "3.1.3") + if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") + endif() + if(CMAKE_CXX_COMPILER_ID STREQUAL "Clang") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") + endif() +else() + set(CMAKE_CXX_STANDARD 11) + set(CMAKE_CXX_STANDARD_REQUIRED ON) +endif() + +find_path(LEVELDB_INCLUDE_PATH NAMES leveldb/db.h) +find_library(LEVELDB_LIB NAMES leveldb) +if ((NOT LEVELDB_INCLUDE_PATH) OR (NOT LEVELDB_LIB)) + message(FATAL_ERROR "Fail to find leveldb") +endif() +include_directories(${LEVELDB_INCLUDE_PATH}) + +find_library(SSL_LIB NAMES ssl) +if (NOT SSL_LIB) + message(FATAL_ERROR "Fail to find ssl") +endif() + +find_library(CRYPTO_LIB NAMES crypto) +if (NOT CRYPTO_LIB) + message(FATAL_ERROR "Fail to find crypto") +endif() + +set(SRC_LIST src/server.cpp + src/cube_bvar.cpp + src/dict.cpp + src/dict_set.cpp + src/recycle.cpp + src/framework.cpp + src/control.cpp + src/server.cpp) + +add_executable(cube ${SRC_LIST} src/main.cpp + ${PROTO_SRC} ${PROTO_HEADER}) + +set(DYNAMIC_LIB + ${CMAKE_THREAD_LIBS_INIT} + ${PROTOBUF_LIBRARIES} + ${LEVELDB_LIB} + ${SSL_LIB} + ${CRYPTO_LIB} + ${THRIFT_LIB} + ${THRIFTNB_LIB} + ) + +if(CMAKE_SYSTEM_NAME STREQUAL "Darwin") + set(DYNAMIC_LIB ${DYNAMIC_LIB} + "-framework CoreFoundation" + "-framework CoreGraphics" + "-framework CoreData" + "-framework CoreText" + "-framework Security" + "-framework Foundation" + "-Wl,-U,_MallocExtension_ReleaseFreeMemory" + "-Wl,-U,_ProfilerStart" + "-Wl,-U,_ProfilerStop") +endif() + +target_link_libraries(cube ${DYNAMIC_LIB} brpc -lpthread -ldl -lz) + +add_executable(cube_test ${SRC_LIST} test/cube_test.cpp + ${PROTO_SRC} ${PROTO_HEADER}) + +target_link_libraries(cube_test ${DYNAMIC_LIB} brpc gtest -lpthread -ldl -lz) + +# install +install(TARGETS cube + RUNTIME DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/bin + ) diff --git a/cube/cube-server/conf/gflags.conf b/cube/cube-server/conf/gflags.conf new file mode 100644 index 0000000000000000000000000000000000000000..011bb107d2a210e2dff74f550e19a0d49fe90e03 --- /dev/null +++ b/cube/cube-server/conf/gflags.conf @@ -0,0 +1,4 @@ +--port=8027 +--dict_split=1 +--in_mem=true +--log_dir=./log/ \ No newline at end of file diff --git a/cube/cube-server/include/cube/control.h b/cube/cube-server/include/cube/control.h new file mode 100644 index 0000000000000000000000000000000000000000..91e2f635e48896eb88db7edd91b53c42ccef28f8 --- /dev/null +++ b/cube/cube-server/include/cube/control.h @@ -0,0 +1,63 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +#include "cube-server/control.pb.h" + +#include "butil/third_party/rapidjson/document.h" +#include "butil/third_party/rapidjson/prettywriter.h" +#include "butil/third_party/rapidjson/stringbuffer.h" + +namespace rec { +namespace mcube { + +class Control : public ControlService { + public: + Control(); + + virtual ~Control(); + + virtual void cmd(::google::protobuf::RpcController* controller, + const ::rec::mcube::HttpRequest* request, + ::rec::mcube::HttpResponse* response, + ::google::protobuf::Closure* done); + + private: + int handle_status(const BUTIL_RAPIDJSON_NAMESPACE::Document& cmd, + BUTIL_RAPIDJSON_NAMESPACE::Document* res); + + int handle_reload_base(const BUTIL_RAPIDJSON_NAMESPACE::Document& cmd, + const std::string& v_path); + int handle_reload_patch(const BUTIL_RAPIDJSON_NAMESPACE::Document& cmd, + const std::string& v_path); + + int handle_bg_load_base(const BUTIL_RAPIDJSON_NAMESPACE::Document& cmd, + const std::string& v_path); + int handle_bg_load_patch(const BUTIL_RAPIDJSON_NAMESPACE::Document& cmd, + const std::string& v_path); + + int handle_bg_unload(const BUTIL_RAPIDJSON_NAMESPACE::Document& cmd); + + int handle_bg_switch(const BUTIL_RAPIDJSON_NAMESPACE::Document& cmd); + + int handle_enable(const BUTIL_RAPIDJSON_NAMESPACE::Document& cmd); + + std::mutex _cmd_mutex; +}; // class Control + +} // namespace mcube +} // namespace rec diff --git a/cube/cube-server/include/cube/cube_bvar.h b/cube/cube-server/include/cube/cube_bvar.h new file mode 100644 index 0000000000000000000000000000000000000000..5859c7670e5c0b7ee5d55991d3636e1c273c8836 --- /dev/null +++ b/cube/cube-server/include/cube/cube_bvar.h @@ -0,0 +1,38 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include "bvar/bvar.h" +#include "bvar/recorder.h" +#include "bvar/window.h" + +namespace bvar { +DECLARE_bool(bvar_dump); +DECLARE_string(bvar_dump_file); +} + +namespace rec { +namespace mcube { + +extern bvar::IntRecorder g_keys_num; +extern bvar::Adder g_request_num; +extern bvar::IntRecorder g_data_load_time; +extern bvar::IntRecorder g_data_size; +extern bvar::Adder g_long_value_num; +extern bvar::Adder g_unfound_key_num; +extern bvar::Adder g_total_key_num; + +} // namespace mcube +} // namespace rec diff --git a/cube/cube-server/include/cube/dict.h b/cube/cube-server/include/cube/dict.h new file mode 100644 index 0000000000000000000000000000000000000000..7f8a08de5077602bd712ac134ff7a4f12bc4cdfa --- /dev/null +++ b/cube/cube-server/include/cube/dict.h @@ -0,0 +1,96 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +#include +#include +#include +#include + +#include "cube/rw_lock.h" +#include "cube/slim_hash_map.h" +#include "cube/virtual_dict.h" + +namespace rec { +namespace mcube { + +class Dict : public VirtualDict { + public: + Dict(); + + virtual ~Dict(); + + /* + void set_base_dict(const Dict * dict){ + _base_dict = dict; + } + */ + + virtual void set_base_dict(const VirtualDict* dict) { + _base_dict = static_cast(dict); + } + + int load(const std::string& dict_path, + bool in_mem, + const std::string& v_path); + + int destroy(); + + bool seek(uint64_t key, char* buff, uint64_t* buff_size); + + const std::string& version(); // no lock, used by framework seek + + std::string guard_version(); + + void atom_inc_seek_num(); + + void atom_dec_seek_num(); + + uint32_t atom_seek_num(); + + private: + int load_index(const std::string& dict_path, const std::string& v_path); + + int load_data(const std::string& dict_path, const std::string& v_path); + + int load_data_mmap(const std::string& dict_path, const std::string& v_path); + + void set_version(const std::string& v_path); + + private: + struct DataBlock { + DataBlock() : size(0), fd(-1) {} + + std::shared_ptr s_data; + uint32_t size; + int fd; + }; + + private: + // boost::unordered_map _table; + slim_hash_map _slim_table; + std::vector _block_set; + std::atomic _seek_num; + const Dict* _base_dict; + std::string _version; + RWLock _rw_lock; +}; // class Dict + +typedef std::shared_ptr DictPtr; + +} // namespace mcube +} // namespace rec diff --git a/cube/cube-server/include/cube/dict_set.h b/cube/cube-server/include/cube/dict_set.h new file mode 100644 index 0000000000000000000000000000000000000000..4a9037fa04ee5edb327f75c80cac3974e9632de6 --- /dev/null +++ b/cube/cube-server/include/cube/dict_set.h @@ -0,0 +1,62 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include + +#include "cube/dict.h" +#include "cube/virtual_dict.h" + +namespace rec { +namespace mcube { + +class DictSet : public VirtualDict { + public: + explicit DictSet(int dict_split); + + virtual ~DictSet(); + + virtual int load(const std::vector& dict_path, + bool in_mem, + const std::string& v_path); + + virtual int destroy(); + + virtual const std::string& version(); + + virtual std::string guard_version(); + + virtual void set_base_dict(const VirtualDict* dict); + + virtual bool seek(uint64_t key, char* buff, uint64_t* buff_size); + + virtual void atom_inc_seek_num(); + + virtual void atom_dec_seek_num(); + + virtual uint32_t atom_seek_num(); + + private: + std::atomic _seek_num{0}; + std::vector _dict_set; + int _dict_split{0}; + std::string _version{""}; + RWLock _rw_lock; +}; // DictSet + +} // namespace mcube +} // namespace rec diff --git a/cube/cube-server/include/cube/error.h b/cube/cube-server/include/cube/error.h new file mode 100644 index 0000000000000000000000000000000000000000..de1f417fec1291453d46a2d98e65283559f47e7e --- /dev/null +++ b/cube/cube-server/include/cube/error.h @@ -0,0 +1,47 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +namespace rec { +namespace mcube { + +/* +struct Error { + enum ErrorCode { + E_OK = 0, + E_CONF_ERROR, + E_PARAM_ERROR, + E_SYS_ERROR, + E_INTERNAL_ERROR, + E_NETWORK_ERROR, + E_DATA_ERROR, + E_NO_IMPL, + E_UNKNOWN, + E_NUM, + E_NOT_FOUND, + E_NO_RES, + E_NO_REQ_ERROR, + E_TYPE_CONVERT_ERROR + }; // enum ErrorCode +}; // struct Error +*/ + +const int E_OK = 0; +const int E_DATA_ERROR = -1; +const int E_OOM = -2; +const int E_NOT_IMPL = -3; + +} // namespace mcube +} // namespace rec diff --git a/cube/cube-server/include/cube/framework.h b/cube/cube-server/include/cube/framework.h new file mode 100644 index 0000000000000000000000000000000000000000..5debe94559a2830e2f4e6ded8a0543c6be80706a --- /dev/null +++ b/cube/cube-server/include/cube/framework.h @@ -0,0 +1,128 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include + +#include "butil/third_party/rapidjson/document.h" +#include "butil/third_party/rapidjson/prettywriter.h" +#include "butil/third_party/rapidjson/stringbuffer.h" + +#include "cube-server/cube.pb.h" +#include "cube/rw_lock.h" +#include "cube/virtual_dict.h" + +namespace rec { +namespace mcube { + +struct Status { + enum StatusCode { F_RUNNING = 0, F_LOADING }; +}; + +class Framework { + public: + static Framework* instance(); + + public: + ~Framework(); + + int init(uint32_t dict_split, bool in_mem); + + int destroy(); + + int status(BUTIL_RAPIDJSON_NAMESPACE::Document* res); + + int seek(const DictRequest* req, DictResponse* res); + + int reload(const std::string& v_path); + + int patch(const std::string& v_path); + + // load dict base + int bg_load_base(const std::string& v_path); + + // load dict patch + int bg_load_patch(const std::string& v_path); + + int bg_unload(); + + int bg_switch(); + + int enable(const std::string& version); + + private: + void init_dict(uint32_t dict_split); + + VirtualDict* create_dict(); + + private: + VirtualDict* get_cur_dict() { + _rw_lock.r_lock(); + VirtualDict* dict = _dict[_dict_idx]; + dict->atom_inc_seek_num(); + _rw_lock.unlock(); + return dict; + } + + std::string get_cur_version() { + _rw_lock.r_lock(); + VirtualDict* dict = _dict[_dict_idx]; + std::string version = dict->version(); + _rw_lock.unlock(); + return version; + } + + VirtualDict* get_bg_dict() const { return _dict[1 - _dict_idx]; } + + std::string get_bg_version() { + _bg_rw_lock.r_lock(); + VirtualDict* dict = _dict[1 - _dict_idx]; + std::string version = ""; + if (dict) { + version = dict->guard_version(); + } + _bg_rw_lock.unlock(); + return version; + } + + void set_bg_dict(VirtualDict* dict) { + _bg_rw_lock.w_lock(); + _dict[1 - _dict_idx] = dict; + _bg_rw_lock.unlock(); + } + + void release(VirtualDict* dict); + + private: + Framework() : _dict_idx(0) {} + Framework(const Framework&) {} + + private: + VirtualDict* _dict[2]{nullptr, nullptr}; + int _dict_idx{0}; + std::string _dict_path{""}; + bool _in_mem{true}; + std::atomic_int _status; + RWLock _rw_lock; + RWLock _bg_rw_lock; + uint32_t _max_val_size{0}; + uint32_t _dict_split{0}; + std::vector _dict_set_path; +}; // class Framework + +} // namespace mcube +} // namespace rec diff --git a/cube/cube-server/include/cube/recycle.h b/cube/cube-server/include/cube/recycle.h new file mode 100644 index 0000000000000000000000000000000000000000..a520bd875a0b2d38e4df71874c1cc4949f813af9 --- /dev/null +++ b/cube/cube-server/include/cube/recycle.h @@ -0,0 +1,76 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include + +#include "cube/dict.h" + +namespace rec { +namespace mcube { + +class Recycle { + public: + static Recycle* get_instance(); + + public: + ~Recycle(); + + /** + * @brief init recycle module + */ + int init(); + + /** + * @brief destroy recycle module and wait recycle quit + */ + int destroy(); + + /** + * @brief send dict to recycle module + */ + // void recycle(Dict* dict); + + /** + * @brief send dict to recycle module + */ + void recycle(VirtualDict* dict); + + private: + static void* recycle_func(void*); + + Recycle(); + + /** + * @brief lock recycle list + */ + void lock(); + + /** + * @brief unlock recycle list + */ + void unlock(); + + private: + pthread_t _recycle_thread; + pthread_mutex_t _recycle_mutex; + // std::queue _recycle_list; + std::queue _recycle_list; + bool _running; +}; // class Recycle + +} // namespace mcube +} // namespace rec diff --git a/cube/cube-server/include/cube/rw_lock.h b/cube/cube-server/include/cube/rw_lock.h new file mode 100644 index 0000000000000000000000000000000000000000..9343f74e8cf0388422b7db8f7a9fe39ceac2c9e9 --- /dev/null +++ b/cube/cube-server/include/cube/rw_lock.h @@ -0,0 +1,39 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +namespace rec { +namespace mcube { + +class RWLock { + public: + RWLock() { pthread_rwlock_init(&_lock, NULL); } + + ~RWLock() { pthread_rwlock_destroy(&_lock); } + + void r_lock() { pthread_rwlock_rdlock(&_lock); } + + void w_lock() { pthread_rwlock_wrlock(&_lock); } + + void unlock() { pthread_rwlock_unlock(&_lock); } + + private: + pthread_rwlock_t _lock; +}; // class RWLock + +} // namespace mcube +} // namespace rec diff --git a/cube/cube-server/include/cube/server.h b/cube/cube-server/include/cube/server.h new file mode 100644 index 0000000000000000000000000000000000000000..fa5a4a465d865c91a63647785b51b124db51b43e --- /dev/null +++ b/cube/cube-server/include/cube/server.h @@ -0,0 +1,35 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include "cube-server/cube.pb.h" + +namespace rec { +namespace mcube { + +class Server : public DictService { + public: + Server(); + + virtual ~Server(); + + virtual void seek(::google::protobuf::RpcController* controller, + const ::rec::mcube::DictRequest* request, + ::rec::mcube::DictResponse* response, + ::google::protobuf::Closure* done); +}; + +} // namespace mcube +} // namespace rec diff --git a/cube/cube-server/include/cube/slim_hash_map.h b/cube/cube-server/include/cube/slim_hash_map.h new file mode 100644 index 0000000000000000000000000000000000000000..761ce9214f628a824f257611c07b07dab2503a48 --- /dev/null +++ b/cube/cube-server/include/cube/slim_hash_map.h @@ -0,0 +1,593 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include + +#include +#include +#include + +#ifdef BCLOUD +#include "base/logging.h" +#else +#include "butil/logging.h" +#endif + +/** + * hash node + */ +#pragma pack(push, 1) +template +struct slim_hash_node_t { + std::pair data; + uint32_t next; +}; +#pragma pack(pop) +/** + * hash map + */ +template +class slim_hash_map { + public: + typedef key_t key_type; + typedef value_t mapped_type; + typedef std::pair value_type; + typedef size_t size_type; + + private: + typedef slim_hash_node_t hash_node_t; + static const size_type MAX_BLOCK_NUM = 512; + static const size_type BLOCK_SIZE = 8388608; + static const uint32_t DUMP_GRANULARITY = + 1000; /**< the granularity of every save job */ + static const int iterm_size = sizeof(key_t) + sizeof(value_t); + hash_node_t* m_blockAddr[MAX_BLOCK_NUM]; + uint32_t m_nFreeEntries; + uint32_t m_nNextEntry; + size_type m_nBlockNum; + uint32_t* m_hashTable; + size_type m_nHashSize; + size_type m_nSize; + std::vector m_fileLens; + + struct DumpBar { + FILE* fp; /**< the fp of the file to dump */ + char* buf; /**< buffer to serialize the date */ + int buf_size; /**< the size of buf */ + // int startIndex; /**< the begin index of the last dump job */ + int lastIndex; /**< the index where last dump job ended */ + + DumpBar() : fp(NULL), buf(NULL) {} + + DumpBar(FILE* _fp, char* _buf, int _buf_size, int _lastIndex) + : fp(_fp), buf(_buf), buf_size(_buf_size), lastIndex(_lastIndex) {} + ~DumpBar() { reset(); } + + void reset() { + if (fp) { + fclose(fp); + } + + fp = NULL; + free(buf); + buf = NULL; + buf_size = 0; + lastIndex = 0; + + return; + } + }; + + DumpBar dumpBar; + + public: + struct iterator { + friend class slim_hash_map; + + private: + slim_hash_map* container; + hash_node_t* node; + uint32_t index; + + public: + iterator(slim_hash_map* container, uint32_t index, hash_node_t* node) { + this->container = container; + this->index = index; + this->node = node; + } + hash_node_t* get_node() { return node; } + uint32_t cur_index() { return index; } + std::pair* operator->() { return &node->data; } + std::pair& operator*() { return node->data; } + iterator& operator++() { + if (node == NULL) { + return *this; + } + + if (node->next == 0) { + ++index; + + while (index < container->m_nHashSize) { + if (container->m_hashTable[index] != 0) { + break; + } + + ++index; + } + + if (index == container->m_nHashSize) { + index = -1; + node = NULL; + return *this; + } + + node = container->get_node(container->m_hashTable[index]); + } else { + node = container->get_node(node->next); + } + + return *this; + } + iterator operator++(int) { + iterator it_bak = *this; + this->operator++(); + return it_bak; + } + + enum IteratorStatus { END = 0, OneBlockEND = 1, CONTINUE = 2 }; + + IteratorStatus IncrIterSenseBlock() { + if (NULL == node) { + return END; + } + + if (0 == node->next) { + ++index; + + while (index < container->m_nHashSize) { + if (container->m_hashTable[index] != 0) { + break; + } + + ++index; + } + + if (index == container->m_nHashSize) { + index = -1; + node = NULL; + return END; + } + + node = container->get_node(container->m_hashTable[index]); + return OneBlockEND; + } + + node = container->get_node(node->next); + return CONTINUE; + } + bool operator==(const iterator& it) const { + return container == it.container && index == it.index && node == it.node; + } + bool operator!=(const iterator& it) const { return !operator==(it); } + }; + + slim_hash_map() { + m_nFreeEntries = 0; + m_nNextEntry = 1; + memset(m_blockAddr, 0, sizeof(m_blockAddr)); + m_nBlockNum = 0; + m_hashTable = 0; + m_nSize = 0; + m_nHashSize = 0; + } + + void destroy() { + delete[] m_hashTable; + m_hashTable = NULL; + m_nHashSize = 0; + + for (size_type i = 0; i < m_nBlockNum; ++i) { + delete[] m_blockAddr[i]; + m_blockAddr[i] = NULL; + } + m_nBlockNum = 0; + } + + ~slim_hash_map() { destroy(); } + + int copy_data_from(const slim_hash_map& rhs) { + destroy(); + + if (rhs.m_nHashSize > 0) { + m_hashTable = new (std::nothrow) uint32_t[rhs.m_nHashSize]; + if (!m_hashTable) { + LOG(ERROR) << "new m_hashTable failed, size " + << sizeof(uint32_t) * rhs.m_nHashSize; + return -1; + } + memcpy(m_hashTable, rhs.m_hashTable, sizeof(uint32_t) * rhs.m_nHashSize); + } + m_nHashSize = rhs.m_nHashSize; + + for (m_nBlockNum = 0; m_nBlockNum < rhs.m_nBlockNum; ++m_nBlockNum) { + m_blockAddr[m_nBlockNum] = new (std::nothrow) hash_node_t[BLOCK_SIZE]; + if (!m_blockAddr[m_nBlockNum]) { + LOG(ERROR) << "new m_blockAddr[" << m_nBlockNum << "] failed, size " + << sizeof(hash_node_t) * BLOCK_SIZE; + return -1; + } + + memcpy(m_blockAddr[m_nBlockNum], + rhs.m_blockAddr[m_nBlockNum], + sizeof(hash_node_t) * BLOCK_SIZE); + } + + m_nFreeEntries = rhs.m_nFreeEntries; + m_nNextEntry = rhs.m_nNextEntry; + m_nSize = rhs.m_nSize; + m_fileLens = rhs.m_fileLens; + + return 0; + } + + size_type size() const { return m_nSize; } + size_type max_size() const { return (size_type)-1; } + bool empty() const { return m_nSize == 0; } + size_type bucket_count() const { return m_nHashSize; } + iterator begin(uint32_t index = 0) { + for (size_type i = index; i < m_nHashSize; ++i) { + if (m_hashTable[i] != 0) { + return iterator(this, i, get_node(m_hashTable[i])); + } + } + + return end(); + } + iterator end() { return iterator(this, -1, NULL); } + iterator find(const key_type& key) { + if (m_nHashSize == 0) { + return iterator(this, 0, NULL); + } + size_type index = key % m_nHashSize; + hash_node_t* node = get_node(m_hashTable[index]); + + while (node != NULL && node->data.first != key) { + node = get_node(node->next); + } + + if (node == NULL) { + return end(); + } + + return iterator(this, index, node); + } + size_type erase(const key_type& key) { + size_type index = key % m_nHashSize; + uint32_t* last_node_pointer = &m_hashTable[index]; + uint32_t addr = m_hashTable[index]; + hash_node_t* node = get_node(addr); + + while (node != NULL && node->data.first != key) { + last_node_pointer = &node->next; + addr = node->next; + node = get_node(addr); + } + + if (node == NULL) { + return 0; + } + + *last_node_pointer = node->next; + release_node(addr, node); + return 1; + } + void erase(const iterator& it) { + uint32_t* last_node_pointer = &m_hashTable[it.index]; + uint32_t addr = m_hashTable[it.index]; + hash_node_t* node = get_node(addr); + + while (node != it.node) { + last_node_pointer = &node->next; + addr = node->next; + node = get_node(addr); + } + + *last_node_pointer = node->next; + release_node(addr, node); + return; + } + static uint64_t next_prime(uint64_t n) { + static const uint64_t s_prime_list[] = { + 53ul, 97ul, 193ul, 389ul, 769ul, + 1543ul, 3079ul, 6151ul, 12289ul, 24593ul, + 49157ul, 98317ul, 196613ul, 393241ul, 786433ul, + 1572869ul, 3145739ul, 6291469ul, 12582917ul, 25165843ul, + 50331653ul, 100663319ul, 200000033ul, 201326611ul, 210000047ul, + 220000051ul, 230000059ul, 240000073ul, 250000103ul, 260000137ul, + 270000161ul, 280000241ul, 290000251ul, 300000277ul, 310000283ul, + 320000287ul, 330000371ul, 340000387ul, 350000411ul, 360000451ul, + 370000489ul, 380000519ul, 390000521ul, 400000543ul, 402653189ul, + 805306457ul, 1610612741ul, 3221225473ul, 4294967291ul}; + const size_t s_num_primes = sizeof(s_prime_list) / sizeof(s_prime_list[0]); + const uint64_t* first = s_prime_list; + const uint64_t* last = s_prime_list + static_cast(s_num_primes); + const uint64_t* pos = std::lower_bound(first, last, n); + return (pos == last) ? *(last - 1) : *pos; + } + + void resize(size_type size) { + uint32_t* hashTable; + size = next_prime(size); + + if (size <= m_nHashSize) { + LOG(INFO) << "next prime[" << size << "] <= m_nHashSize[" << m_nHashSize + << "], no need to resize"; + return; + } + LOG(INFO) << "resize m_nHashSize[" << m_nHashSize << "] to next prime[" + << size << "]"; + + try { + hashTable = new uint32_t[size]; + } catch (std::exception& e) { + LOG(ERROR) << "std::exception[" << e.what() << "] was thrown!"; + return; + } catch (...) { + LOG(ERROR) << "unkown exception was thrown!"; + return; + } + + memset(hashTable, 0, sizeof(uint32_t) * size); + + if (m_hashTable == NULL) { + m_hashTable = hashTable; + } else { + // rehash + for (size_type bucket = 0; bucket < m_nHashSize; ++bucket) { + uint32_t first_addr = m_hashTable[bucket]; + hash_node_t* first = get_node(first_addr); + + while (first) { + size_type new_bucket = first->data.first % size; + uint32_t next_addr = first->next; + m_hashTable[bucket] = next_addr; + first->next = hashTable[new_bucket]; + hashTable[new_bucket] = first_addr; + first_addr = next_addr; + first = get_node(first_addr); + } + } + + delete[] m_hashTable; + m_hashTable = hashTable; + } + + m_nHashSize = size; + } + mapped_type& operator[](const key_type& key) { + uint32_t index = key % m_nHashSize; + hash_node_t* node = get_node(m_hashTable[index]); + + while (node != NULL && node->data.first != key) { + node = get_node(node->next); + } + + if (node != NULL) { + return node->data.second; + } + + return add_node(index, key)->data.second; + } + void clear() { + memset(m_hashTable, 0, sizeof(uint32_t) * m_nHashSize); + m_nNextEntry = 1; + m_nFreeEntries = 0; + m_nSize = 0; + } + bool load(const char* file) { + // clear(); + int size = sizeof(key_t) + sizeof(value_t); + FILE* fp = fopen(file, "rb"); + char* buf = reinterpret_cast(malloc(size * 100000)); + + if (fp == NULL || buf == NULL) { + return false; + } + + size_t read_count; + bool err = false; + key_t key; + value_t value; + + while ((read_count = fread(buf, size, 100000, fp)) != 0) { + if (ferror(fp) != 0) { + err = true; + break; + } + + for (int i = 0; i < static_cast(read_count); ++i) { + key = *(reinterpret_cast(buf + i * size)); + value = *(reinterpret_cast(buf + i * size + sizeof(key_t))); + (*this)[key] = value; + } + } + + if (err) { + clear(); + } + + fclose(fp); + free(buf); + return !err; + } + + /** + * @brief dump data in memory into file. + * DUMP_GRANULARITYs entry will dump every execution. + * + * @param file the file name to store the data + * + * @return -1: failed to dump data; + * 0 : complete the dumping; + * greater than 0: the index of the entry in hashtable where dump job + * goes + */ + int save(const char* file) { + static const int FACTOR = 1000; + bool writerErr = false; + hash_node_t* node = NULL; + + if (NULL == file && NULL == dumpBar.fp) { + return -1; + } + + if (file) { // the begin of the dump job + dumpBar.fp = fopen(file, "wb"); + + if (NULL == dumpBar.fp) { + goto ERR_RET; + } + + dumpBar.buf_size = iterm_size * DUMP_GRANULARITY * FACTOR; + dumpBar.buf = reinterpret_cast(malloc(dumpBar.buf_size)); + + if (NULL == dumpBar.buf) { + goto ERR_RET; + } + } + + for (uint32_t i = 0; + i < DUMP_GRANULARITY && (size_type)dumpBar.lastIndex < m_nHashSize; + i++, dumpBar.lastIndex++) { + node = get_node(m_hashTable[dumpBar.lastIndex]); + + for (uint32_t j = 0; NULL != node;) { + *(reinterpret_cast(dumpBar.buf + j * iterm_size)) = + node->data.first; + *(reinterpret_cast(dumpBar.buf + j * iterm_size + + sizeof(key_t))) = node->data.second; + ++j; + + node = get_node(node->next); + + if (j == DUMP_GRANULARITY * FACTOR || NULL == node) { + if (fwrite(dumpBar.buf, iterm_size, j, dumpBar.fp) != j) { + writerErr = true; + goto ERR_RET; + } + + j = 0; + } + } // end of for + } // end of for + + if ((size_type)dumpBar.lastIndex == m_nHashSize) { // mission complete + dumpBar.reset(); + return 0; + } + + // not complete yet + return dumpBar.lastIndex; + + ERR_RET: + + if (writerErr) { + remove(file); + } + + dumpBar.reset(); + return -1; + } + + void add_file_len(uint64_t len) { m_fileLens.push_back(len); } + + // return true if m_fileLens is prefix of lens + bool check_file_len(const std::vector& lens, + size_t* old_size) const { + *old_size = m_fileLens.size(); + + if (*old_size > lens.size()) { + LOG(WARNING) << "old file_len size[" << *old_size + << "] > new file_len size[" << lens.size() + << "] in patch mode"; + return false; + } + + for (size_t i = 0; i < *old_size; ++i) { + if (m_fileLens[i] != lens[i]) { + LOG(WARNING) << "old file_len[" << m_fileLens[i] << "] != new file_len[" + << lens[i] << "] of pos " << i << " in patch mode"; + return false; + } + } + + return true; + } + + protected: + hash_node_t* get_node(uint32_t addr) { + if (addr == 0) { + return NULL; + } + + uint32_t block = ((addr & 0xFF800000) >> 23); + uint32_t index = (addr & 0x7FFFFF); + return &m_blockAddr[block][index]; + } + void release_node(uint32_t addr, hash_node_t* node) { + --m_nSize; + node->next = m_nFreeEntries; + m_nFreeEntries = addr; + return; + } + hash_node_t* add_node(uint32_t index, const key_type& key) { + ++m_nSize; + + if (m_nFreeEntries) { + uint32_t addr = m_nFreeEntries; + hash_node_t* node = get_node(addr); + m_nFreeEntries = node->next; + node->next = m_hashTable[index]; + m_hashTable[index] = addr; + node->data.first = key; + return node; + } + + uint32_t block = ((m_nNextEntry & 0xFF800000) >> 23); + + if (block >= m_nBlockNum) { + try { + m_blockAddr[m_nBlockNum++] = new hash_node_t[BLOCK_SIZE]; + } catch (std::exception& e) { + LOG(ERROR) << "std::exception[" << e.what() << "] was thrown!"; + return NULL; + } catch (...) { + LOG(ERROR) << "unkown exception was thrown!"; + return NULL; + } + } + + uint32_t addr = m_nNextEntry; + ++m_nNextEntry; + hash_node_t* node = get_node(addr); + node->next = m_hashTable[index]; + m_hashTable[index] = addr; + node->data.first = key; + return node; + } +}; diff --git a/cube/cube-server/include/cube/util.h b/cube/cube-server/include/cube/util.h new file mode 100644 index 0000000000000000000000000000000000000000..11d53a47810e6131ffc594f989123a61d7345c55 --- /dev/null +++ b/cube/cube-server/include/cube/util.h @@ -0,0 +1,30 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +namespace rec { +namespace mcube { + +#define TIME_FLAG(flag) \ + struct timeval flag; \ + gettimeofday(&(flag), NULL); + +inline uint64_t time_diff(const struct timeval& start_time, + const struct timeval& end_time) { + return (end_time.tv_sec - start_time.tv_sec) * 1000 + + (end_time.tv_usec - start_time.tv_usec) / 1000; +} +} // namespace mcube +} // namespace rec diff --git a/cube/cube-server/include/cube/virtual_dict.h b/cube/cube-server/include/cube/virtual_dict.h new file mode 100644 index 0000000000000000000000000000000000000000..0618750dafbdc3317b9603744215fd69933eb28d --- /dev/null +++ b/cube/cube-server/include/cube/virtual_dict.h @@ -0,0 +1,66 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include + +#include "cube/error.h" + +namespace rec { +namespace mcube { + +class VirtualDict { + public: + VirtualDict() {} + + virtual ~VirtualDict() {} + + virtual int load(const std::string& /*dict_path*/, + bool /*in_mem*/, + const std::string& /*v_path*/) { + return E_NOT_IMPL; + } + + virtual int load(const std::vector& /*dict_path*/, + bool /*in_mem*/, + const std::string& /*v_path*/) { + return E_NOT_IMPL; + } + + virtual int destroy() { return E_NOT_IMPL; } + + virtual const std::string& version() { + static std::string UNKNOWN_VERSION = "UNKNOWN"; + return UNKNOWN_VERSION; + } + + virtual std::string guard_version() { + static std::string UNKNOWN_VERSION = "UNKNOWN"; + return UNKNOWN_VERSION; + } + + virtual void set_base_dict(const VirtualDict* dict) = 0; + + virtual bool seek(uint64_t key, char* buff, uint64_t* buff_size) = 0; + + virtual void atom_inc_seek_num() = 0; + + virtual void atom_dec_seek_num() = 0; + virtual uint32_t atom_seek_num() = 0; +}; // class VirtualDict + +} // namespace mcube +} // namespace rec diff --git a/cube/cube-server/proto/control.proto b/cube/cube-server/proto/control.proto new file mode 100644 index 0000000000000000000000000000000000000000..ec4a2a72bf7109f2ed983f93be9a877682045523 --- /dev/null +++ b/cube/cube-server/proto/control.proto @@ -0,0 +1,24 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto2"; + +package rec.mcube; + +option cc_generic_services = true; + +message HttpRequest {}; +message HttpResponse {}; + +service ControlService { rpc cmd(HttpRequest) returns (HttpResponse); }; diff --git a/cube/cube-server/proto/cube.proto b/cube/cube-server/proto/cube.proto new file mode 100644 index 0000000000000000000000000000000000000000..231e29d7d11eef2b2acd16aff872478d841c2267 --- /dev/null +++ b/cube/cube-server/proto/cube.proto @@ -0,0 +1,36 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto2"; + +package rec.mcube; + +option cc_generic_services = true; + +message DictRequest { + repeated uint64 keys = 1; + optional bool version_required = 2 [ default = false ]; +}; + +message DictValue { + required uint32 status = 1; + required bytes value = 2; +}; + +message DictResponse { + repeated DictValue values = 1; + optional string version = 2; +}; + +service DictService { rpc seek(DictRequest) returns (DictResponse); }; diff --git a/cube/cube-server/src/control.cpp b/cube/cube-server/src/control.cpp new file mode 100644 index 0000000000000000000000000000000000000000..8d4074da20c45c1f5c8df8f1fc723945d3e6b2a9 --- /dev/null +++ b/cube/cube-server/src/control.cpp @@ -0,0 +1,168 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include "cube/control.h" +#include "cube/framework.h" + +namespace rec { +namespace mcube { + +using ::rec::mcube::HttpRequest; +using ::rec::mcube::HttpResponse; + +using ::google::protobuf::RpcController; +using ::google::protobuf::Closure; + +using ::brpc::HttpHeader; +using ::brpc::URI; +using ::brpc::Controller; +using ::brpc::ClosureGuard; + +using BUTIL_RAPIDJSON_NAMESPACE::Document; + +std::string rapidjson_value_to_string( + const BUTIL_RAPIDJSON_NAMESPACE::Value& value) { + BUTIL_RAPIDJSON_NAMESPACE::StringBuffer buffer; + BUTIL_RAPIDJSON_NAMESPACE::PrettyWriter< + BUTIL_RAPIDJSON_NAMESPACE::StringBuffer> + writer(buffer); + value.Accept(writer); + return buffer.GetString(); +} + +Control::Control() {} + +Control::~Control() {} + +void Control::cmd(::google::protobuf::RpcController* cntl_base, + const ::rec::mcube::HttpRequest* /*request*/, + ::rec::mcube::HttpResponse* /*response*/, + ::google::protobuf::Closure* done) { + ClosureGuard done_guard(done); + Controller* cntl = static_cast(cntl_base); + + std::string cmd_str = cntl->request_attachment().to_string(); + Document cmd; + cmd.Parse(cmd_str.c_str()); + LOG(INFO) << "HANDLE CMD: " << cmd_str; + if (!cmd.IsObject()) { + LOG(ERROR) << "parse command failed"; + cntl->http_response().set_status_code(brpc::HTTP_STATUS_BAD_REQUEST); + return; + } + + if (!cmd.HasMember("cmd") || !cmd["cmd"].IsString()) { + cntl->http_response().set_status_code(brpc::HTTP_STATUS_BAD_REQUEST); + return; + } + + std::string cmd_name = cmd["cmd"].GetString(); + + std::string version_path = ""; + if (cmd.HasMember("version_path") && cmd["version_path"].IsString()) { + version_path = cmd["version_path"].GetString(); + } + + int ret = 0; + Document response; + if (cmd_name.compare("status") == 0) { + ret = handle_status(cmd, &response); + } else if (_cmd_mutex.try_lock()) { + if (cmd_name.compare("reload_base") == 0) { + ret = handle_reload_base(cmd, version_path); + } else if (cmd_name.compare("reload_patch") == 0) { + ret = handle_reload_patch(cmd, version_path); + } else if (cmd_name.compare("bg_load_base") == 0) { + ret = handle_bg_load_base(cmd, version_path); + } else if (cmd_name.compare("bg_load_patch") == 0) { + ret = handle_bg_load_patch(cmd, version_path); + } else if (cmd_name.compare("bg_unload") == 0) { + ret = handle_bg_unload(cmd); + } else if (cmd_name.compare("bg_switch") == 0) { + ret = handle_bg_switch(cmd); + } else if (cmd_name.compare("enable") == 0) { + ret = handle_enable(cmd); + } else { + ret = -1; + LOG(ERROR) << "unknown cmd: " << cmd_name; + } + _cmd_mutex.unlock(); + } else { + LOG(ERROR) << "try to get cmd mutex failed cmd: " << cmd_name; + ret = -1; + } + + cntl->response_attachment().append(rapidjson_value_to_string(response)); + if (ret == 0) { + cntl->http_response().set_status_code(brpc::HTTP_STATUS_OK); + } else { + cntl->http_response().set_status_code( + brpc::HTTP_STATUS_INTERNAL_SERVER_ERROR); + } + + LOG(INFO) << "CMD DONE: " << cmd_str; + return; +} + +int Control::handle_status(const Document& /*cmd*/, Document* res) { + Framework* framework = Framework::instance(); + return framework->status(res); +} + +int Control::handle_reload_patch(const Document& /*cmd*/, + const std::string& v_path) { + Framework* framework = Framework::instance(); + return framework->patch(v_path); +} + +int Control::handle_reload_base(const Document& /*cmd*/, + const std::string& v_path) { + Framework* framework = Framework::instance(); + return framework->reload(v_path); +} + +int Control::handle_bg_load_patch(const Document& /*cmd*/, + const std::string& v_path) { + Framework* framework = Framework::instance(); + return framework->bg_load_patch(v_path); +} + +int Control::handle_bg_load_base(const Document& /*cmd*/, + const std::string& v_path) { + Framework* framework = Framework::instance(); + return framework->bg_load_base(v_path); +} + +int Control::handle_bg_unload(const Document& /*cmd*/) { + Framework* framework = Framework::instance(); + return framework->bg_unload(); +} + +int Control::handle_bg_switch(const Document& /*cmd*/) { + Framework* framework = Framework::instance(); + return framework->bg_switch(); +} + +int Control::handle_enable(const Document& cmd) { + if (!cmd.HasMember("version") || !cmd["version"].IsString()) { + return -1; + } + Framework* framework = Framework::instance(); + return framework->enable(cmd["version"].GetString()); +} + +} // namespace mcube +} // namespace rec diff --git a/cube/cube-server/src/cube_bvar.cpp b/cube/cube-server/src/cube_bvar.cpp new file mode 100644 index 0000000000000000000000000000000000000000..557c43eb466fc4ec31263c30c265f5ca4ae19dc3 --- /dev/null +++ b/cube/cube-server/src/cube_bvar.cpp @@ -0,0 +1,46 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "cube/cube_bvar.h" + +namespace rec { +namespace mcube { + +bvar::IntRecorder g_keys_num; +bvar::Window g_keys_win("keys_per_request_num", + &g_keys_num, + bvar::FLAGS_bvar_dump_interval); + +bvar::Adder g_request_num("request_num"); +bvar::Window> g_request_num_minute("request_num_minute", + &g_request_num, + 60); + +bvar::IntRecorder g_data_load_time("data_load_time"); + +bvar::IntRecorder g_data_size("data_size"); + +bvar::Adder g_long_value_num("long_value_num"); +bvar::Window> g_long_value_num_minute( + "long_value_num_minute", &g_long_value_num, 60); + +bvar::Adder g_unfound_key_num("unfound_key_num"); +bvar::Window> g_unfound_key_num_minute( + "unfound_key_num_minute", &g_unfound_key_num, 60); + +bvar::Adder g_total_key_num("total_key_num"); +bvar::Window> g_total_key_num_minute( + "total_key_num_minute", &g_total_key_num, 60); +} // namespace mcube +} // namespace rec diff --git a/cube/cube-server/src/dict.cpp b/cube/cube-server/src/dict.cpp new file mode 100644 index 0000000000000000000000000000000000000000..5897d366ee2b4910ad9d0f8b331f6be541ee63c9 --- /dev/null +++ b/cube/cube-server/src/dict.cpp @@ -0,0 +1,432 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "cube/cube_bvar.h" +#include "cube/dict.h" +#include "cube/error.h" +#include "cube/util.h" + +namespace rec { +namespace mcube { + +static void munmap_deleter(void* data, uint32_t size) { + if (data != MAP_FAILED) { + munmap(data, size); + } +} + +Dict::Dict() : _seek_num(0), _base_dict(NULL) {} + +Dict::~Dict() {} + +void Dict::atom_dec_seek_num() { --_seek_num; } + +void Dict::atom_inc_seek_num() { ++_seek_num; } + +uint32_t Dict::atom_seek_num() { return _seek_num; } + +int Dict::load(const std::string& dict_path, + bool in_mem, + const std::string& v_path) { + TIME_FLAG(load_start); + + int ret = load_index(dict_path, v_path); + if (ret != E_OK) { + LOG(WARNING) << "load index failed"; + return ret; + } + + if (in_mem) { + ret = load_data(dict_path, v_path); + if (ret != E_OK) { + LOG(ERROR) << "load data failed"; + return ret; + } + } else { + ret = load_data_mmap(dict_path, v_path); + if (ret != E_OK) { + LOG(ERROR) << "load data failed"; + return ret; + } + } + set_version(v_path); + TIME_FLAG(load_end); + g_data_load_time << time_diff(load_start, load_end); + return E_OK; +} + +int Dict::load_index(const std::string& dict_path, const std::string& v_path) { + std::string index_n_path(dict_path); + index_n_path.append(v_path); + index_n_path.append("/index.n"); + LOG(INFO) << "index file path: " << index_n_path; + + std::unique_ptr pf(fopen(index_n_path.c_str(), "rb"), + &fclose); + if (pf.get() == NULL) { + LOG(WARNING) << "open index: " << index_n_path << " failed"; + return E_DATA_ERROR; + } + + int type = 0; + if (fread(reinterpret_cast(&type), sizeof(int), 1, pf.get()) != 1) { + LOG(ERROR) << "index syntax error"; + return E_DATA_ERROR; + } + + uint32_t count = 0; + if (fread(reinterpret_cast(&count), sizeof(uint32_t), 1, pf.get()) != + 1) { + LOG(ERROR) << "index syntax error"; + return E_DATA_ERROR; + } + + uint32_t file_num = 0; + if (fread( + reinterpret_cast(&file_num), sizeof(uint32_t), 1, pf.get()) != + 1) { + LOG(ERROR) << "index syntax error"; + return E_DATA_ERROR; + } + LOG(INFO) << "index type:" << type << ", count:" << count + << ", file_num:" << file_num; + + // read file_lens begin + uint32_t file_cnt = file_num; + uint64_t len[1024]; + std::vector file_lens; + file_lens.reserve(file_num); + while (file_cnt > 0) { + uint32_t to_read = file_cnt > 1024 ? 1024 : file_cnt; + file_cnt -= to_read; + if (fread(reinterpret_cast(&len), + sizeof(uint64_t), + to_read, + pf.get()) != to_read) { + return E_DATA_ERROR; + } + for (uint32_t i = 0; i < to_read; ++i) { + file_lens.push_back(len[i]); + } + } + + if (file_lens.size() != file_num) { + LOG(ERROR) << "file_num[" << file_num << "] != file_lens size[" + << file_lens.size() << "], shouldn't happen"; + return E_DATA_ERROR; + } + + // try patch mode + size_t file_idx = 0; + if (_base_dict) { + if (_base_dict->_slim_table.check_file_len(file_lens, &file_idx)) { + LOG(INFO) << "index check file len ok in patch mode, set file_idx to " + << file_idx; + + if (_slim_table.copy_data_from(_base_dict->_slim_table) != 0) { + LOG(ERROR) << "copy data from old index failed in patch mode"; + return E_DATA_ERROR; + } + } else { + file_idx = 0; + LOG(INFO) + << "index check file len failed in patch mode, set file_idx to 0"; + } + } + + _slim_table.resize(count / 2); + + char file[1024]; + struct stat fstat; + for (; file_idx < file_num; ++file_idx) { + snprintf(file, + sizeof(file), + "%s%s/index.%lu", + dict_path.c_str(), + v_path.c_str(), + file_idx); + if (stat(file, &fstat) < 0) { + if (errno == ENOENT) { + LOG(WARNING) << "index." << file_idx << " not exist"; + _slim_table.add_file_len(0); + continue; + } + return E_DATA_ERROR; + } + if ((uint64_t)fstat.st_size != file_lens[file_idx]) { + LOG(ERROR) << "load_index failed, expect index file[" << file_idx + << "] size is " << file_lens[file_idx] << ", actual size is " + << (uint64_t)fstat.st_size; + return E_DATA_ERROR; + } + LOG(INFO) << "loading from index." << file_idx; + if (!_slim_table.load(file) || _slim_table.size() > count) { + return E_DATA_ERROR; + } + + _slim_table.add_file_len(file_lens[file_idx]); + } + + return E_OK; +} + +int Dict::load_data(const std::string& dict_path, const std::string& v_path) { + if (_base_dict) { + _block_set = _base_dict->_block_set; + } + + std::string data_n_path(dict_path); + data_n_path.append(v_path); + data_n_path.append("/data.n"); + FILE* pf = fopen(data_n_path.c_str(), "rb"); + if (pf == NULL) { + LOG(ERROR) << "open data [" << data_n_path << "] failed"; + return E_DATA_ERROR; + } + uint32_t count = 0; + if (fread(reinterpret_cast(&count), sizeof(uint32_t), 1, pf) != 1) { + LOG(ERROR) << "data syntax error"; + fclose(pf); + return E_DATA_ERROR; + } + + std::vector block_size; + uint64_t total_data_size = 0; + for (uint32_t i = 0; i < count; ++i) { + uint32_t size = 0; + if (fread(reinterpret_cast(&size), sizeof(uint32_t), 1, pf) != 1) { + LOG(ERROR) << "data syntax error"; + fclose(pf); + return E_DATA_ERROR; + } + block_size.push_back(size); + total_data_size += size; + } + g_data_size << (total_data_size / 1024 / 1024); + fclose(pf); + pf = NULL; + + uint32_t old_size = _block_set.size(); + for (size_t i = 0; i < old_size; ++i) { + if (_block_set[i].size != block_size[i]) { + old_size = 0; + break; + } + } + _block_set.resize(count); + for (size_t i = old_size; i < _block_set.size(); ++i) { + char data_path[1024]; + LOG(INFO) << "load from data." << i; + snprintf( + data_path, 1024, "%s%s/data.%lu", dict_path.c_str(), v_path.c_str(), i); + + FILE* data_file = fopen(data_path, "rb"); + if (data_file == NULL) { + LOG(WARNING) << "open data file [" << data_path << " failed"; + _block_set[i].s_data.reset(); + _block_set[i].size = 0; + continue; + } + + _block_set[i].s_data.reset( + reinterpret_cast(malloc(block_size[i] * sizeof(char)))); + if (_block_set[i].s_data.get() == NULL) { + LOG(ERROR) << "malloc data failed"; + fclose(data_file); + return E_OOM; + } + _block_set[i].size = block_size[i]; + + if (fread(reinterpret_cast(_block_set[i].s_data.get()), + sizeof(char), + _block_set[i].size, + data_file) != _block_set[i].size) { + LOG(ERROR) << "read data failed"; + fclose(data_file); + return E_DATA_ERROR; + } + + fclose(data_file); + } + + return E_OK; +} + +int Dict::load_data_mmap(const std::string& dict_path, + const std::string& v_path) { + std::string data_n_path(dict_path); + data_n_path.append(v_path); + data_n_path.append("/data.n"); + FILE* pf = fopen(data_n_path.c_str(), "rb"); + if (pf == NULL) { + LOG(ERROR) << "open data [" << data_n_path << "] failed"; + return E_DATA_ERROR; + } + uint32_t count = 0; + if (fread(reinterpret_cast(&count), sizeof(uint32_t), 1, pf) != 1) { + LOG(ERROR) << "data syntax error"; + fclose(pf); + return E_DATA_ERROR; + } + + std::vector block_size; + uint64_t total_data_size = 0; + for (uint32_t i = 0; i < count; ++i) { + uint32_t size = 0; + if (fread(reinterpret_cast(&size), sizeof(uint32_t), 1, pf) != 1) { + LOG(ERROR) << "data syntax error"; + fclose(pf); + return E_DATA_ERROR; + } + block_size.push_back(size); + total_data_size += size; + } + g_data_size << (total_data_size / 1024 / 1024); + fclose(pf); + pf = NULL; + + uint32_t old_size = _block_set.size(); + _block_set.resize(count); + for (size_t i = old_size; i < _block_set.size(); ++i) { + char data_path[1024]; + LOG(INFO) << "load from data." << i; + snprintf( + data_path, 1024, "%s%s/data.%lu", dict_path.c_str(), v_path.c_str(), i); + + int data_fd = open(data_path, + O_RDONLY | O_NONBLOCK, + S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); + if (data_fd < 0) { + LOG(WARNING) << "open data file [" << data_path << "] failed"; + _block_set[i].s_data.reset(); + _block_set[i].size = 0; + continue; + } + + _block_set[i].s_data.reset( + reinterpret_cast( + mmap(NULL, block_size[i], PROT_READ, MAP_SHARED, data_fd, 0)), + std::bind(munmap_deleter, std::placeholders::_1, block_size[i])); + + if (_block_set[i].s_data.get() == MAP_FAILED) { + LOG(WARNING) << "map data file [" << data_path << "] failed"; + _block_set[i].s_data.reset(); + _block_set[i].size = 0; + continue; + } + _block_set[i].size = block_size[i]; + _block_set[i].fd = data_fd; + } + + return E_OK; +} + +int Dict::destroy() { + for (size_t i = 0; i < _block_set.size(); ++i) { + if (_block_set[i].fd > 0) { + close(_block_set[i].fd); + _block_set[i].fd = -1; + } + _block_set[i].size = 0; + } + return E_OK; +} + +void Dict::set_version(const std::string& v_path) { + _rw_lock.w_lock(); + _version = (v_path == "") ? "" : v_path.substr(1); + _rw_lock.unlock(); +} + +const std::string& Dict::version() { return _version; } + +std::string Dict::guard_version() { + _rw_lock.r_lock(); + std::string version = _version; + _rw_lock.unlock(); + return version; +} + +bool Dict::seek(uint64_t key, char* buff, uint64_t* buff_size) { + slim_hash_map::iterator it = _slim_table.find(key); + if (it.get_node() == NULL) { + *(reinterpret_cast(buff)) = 0; + *buff_size = sizeof(uint32_t); + g_unfound_key_num << 1; + return false; + } + if (it == _slim_table.end()) { + *(reinterpret_cast(buff)) = 0; + *buff_size = sizeof(uint32_t); + return false; + } + + uint64_t flag = it->second; + uint32_t id = (uint32_t)(flag >> 32); + uint64_t addr = (uint32_t)(flag); + + if (_block_set.size() > id) { + uint32_t block_size = _block_set[id].size; + char* block_data = NULL; + block_data = _block_set[id].s_data.get(); + + if (block_data && addr + sizeof(uint32_t) <= block_size) { + uint32_t len = *(reinterpret_cast(block_data + addr)); + if (addr + len <= block_size && len >= sizeof(uint32_t)) { + uint64_t default_buffer_size = *buff_size; + + *buff_size = len - sizeof(uint32_t); + if (*buff_size > default_buffer_size) { + g_long_value_num << 1; + LOG(ERROR) << "value len is " << *buff_size + << ", larger than default_buffer_size " + << default_buffer_size; + return false; + } + memcpy(buff, + (block_data + addr + sizeof(uint32_t)), + len - sizeof(uint32_t)); + return true; + } else { + *(reinterpret_cast(buff)) = 0; + *buff_size = sizeof(uint32_t); + return false; + } + } else { + *(reinterpret_cast(buff)) = 0; + *buff_size = sizeof(uint32_t); + return false; + } + } else { + *(reinterpret_cast(buff)) = 0; + *buff_size = sizeof(uint32_t); + return false; + } + + return false; +} + +} // namespace mcube +} // namespace rec diff --git a/cube/cube-server/src/dict_set.cpp b/cube/cube-server/src/dict_set.cpp new file mode 100644 index 0000000000000000000000000000000000000000..90f42456b2b243e38750fb0bc7e4693d81fa5671 --- /dev/null +++ b/cube/cube-server/src/dict_set.cpp @@ -0,0 +1,91 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "cube/dict_set.h" + +namespace rec { +namespace mcube { + +DictSet::DictSet(int dict_split) : VirtualDict(), _dict_split(dict_split) { + _dict_set.resize(_dict_split); +} + +DictSet::~DictSet() {} + +void DictSet::set_base_dict(const VirtualDict* dict) { + const DictSet* dict_set = static_cast(dict); + for (size_t i = 0; i < _dict_set.size(); ++i) { + if (!_dict_set[i]) { + _dict_set[i] = std::make_shared(); + } + _dict_set[i]->set_base_dict(dict_set->_dict_set[i].get()); + } +} + +int DictSet::load(const std::vector& dict_path, + bool in_mem, + const std::string& v_path) { + if ((uint32_t)_dict_split != dict_path.size()) { + return E_DATA_ERROR; + } + + for (size_t i = 0; i < dict_path.size(); ++i) { + if (!_dict_set[i]) { + _dict_set[i] = std::make_shared(); + } + + if (_dict_set[i]->load(dict_path[i], in_mem, v_path) != E_OK) { + LOG(ERROR) << "dict split[" << i << "] load failed"; + return E_DATA_ERROR; + } + } + + _rw_lock.w_lock(); + _version = (v_path == "") ? "" : v_path.substr(1); + _rw_lock.unlock(); + + return E_OK; +} + +int DictSet::destroy() { + for (size_t i = 0; i < _dict_set.size(); ++i) { + if (_dict_set[i]->destroy() != E_OK) { + LOG(WARNING) << "dict split[" << i << "] destory failed"; + } + } + + return E_OK; +} + +const std::string& DictSet::version() { return _version; } + +std::string DictSet::guard_version() { + _rw_lock.r_lock(); + std::string version = _version; + _rw_lock.unlock(); + return version; +} + +bool DictSet::seek(uint64_t key, char* buff, uint64_t* buff_size) { + return _dict_set[key % _dict_split]->seek(key, buff, buff_size); +} + +void DictSet::atom_inc_seek_num() { ++_seek_num; } + +void DictSet::atom_dec_seek_num() { --_seek_num; } + +uint32_t DictSet::atom_seek_num() { return _seek_num; } + +} // namespace mcube +} // namespace rec diff --git a/cube/cube-server/src/framework.cpp b/cube/cube-server/src/framework.cpp new file mode 100644 index 0000000000000000000000000000000000000000..649782013f691c081c57a604a3d9a0b9b11fa1fa --- /dev/null +++ b/cube/cube-server/src/framework.cpp @@ -0,0 +1,339 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include + +#include "cube/cube_bvar.h" +#include "cube/dict.h" +#include "cube/dict_set.h" +#include "cube/framework.h" +#include "cube/recycle.h" + +using BUTIL_RAPIDJSON_NAMESPACE::Document; +using BUTIL_RAPIDJSON_NAMESPACE::Value; +using BUTIL_RAPIDJSON_NAMESPACE::StringRef; + +namespace { +static ::rec::mcube::Framework* g_instance = NULL; +} + +namespace rec { +namespace mcube { + +Framework* Framework::instance() { + if (g_instance == NULL) { + g_instance = new Framework(); + } + + return g_instance; +} + +Framework::~Framework() {} + +int Framework::init(uint32_t dict_split, bool in_mem) { + Recycle* rec = Recycle::get_instance(); + int ret = rec->init(); + if (ret != 0) { + LOG(ERROR) << "init recycle failed"; + return ret; + } + + /* + _dict[0] = new (std::nothrow) Dict(); + _dict[1] = NULL; + */ + init_dict(dict_split); + VirtualDict* cur_dict = _dict[_dict_idx]; + _dict_path = "./data"; + _max_val_size = 1024; + _in_mem = in_mem; + + std::string version_file = _dict_path + "/VERSION"; + std::string version_path = ""; + std::ifstream input(version_file.c_str()); + if (!std::getline(input, version_path)) { + version_path = ""; + } else { + version_path = "/" + version_path; + } + input.close(); + + LOG(INFO) << "load dict from" << _dict_path << version_path; + if (_dict_split > 1) { + _dict_set_path.clear(); + _dict_set_path.resize(_dict_split); + std::stringstream dict_set_path_buf; + for (size_t i = 0; i < _dict_split; ++i) { + dict_set_path_buf.str(std::string()); + dict_set_path_buf.clear(); + dict_set_path_buf << _dict_path << "/" << i; + _dict_set_path[i] = dict_set_path_buf.str(); + } + ret = cur_dict->load(_dict_set_path, _in_mem, version_path); + } else { + ret = cur_dict->load(_dict_path, _in_mem, version_path); + } + + if (ret != 0) { + LOG(WARNING) << "init: load dict data failed err=" << ret + << ". starting service with empty data."; + } else { + LOG(INFO) << "load dict from " << _dict_path << version_path << " done"; + } + + _status = Status::F_RUNNING; + + return 0; +} + +int Framework::destroy() { + Recycle* recycle = Recycle::get_instance(); + int ret = recycle->destroy(); + if (ret != 0) { + LOG(WARNING) << "destroy recycle failed"; + } + return 0; +} + +void Framework::init_dict(uint32_t dict_split) { + _dict_split = dict_split; + + if (_dict_split <= 1) { + _dict[0] = new (std::nothrow) Dict(); + _dict[1] = NULL; + } else { + _dict[0] = new (std::nothrow) DictSet(_dict_split); + _dict[1] = NULL; + } +} + +VirtualDict* Framework::create_dict() { + if (_dict_split > 1) { + return new (std::nothrow) DictSet(_dict_split); + } else { + return new (std::nothrow) Dict(); + } +} + +void Framework::release(VirtualDict* dict) { dict->atom_dec_seek_num(); } + +int Framework::status(Document* res) { + res->SetObject(); + Document::AllocatorType& allocator = res->GetAllocator(); + Value cur_version; + Value bg_version; + cur_version.SetString(StringRef(get_cur_version().c_str())); + bg_version.SetString(StringRef((get_bg_version().c_str()))); + res->AddMember("cur_version", cur_version, allocator); + res->AddMember("bg_version", bg_version, allocator); + res->AddMember("status", _status.load(), allocator); + return 0; +} + +int Framework::seek(const DictRequest* req, DictResponse* res) { + g_request_num << 1; + VirtualDict* cur_dict = get_cur_dict(); + char* val_buf = new char[_max_val_size]; + g_keys_num << req->keys_size(); + g_total_key_num << req->keys_size(); + + std::vector values(req->keys_size()); + for (int i = 0; i < req->keys_size(); ++i) { + values[i] = res->add_values(); + } + + for (int i = 0; i < req->keys_size(); ++i) { + uint64_t val_size = _max_val_size; + // DictValue* val = res->add_values(); + DictValue* val = values[i]; + if (cur_dict->seek(req->keys(i), val_buf, &val_size)) { + val->set_status(0); + val->set_value(val_buf, val_size); + } else { + val->set_status(-1); + val->set_value(""); + } + } + + if (req->version_required()) { + res->set_version(cur_dict->version()); + } + + // delete [] keys; + delete[] val_buf; + release(cur_dict); + return 0; +} + +int Framework::reload(const std::string& v_path) { + int ret = bg_load_base(v_path); + if (ret != 0) { + LOG(WARNING) << "background load dict base failed"; + } else { + LOG(INFO) << "background load dict base succ"; + } + + ret = bg_switch(); + if (ret != 0) { + LOG(WARNING) << "switch background dict failed"; + } else { + LOG(INFO) << "switch background dict succ"; + } + + ret = bg_unload(); + if (ret != 0) { + LOG(WARNING) << "unload background dict failed"; + } else { + LOG(INFO) << "unload background dict succ"; + } + + return ret; +} + +int Framework::patch(const std::string& v_path) { + int ret = bg_load_patch(v_path); + if (ret != 0) { + LOG(WARNING) << "background load dict patch failed"; + } else { + LOG(INFO) << "background load dict patch succ"; + } + + ret = bg_switch(); + if (ret != 0) { + LOG(WARNING) << "switch background dict failed"; + } else { + LOG(INFO) << "switch background dict succ"; + } + + ret = bg_unload(); + if (ret != 0) { + LOG(WARNING) << "unload background dict failed"; + } else { + LOG(INFO) << "unload background dict succ"; + } + + return ret; +} + +int Framework::bg_load_base(const std::string& v_path) { + int ret = bg_unload(); + if (ret != 0) { + LOG(WARNING) << "unload background dict failed"; + } + + VirtualDict* bg_dict = create_dict(); + + if (!bg_dict) { + LOG(ERROR) << "create Dict failed"; + return -1; + } + + _status = Status::F_LOADING; + if (_dict_split > 1) { + ret = bg_dict->load(_dict_set_path, _in_mem, v_path); + } else { + ret = bg_dict->load(_dict_path, _in_mem, v_path); + } + _status = Status::F_RUNNING; + if (ret != 0) { + LOG(WARNING) << "load background dict failed"; + delete bg_dict; + bg_dict = NULL; + return ret; + } else { + LOG(INFO) << "load background dict succ"; + set_bg_dict(bg_dict); + } + + return ret; +} + +int Framework::bg_load_patch(const std::string& v_path) { + int ret = bg_unload(); + if (ret != 0) { + LOG(WARNING) << "unload background dict failed"; + } + + VirtualDict* bg_dict = create_dict(); + + if (!bg_dict) { + LOG(ERROR) << "create Dict failed"; + return -1; + } + + _status = Status::F_LOADING; + if (_dict[_dict_idx]) { + bg_dict->set_base_dict(_dict[_dict_idx]); + LOG(INFO) << "set base dict from current dict " << _dict_idx; + } + + if (_dict_split > 1) { + ret = bg_dict->load(_dict_set_path, _in_mem, v_path); + } else { + ret = bg_dict->load(_dict_path, _in_mem, v_path); + } + _status = Status::F_RUNNING; + if (ret != 0) { + LOG(WARNING) << "load background dict failed"; + delete bg_dict; + bg_dict = NULL; + return ret; + } else { + LOG(INFO) << "load background dict succ"; + set_bg_dict(bg_dict); + } + return ret; +} + +int Framework::bg_unload() { + VirtualDict* bg_dict = get_bg_dict(); + if (bg_dict != NULL) { + set_bg_dict(NULL); + Recycle* recycle = Recycle::get_instance(); + recycle->recycle(bg_dict); + } + LOG(INFO) << "unload background dict succ"; + return 0; +} + +int Framework::bg_switch() { + _rw_lock.w_lock(); + int bg_idx = 1 - _dict_idx; + if (!_dict[bg_idx]) { + LOG(WARNING) << "switch dict failed because NULL"; + _rw_lock.unlock(); + return -1; + } + _dict_idx = bg_idx; + _rw_lock.unlock(); + return 0; +} + +int Framework::enable(const std::string& version) { + int ret = 0; + if (version != "" && version == get_cur_version()) { + ret = 0; + } else if (version == get_bg_version()) { + ret = bg_switch(); + } else { + LOG(WARNING) << "bg dict version not matched"; + ret = -1; + } + return ret; +} + +} // namespace mcube +} // namespace rec diff --git a/cube/cube-server/src/main.cpp b/cube/cube-server/src/main.cpp new file mode 100644 index 0000000000000000000000000000000000000000..bacfb1ea92032815522c3c2dc6ec1eeaa3f90dfc --- /dev/null +++ b/cube/cube-server/src/main.cpp @@ -0,0 +1,133 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include +#ifdef BCLOUD +#include "base/logging.h" +#else +#include "butil/logging.h" +#endif + +#include "cube/control.h" +#include "cube/framework.h" +#include "cube/server.h" + +DEFINE_int32(port, 8000, "TCP Port of this server"); +DEFINE_int32(dict_split, 1, "data dict split for dictset"); +DEFINE_bool(in_mem, + true, + "True[load data into memory] False[mmap data in disk]"); +DECLARE_string(flagfile); + +namespace rec { +namespace mcube { + +bool g_signal_quit = false; +static void sigint_handler(int) { g_signal_quit = true; } // sigint_handler + +int run(int argc, char** argv) { + google::ParseCommandLineFlags(&argc, &argv, true); + +// initialize logger instance +#ifdef BCLOUD + logging::LoggingSettings settings; + settings.logging_dest = logging::LOG_TO_FILE; + + std::string filename(argv[0]); + filename = filename.substr(filename.find_last_of('/') + 1); + settings.log_file = + strdup((std::string("./log/") + filename + ".log").c_str()); + settings.delete_old = logging::DELETE_OLD_LOG_FILE; + logging::InitLogging(settings); + + logging::ComlogSinkOptions cso; + cso.process_name = filename; + cso.enable_wf_device = true; + logging::ComlogSink::GetInstance()->Setup(&cso); +#else + if (FLAGS_log_dir == "") { + FLAGS_log_dir = "./log"; + } + + struct stat st_buf; + int ret = 0; + if ((ret = stat(FLAGS_log_dir.c_str(), &st_buf)) != 0) { + mkdir(FLAGS_log_dir.c_str(), 0777); + ret = stat(FLAGS_log_dir.c_str(), &st_buf); + if (ret != 0) { + LOG(WARNING) << "Log path " << FLAGS_log_dir + << " not exist, and create fail"; + return -1; + } + } + google::InitGoogleLogging(strdup(argv[0])); + FLAGS_logbufsecs = 0; + FLAGS_logbuflevel = -1; +#endif + LOG(INFO) << "Succ initialize logger"; + + Framework* framework = Framework::instance(); + ret = framework->init(FLAGS_dict_split, FLAGS_in_mem); + if (ret != 0) { + LOG(ERROR) << "init predict framework failed"; + return ret; + } + + Server cube; + Control cntl; + + brpc::Server server; + server.set_version("Cube Service"); + brpc::ServerOptions option; + + if (server.AddService(&cube, brpc::SERVER_DOESNT_OWN_SERVICE) != 0) { + LOG(ERROR) << "Failed to add predict service"; + return -1; + } + + if (server.AddService(&cntl, brpc::SERVER_DOESNT_OWN_SERVICE) != 0) { + LOG(ERROR) << "Failed to add predict service"; + return -1; + } + + if (server.Start(FLAGS_port, &option) != 0) { + LOG(ERROR) << "Fail to start service"; + return -1; + } + LOG(INFO) << "cube service start"; + + signal(SIGINT, sigint_handler); + while (!g_signal_quit) { + sleep(1); + } + + return 0; +} + +} // namespace mcube +} // namespace rec + +int main(int argc, char** argv) { + if (google::SetCommandLineOption("bvar_dump", "true").empty()) { + LOG(ERROR) << "Failed to dump bvar file"; + return -1; + } + google::SetCommandLineOption("flagfile", "conf/gflags.conf"); + return ::rec::mcube::run(argc, argv); +} + +/* vim: set ts=4 sw=4 sts=4 tw=100 */ diff --git a/cube/cube-server/src/recycle.cpp b/cube/cube-server/src/recycle.cpp new file mode 100644 index 0000000000000000000000000000000000000000..391148e2ec8769121ecb942a23bd7eff8bd67c60 --- /dev/null +++ b/cube/cube-server/src/recycle.cpp @@ -0,0 +1,121 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "cube/recycle.h" + +namespace { +using rec::mcube::Recycle; +static Recycle* g_instance = NULL; +} + +namespace rec { +namespace mcube { + +Recycle* Recycle::get_instance() { + if (g_instance == NULL) { + g_instance = new Recycle(); + } + + return g_instance; +} + +Recycle::Recycle() : _running(false) {} + +Recycle::~Recycle() {} + +int Recycle::init() { + // init mutex lock; + if (pthread_mutex_init(&_recycle_mutex, NULL) != 0) { + LOG(ERROR) << "init recycle lock failed"; + return -1; + } + + _running = true; + + // init thread; + if (pthread_create(&_recycle_thread, + NULL, + Recycle::recycle_func, + reinterpret_cast(this)) != 0) { + LOG(ERROR) << "init recycle thread failed"; + return -1; + } + return 0; +} + +int Recycle::destroy() { + _running = false; + // join thread; + if (pthread_join(_recycle_thread, NULL) != 0) { + LOG(WARNING) << "join recycle thread failed"; + } + // destroy lock + if (pthread_mutex_destroy(&_recycle_mutex) != 0) { + LOG(WARNING) << "destroy recycle lock failed"; + } + + return 0; +} + +/* +void Recycle::recycle(Dict* dict) { + lock(); + _recycle_list.push(dict); + unlock(); +} +*/ + +void Recycle::recycle(VirtualDict* dict) { + lock(); + _recycle_list.push(dict); + unlock(); +} + +void Recycle::lock() { pthread_mutex_lock(&_recycle_mutex); } + +void Recycle::unlock() { pthread_mutex_unlock(&_recycle_mutex); } + +void* Recycle::recycle_func(void* arg) { + Recycle* recycle = reinterpret_cast(arg); + std::queue& recycle_list = recycle->_recycle_list; + + while (recycle->_running) { + recycle->lock(); + if (recycle_list.empty()) { + recycle->unlock(); + sleep(1); + continue; + } + + VirtualDict* dict = recycle_list.front(); + recycle_list.pop(); + recycle->unlock(); + + while (dict->atom_seek_num() != 0) { + sleep(1); + } + + int ret = dict->destroy(); + if (ret != 0) { + LOG(WARNING) << "destroy dict failed"; + } + + delete dict; + } + + return NULL; +} + +} // namespace mcube +} // namespace rec diff --git a/cube/cube-server/src/server.cpp b/cube/cube-server/src/server.cpp new file mode 100644 index 0000000000000000000000000000000000000000..62174880d99e17aedc9ede1514944104c30fd4a2 --- /dev/null +++ b/cube/cube-server/src/server.cpp @@ -0,0 +1,41 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include "cube/framework.h" +#include "cube/server.h" + +namespace rec { +namespace mcube { + +Server::Server() {} + +Server::~Server() {} + +void Server::seek(::google::protobuf::RpcController* /*cntl_base*/, + const ::rec::mcube::DictRequest* request, + ::rec::mcube::DictResponse* response, + ::google::protobuf::Closure* done) { + brpc::ClosureGuard done_guard(done); + + Framework* framework = Framework::instance(); + int ret = framework->seek(request, response); + if (ret != 0) { + LOG(ERROR) << "seek failed err=" << ret; + } +} + +} // namespace mcube +} // namespace rec diff --git a/cube/cube-server/test/cube_test.cpp b/cube/cube-server/test/cube_test.cpp new file mode 100644 index 0000000000000000000000000000000000000000..fffe413d3ef37fe00106c009d08585fe06b3d27d --- /dev/null +++ b/cube/cube-server/test/cube_test.cpp @@ -0,0 +1,56 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include "gtest/gtest.h" + +#include "cube/control.h" + +namespace rec { +namespace mcube { +namespace unittest { +struct DoNothing : public google::protobuf::Closure { + void Run() {} +}; + +class ControlTest : public ::testing::Test { + protected: + ControlTest() {} + virtual ~ControlTest() {} + virtual void SetUp() {} + virtual void TearDown() {} +}; // class ControlTest + +TEST_F(ControlTest, control_cmd) { + brpc::Controller cntl; + DoNothing do_nothing; + Control control; + + control.cmd(&cntl, NULL, NULL, &do_nothing); + ASSERT_EQ(brpc::HTTP_STATUS_BAD_REQUEST, cntl.http_response().status_code()); + cntl.Reset(); +} + +int run(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + +} // namespace unittest +} // namespace mcube +} // namespace rec + +int main(int argc, char** argv) { + return ::rec::mcube::unittest::run(argc, argv); +}