diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000000000000000000000000000000000000..fe34bf0ef8c3d23f0762da9fbdff8458967a2011 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +build +.baidu* +.bcloud* +output diff --git a/CMakeLists.txt b/CMakeLists.txt index d01be6d74f8c0b12b8b50e5ec3e63795a9ef4a09..177f4f07d78df6d1416255bc57d4dd005838b95c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -26,6 +26,7 @@ message(STATUS "CXX compiler: ${CMAKE_CXX_COMPILER}, version: " message(STATUS "C compiler: ${CMAKE_C_COMPILER}, version: " "${CMAKE_C_COMPILER_ID} ${CMAKE_C_COMPILER_VERSION}") + find_package(Git REQUIRED) find_package(Threads REQUIRED) find_package(CUDA QUIET) @@ -60,6 +61,7 @@ if (NOT DEFINED WITH_MKLDNN) endif() include(external/leveldb) +include(external/rocksdb) include(external/zlib) include(external/boost) include(external/protobuf) @@ -67,6 +69,7 @@ include(external/brpc) include(external/gflags) include(external/glog) include(external/snappy) +include(external/gtest) include(generic) include(flags) @@ -81,6 +84,7 @@ include_directories(${PADDLE_SERVING_BINARY_DIR}) set(EXTERNAL_LIBS gflags + rocksdb glog protobuf paddlepaddle @@ -105,6 +109,7 @@ add_subdirectory(configure) add_subdirectory(pdcodegen) add_subdirectory(sdk-cpp) add_subdirectory(demo-client) +add_subdirectory(kvdb) if (NOT CLIENT_ONLY) add_subdirectory(predictor) diff --git a/cmake/external/gtest.cmake b/cmake/external/gtest.cmake index 9be625b620287cd4c644ae6908000fd5eec5d5c7..82af574574dae191aedc785e7aa4cedcf4c88330 100644 --- a/cmake/external/gtest.cmake +++ b/cmake/external/gtest.cmake @@ -12,72 +12,33 @@ # See the License for the specific language governing permissions and # limitations under the License. -#FIXME:(gongwb) Move brpc's gtest dependency. -IF(WITH_TESTING OR (WITH_DISTRIBUTE AND NOT WITH_GRPC)) - IF(WITH_TESTING) - ENABLE_TESTING() - ENDIF(WITH_TESTING) +INCLUDE(ExternalProject) + +SET(GTEST_SOURCES_DIR ${THIRD_PARTY_PATH}/gtest) +SET(GTEST_INSTALL_DIR ${THIRD_PARTY_PATH}/install/gtest) +SET(GTEST_INCLUDE_DIR "${GTEST_INSTALL_DIR}/include" CACHE PATH "gtest include directory." FORCE) +SET(GTEST_LIBRARIES "${GTEST_INSTALL_DIR}/lib/libgtest.a" CACHE FILEPATH "gtest library." FORCE) +INCLUDE_DIRECTORIES(${GTEST_INCLUDE_DIR}) + +ExternalProject_Add( + extern_gtest + ${EXTERNAL_PROJECT_LOG_ARGS} + PREFIX ${GTEST_SOURCES_DIR} + GIT_REPOSITORY "https://github.com/google/googletest" + GIT_TAG master + UPDATE_COMMAND "" + CONFIGURE_COMMAND "" + BUILD_COMMAND CXXFLAGS=-fPIC && mkdir build && cd build && cmake .. && make -j ${NUM_OF_PROCESSOR} gtest + INSTALL_COMMAND mkdir -p ${GTEST_INSTALL_DIR}/lib/ + && cp ${GTEST_SOURCES_DIR}/src/extern_gtest/build/lib/libgtest.a ${GTEST_LIBRARIES} + && cp -r ${GTEST_SOURCES_DIR}/src/extern_gtest/googletest/include ${GTEST_INSTALL_DIR}/ + BUILD_IN_SOURCE 1 +) + +ADD_DEPENDENCIES(extern_gtest snappy) + +ADD_LIBRARY(gtest STATIC IMPORTED GLOBAL) +SET_PROPERTY(TARGET gtest PROPERTY IMPORTED_LOCATION ${GTEST_LIBRARIES}) + +LIST(APPEND external_project_dependencies gtest) - INCLUDE(ExternalProject) - - SET(GTEST_SOURCES_DIR ${THIRD_PARTY_PATH}/gtest) - SET(GTEST_INSTALL_DIR ${THIRD_PARTY_PATH}/install/gtest) - SET(GTEST_INCLUDE_DIR "${GTEST_INSTALL_DIR}/include" CACHE PATH "gtest include directory." FORCE) - - INCLUDE_DIRECTORIES(${GTEST_INCLUDE_DIR}) - - IF(WIN32) - set(GTEST_LIBRARIES - "${GTEST_INSTALL_DIR}/lib/gtest.lib" CACHE FILEPATH "gtest libraries." FORCE) - set(GTEST_MAIN_LIBRARIES - "${GTEST_INSTALL_DIR}/lib/gtest_main.lib" CACHE FILEPATH "gtest main libraries." FORCE) - ELSE(WIN32) - set(GTEST_LIBRARIES - "${GTEST_INSTALL_DIR}/lib/libgtest.a" CACHE FILEPATH "gtest libraries." FORCE) - set(GTEST_MAIN_LIBRARIES - "${GTEST_INSTALL_DIR}/lib/libgtest_main.a" CACHE FILEPATH "gtest main libraries." FORCE) - ENDIF(WIN32) - - IF(WITH_MKLML) - # wait for mklml downloading completed - SET(GTEST_DEPENDS ${MKLML_PROJECT}) - ENDIF() - - ExternalProject_Add( - extern_gtest - ${EXTERNAL_PROJECT_LOG_ARGS} - DEPENDS ${GTEST_DEPENDS} - GIT_REPOSITORY "https://github.com/google/googletest.git" - GIT_TAG "release-1.8.0" - PREFIX ${GTEST_SOURCES_DIR} - UPDATE_COMMAND "" - CMAKE_ARGS -DCMAKE_CXX_COMPILER=${CMAKE_CXX_COMPILER} - -DCMAKE_C_COMPILER=${CMAKE_C_COMPILER} - -DCMAKE_CXX_FLAGS=${CMAKE_CXX_FLAGS} - -DCMAKE_CXX_FLAGS_RELEASE=${CMAKE_CXX_FLAGS_RELEASE} - -DCMAKE_CXX_FLAGS_DEBUG=${CMAKE_CXX_FLAGS_DEBUG} - -DCMAKE_C_FLAGS=${CMAKE_C_FLAGS} - -DCMAKE_C_FLAGS_DEBUG=${CMAKE_C_FLAGS_DEBUG} - -DCMAKE_C_FLAGS_RELEASE=${CMAKE_C_FLAGS_RELEASE} - -DCMAKE_INSTALL_PREFIX=${GTEST_INSTALL_DIR} - -DCMAKE_POSITION_INDEPENDENT_CODE=ON - -DBUILD_GMOCK=ON - -Dgtest_disable_pthreads=ON - -Dgtest_force_shared_crt=ON - -DCMAKE_BUILD_TYPE=${THIRD_PARTY_BUILD_TYPE} - ${EXTERNAL_OPTIONAL_ARGS} - CMAKE_CACHE_ARGS -DCMAKE_INSTALL_PREFIX:PATH=${GTEST_INSTALL_DIR} - -DCMAKE_POSITION_INDEPENDENT_CODE:BOOL=ON - -DCMAKE_BUILD_TYPE:STRING=${THIRD_PARTY_BUILD_TYPE} - ) - - ADD_LIBRARY(gtest STATIC IMPORTED GLOBAL) - SET_PROPERTY(TARGET gtest PROPERTY IMPORTED_LOCATION ${GTEST_LIBRARIES}) - ADD_DEPENDENCIES(gtest extern_gtest) - - ADD_LIBRARY(gtest_main STATIC IMPORTED GLOBAL) - SET_PROPERTY(TARGET gtest_main PROPERTY IMPORTED_LOCATION ${GTEST_MAIN_LIBRARIES}) - ADD_DEPENDENCIES(gtest_main extern_gtest) - - LIST(APPEND external_project_dependencies gtest gtest_main) -ENDIF(WITH_TESTING OR (WITH_DISTRIBUTE AND NOT WITH_GRPC)) diff --git a/cmake/external/protobuf.cmake b/cmake/external/protobuf.cmake index e05b7694ddf1e1652b00f156cde1a2d433c9fc46..0422e5fed94d69f2f86139d9f229030450f638bc 100644 --- a/cmake/external/protobuf.cmake +++ b/cmake/external/protobuf.cmake @@ -122,6 +122,15 @@ macro(PROMPT_PROTOBUF_LIB) # make `protobuf_generate_cpp` happy. SET(Protobuf_PROTOC_EXECUTABLE ${PROTOBUF_PROTOC_EXECUTABLE}) + # For CMake 3.9 and above + if(NOT TARGET protobuf::protoc) + add_executable(protobuf::protoc IMPORTED) + if(EXISTS "${Protobuf_PROTOC_EXECUTABLE}") + set_target_properties(protobuf::protoc PROPERTIES + IMPORTED_LOCATION "${Protobuf_PROTOC_EXECUTABLE}") + endif() + endif() + FOREACH(dep ${protobuf_DEPS}) ADD_DEPENDENCIES(protobuf ${dep}) ADD_DEPENDENCIES(protobuf_lite ${dep}) diff --git a/cmake/external/redis++.cmake b/cmake/external/redis++.cmake new file mode 100644 index 0000000000000000000000000000000000000000..1809035f5cacc4507dade593a58b274b322698d7 --- /dev/null +++ b/cmake/external/redis++.cmake @@ -0,0 +1,45 @@ +# Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +INCLUDE(ExternalProject) + +SET(REDISCLIENT_SOURCES_DIR ${THIRD_PARTY_PATH}/redis++) +SET(REDISCLIENT_INSTALL_DIR ${THIRD_PARTY_PATH}/install/redis++) +SET(REDISCLIENT_INCLUDE_DIR "${REDISCLIENT_INSTALL_DIR}/include" CACHE PATH "redis++ include directory." FORCE) +SET(REDISCLIENT_LIBRARIES "${REDISCLIENT_INSTALL_DIR}/lib/libredis++.a" CACHE FILEPATH "redis++ library." FORCE) +INCLUDE_DIRECTORIES(${REDISCLIENT_INCLUDE_DIR}) + +ExternalProject_Add( + extern_redis++ + ${EXTERNAL_PROJECT_LOG_ARGS} + PREFIX ${REDISCLIENT_SOURCES_DIR} + GIT_REPOSITORY "https://github.com/sewenew/redis-plus-plus" + GIT_TAG master + UPDATE_COMMAND "" + CONFIGURE_COMMAND "" + BUILD_COMMAND cmake . && CXXFLAGS=-fPIC make -j ${NUM_OF_PROCESSOR} static + INSTALL_COMMAND mkdir -p ${REDISCLIENT_INSTALL_DIR}/lib/ + && cp ${REDISCLIENT_SOURCES_DIR}/src/extern_redis++/lib/libredis++.a ${REDISCLIENT_LIBRARIES} + && cp -r ${REDISCLIENT_SOURCES_DIR}/src/extern_redis++/src/sw/redis++/ ${REDISCLIENT_INSTALL_DIR}/include/redis++/ + BUILD_IN_SOURCE 1 +) + +ADD_DEPENDENCIES(extern_redis++ snappy) + +ADD_LIBRARY(redis++ STATIC IMPORTED GLOBAL) +SET_PROPERTY(TARGET redis++ PROPERTY IMPORTED_LOCATION ${ROCKSDB_LIBRARIES}) +ADD_DEPENDENCIES(redis++ extern_redis++) + +LIST(APPEND external_project_dependencies redis++) + diff --git a/cmake/external/rocksdb.cmake b/cmake/external/rocksdb.cmake new file mode 100644 index 0000000000000000000000000000000000000000..ae1d590634078a71c3d2dd082c2b6475bf22cfe8 --- /dev/null +++ b/cmake/external/rocksdb.cmake @@ -0,0 +1,45 @@ +# Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +INCLUDE(ExternalProject) + +SET(ROCKSDB_SOURCES_DIR ${THIRD_PARTY_PATH}/rocksdb) +SET(ROCKSDB_INSTALL_DIR ${THIRD_PARTY_PATH}/install/rocksdb) +SET(ROCKSDB_INCLUDE_DIR "${ROCKSDB_INSTALL_DIR}/include" CACHE PATH "rocksdb include directory." FORCE) +SET(ROCKSDB_LIBRARIES "${ROCKSDB_INSTALL_DIR}/lib/librocksdb.a" CACHE FILEPATH "rocksdb library." FORCE) +INCLUDE_DIRECTORIES(${ROCKSDB_INCLUDE_DIR}) + +ExternalProject_Add( + extern_rocksdb + ${EXTERNAL_PROJECT_LOG_ARGS} + PREFIX ${ROCKSDB_SOURCES_DIR} + GIT_REPOSITORY "https://github.com/facebook/rocksdb" + GIT_TAG 6.2.fb + UPDATE_COMMAND "" + CONFIGURE_COMMAND "" + BUILD_COMMAND CXXFLAGS=-fPIC make -j ${NUM_OF_PROCESSOR} static_lib + INSTALL_COMMAND mkdir -p ${ROCKSDB_INSTALL_DIR}/lib/ + && cp ${ROCKSDB_SOURCES_DIR}/src/extern_rocksdb/librocksdb.a ${ROCKSDB_LIBRARIES} + && cp -r ${ROCKSDB_SOURCES_DIR}/src/extern_rocksdb/include ${ROCKSDB_INSTALL_DIR}/ + BUILD_IN_SOURCE 1 +) + +ADD_DEPENDENCIES(extern_rocksdb snappy) + +ADD_LIBRARY(rocksdb STATIC IMPORTED GLOBAL) +SET_PROPERTY(TARGET rocksdb PROPERTY IMPORTED_LOCATION ${ROCKSDB_LIBRARIES}) +ADD_DEPENDENCIES(rocksdb extern_rocksdb) + +LIST(APPEND external_project_dependencies rocksdb) + diff --git a/demo-client/CMakeLists.txt b/demo-client/CMakeLists.txt index 5e7208090ca4c47f724be38e92b8685684367501..d56bd9687755167efd8e00df3eb997ca1127bcd9 100644 --- a/demo-client/CMakeLists.txt +++ b/demo-client/CMakeLists.txt @@ -30,6 +30,11 @@ target_link_libraries(echo -Wl,--whole-archive sdk-cpp -Wl,--no-whole-archive -lpthread -lcrypto -lm -lrt -lssl -ldl -lz) +add_executable(echo_kvdb ${CMAKE_CURRENT_LIST_DIR}/src/echo_kvdb.cpp) +target_link_libraries(echo_kvdb -Wl,--whole-archive sdk-cpp -Wl,--no-whole-archive + -lpthread -lcrypto -lm -lrt -lssl -ldl + -lz) + add_executable(dense_format ${CMAKE_CURRENT_LIST_DIR}/src/dense_format.cpp) target_link_libraries(dense_format -Wl,--whole-archive sdk-cpp -Wl,--no-whole-archive -lpthread -lcrypto -lm -lrt -lssl -ldl -lz) @@ -69,6 +74,11 @@ install(TARGETS echo install(DIRECTORY ${CMAKE_CURRENT_LIST_DIR}/conf DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/demo/client/echo/) +install(TARGETS echo_kvdb + RUNTIME DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/demo/client/echo_kvdb/bin) +install(DIRECTORY ${CMAKE_CURRENT_LIST_DIR}/conf DESTINATION + ${PADDLE_SERVING_INSTALL_DIR}/demo/client/echo_kvdb/) + install(TARGETS dense_format RUNTIME DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/demo/client/dense_format/bin) diff --git a/demo-client/conf/predictors.prototxt b/demo-client/conf/predictors.prototxt index da59ad7be3779cd55d3f9b879a1663dbebfa9c93..45ecbc3e7613a1c2cdaddc636d3d13a5c03e5503 100644 --- a/demo-client/conf/predictors.prototxt +++ b/demo-client/conf/predictors.prototxt @@ -109,3 +109,18 @@ predictors { } } } + +predictors { + name: "echo_kvdb_service" + service_name: "baidu.paddle_serving.predictor.echo_kvdb_service.EchoKVDBService" + endpoint_router: "WeightedRandomRender" + weighted_random_render_conf { + variant_weight_list: "50" + } + variants { + tag: "var1" + naming_conf { + cluster: "list://127.0.0.1:8010" + } + } +} diff --git a/demo-client/src/echo_kvdb.cpp b/demo-client/src/echo_kvdb.cpp new file mode 100644 index 0000000000000000000000000000000000000000..ae53f8838bee596e63fc3c6bbc7007fe5cf7a0ea --- /dev/null +++ b/demo-client/src/echo_kvdb.cpp @@ -0,0 +1,155 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include + +#include +#include "sdk-cpp/builtin_format.pb.h" +#include "sdk-cpp/echo_kvdb_service.pb.h" +#include "sdk-cpp/include/common.h" +#include "sdk-cpp/include/predictor_sdk.h" +#include +using baidu::paddle_serving::sdk_cpp::Predictor; +using baidu::paddle_serving::sdk_cpp::PredictorApi; +using baidu::paddle_serving::predictor::echo_kvdb_service::Request; +using baidu::paddle_serving::predictor::echo_kvdb_service::Response; +using baidu::paddle_serving::predictor::format::KVDBReq; +using baidu::paddle_serving::predictor::format::KVDBRes; +int global_key = 0; +int create_req(Request& req) { // NOLINT + KVDBReq* key = req.mutable_reqs()->Add(); + key->set_op("SET"); + key->set_key(std::to_string(global_key)); + key->set_value(std::to_string(global_key * 1)); + + key = req.mutable_reqs()->Add(); + key->set_op("GET"); + key->set_key(std::to_string(global_key)); + global_key++; + return 0; +} + +void print_res(const Request& req, + const Response& res, + std::string route_tag, + uint64_t elapse_ms) { + LOG(INFO) << "Receive Response size: " << res.ress_size(); + for (size_t i = 0; i < res.ress_size(); i++) { + KVDBRes val = res.ress(i); + LOG(INFO) << "Receive value from demo-server: " << val.value(); +} + LOG(INFO) << "Succ call predictor[echo_kvdb_service], the tag is: " << route_tag + << ", elapse_ms: " << elapse_ms; +} + +int main(int argc, char** argv) { + PredictorApi api; + +// initialize logger instance +#ifdef BCLOUD + logging::LoggingSettings settings; + settings.logging_dest = logging::LOG_TO_FILE; + + std::string filename(argv[0]); + filename = filename.substr(filename.find_last_of('/') + 1); + settings.log_file = (std::string("./log/") + filename + ".log").c_str(); + settings.delete_old = logging::DELETE_OLD_LOG_FILE; + logging::InitLogging(settings); + + logging::ComlogSinkOptions cso; + cso.process_name = filename; + cso.enable_wf_device = true; + logging::ComlogSink::GetInstance()->Setup(&cso); +#else + struct stat st_buf; + int ret = 0; + if ((ret = stat("./log", &st_buf)) != 0) { + mkdir("./log", 0777); + ret = stat("./log", &st_buf); + if (ret != 0) { + LOG(WARNING) << "Log path ./log not exist, and create fail"; + return -1; + } + } + FLAGS_log_dir = "./log"; + google::InitGoogleLogging(strdup(argv[0])); +#endif + + if (api.create("./conf", "predictors.prototxt") != 0) { + LOG(ERROR) << "Failed create predictors api!"; + return -1; + } + + Request req; + Response res; + + api.thrd_initialize(); + + while (true) { + if (global_key > 10000) { + break; + } + timeval start; + gettimeofday(&start, NULL); + + api.thrd_clear(); + + Predictor* predictor = api.fetch_predictor("echo_kvdb_service"); + if (!predictor) { + LOG(ERROR) << "Failed fetch predictor: echo_kvdb_service"; + return -1; + } + + req.Clear(); + res.Clear(); + + if (create_req(req) != 0) { + return -1; + } + + butil::IOBufBuilder debug_os; + if (predictor->debug(&req, &res, &debug_os) != 0) { + LOG(ERROR) << "failed call predictor with req:" << req.ShortDebugString(); + return -1; + } + + butil::IOBuf debug_buf; + debug_os.move_to(debug_buf); + LOG(INFO) << "Debug string: " << debug_buf; + + timeval end; + gettimeofday(&end, NULL); + + uint64_t elapse_ms = (end.tv_sec * 1000 + end.tv_usec / 1000) - + (start.tv_sec * 1000 + start.tv_usec / 1000); + + print_res(req, res, predictor->tag(), elapse_ms); + res.Clear(); + + usleep(50); + } // while (true) + + api.thrd_finalize(); + api.destroy(); + +#ifndef BCLOUD + google::ShutdownGoogleLogging(); +#endif + + return 0; +} + +/* vim: set expandtab ts=4 sw=4 sts=4 tw=100: */ diff --git a/demo-serving/CMakeLists.txt b/demo-serving/CMakeLists.txt index 238fadf8934ca6bc8051e3f2d662c6fbe99170db..b90d394b8f71e9abcea2e861040ebe60c2d8a1a4 100644 --- a/demo-serving/CMakeLists.txt +++ b/demo-serving/CMakeLists.txt @@ -12,6 +12,7 @@ if (NOT EXISTS ) endif() +include_directories(SYSTEM ${CMAKE_CURRENT_LIST_DIR}/../kvdb/include) find_library(MKLML_LIBS NAMES libmklml_intel.so libiomp5.so) include(op/CMakeLists.txt) include(proto/CMakeLists.txt) @@ -26,13 +27,27 @@ target_include_directories(serving PUBLIC ) if(WITH_GPU) - target_link_libraries(serving ${CUDA_LIBRARIES} -Wl,--whole-archive fluid_gpu_engine + target_link_libraries(serving -Wl,--whole-archive fluid_gpu_engine -Wl,--no-whole-archive) endif() + +target_link_libraries(serving -Wl,--whole-archive fluid_cpu_engine + -Wl,--no-whole-archive) + +target_link_libraries(serving paddle_fluid ${paddle_depend_libs}) + target_link_libraries(serving opencv_imgcodecs - ${opencv_depend_libs} -Wl,--whole-archive fluid_cpu_engine - -Wl,--no-whole-archive pdserving paddle_fluid ${paddle_depend_libs} - ${MKLML_LIB} ${MKLML_IOMP_LIB} -lpthread -lcrypto -lm -lrt -lssl -ldl -lz) + ${opencv_depend_libs}) + +target_link_libraries(serving pdserving) + +target_link_libraries(serving kvdb rocksdb) + +if(WITH_GPU) + target_link_libraries(serving ${CUDA_LIBRARIES}) +endif() +target_link_libraries(serving ${MKLML_LIB} ${MKLML_IOMP_LIB} -lpthread + -lcrypto -lm -lrt -lssl -ldl -lz -lbz2) install(TARGETS serving RUNTIME DESTINATION diff --git a/demo-serving/conf/service.prototxt b/demo-serving/conf/service.prototxt index b9b2c52c325df4618935c8930af043abfe98b31f..3f98211a94c0abf1c5d4418c2337480ee5da362e 100644 --- a/demo-serving/conf/service.prototxt +++ b/demo-serving/conf/service.prototxt @@ -27,3 +27,7 @@ services { workflows: "workflow6" } +services { + name: "EchoKVDBService" + workflows: "workflow7" +} diff --git a/demo-serving/conf/workflow.prototxt b/demo-serving/conf/workflow.prototxt index 3353865442def6f8186c7dfb8672e17da147b5a3..afd0db676218558ff03a7079810b5e46db7c0c4a 100644 --- a/demo-serving/conf/workflow.prototxt +++ b/demo-serving/conf/workflow.prototxt @@ -67,4 +67,11 @@ workflows { type: "TextClassificationOp" } } - +workflows { + name: "workflow7" + workflow_type: "Sequence" + nodes { + name: "echo_kvdb_service_op" + type: "KVDBEchoOp" + } +} diff --git a/demo-serving/op/kvdb_echo_op.cpp b/demo-serving/op/kvdb_echo_op.cpp new file mode 100644 index 0000000000000000000000000000000000000000..1c4d013a815a9708fae0384873952d625ecb32be --- /dev/null +++ b/demo-serving/op/kvdb_echo_op.cpp @@ -0,0 +1,59 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "demo-serving/op/kvdb_echo_op.h" + +namespace baidu { +namespace paddle_serving { +namespace predictor { + +using baidu::paddle_serving::predictor::format::KVDBReq; +using baidu::paddle_serving::predictor::format::KVDBRes; +using baidu::paddle_serving::predictor::echo_kvdb_service::Request; +using baidu::paddle_serving::predictor::echo_kvdb_service::Response; + +int KVDBEchoOp::inference() { + debug(); +} +int KVDBEchoOp::debug() { + //TODO: implement DEBUG mode + baidu::paddle_serving::predictor::Resource& resource = baidu::paddle_serving::predictor::Resource::instance(); + std::shared_ptr db = resource.getDB(); + const Request* req = dynamic_cast(get_request_message()); + Response* res = mutable_data(); + LOG(INFO) << "Receive request in KVDB echo service: " << req->ShortDebugString(); + for (size_t i = 0; i < req->reqs_size(); i++) { + auto kvdbreq = req->reqs(i); + std::string op = kvdbreq.op(); + std::string key = kvdbreq.key(); + std::string val = kvdbreq.value(); + if (op == "SET") { + db->Put(key, val); + KVDBRes* kvdb_value_res = res->mutable_ress()->Add(); + kvdb_value_res -> set_value("OK"); + } else if (op == "GET") { + std::string getvalue = db->Get(key); + KVDBRes* kvdb_value_res = res->mutable_ress()->Add(); + kvdb_value_res -> set_value(getvalue); + } + } + return 0; +} + + + +DEFINE_OP(KVDBEchoOp); +} +} +} diff --git a/demo-serving/op/kvdb_echo_op.h b/demo-serving/op/kvdb_echo_op.h new file mode 100644 index 0000000000000000000000000000000000000000..7c7502a11716fc3cf64e059481127f09f8bbb0ea --- /dev/null +++ b/demo-serving/op/kvdb_echo_op.h @@ -0,0 +1,37 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once +#include "demo-serving/echo_kvdb_service.pb.h" +#include "predictor/framework/resource.h" +#include "predictor/common/inner_common.h" +#include "predictor/framework/channel.h" +#include "predictor/framework/op_repository.h" +#include "predictor/op/op.h" +#include "kvdb/paddle_rocksdb.h" + +namespace baidu { +namespace paddle_serving { +namespace predictor { + +class KVDBEchoOp: public OpWithChannel { + public: + DECLARE_OP(KVDBEchoOp); + int inference(); + int debug(); + +}; +} // namespace predictor +} // namespace paddle_serving +} // namespace baidu diff --git a/demo-serving/proto/CMakeLists.txt b/demo-serving/proto/CMakeLists.txt index 5b7b7cb37063421462ae525aa5741ed5d91f0be1..2edd5e9eacde35b2add7fd11f676662e4c218e45 100644 --- a/demo-serving/proto/CMakeLists.txt +++ b/demo-serving/proto/CMakeLists.txt @@ -3,6 +3,7 @@ LIST(APPEND protofiles ${CMAKE_CURRENT_LIST_DIR}/dense_service.proto ${CMAKE_CURRENT_LIST_DIR}/sparse_service.proto ${CMAKE_CURRENT_LIST_DIR}/echo_service.proto + ${CMAKE_CURRENT_LIST_DIR}/echo_kvdb_service.proto ${CMAKE_CURRENT_LIST_DIR}/int64tensor_service.proto ${CMAKE_CURRENT_LIST_DIR}/text_classification.proto ) diff --git a/demo-serving/proto/echo_kvdb_service.proto b/demo-serving/proto/echo_kvdb_service.proto new file mode 100644 index 0000000000000000000000000000000000000000..58ffb21bd870185bc84df566fb57b58a63f59c49 --- /dev/null +++ b/demo-serving/proto/echo_kvdb_service.proto @@ -0,0 +1,34 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto2"; +import "pds_option.proto"; +import "builtin_format.proto"; +package baidu.paddle_serving.predictor.echo_kvdb_service; + +option cc_generic_services = true; + +message Request { + repeated baidu.paddle_serving.predictor.format.KVDBReq reqs = 1; +}; + +message Response { + repeated baidu.paddle_serving.predictor.format.KVDBRes ress = 1; +}; + +service EchoKVDBService { + rpc inference(Request) returns (Response); + rpc debug(Request) returns (Response); + option (pds.options).generate_impl = true; +}; diff --git a/kvdb/CMakeLists.txt b/kvdb/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..51470669e0d69fa909c15890c2e52bfc4bc9a100 --- /dev/null +++ b/kvdb/CMakeLists.txt @@ -0,0 +1,53 @@ + +include_directories(SYSTEM ${CMAKE_CURRENT_LIST_DIR}/include) + +set(SRC_LIST ${CMAKE_CURRENT_LIST_DIR}/src/test_rocksdb.cpp + ${CMAKE_CURRENT_LIST_DIR}/src/rockskvdb_impl.cpp + ${CMAKE_CURRENT_LIST_DIR}/src/param_dict_mgr_impl.cpp + ${CMAKE_CURRENT_LIST_DIR}/src/mock_param_dict_impl.cpp + ${CMAKE_CURRENT_LIST_DIR}/src/paddle_rocksdb.cpp + ${CMAKE_CURRENT_LIST_DIR}/src/gtest_kvdb.cpp) + +add_library(kvdb ${SRC_LIST}) +install(TARGETS kvdb ARCHIVE DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/lib/) + +add_executable(kvdb_test ${SRC_LIST}) +add_dependencies(kvdb_test rocksdb) +target_link_libraries(kvdb_test rocksdb bz2 snappy zlib gtest) +#target_include_directories(kvdb_test PUBLIC ${CMAKE_CURRENT_LIST_DIR}/include/kvdb) + +set(SRC_LIST2 ${CMAKE_CURRENT_LIST_DIR}/src/gtest_db_thread.cpp + ${CMAKE_CURRENT_LIST_DIR}/src/rockskvdb_impl.cpp + ${CMAKE_CURRENT_LIST_DIR}/src/param_dict_mgr_impl.cpp + ${CMAKE_CURRENT_LIST_DIR}/src/mock_param_dict_impl.cpp + ${CMAKE_CURRENT_LIST_DIR}/src/paddle_rocksdb.cpp) + + +add_executable(db_thread ${SRC_LIST2}) +add_dependencies(db_thread rocksdb) +target_link_libraries(db_thread rocksdb bz2 snappy zlib gtest) +#target_include_directories(db_thread PUBLIC ${CMAKE_CURRENT_LIST_DIR}/include/kvdb) + + +set(SRC_LIST3 ${CMAKE_CURRENT_LIST_DIR}/src/gtest_db_func.cpp + ${CMAKE_CURRENT_LIST_DIR}/src/rockskvdb_impl.cpp + ${CMAKE_CURRENT_LIST_DIR}/src/param_dict_mgr_impl.cpp + ${CMAKE_CURRENT_LIST_DIR}/src/mock_param_dict_impl.cpp + ${CMAKE_CURRENT_LIST_DIR}/src/paddle_rocksdb.cpp) + +add_executable(db_func ${SRC_LIST3}) +add_dependencies(db_func rocksdb) +target_link_libraries(db_func rocksdb bz2 snappy zlib gtest) +#target_include_directories(db_func PUBLIC ${CMAKE_CURRENT_LIST_DIR}/include/kvdb) + +install(TARGETS kvdb_test + RUNTIME DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/demo/kvdb_test) +install(TARGETS db_thread + RUNTIME DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/demo/db_thread) +install(TARGETS db_func + RUNTIME DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/demo/db_func) + + +file(GLOB kvdb_headers "${CMAKE_CURRENT_LIST_DIR}/include/kvdb/*.h") +install(FILES ${kvdb_headers} DESTINATION + ${PADDLE_SERVING_INSTALL_DIR}/include/kvdb/) diff --git a/kvdb/include/kvdb/kvdb_impl.h b/kvdb/include/kvdb/kvdb_impl.h new file mode 100644 index 0000000000000000000000000000000000000000..b610120afc15314a46134a88db1c5979ba6b1287 --- /dev/null +++ b/kvdb/include/kvdb/kvdb_impl.h @@ -0,0 +1,130 @@ + +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once +#include +#include +#include +#include +class AbstractKVDB; +class FileReader; +class ParamDict; + +typedef std::shared_ptr AbsKVDBPtr; +typedef std::shared_ptr FileReaderPtr; +typedef std::shared_ptr ParamDictPtr; + +class AbstractKVDB { +public: + virtual void CreateDB() = 0; + virtual void SetDBName(std::string) = 0; + virtual void Set(std::string key, std::string value) = 0; + virtual std::string Get(std::string key) = 0; + virtual ~AbstractKVDB() = 0; +}; + +// TODO: Implement RedisKVDB +//class RedisKVDB; + +class FileReader { +public: + inline virtual std::string GetFileName() { + return this->filename_; + } + + inline virtual void SetFileName(std::string filename) { + this->filename_ = filename; + this->last_md5_val_ = this->GetMD5(); + this->time_stamp_ = std::chrono::system_clock::now(); + } + + inline virtual std::string GetMD5() { + auto getCmdOut = [] (std::string cmd) { + std::string data; + FILE *stream = nullptr; + const int max_buffer = 256; + char buffer[max_buffer]; + cmd.append(" 2>&1"); + stream = popen(cmd.c_str(), "r"); + if (stream) { + if (fgets(buffer, max_buffer, stream) != NULL) { + data.append(buffer); + } + } + return data; + }; + std::string cmd = "md5sum " + this->filename_; + //TODO: throw exception if error occurs during execution of shell command + std::string md5val = getCmdOut(cmd); + this->time_stamp_ = md5val == this->last_md5_val_? this->time_stamp_: std::chrono::system_clock::now(); + this->last_md5_val_ = md5val; + return md5val; + } + + inline virtual bool CheckDiff() { + return this->GetMD5() == this->last_md5_val_; + } + + inline virtual std::chrono::system_clock::time_point GetTimeStamp() { + return this->time_stamp_; + } + + inline virtual ~FileReader() {}; +protected: + std::string filename_; + std::string last_md5_val_; + std::chrono::system_clock::time_point time_stamp_; +}; + + +class ParamDict { + typedef std::string Key; + typedef std::vector Value; +public: + virtual std::vector GetDictReaderLst(); + virtual void SetFileReaderLst(std::vector lst); + + virtual std::vector GetSparseValue(int64_t, int64_t); + virtual std::vector GetSparseValue(std::string, std::string); + + virtual bool InsertSparseValue(int64_t, int64_t, const std::vector&); + virtual bool InsertSparseValue(std::string, std::string, const std::vector&); + + virtual void SetReader(std::function(std::string)>); + virtual void UpdateBaseModel(); + virtual void UpdateDeltaModel(); + + virtual std::pair GetKVDB(); + virtual void SetKVDB(std::pair); + virtual void CreateKVDB(); + + virtual ~ParamDict(); +protected: + std::function(std::string)> read_func_; + std::vector file_reader_lst_; + AbsKVDBPtr front_db, back_db; +}; + + + +class ParamDictMgr { +public: + void UpdateAll(); + void InsertParamDict(std::string, ParamDictPtr); + +protected: + std::unordered_map ParamDictMap; +}; + diff --git a/kvdb/include/kvdb/paddle_rocksdb.h b/kvdb/include/kvdb/paddle_rocksdb.h new file mode 100644 index 0000000000000000000000000000000000000000..5ae1a9dd4a061f5dc57ad0d0c30762f7e2a1d3a0 --- /dev/null +++ b/kvdb/include/kvdb/paddle_rocksdb.h @@ -0,0 +1,40 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once +#include +#include +#include "rocksdb/db.h" +#include "rocksdb/options.h" +#include "rocksdb/slice.h" +#include "rocksdb/sst_file_writer.h" +#include "rocksdb/table.h" +#include "rocksdb/compaction_filter.h" +#include "rocksdb/filter_policy.h" + + +class RocksDBWrapper { +public: + RocksDBWrapper(std::string db_name); + std::string Get(std::string key); + + bool Put(std::string key, std::string value); + void SetDBName(std::string db_name); + static std::shared_ptr RocksDBWrapperFactory(std::string db_name = "SparseMatrix"); + +protected: + rocksdb::DB *db_; + std::string db_name_; +}; + diff --git a/kvdb/include/kvdb/rocksdb_impl.h b/kvdb/include/kvdb/rocksdb_impl.h new file mode 100644 index 0000000000000000000000000000000000000000..71adb23a5175d4b4542f71947f415b9313db74f9 --- /dev/null +++ b/kvdb/include/kvdb/rocksdb_impl.h @@ -0,0 +1,35 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once +#include "kvdb_impl.h" +#include "paddle_rocksdb.h" +class RocksKVDB: public AbstractKVDB { +public: + void CreateDB(); + void SetDBName(std::string); + void Set(std::string key, std::string value); + std::string Get(std::string key); + ~RocksKVDB(); + +protected: + std::shared_ptr db_; + +public: + static int db_count; +}; + + + + diff --git a/kvdb/src/gtest_db_func.cpp b/kvdb/src/gtest_db_func.cpp new file mode 100644 index 0000000000000000000000000000000000000000..f987c960b7165ebfc4b47fcfb1d669da59db00e8 --- /dev/null +++ b/kvdb/src/gtest_db_func.cpp @@ -0,0 +1,76 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "kvdb/rocksdb_impl.h" +#include "kvdb/kvdb_impl.h" +#include "kvdb/paddle_rocksdb.h" +#include +#include +#include +#include +#include +#include +class KVDBTest : public ::testing::Test { +protected: + void SetUp() override{ + + } + + static void SetUpTestCase() { + } + + +}; +int my_argc; +char** my_argv; + +std::vector StringSplit(std::string str, char split) { + std::vector strs; + std::istringstream f(str); + std:: string s; + while (getline(f, s, split)) { + strs.push_back(s); + } + return strs; +} + +TEST_F(KVDBTest, AbstractKVDB_Func_Test) { + AbsKVDBPtr kvdb = std::make_shared(); + kvdb->CreateDB(); + std::string set_list = "setlist.txt"; + std::string get_list = "getlist.txt"; + std::ifstream set_file(set_list); + std::ifstream get_file(get_list); + for (std::string line; getline(set_file, line); ) + { + std::vector strs = StringSplit (line, ' '); + kvdb->Set(strs[0], strs[1]); + } + + for (std::string line; getline(get_file, line); ) { + std::vector strs = StringSplit(line, ' '); + std::string val = kvdb->Get(strs[0]); + ASSERT_EQ(val, strs[1]); + } +} + + + +int main(int argc, char** argv) { + my_argc = argc; + my_argv = argv; + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + diff --git a/kvdb/src/gtest_db_thread.cpp b/kvdb/src/gtest_db_thread.cpp new file mode 100644 index 0000000000000000000000000000000000000000..2e8a7f5bbe46f732cdcdd336a728a4b6be5bbedb --- /dev/null +++ b/kvdb/src/gtest_db_thread.cpp @@ -0,0 +1,74 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "kvdb/rocksdb_impl.h" +#include "kvdb/kvdb_impl.h" +#include "kvdb/paddle_rocksdb.h" +#include +#include +#include +#include +#include +class KVDBTest : public ::testing::Test { +protected: + void SetUp() override{ + + } + + static void SetUpTestCase() { + + } +}; +int my_argc; +char** my_argv; + + +void db_thread_test(AbsKVDBPtr kvdb, int size) { + for (int i = 0; i < size; i++) { + kvdb->Set(std::to_string(i), std::to_string(i)); + kvdb->Get(std::to_string(i)); + } +} + +TEST_F(KVDBTest, AbstractKVDB_Thread_Test) { + if (my_argc != 3) { + std::cerr << "illegal input! should be db_thread ${num_of_thread} ${num_of_ops_each_thread}" << std::endl; + return; + } + int num_of_thread = atoi(my_argv[1]); + int nums_of_ops_each_thread = atoi(my_argv[2]); + std::vector kvdbptrs; + for (int i= 0; i < num_of_thread; i++) { + kvdbptrs.push_back(std::make_shared()); + kvdbptrs[i]->CreateDB(); + } + std::vector tarr; + for (int i = 0; i< num_of_thread; i++) { + tarr.push_back(std::thread(db_thread_test, kvdbptrs[i], nums_of_ops_each_thread)); + } + for (int i = 0; i< num_of_thread; i++) { + tarr[i].join(); + } + return; +} + + + +int main(int argc, char** argv) { + my_argc = argc; + my_argv = argv; + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + diff --git a/kvdb/src/gtest_kvdb.cpp b/kvdb/src/gtest_kvdb.cpp new file mode 100644 index 0000000000000000000000000000000000000000..4e7d0aca5ae778a17ed5a4040f6424ee96579c47 --- /dev/null +++ b/kvdb/src/gtest_kvdb.cpp @@ -0,0 +1,163 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "kvdb/rocksdb_impl.h" +#include "kvdb/kvdb_impl.h" +#include "kvdb/paddle_rocksdb.h" +#include +#include +#include +#include +#include +#include +class KVDBTest : public ::testing::Test { +protected: + void SetUp() override{ + + } + + static void SetUpTestCase() { + kvdb = std::make_shared(); + dict_reader = std::make_shared(); + param_dict = std::make_shared(); + } + + static AbsKVDBPtr kvdb; + static FileReaderPtr dict_reader; + static ParamDictPtr param_dict; + static ParamDictMgr dict_mgr; + +}; +AbsKVDBPtr KVDBTest::kvdb; +FileReaderPtr KVDBTest::dict_reader; +ParamDictPtr KVDBTest::param_dict; +ParamDictMgr KVDBTest::dict_mgr; + +void GenerateTestIn(std::string); +void UpdateTestIn(std::string); + +TEST_F(KVDBTest, AbstractKVDB_Unit_Test) { + kvdb->CreateDB(); + kvdb->SetDBName("test_kvdb"); + for (int i = 0; i < 100; i++) { + kvdb->Set(std::to_string(i), std::to_string(i * 2)); + } + for (int i = 0; i < 100; i++) { + std::string val = kvdb->Get(std::to_string(i)); + ASSERT_EQ(val, std::to_string(i * 2)); + } +} + +TEST_F(KVDBTest, FileReader_Unit_Test) { + std::string test_in_filename = "abs_dict_reader_test_in.txt"; + GenerateTestIn(test_in_filename); + dict_reader->SetFileName(test_in_filename); + + std::string md5_1 = dict_reader->GetMD5(); + std::chrono::system_clock::time_point timestamp_1 = dict_reader->GetTimeStamp(); + + std::string md5_2 = dict_reader->GetMD5(); + std::chrono::system_clock::time_point timestamp_2 = dict_reader->GetTimeStamp(); + + ASSERT_EQ(md5_1, md5_2); + ASSERT_EQ(timestamp_1, timestamp_2); + + UpdateTestIn(test_in_filename); + + std::string md5_3 = dict_reader->GetMD5(); + std::chrono::system_clock::time_point timestamp_3 = dict_reader->GetTimeStamp(); + + ASSERT_NE(md5_2, md5_3); + ASSERT_NE(timestamp_2, timestamp_3); +} +#include +TEST_F(KVDBTest, ParamDict_Unit_Test) { + std::string test_in_filename = "abs_dict_reader_test_in.txt"; + param_dict->SetFileReaderLst({test_in_filename}); + param_dict->SetReader( + [] (std::string text) { + auto split = [](const std::string& s, + std::vector& sv, + const char* delim = " ") { + sv.clear(); + char* buffer = new char[s.size() + 1]; + std::copy(s.begin(), s.end(), buffer); + char* p = strtok(buffer, delim); + do { + sv.push_back(p); + } while ((p = strtok(NULL, delim))); + return; + }; + std::vector text_split; + split(text, text_split, " "); + std::string key = text_split[0]; + text_split.erase(text_split.begin()); + return make_pair(key, text_split); + }); + param_dict->CreateKVDB(); + GenerateTestIn(test_in_filename); + + param_dict->UpdateBaseModel(); + + std::this_thread::sleep_for(std::chrono::seconds(2)); + + std::vector test_vec = param_dict->GetSparseValue("1", ""); + + ASSERT_LT(fabs(test_vec[0] - 1.0), 1e-2); + + UpdateTestIn(test_in_filename); + param_dict->UpdateDeltaModel(); +} + +void GenerateTestIn(std::string filename) { + std::ifstream in_file(filename); + if (in_file.good()) { + in_file.close(); + std::string cmd = "rm -rf "+ filename; + system(cmd.c_str()); + } + std::ofstream out_file(filename); + for (size_t i = 0; i < 100000; i++) { + out_file << i << " " << i << " "; + for (size_t j = 0; j < 3; j++) { + out_file << i << " "; + } + out_file << std::endl; + } + out_file.close(); +} + +void UpdateTestIn(std::string filename) { + std::ifstream in_file(filename); + if (in_file.good()) { + in_file.close(); + std::string cmd = "rm -rf " + filename; + system(cmd.c_str()); + } + std::ofstream out_file(filename); + for (size_t i = 0; i < 10000; i++) { + out_file << i << " " << i << " "; + for (size_t j = 0; j < 3; j++) { + out_file << i + 1 << " "; + } + out_file << std::endl; + } + out_file.close(); +} + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + diff --git a/kvdb/src/mock_param_dict_impl.cpp b/kvdb/src/mock_param_dict_impl.cpp new file mode 100644 index 0000000000000000000000000000000000000000..e1763b2b6bd1b668a3a2d3baa25ccdbbd63e95fe --- /dev/null +++ b/kvdb/src/mock_param_dict_impl.cpp @@ -0,0 +1,150 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "kvdb/rocksdb_impl.h" +#include +#include +#include +#include +#include + +std::vector ParamDict::GetDictReaderLst() { + return this->file_reader_lst_; +} + +void ParamDict::SetFileReaderLst(std::vector lst) { + for (size_t i = 0; i < lst.size(); i++) { + FileReaderPtr fr = std::make_shared(); + fr->SetFileName(lst[i]); + this->file_reader_lst_.push_back(fr); + } +} + +std::vector ParamDict::GetSparseValue(std::string feasign, std::string slot) { + auto BytesToFloat = [](uint8_t* byteArray){ + return *((float*)byteArray); + }; + //TODO: the concatation of feasign and slot is TBD. + std::string result = front_db->Get(feasign + slot); + std::vector value; + if (result == "NOT_FOUND") + return value; + uint8_t* raw_values_ptr = reinterpret_cast(&result[0]); + for (size_t i = 0; i < result.size(); i += 4) { + float temp = BytesToFloat(raw_values_ptr + i); + value.push_back(temp); + } + return value; +} + +void ParamDict::SetReader(std::function(std::string)> func) { + read_func_ = func; +} + +std::vector ParamDict::GetSparseValue(int64_t feasign, int64_t slot) { + return this->GetSparseValue(std::to_string(feasign), std::to_string(slot)); +} + +bool ParamDict::InsertSparseValue(int64_t feasign, int64_t slot, const std::vector& values) { + return this->InsertSparseValue(std::to_string(feasign), std::to_string(slot), values); +} + +bool ParamDict::InsertSparseValue(std::string feasign, std::string slot, const std::vector& values) { + auto FloatToBytes = [](float fvalue, uint8_t *arr){ + unsigned char *pf = nullptr; + unsigned char *px = nullptr; + unsigned char i = 0; + pf =(unsigned char *)&fvalue; + px = arr; + for (i = 0; i < 4; i++) + { + *(px+i)=*(pf+i); + } + }; + + std::string key = feasign + slot; + uint8_t* values_ptr = new uint8_t[values.size() * 4]; + std::string value; + for (size_t i = 0; i < values.size(); i++) { + FloatToBytes(values[i], values_ptr + 4 * i); + } + char* raw_values_ptr = reinterpret_cast(values_ptr); + for (size_t i = 0; i < values.size()*4; i++) { + value.push_back(raw_values_ptr[i]); + } + back_db->Set(key, value); +//TODO: change stateless to stateful + return true; +} + +void ParamDict::UpdateBaseModel() { + auto is_number = [] (const std::string& s) + { + return !s.empty() && std::find_if(s.begin(), + s.end(), [](char c) { return !std::isdigit(c); }) == s.end(); + }; + std::thread t([&] () { + for (FileReaderPtr file_reader: this->file_reader_lst_) { + std::string line; + std::ifstream infile(file_reader->GetFileName()); + if (infile.is_open()) { + while (getline(infile, line)) { + std::pair kvpair = read_func_(line); + std::vector nums; + for (size_t i = 0; i < kvpair.second.size(); i++) { + if (is_number(kvpair.second[i])) { + nums.push_back(std::stof(kvpair.second[i])); + } + } + this->InsertSparseValue(kvpair.first, "", nums); + } + } + infile.close(); + } + AbsKVDBPtr temp = front_db; + front_db = back_db; + back_db = temp; + }); + t.detach(); +} + + +void ParamDict::UpdateDeltaModel() { + UpdateBaseModel(); +} + +std::pair ParamDict::GetKVDB() { + return {front_db, back_db}; +} + +void ParamDict::SetKVDB(std::pair kvdbs) { + this->front_db = kvdbs.first; + this->back_db = kvdbs.second; +} + +void ParamDict::CreateKVDB() { + this->front_db = std::make_shared(); + this->back_db = std::make_shared(); + this->front_db->CreateDB(); + this->back_db->CreateDB(); +} + +ParamDict::~ParamDict() { + +} + + + + + diff --git a/kvdb/src/paddle_rocksdb.cpp b/kvdb/src/paddle_rocksdb.cpp new file mode 100644 index 0000000000000000000000000000000000000000..55728ba65285b04ed09708ffc1de179522d3f235 --- /dev/null +++ b/kvdb/src/paddle_rocksdb.cpp @@ -0,0 +1,53 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "kvdb/paddle_rocksdb.h" + +RocksDBWrapper::RocksDBWrapper(std::string db_name) { + rocksdb::Options options; + options.create_if_missing = true; + db_name_ = db_name; + db_ = nullptr; + rocksdb::Status s = rocksdb::DB::Open(options, db_name, &db_); + return; +} + +std::string RocksDBWrapper::Get(std::string key) { + rocksdb::ReadOptions options; + options.verify_checksums = true; + std::string result; + rocksdb::Status s = db_->Get(options, key, &result); + if (s.IsNotFound()) { + result = "NOT_FOUND"; + } + return result; +} + +bool RocksDBWrapper::Put(std::string key, std::string value) { + rocksdb::WriteOptions options; + rocksdb::Status s = db_->Put(options, key, value); + if (s.ok()) { + return true; + } else { + return false; + } +} + +void RocksDBWrapper::SetDBName(std::string db_name) { + this->db_name_ = db_name; +} + +std::shared_ptr RocksDBWrapper::RocksDBWrapperFactory(std::string db_name) { + return std::make_shared(db_name); +} diff --git a/kvdb/src/param_dict_mgr_impl.cpp b/kvdb/src/param_dict_mgr_impl.cpp new file mode 100644 index 0000000000000000000000000000000000000000..67fd8744fe8dfb0ebe028650506432e5f5ac3186 --- /dev/null +++ b/kvdb/src/param_dict_mgr_impl.cpp @@ -0,0 +1,28 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "kvdb/kvdb_impl.h" + +void ParamDictMgr::UpdateAll() { + for (auto it = this->ParamDictMap.begin(); it!= this->ParamDictMap.end(); ++it) { + it->second->UpdateBaseModel(); + } + +} + +void ParamDictMgr::InsertParamDict(std::string key, ParamDictPtr value) { + this->ParamDictMap.insert(std::make_pair(key, value)); +} + +AbstractKVDB::~AbstractKVDB() {} diff --git a/kvdb/src/rockskvdb_impl.cpp b/kvdb/src/rockskvdb_impl.cpp new file mode 100644 index 0000000000000000000000000000000000000000..08371abf337878414c1617aad5bbb2ac63aa0a11 --- /dev/null +++ b/kvdb/src/rockskvdb_impl.cpp @@ -0,0 +1,44 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "kvdb/rocksdb_impl.h" + +int RocksKVDB::db_count; +void RocksKVDB::CreateDB() { + this->db_ = RocksDBWrapper::RocksDBWrapperFactory("RocksDB_" + std::to_string(RocksKVDB::db_count)); + RocksKVDB::db_count ++; + return; +} + +void RocksKVDB::SetDBName(std::string db_name) { + this->db_->SetDBName(db_name); + return; +} + +void RocksKVDB::Set(std::string key, std::string value) { + this->db_->Put(key, value); + return; +} + +std::string RocksKVDB::Get(std::string key) { + return this->db_->Get(key); +} + +RocksKVDB::~RocksKVDB() { + +} + + + + diff --git a/kvdb/src/test_rocksdb.cpp b/kvdb/src/test_rocksdb.cpp new file mode 100644 index 0000000000000000000000000000000000000000..ca1d714ac412e63d0f1c1e5e0b1ac0c75efc618a --- /dev/null +++ b/kvdb/src/test_rocksdb.cpp @@ -0,0 +1,48 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "kvdb/rocksdb_impl.h" +#include "kvdb/paddle_rocksdb.h" +#include +void test_rockskvdb() { + RocksKVDB db; + db.CreateDB(); + db.SetDBName("Sparse Matrix"); + db.Set("1", "One"); + std::cout << db.Get("1") << std::endl; + return ; +} + +void test_rocksdbwrapper() { + std::shared_ptr db = RocksDBWrapper::RocksDBWrapperFactory("TEST"); + for (size_t i = 0; i < 1000; i++) { + db->Put(std::to_string(i), std::to_string(i * 2)); + } + for (size_t i = 0; i < 1000; i++) { + std::string res = db->Get(std::to_string(i)); + std::cout << res << " "; + } + std::cout << std::endl; +} + +#ifdef RAW_TEST +int main() { + test_rockskvdb(); + test_rocksdbwrapper(); +} +#endif + + + + diff --git a/predictor/CMakeLists.txt b/predictor/CMakeLists.txt index c70c5dbebe62b0ac0b67a11f4822da57fc495352..652211448f01c830ee956406cdd2ba07c91df635 100644 --- a/predictor/CMakeLists.txt +++ b/predictor/CMakeLists.txt @@ -6,15 +6,17 @@ include(framework/CMakeLists.txt) #include(plugin/CMakeLists.txt) include(src/CMakeLists.txt) +include_directories(SYSTEM ${CMAKE_CURRENT_LIST_DIR}/../kvdb/include) + add_library(pdserving ${pdserving_srcs}) set_source_files_properties( ${pdserving_srcs} PROPERTIES COMPILE_FLAGS "-Wno-strict-aliasing -Wno-unused-variable -Wno-non-virtual-dtor -Wno-error=non-virtual-dtor -Wno-error=delete-non-virtual-dtor") -add_dependencies(pdserving protobuf boost brpc leveldb pdcodegen configure) +add_dependencies(pdserving protobuf kvdb boost brpc leveldb pdcodegen configure) target_link_libraries(pdserving - brpc protobuf boost leveldb configure -lpthread -lcrypto -lm -lrt -lssl -ldl -lz) + brpc protobuf boost leveldb configure kvdb -lpthread -lcrypto -lm -lrt -lssl -ldl -lz) # install install(TARGETS pdserving diff --git a/predictor/conf/echo_kvdb_dag.conf b/predictor/conf/echo_kvdb_dag.conf new file mode 100644 index 0000000000000000000000000000000000000000..770479c7118e75b29610135d27e1b249de10ac0d --- /dev/null +++ b/predictor/conf/echo_kvdb_dag.conf @@ -0,0 +1,4 @@ +workflow_type: Sequence +[@Node] +name: echo_kvdb_op +type: KVDBEchoOp diff --git a/predictor/conf/workflow.conf b/predictor/conf/workflow.conf index 29d4737b50afed31a7d3c8a2679d1925eb26b960..2a7fc02a8bf258d451f7be29cc9079fd3874f05a 100644 --- a/predictor/conf/workflow.conf +++ b/predictor/conf/workflow.conf @@ -13,3 +13,7 @@ name: workflow3 path: ./conf file: echo_dag.conf +[@Workflow] +name: workflow7 +path: ./conf +file: echo_kvdb_dag.conf diff --git a/predictor/framework/resource.cpp b/predictor/framework/resource.cpp index 9ced89ffa4373e1cebc5640aefcde4645291a6e1..9b545c9150a4e4f4ed3f2a548bdfe1e288348730 100644 --- a/predictor/framework/resource.cpp +++ b/predictor/framework/resource.cpp @@ -36,7 +36,13 @@ DynamicResource::DynamicResource() {} DynamicResource::~DynamicResource() {} -int DynamicResource::initialize() { return 0; } +int DynamicResource::initialize() { + return 0; +} + +std::shared_ptr Resource::getDB() { + return db; +} int DynamicResource::clear() { return 0; } @@ -80,6 +86,11 @@ int Resource::initialize(const std::string& path, const std::string& file) { LOG(ERROR) << "unable to create tls_bthread_key of thrd_data"; return -1; } + //init rocksDB instance + if (db.get() == nullptr) { + db = RocksDBWrapper::RocksDBWrapperFactory("kvdb"); + } + THREAD_SETSPECIFIC(_tls_bspec_key, NULL); return 0; } diff --git a/predictor/framework/resource.h b/predictor/framework/resource.h index 3f3bdb9184dae5dbba418b09d12c65116622df5f..767b71d40c1d4478ab2ac2d2e32aedba0335621b 100644 --- a/predictor/framework/resource.h +++ b/predictor/framework/resource.h @@ -16,6 +16,7 @@ #include #include "predictor/common/inner_common.h" #include "predictor/framework/memory.h" +#include "kvdb/paddle_rocksdb.h" namespace baidu { namespace paddle_serving { @@ -30,6 +31,7 @@ struct DynamicResource { int initialize(); int clear(); + }; class Resource { @@ -53,6 +55,8 @@ class Resource { int finalize(); + std::shared_ptr getDB(); + DynamicResource* get_dynamic_resource() { return reinterpret_cast( THREAD_GETSPECIFIC(_tls_bspec_key)); @@ -60,7 +64,8 @@ class Resource { private: int thread_finalize() { return 0; } - + std::shared_ptr db; + THREAD_KEY_T _tls_bspec_key; }; diff --git a/predictor/proto/builtin_format.proto b/predictor/proto/builtin_format.proto index 8f0d1d8f01ffa6f026271a1f3d20b08ae072cc77..c2abb4fe882809e57ec5901bfebeccf23383564b 100644 --- a/predictor/proto/builtin_format.proto +++ b/predictor/proto/builtin_format.proto @@ -15,6 +15,19 @@ syntax = "proto2"; package baidu.paddle_serving.predictor.format; +// echo kvdb formant +message KVDBReq { + required string op = 1; + required string key = 2; + optional string value = 3; + +}; +message KVDBRes{ + required string value = 2; +}; + + + // dense format message DenseInstance { repeated float features = 1; }; diff --git a/sdk-cpp/proto/echo_kvdb_service.proto b/sdk-cpp/proto/echo_kvdb_service.proto new file mode 100644 index 0000000000000000000000000000000000000000..332e5c4ad9d7a710fdcc571059d616457866f593 --- /dev/null +++ b/sdk-cpp/proto/echo_kvdb_service.proto @@ -0,0 +1,34 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto2"; +import "pds_option.proto"; +import "builtin_format.proto"; +package baidu.paddle_serving.predictor.echo_kvdb_service; + +option cc_generic_services = true; + +message Request { + repeated baidu.paddle_serving.predictor.format.KVDBReq reqs = 1; +}; + +message Response { + repeated baidu.paddle_serving.predictor.format.KVDBRes ress = 1; +}; + +service EchoKVDBService { + rpc inference(Request) returns (Response); + rpc debug(Request) returns (Response); + option (pds.options).generate_stub = true; +};