未验证 提交 18ee0af3 编写于 作者: D Dong Daxiang 提交者: GitHub

Merge pull request #116 from PaddlePaddle/general_model_config

add model independent python client. 
model independent server implementation. 
reorg source code folder. 
make python client as wheel
......@@ -2,3 +2,4 @@ build
.baidu*
.bcloud*
output
*~
# Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
......@@ -27,7 +27,7 @@ message(STATUS "CXX compiler: ${CMAKE_CXX_COMPILER}, version: "
message(STATUS "C compiler: ${CMAKE_C_COMPILER}, version: "
"${CMAKE_C_COMPILER_ID} ${CMAKE_C_COMPILER_VERSION}")
find_package(Git REQUIRED)
find_package(Threads REQUIRED)
find_package(CUDA QUIET)
......@@ -62,17 +62,22 @@ if (NOT DEFINED WITH_MKLDNN)
endif()
endif()
if (NOT CLIENT_ONLY)
include(external/jsoncpp)
include(external/leveldb)
include(external/rocksdb)
include(external/gtest)
endif()
include(external/snappy)
include(external/leveldb)
include(external/zlib)
include(external/boost)
include(external/protobuf)
include(external/brpc)
include(external/gflags)
include(external/glog)
include(external/snappy)
include(external/gtest)
include(external/pybind11)
include(external/python)
include(generic)
include(flags)
......@@ -82,50 +87,55 @@ include(paddlepaddle)
include(external/opencv)
endif()
message("paddle serving source dir: " ${PADDLE_SERVING_SOURCE_DIR})
include_directories(${PADDLE_SERVING_SOURCE_DIR})
include_directories(${PADDLE_SERVING_BINARY_DIR})
if(NOT CLIENT_ONLY)
set(EXTERNAL_LIBS
jsoncpp
gflags
rocksdb
glog
protobuf
paddlepaddle
brpc)
endif()
set(EXTERNAL_LIBS
jsoncpp
gflags
rocksdb
glog
protobuf
paddlepaddle
brpc
gflags
glog
protobuf
brpc
)
if(NOT CLIENT_ONLY)
if(WITH_MKLML)
list(APPEND EXTERNAL_LIBS ${MKLML_IOMP_LIB})
endif()
endif()
if(NOT CLIENT_ONLY)
if(WITH_MKLDNN)
list(APPEND EXTERNAL_LIBS ${MKLDNN_LIB})
endif()
endif()
if (NOT CLIENT_ONLY)
list(APPEND EXTERNAL_LIBS paddlepaddle)
list(APPEND EXTERNAL_LIBS opencv)
endif()
add_subdirectory(cube)
add_subdirectory(configure)
add_subdirectory(pdcodegen)
add_subdirectory(sdk-cpp)
add_subdirectory(demo-client)
add_subdirectory(kvdb)
if (NOT CLIENT_ONLY)
add_subdirectory(predictor)
add_subdirectory(inferencer-fluid-cpu)
if (WITH_GPU)
add_subdirectory(inferencer-fluid-gpu)
endif()
add_subdirectory(demo-serving)
add_subdirectory(core)
if(NOT CLIENT_ONLY)
add_subdirectory(paddle_inference)
endif()
# Paddle Serving Solutions
if (WITH_ELASTIC_CTR)
add_subdirectory(elastic-ctr)
if(CLIENT_ONLY)
add_subdirectory(python)
set(PYTHON_INCLUDE_DIR ${PYTHON_INCLUDE})
set(PYTHON_LIBRARIES ${PYTHON_LIB})
endif()
add_subdirectory(examples)
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
include(ExternalProject)
set(PYBIND_SOURCE_DIR ${THIRD_PARTY_PATH}/pybind)
include_directories(${PYBIND_SOURCE_DIR}/src/extern_pybind/include)
ExternalProject_Add(
extern_pybind
${EXTERNAL_PROJECT_LOG_ARGS}
GIT_REPOSITORY "https://github.com/pybind/pybind11.git"
GIT_TAG "v2.2.4"
PREFIX ${PYBIND_SOURCE_DIR}
UPDATE_COMMAND ""
CONFIGURE_COMMAND ""
BUILD_COMMAND ""
INSTALL_COMMAND ""
TEST_COMMAND ""
)
if(${CMAKE_VERSION} VERSION_LESS "3.3.0")
set(dummyfile ${CMAKE_CURRENT_BINARY_DIR}/pybind_dummy.c)
file(WRITE ${dummyfile} "const char * dummy_pybind = \"${dummyfile}\";")
add_library(pybind STATIC ${dummyfile})
else()
add_library(pybind INTERFACE)
endif()
add_dependencies(pybind extern_pybind)
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
FIND_PACKAGE(PythonInterp ${PY_VERSION} REQUIRED)
FIND_PACKAGE(PythonLibs ${PY_VERSION} REQUIRED)
if(WIN32)
execute_process(COMMAND "${PYTHON_EXECUTABLE}" "-c"
"from distutils import sysconfig as s;import sys;import struct;
print(sys.prefix);
print(s.get_config_var('LDVERSION') or s.get_config_var('VERSION'));
"
RESULT_VARIABLE _PYTHON_SUCCESS
OUTPUT_VARIABLE _PYTHON_VALUES
ERROR_VARIABLE _PYTHON_ERROR_VALUE)
if(NOT _PYTHON_SUCCESS MATCHES 0)
set(PYTHONLIBS_FOUND FALSE)
return()
endif()
# Convert the process output into a list
string(REGEX REPLACE ";" "\\\\;" _PYTHON_VALUES ${_PYTHON_VALUES})
string(REGEX REPLACE "\n" ";" _PYTHON_VALUES ${_PYTHON_VALUES})
list(GET _PYTHON_VALUES 0 PYTHON_PREFIX)
list(GET _PYTHON_VALUES 1 PYTHON_LIBRARY_SUFFIX)
# Make sure all directory separators are '/'
string(REGEX REPLACE "\\\\" "/" PYTHON_PREFIX ${PYTHON_PREFIX})
set(PYTHON_LIBRARY
"${PYTHON_PREFIX}/libs/Python${PYTHON_LIBRARY_SUFFIX}.lib")
# when run in a venv, PYTHON_PREFIX points to it. But the libraries remain in the
# original python installation. They may be found relative to PYTHON_INCLUDE_DIR.
if(NOT EXISTS "${PYTHON_LIBRARY}")
get_filename_component(_PYTHON_ROOT ${PYTHON_INCLUDE_DIR} DIRECTORY)
set(PYTHON_LIBRARY
"${_PYTHON_ROOT}/libs/Python${PYTHON_LIBRARY_SUFFIX}.lib")
endif()
# raise an error if the python libs are still not found.
if(NOT EXISTS "${PYTHON_LIBRARY}")
message(FATAL_ERROR "Python libraries not found")
endif()
SET(PYTHON_LIBRARIES "${PYTHON_LIBRARY}")
endif(WIN32)
# Fixme: Maybe find a static library. Get SHARED/STATIC by FIND_PACKAGE.
ADD_LIBRARY(python SHARED IMPORTED GLOBAL)
SET_PROPERTY(TARGET python PROPERTY IMPORTED_LOCATION ${PYTHON_LIBRARIES})
SET(py_env "")
INCLUDE_DIRECTORIES(${PYTHON_INCLUDE_DIR})
......@@ -838,9 +838,9 @@ function(PROTOBUF_GENERATE_SERVING_CPP FOR_SERVING_SIDE SRCS HDRS )
"${CMAKE_CURRENT_BINARY_DIR}/${FIL_WE}.pb.h"
COMMAND ${Protobuf_PROTOC_EXECUTABLE}
ARGS --cpp_out=${CMAKE_CURRENT_BINARY_DIR}
--pdcodegen_out=${CMAKE_CURRENT_BINARY_DIR}
--plugin=protoc-gen-pdcodegen=${CMAKE_BINARY_DIR}/pdcodegen/pdcodegen
--proto_path=${CMAKE_SOURCE_DIR}/predictor/proto
--pdcodegen_out=${CMAKE_CURRENT_BINARY_DIR}/core
--plugin=protoc-gen-pdcodegen=${CMAKE_BINARY_DIR}/core/pdcodegen/pdcodegen
--proto_path=${CMAKE_SOURCE_DIR}/core/predictor/proto
${_protobuf_include_path} ${ABS_FIL}
DEPENDS ${ABS_FIL} ${Protobuf_PROTOC_EXECUTABLE}
COMMENT "Running Paddle-serving C++ protocol buffer compiler on ${FIL}"
......
......@@ -20,6 +20,7 @@ SET(PADDLE_INSTALL_DIR ${THIRD_PARTY_PATH}/install/Paddle/)
SET(PADDLE_INCLUDE_DIR "${PADDLE_INSTALL_DIR}/include" CACHE PATH "PaddlePaddle include directory." FORCE)
SET(PADDLE_LIBRARIES "${PADDLE_INSTALL_DIR}/lib/libpaddle_fluid.a" CACHE FILEPATH "Paddle library." FORCE)
message("paddle install dir: " ${PADDLE_INSTALL_DIR})
# Reference https://stackoverflow.com/questions/45414507/pass-a-list-of-prefix-paths-to-externalproject-add-in-cmake-args
set(prefix_path "${THIRD_PARTY_PATH}/install/gflags|${THIRD_PARTY_PATH}/install/leveldb|${THIRD_PARTY_PATH}/install/snappy|${THIRD_PARTY_PATH}/install/gtest|${THIRD_PARTY_PATH}/install/protobuf|${THIRD_PARTY_PATH}/install/zlib|${THIRD_PARTY_PATH}/install/glog")
......@@ -62,7 +63,7 @@ ExternalProject_Add(
${CMAKE_COMMAND} -E copy_directory ${PADDLE_DOWNLOAD_DIR}/paddle/include ${PADDLE_INSTALL_DIR}/include &&
${CMAKE_COMMAND} -E copy_directory ${PADDLE_DOWNLOAD_DIR}/paddle/lib ${PADDLE_INSTALL_DIR}/lib &&
${CMAKE_COMMAND} -E copy_directory ${PADDLE_DOWNLOAD_DIR}/third_party ${PADDLE_INSTALL_DIR}/third_party &&
${CMAKE_COMMAND} -E copy ${PADDLE_INSTALL_DIR}/third_party/install/mkldnn/lib/libmkldnn.so.1 ${PADDLE_INSTALL_DIR}/third_party/install/mkldnn/lib/libmkldnn.so
${CMAKE_COMMAND} -E copy ${PADDLE_INSTALL_DIR}/third_party/install/mkldnn/lib/libmkldnn.so.0 ${PADDLE_INSTALL_DIR}/third_party/install/mkldnn/lib/libmkldnn.so
)
INCLUDE_DIRECTORIES(${PADDLE_INCLUDE_DIR})
......
LIST(APPEND protofiles
${CMAKE_CURRENT_LIST_DIR}/proto/server_configure.proto
${CMAKE_CURRENT_LIST_DIR}/proto/sdk_configure.proto
${CMAKE_CURRENT_LIST_DIR}/proto/inferencer_configure.proto
)
PROTOBUF_GENERATE_CPP(configure_proto_srcs configure_proto_hdrs ${protofiles})
list(APPEND configure_srcs ${configure_proto_srcs})
list(APPEND configure_srcs ${CMAKE_CURRENT_LIST_DIR}/src/configure_parser.cpp)
add_library(configure ${configure_srcs})
add_dependencies(configure brpc)
add_executable(test_configure
${CMAKE_CURRENT_LIST_DIR}/tests/test_configure.cpp)
target_link_libraries(test_configure configure protobuf)
install(TARGETS configure
ARCHIVE DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/lib
)
install(FILES ${CMAKE_CURRENT_LIST_DIR}/include/configure_parser.h
DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/include/configure/include)
FILE(GLOB inc ${CMAKE_CURRENT_BINARY_DIR}/*.pb.h)
install(FILES ${inc}
DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/include/configure)
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto2";
package baidu.paddle_serving.configure;
message ConnectionConf {
required int32 connect_timeout_ms = 1;
required int32 rpc_timeout_ms = 2;
required int32 connect_retry_count = 3;
required int32 max_connection_per_host = 4;
required int32 hedge_request_timeout_ms = 5;
required int32 hedge_fetch_retry_count = 6;
required string connection_type = 7;
};
message NamingConf {
optional string cluster_filter_strategy = 1;
optional string load_balance_strategy = 2;
optional string cluster = 3;
};
message RpcParameter {
// 0-NONE, 1-SNAPPY, 2-GZIP, 3-ZLIB, 4-LZ4
required int32 compress_type = 1;
required int32 package_size = 2;
required string protocol = 3;
required int32 max_channel_per_request = 4;
};
message SplitConf {
optional string split_tag_name = 1;
optional string tag_candidates = 2;
};
message VariantConf {
required string tag = 1;
optional ConnectionConf connection_conf = 2;
optional NamingConf naming_conf = 3;
optional RpcParameter rpc_parameter = 4;
optional SplitConf split_conf = 5;
optional string variant_router = 6;
};
message WeightedRandomRenderConf { required string variant_weight_list = 1; };
message Predictor {
required string name = 1;
required string service_name = 2;
required string endpoint_router = 3;
required WeightedRandomRenderConf weighted_random_render_conf = 4;
repeated VariantConf variants = 5;
};
// SDK conf
message SDKConf {
required VariantConf default_variant_conf = 1;
repeated Predictor predictors = 2;
};
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "configure/include/configure_parser.h"
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <fstream>
#ifdef BCLOUD
#include "base/logging.h"
#else
#include "butil/logging.h"
#endif
#include "google/protobuf/io/zero_copy_stream_impl.h"
#include "google/protobuf/text_format.h"
namespace baidu {
namespace paddle_serving {
namespace configure {
int read_proto_conf(const std::string &conf_path,
const std::string &conf_file,
google::protobuf::Message *conf) {
std::string file_str = conf_path + "/" + conf_file;
int fd = open(file_str.c_str(), O_RDONLY);
if (fd == -1) {
LOG(WARNING) << "File not found: " << file_str.c_str();
return -1;
}
google::protobuf::io::FileInputStream input(fd);
bool success = google::protobuf::TextFormat::Parse(&input, conf);
close(fd);
if (!success) {
return -1;
}
return 0;
}
int write_proto_conf(google::protobuf::Message *message,
const std::string &output_path,
const std::string &output_file) {
std::string binary_str;
google::protobuf::TextFormat::PrintToString(*message, &binary_str);
std::string file_str = output_path + "/" + output_file;
std::ofstream fout_bin((file_str.c_str()));
if (!fout_bin) {
LOG(WARNING) << "Open file error: " << file_str.c_str();
return -1;
}
fout_bin.write(binary_str.c_str(), binary_str.size());
fout_bin.close();
return 0;
}
} // namespace configure
} // namespace paddle_serving
} // namespace baidu
/* vim: set expandtab ts=2 sw=2 sts=2 tw=100: */
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <iostream>
#include "configure/include/configure_parser.h"
#include "configure/inferencer_configure.pb.h"
#include "configure/sdk_configure.pb.h"
#include "configure/server_configure.pb.h"
using baidu::paddle_serving::configure::EngineDesc;
using baidu::paddle_serving::configure::ModelToolkitConf;
using baidu::paddle_serving::configure::ResourceConf;
using baidu::paddle_serving::configure::DAGNodeDependency;
using baidu::paddle_serving::configure::DAGNode;
using baidu::paddle_serving::configure::Workflow;
using baidu::paddle_serving::configure::WorkflowConf;
using baidu::paddle_serving::configure::InferService;
using baidu::paddle_serving::configure::InferServiceConf;
using baidu::paddle_serving::configure::ConnectionConf;
using baidu::paddle_serving::configure::WeightedRandomRenderConf;
using baidu::paddle_serving::configure::NamingConf;
using baidu::paddle_serving::configure::RpcParameter;
using baidu::paddle_serving::configure::Predictor;
using baidu::paddle_serving::configure::VariantConf;
using baidu::paddle_serving::configure::SDKConf;
using baidu::paddle_serving::configure::SigmoidConf;
const char *output_dir = "./conf/";
const char *model_toolkit_conf_file = "model_toolkit.prototxt";
const char *resource_conf_file = "resource.prototxt";
const char *workflow_conf_file = "workflow.prototxt";
const char *service_conf_file = "service.prototxt";
const char *sdk_conf_file = "predictors.prototxt";
const char *sigmoid_conf_file = "inferencer.prototxt";
int test_write_conf() {
// model_toolkit conf
ModelToolkitConf model_toolkit_conf;
// This engine has a default version
EngineDesc *engine = model_toolkit_conf.add_engines();
engine->set_name("image_classification_resnet");
engine->set_type("FLUID_CPU_NATIVE_DIR");
engine->set_reloadable_meta("./data/model/paddle/fluid_time_file");
engine->set_reloadable_type("timestamp_ne");
engine->set_model_data_path("./data/model/paddle/fluid/SE_ResNeXt50_32x4d");
engine->set_runtime_thread_num(0);
engine->set_batch_infer_size(0);
engine->set_enable_batch_align(0);
engine->set_sparse_param_service_type(EngineDesc::LOCAL);
engine->set_sparse_param_service_table_name("local_kv");
engine->set_enable_memory_optimization(true);
engine->set_static_optimization(false);
engine->set_force_update_static_cache(false);
int ret = baidu::paddle_serving::configure::write_proto_conf(
&model_toolkit_conf, output_dir, model_toolkit_conf_file);
if (ret != 0) {
return ret;
}
// resource conf
ResourceConf resource_conf;
resource_conf.set_model_toolkit_path(output_dir);
resource_conf.set_model_toolkit_file("model_toolkit.prototxt");
resource_conf.set_cube_config_file("./conf/cube.conf");
ret = baidu::paddle_serving::configure::write_proto_conf(
&resource_conf, output_dir, resource_conf_file);
if (ret != 0) {
return ret;
}
// workflow entries conf
WorkflowConf workflow_conf;
Workflow *workflow = workflow_conf.add_workflows();
workflow->set_name("workflow1");
workflow->set_workflow_type("Sequence");
DAGNode *dag_node = workflow->add_nodes();
dag_node->set_name("image_reader_op");
dag_node->set_type("ReaderOp");
dag_node = workflow->add_nodes();
dag_node->set_name("imag_classify_op");
dag_node->set_type("ClassifyOp");
DAGNodeDependency *node_dependency = dag_node->add_dependencies();
node_dependency->set_name("image_reader_op");
node_dependency->set_mode("RO");
dag_node = workflow->add_nodes();
dag_node->set_name("write_json_op");
dag_node->set_type("WriteOp");
node_dependency = dag_node->add_dependencies();
node_dependency->set_name("image_classify_op");
node_dependency->set_mode("RO");
workflow = workflow_conf.add_workflows();
workflow->set_name("workflow2");
workflow->set_workflow_type("Sequence");
dag_node = workflow->add_nodes();
dag_node->set_name("dense_op");
dag_node->set_type("DenseOp");
ret = baidu::paddle_serving::configure::write_proto_conf(
&workflow_conf, output_dir, workflow_conf_file);
if (ret != 0) {
return ret;
}
InferServiceConf infer_service_conf;
infer_service_conf.set_port(0);
InferService *infer_service = infer_service_conf.add_services();
infer_service->set_name("ImageClassifyService");
infer_service->add_workflows("workflow1");
infer_service->add_workflows("workflow2");
infer_service = infer_service_conf.add_services();
infer_service->set_name("BuiltinDenseFormatService");
infer_service->add_workflows("workflow2");
ret = baidu::paddle_serving::configure::write_proto_conf(
&infer_service_conf, output_dir, service_conf_file);
if (ret != 0) {
return ret;
}
SDKConf sdk_conf;
VariantConf *default_variant_conf = sdk_conf.mutable_default_variant_conf();
default_variant_conf->set_tag("default");
ConnectionConf *connection_conf =
default_variant_conf->mutable_connection_conf();
connection_conf->set_connect_timeout_ms(2000);
connection_conf->set_rpc_timeout_ms(20000);
connection_conf->set_connect_retry_count(2);
connection_conf->set_max_connection_per_host(100);
connection_conf->set_hedge_request_timeout_ms(-1);
connection_conf->set_hedge_fetch_retry_count(2);
connection_conf->set_connection_type("pooled");
NamingConf *naming_conf = default_variant_conf->mutable_naming_conf();
naming_conf->set_cluster_filter_strategy("Default");
naming_conf->set_load_balance_strategy("la");
RpcParameter *rpc_parameter = default_variant_conf->mutable_rpc_parameter();
rpc_parameter->set_compress_type(0);
rpc_parameter->set_package_size(20);
rpc_parameter->set_protocol("baidu_std");
rpc_parameter->set_max_channel_per_request(3);
Predictor *predictor = sdk_conf.add_predictors();
predictor->set_name("ximage");
predictor->set_service_name(
"baidu.paddle_serving.predictor.image_classification."
"ImageClassifyService");
predictor->set_endpoint_router("WeightedRandomRender");
WeightedRandomRenderConf *weighted_random_render_conf =
predictor->mutable_weighted_random_render_conf();
weighted_random_render_conf->set_variant_weight_list("50");
VariantConf *variant_conf = predictor->add_variants();
variant_conf->set_tag("var1");
naming_conf = variant_conf->mutable_naming_conf();
naming_conf->set_cluster("list://127.0.0.1:8010");
ret = baidu::paddle_serving::configure::write_proto_conf(
&sdk_conf, output_dir, sdk_conf_file);
if (ret != 0) {
return ret;
}
SigmoidConf sigmoid_conf;
sigmoid_conf.set_dnn_model_path("data/dnn_model");
sigmoid_conf.set_sigmoid_w_file("data/dnn_model/_sigmoid_.w_0");
sigmoid_conf.set_sigmoid_b_file("data/dnn_model/_sigmoid_.b_0");
sigmoid_conf.set_exp_max_input(0.75);
sigmoid_conf.set_exp_min_input(0.25);
ret = baidu::paddle_serving::configure::write_proto_conf(
&sigmoid_conf, output_dir, sigmoid_conf_file);
if (ret != 0) {
return ret;
}
return 0;
}
int test_read_conf() {
int ret = 0;
ModelToolkitConf model_toolkit_conf;
ret = baidu::paddle_serving::configure::read_proto_conf(
output_dir, model_toolkit_conf_file, &model_toolkit_conf);
if (ret != 0) {
std::cout << "Read conf fail: " << model_toolkit_conf_file << std::endl;
return -1;
}
ResourceConf resource_conf;
ret = baidu::paddle_serving::configure::read_proto_conf(
output_dir, resource_conf_file, &resource_conf);
if (ret != 0) {
std::cout << "Read conf fail: " << resource_conf_file << std::endl;
return -1;
}
WorkflowConf workflow_conf;
ret = baidu::paddle_serving::configure::read_proto_conf(
output_dir, workflow_conf_file, &workflow_conf);
if (ret != 0) {
std::cout << "Read conf fail: " << workflow_conf_file << std::endl;
return -1;
}
InferServiceConf service_conf;
ret = baidu::paddle_serving::configure::read_proto_conf(
output_dir, service_conf_file, &service_conf);
if (ret != 0) {
std::cout << "Read conf fail: " << service_conf_file << std::endl;
return -1;
}
SDKConf sdk_conf;
ret = baidu::paddle_serving::configure::read_proto_conf(
output_dir, sdk_conf_file, &sdk_conf);
if (ret != 0) {
std::cout << "Read conf fail: " << sdk_conf_file << std::endl;
return -1;
}
SigmoidConf sigmoid_conf;
ret = baidu::paddle_serving::configure::read_proto_conf(
output_dir, sigmoid_conf_file, &sigmoid_conf);
if (ret != 0) {
std::cout << "Read conf fail: " << sdk_conf_file << std::endl;
return -1;
}
return 0;
}
int main() {
int ret = 0;
struct stat stat_buf;
if (stat(output_dir, &stat_buf) != 0) {
int ret = mkdir("./conf", 0777);
if (ret != 0) {
std::cout << "mkdir ./conf fail" << std::endl;
return -1;
}
if (stat("./conf", &stat_buf) != 0) {
std::cout << "./conf not exist and creating it failed" << std::endl;
return -1;
}
}
ret = test_write_conf();
if (ret != 0) {
std::cout << "test_write_conf fail" << std::endl;
return -1;
}
std::cout << "test_write_conf success" << std::endl;
ret = test_read_conf();
if (ret != 0) {
std::cout << "test_read_conf fail" << std::endl;
return -1;
}
std::cout << "test_read_conf success" << std::endl;
return 0;
}
/* vim: set expandtab ts=4 sw=4 sts=4 tw=100: */
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License
if(NOT CLIENT_ONLY)
add_subdirectory(cube)
add_subdirectory(kvdb)
endif()
add_subdirectory(configure)
add_subdirectory(pdcodegen)
add_subdirectory(sdk-cpp)
if(CLIENT_ONLY)
add_subdirectory(general-client)
endif()
if (NOT CLIENT_ONLY)
add_subdirectory(predictor)
endif()
LIST(APPEND protofiles
${CMAKE_CURRENT_LIST_DIR}/proto/server_configure.proto
${CMAKE_CURRENT_LIST_DIR}/proto/sdk_configure.proto
${CMAKE_CURRENT_LIST_DIR}/proto/inferencer_configure.proto
${CMAKE_CURRENT_LIST_DIR}/proto/general_model_config.proto
)
PROTOBUF_GENERATE_CPP(configure_proto_srcs configure_proto_hdrs ${protofiles})
list(APPEND configure_srcs ${configure_proto_srcs})
list(APPEND configure_srcs ${CMAKE_CURRENT_LIST_DIR}/src/configure_parser.cpp)
add_library(configure ${configure_srcs})
add_dependencies(configure brpc)
add_executable(test_configure
${CMAKE_CURRENT_LIST_DIR}/tests/test_configure.cpp)
target_link_libraries(test_configure configure protobuf)
install(TARGETS configure
ARCHIVE DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/lib
)
install(FILES ${CMAKE_CURRENT_LIST_DIR}/include/configure_parser.h
DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/include/configure/include)
FILE(GLOB inc ${CMAKE_CURRENT_BINARY_DIR}/*.pb.h)
install(FILES ${inc}
DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/include/configure)
py_proto_compile(sdk_configure_py_proto SRCS proto/sdk_configure.proto)
add_custom_target(sdk_configure_py_proto_init ALL COMMAND ${CMAKE_COMMAND} -E touch __init__.py)
add_dependencies(sdk_configure_py_proto sdk_configure_py_proto_init)
add_custom_command(TARGET sdk_configure_py_proto POST_BUILD
COMMAND ${CMAKE_COMMAND} -E make_directory ${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving/proto
COMMAND cp *.py ${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving/proto
COMMENT "Copy generated python proto into directory paddle_serving/proto."
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto2";
package baidu.paddle_serving.configure;
message Shape {
repeated int32 shape = 1;
};
message GeneralModelConfig {
repeated bool is_lod_feed = 1;
repeated int32 feed_type = 2;
repeated Shape feed_shape = 4;
};
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto2";
package baidu.paddle_serving.configure;
message ConnectionConf {
required int32 connect_timeout_ms = 1 [default = 2000];
required int32 rpc_timeout_ms = 2 [default = 20000];
required int32 connect_retry_count = 3 [default = 2];
required int32 max_connection_per_host = 4 [default = 100];
required int32 hedge_request_timeout_ms = 5 [default = -1];
required int32 hedge_fetch_retry_count = 6 [default = 2];
required string connection_type = 7 [default = "pooled"];
};
message NamingConf {
optional string cluster_filter_strategy = 1 [default="Default"];
optional string load_balance_strategy = 2 [default = "la"];
optional string cluster = 3;
};
message RpcParameter {
// 0-NONE, 1-SNAPPY, 2-GZIP, 3-ZLIB, 4-LZ4
required int32 compress_type = 1 [default = 0];
required int32 package_size = 2 [default = 20];
required string protocol = 3 [default = "baidu_std"];
required int32 max_channel_per_request = 4 [default = 3];
};
message SplitConf {
optional string split_tag_name = 1;
optional string tag_candidates = 2;
};
message VariantConf {
required string tag = 1;
optional ConnectionConf connection_conf = 2;
optional NamingConf naming_conf = 3;
optional RpcParameter rpc_parameter = 4;
optional SplitConf split_conf = 5;
optional string variant_router = 6;
};
message WeightedRandomRenderConf { required string variant_weight_list = 1 [default= "50" ]; };
message Predictor {
required string name = 1 [default="general_model"];
required string service_name = 2 [default="baidu.paddle_serving.predictor.general_model.GeneralModelService"];
required string endpoint_router = 3 [default="WeightedRandomRender"];
required WeightedRandomRenderConf weighted_random_render_conf = 4;
repeated VariantConf variants = 5;
};
// SDK conf
message SDKConf {
required VariantConf default_variant_conf = 1;
repeated Predictor predictors = 2;
};
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "core/configure/include/configure_parser.h"
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <fstream>
#ifdef BCLOUD
#include "base/logging.h"
#else
#include "butil/logging.h"
#endif
#include "google/protobuf/io/zero_copy_stream_impl.h"
#include "google/protobuf/text_format.h"
namespace baidu {
namespace paddle_serving {
namespace configure {
int read_proto_conf(const std::string &conf_path,
const std::string &conf_file,
google::protobuf::Message *conf) {
std::string file_str = conf_path + "/" + conf_file;
int fd = open(file_str.c_str(), O_RDONLY);
if (fd == -1) {
LOG(WARNING) << "File not found: " << file_str.c_str();
return -1;
}
google::protobuf::io::FileInputStream input(fd);
bool success = google::protobuf::TextFormat::Parse(&input, conf);
close(fd);
if (!success) {
return -1;
}
return 0;
}
int write_proto_conf(google::protobuf::Message *message,
const std::string &output_path,
const std::string &output_file) {
std::string binary_str;
google::protobuf::TextFormat::PrintToString(*message, &binary_str);
std::string file_str = output_path + "/" + output_file;
std::ofstream fout_bin((file_str.c_str()));
if (!fout_bin) {
LOG(WARNING) << "Open file error: " << file_str.c_str();
return -1;
}
fout_bin.write(binary_str.c_str(), binary_str.size());
fout_bin.close();
return 0;
}
} // namespace configure
} // namespace paddle_serving
} // namespace baidu
/* vim: set expandtab ts=2 sw=2 sts=2 tw=100: */
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <iostream>
#include "core/configure/include/configure_parser.h"
#include "core/configure/inferencer_configure.pb.h"
#include "core/configure/sdk_configure.pb.h"
#include "core/configure/server_configure.pb.h"
using baidu::paddle_serving::configure::EngineDesc;
using baidu::paddle_serving::configure::ModelToolkitConf;
using baidu::paddle_serving::configure::ResourceConf;
using baidu::paddle_serving::configure::DAGNodeDependency;
using baidu::paddle_serving::configure::DAGNode;
using baidu::paddle_serving::configure::Workflow;
using baidu::paddle_serving::configure::WorkflowConf;
using baidu::paddle_serving::configure::InferService;
using baidu::paddle_serving::configure::InferServiceConf;
using baidu::paddle_serving::configure::ConnectionConf;
using baidu::paddle_serving::configure::WeightedRandomRenderConf;
using baidu::paddle_serving::configure::NamingConf;
using baidu::paddle_serving::configure::RpcParameter;
using baidu::paddle_serving::configure::Predictor;
using baidu::paddle_serving::configure::VariantConf;
using baidu::paddle_serving::configure::SDKConf;
using baidu::paddle_serving::configure::SigmoidConf;
const char *output_dir = "./conf/";
const char *model_toolkit_conf_file = "model_toolkit.prototxt";
const char *resource_conf_file = "resource.prototxt";
const char *workflow_conf_file = "workflow.prototxt";
const char *service_conf_file = "service.prototxt";
const char *sdk_conf_file = "predictors.prototxt";
const char *sigmoid_conf_file = "inferencer.prototxt";
int test_write_conf() {
// model_toolkit conf
ModelToolkitConf model_toolkit_conf;
// This engine has a default version
EngineDesc *engine = model_toolkit_conf.add_engines();
engine->set_name("image_classification_resnet");
engine->set_type("FLUID_CPU_NATIVE_DIR");
engine->set_reloadable_meta("./data/model/paddle/fluid_time_file");
engine->set_reloadable_type("timestamp_ne");
engine->set_model_data_path("./data/model/paddle/fluid/SE_ResNeXt50_32x4d");
engine->set_runtime_thread_num(0);
engine->set_batch_infer_size(0);
engine->set_enable_batch_align(0);
engine->set_sparse_param_service_type(EngineDesc::LOCAL);
engine->set_sparse_param_service_table_name("local_kv");
engine->set_enable_memory_optimization(true);
engine->set_static_optimization(false);
engine->set_force_update_static_cache(false);
int ret = baidu::paddle_serving::configure::write_proto_conf(
&model_toolkit_conf, output_dir, model_toolkit_conf_file);
if (ret != 0) {
return ret;
}
// resource conf
ResourceConf resource_conf;
resource_conf.set_model_toolkit_path(output_dir);
resource_conf.set_model_toolkit_file("model_toolkit.prototxt");
resource_conf.set_cube_config_file("./conf/cube.conf");
ret = baidu::paddle_serving::configure::write_proto_conf(
&resource_conf, output_dir, resource_conf_file);
if (ret != 0) {
return ret;
}
// workflow entries conf
WorkflowConf workflow_conf;
Workflow *workflow = workflow_conf.add_workflows();
workflow->set_name("workflow1");
workflow->set_workflow_type("Sequence");
DAGNode *dag_node = workflow->add_nodes();
dag_node->set_name("image_reader_op");
dag_node->set_type("ReaderOp");
dag_node = workflow->add_nodes();
dag_node->set_name("imag_classify_op");
dag_node->set_type("ClassifyOp");
DAGNodeDependency *node_dependency = dag_node->add_dependencies();
node_dependency->set_name("image_reader_op");
node_dependency->set_mode("RO");
dag_node = workflow->add_nodes();
dag_node->set_name("write_json_op");
dag_node->set_type("WriteOp");
node_dependency = dag_node->add_dependencies();
node_dependency->set_name("image_classify_op");
node_dependency->set_mode("RO");
workflow = workflow_conf.add_workflows();
workflow->set_name("workflow2");
workflow->set_workflow_type("Sequence");
dag_node = workflow->add_nodes();
dag_node->set_name("dense_op");
dag_node->set_type("DenseOp");
ret = baidu::paddle_serving::configure::write_proto_conf(
&workflow_conf, output_dir, workflow_conf_file);
if (ret != 0) {
return ret;
}
InferServiceConf infer_service_conf;
infer_service_conf.set_port(0);
InferService *infer_service = infer_service_conf.add_services();
infer_service->set_name("ImageClassifyService");
infer_service->add_workflows("workflow1");
infer_service->add_workflows("workflow2");
infer_service = infer_service_conf.add_services();
infer_service->set_name("BuiltinDenseFormatService");
infer_service->add_workflows("workflow2");
ret = baidu::paddle_serving::configure::write_proto_conf(
&infer_service_conf, output_dir, service_conf_file);
if (ret != 0) {
return ret;
}
SDKConf sdk_conf;
VariantConf *default_variant_conf = sdk_conf.mutable_default_variant_conf();
default_variant_conf->set_tag("default");
ConnectionConf *connection_conf =
default_variant_conf->mutable_connection_conf();
connection_conf->set_connect_timeout_ms(2000);
connection_conf->set_rpc_timeout_ms(20000);
connection_conf->set_connect_retry_count(2);
connection_conf->set_max_connection_per_host(100);
connection_conf->set_hedge_request_timeout_ms(-1);
connection_conf->set_hedge_fetch_retry_count(2);
connection_conf->set_connection_type("pooled");
NamingConf *naming_conf = default_variant_conf->mutable_naming_conf();
naming_conf->set_cluster_filter_strategy("Default");
naming_conf->set_load_balance_strategy("la");
RpcParameter *rpc_parameter = default_variant_conf->mutable_rpc_parameter();
rpc_parameter->set_compress_type(0);
rpc_parameter->set_package_size(20);
rpc_parameter->set_protocol("baidu_std");
rpc_parameter->set_max_channel_per_request(3);
Predictor *predictor = sdk_conf.add_predictors();
predictor->set_name("ximage");
predictor->set_service_name(
"baidu.paddle_serving.predictor.image_classification."
"ImageClassifyService");
predictor->set_endpoint_router("WeightedRandomRender");
WeightedRandomRenderConf *weighted_random_render_conf =
predictor->mutable_weighted_random_render_conf();
weighted_random_render_conf->set_variant_weight_list("50");
VariantConf *variant_conf = predictor->add_variants();
variant_conf->set_tag("var1");
naming_conf = variant_conf->mutable_naming_conf();
naming_conf->set_cluster("list://127.0.0.1:8010");
ret = baidu::paddle_serving::configure::write_proto_conf(
&sdk_conf, output_dir, sdk_conf_file);
if (ret != 0) {
return ret;
}
SigmoidConf sigmoid_conf;
sigmoid_conf.set_dnn_model_path("data/dnn_model");
sigmoid_conf.set_sigmoid_w_file("data/dnn_model/_sigmoid_.w_0");
sigmoid_conf.set_sigmoid_b_file("data/dnn_model/_sigmoid_.b_0");
sigmoid_conf.set_exp_max_input(0.75);
sigmoid_conf.set_exp_min_input(0.25);
ret = baidu::paddle_serving::configure::write_proto_conf(
&sigmoid_conf, output_dir, sigmoid_conf_file);
if (ret != 0) {
return ret;
}
return 0;
}
int test_read_conf() {
int ret = 0;
ModelToolkitConf model_toolkit_conf;
ret = baidu::paddle_serving::configure::read_proto_conf(
output_dir, model_toolkit_conf_file, &model_toolkit_conf);
if (ret != 0) {
std::cout << "Read conf fail: " << model_toolkit_conf_file << std::endl;
return -1;
}
ResourceConf resource_conf;
ret = baidu::paddle_serving::configure::read_proto_conf(
output_dir, resource_conf_file, &resource_conf);
if (ret != 0) {
std::cout << "Read conf fail: " << resource_conf_file << std::endl;
return -1;
}
WorkflowConf workflow_conf;
ret = baidu::paddle_serving::configure::read_proto_conf(
output_dir, workflow_conf_file, &workflow_conf);
if (ret != 0) {
std::cout << "Read conf fail: " << workflow_conf_file << std::endl;
return -1;
}
InferServiceConf service_conf;
ret = baidu::paddle_serving::configure::read_proto_conf(
output_dir, service_conf_file, &service_conf);
if (ret != 0) {
std::cout << "Read conf fail: " << service_conf_file << std::endl;
return -1;
}
SDKConf sdk_conf;
ret = baidu::paddle_serving::configure::read_proto_conf(
output_dir, sdk_conf_file, &sdk_conf);
if (ret != 0) {
std::cout << "Read conf fail: " << sdk_conf_file << std::endl;
return -1;
}
SigmoidConf sigmoid_conf;
ret = baidu::paddle_serving::configure::read_proto_conf(
output_dir, sigmoid_conf_file, &sigmoid_conf);
if (ret != 0) {
std::cout << "Read conf fail: " << sdk_conf_file << std::endl;
return -1;
}
return 0;
}
int main() {
int ret = 0;
struct stat stat_buf;
if (stat(output_dir, &stat_buf) != 0) {
int ret = mkdir("./conf", 0777);
if (ret != 0) {
std::cout << "mkdir ./conf fail" << std::endl;
return -1;
}
if (stat("./conf", &stat_buf) != 0) {
std::cout << "./conf not exist and creating it failed" << std::endl;
return -1;
}
}
ret = test_write_conf();
if (ret != 0) {
std::cout << "test_write_conf fail" << std::endl;
return -1;
}
std::cout << "test_write_conf success" << std::endl;
ret = test_read_conf();
if (ret != 0) {
std::cout << "test_read_conf fail" << std::endl;
return -1;
}
std::cout << "test_read_conf success" << std::endl;
return 0;
}
/* vim: set expandtab ts=4 sw=4 sts=4 tw=100: */
logex @ 1e897307
Subproject commit 1e897307c07004df144afedbf975228745a5a9e8
pipeline @ c5d633d2
Subproject commit c5d633d2d3353a6638b07f46055a6413d50db520
docopt-go @ ee0de3bc
Subproject commit ee0de3bc6815ee19d4a46c7eb90f829db0e014b1
rfw @ 6f0a6f32
Subproject commit 6f0a6f3266ba1058df9ef0c94cda1cecd2e62852
# Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License
protobuf_generate_cpp(PROTO_SRC PROTO_HEADER idl/cube.proto idl/control.proto)
# include PROTO_HEADER
include_directories(${CMAKE_CURRENT_BINARY_DIR}/core)
if(CMAKE_SYSTEM_NAME STREQUAL "Darwin")
include(CheckFunctionExists)
CHECK_FUNCTION_EXISTS(clock_gettime HAVE_CLOCK_GETTIME)
if(NOT HAVE_CLOCK_GETTIME)
set(DEFINE_CLOCK_GETTIME "-DNO_CLOCK_GETTIME_IN_MAC")
endif()
endif()
set(CMAKE_CXX_FLAGS "${CMAKE_CPP_FLAGS} -DNDEBUG -O2 -D__const__= -pipe -W -Wall -Wno-unused-parameter -fPIC -fno-omit-frame-pointer")
if(CMAKE_VERSION VERSION_LESS "3.1.3")
if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
endif()
if(CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
endif()
else()
set(CMAKE_CXX_STANDARD 11)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
endif()
add_executable(cube-cli src/cube_cli.cpp src/cube_api.cpp src/meta.cpp
${PROTO_SRC} ${PROTO_HEADER})
add_library(cube-api STATIC src/cube_api.cpp src/meta.cpp
${PROTO_SRC} ${PROTO_HEADER})
set(DYNAMIC_LIB
gflags
protobuf
leveldb
-lssl
-lcrypto
)
if(CMAKE_SYSTEM_NAME STREQUAL "Darwin")
set(DYNAMIC_LIB ${DYNAMIC_LIB}
"-framework CoreFoundation"
"-framework CoreGraphics"
"-framework CoreData"
"-framework CoreText"
"-framework Security"
"-framework Foundation"
"-Wl,-U,_MallocExtension_ReleaseFreeMemory"
"-Wl,-U,_ProfilerStart"
"-Wl,-U,_ProfilerStop")
endif()
target_link_libraries(cube-cli brpc ${DYNAMIC_LIB} -lpthread -ldl -lz)
target_link_libraries(cube-api ${DYNAMIC_LIB} brpc -lpthread -ldl -lz)
# install
install(TARGETS cube-api
ARCHIVE DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/lib
)
install(DIRECTORY ${CMAKE_CURRENT_LIST_DIR}/include
DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/include/cube-api)
FILE(GLOB inc ${CMAKE_CURRENT_BINARY_DIR}/*.pb.h)
install(FILES ${inc}
DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/include/cube-api)
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <stdint.h>
#include <string>
#include <vector>
#ifdef BCLOUD
#include "baidu/rpc/server.h"
#else
#include "brpc/server.h"
#endif
#include "core/cube/cube-api/cube.pb.h"
#include "core/cube/cube-api/include/meta.h"
namespace rec {
namespace mcube {
struct CubeValue {
int error;
std::string buff;
};
class CubeAPI {
public:
static CubeAPI* instance();
static void cleanup();
public:
CubeAPI();
~CubeAPI();
/**
* @brief: init api, not thread safe, should be called before seek.
* @param [in] conf_file: filepath of config file.
* @return: error code. 0 for succ.
*/
int init(const char* conf_file);
/**
* @brief: destroy api, should be called before program exit.
* @return: error code. 0 for succ.
*/
int destroy();
/** brief: get data from cube server, thread safe.
* @param [in] dict_name: dict name.
* @param [in] key: key to seek.
* @param [out] val: value of key.
* @return: error code. 0 for succ.
*/
int seek(const std::string& dict_name, const uint64_t& key, CubeValue* val);
/**
* @brief: get data from cube server, thread safe.
* @param [in] dict_name: dict name.
* @param [in] keys: keys to seek.
* @param [out] vals: value of keys.
* @return: TODO
*/
int seek(const std::string& dict_name,
const std::vector<uint64_t>& keys,
std::vector<CubeValue>* vals);
int opt_seek(const std::string& dict_name,
const std::vector<uint64_t>& keys,
std::function<void(DictValue*, size_t)> parse);
/**
* @brief: get data from cube server, thread safe.
* @param [in] dict_name: dict name.
* @param [in] keys: keys to seek.
* @param [out] vals: value of keys.
* @param [out] version: data version.
* @return: TODO
*/
int seek(const std::string& dict_name,
const std::vector<uint64_t>& keys,
std::vector<CubeValue>* vals,
std::string* version);
int opt_seek(const std::string& dict_name,
const std::vector<uint64_t>& keys,
std::function<void(DictValue*, size_t)> parse,
std::string* version);
public:
static const char* error_msg(int error_code);
private:
CubeAPI(const CubeAPI&) {}
private:
Meta* _meta;
bthread_key_t _tls_key;
// void split(const std::vector<uint64_t>& keys);
};
} // namespace mcube
} // namespace rec
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "core/cube/cube-api/include/cube_api.h"
#ifdef BCLOUD
#include <baidu/rpc/channel.h>
#include <baidu/rpc/parallel_channel.h>
#else
#include <brpc/channel.h>
#include <brpc/parallel_channel.h>
#endif
#include <google/protobuf/descriptor.h>
#include "core/cube/cube-api/include/cube_api_bvar.h"
#include "core/cube/cube-api/include/error.h"
#include "core/cube/cube-api/include/meta.h"
namespace {
static ::rec::mcube::CubeAPI* g_ins = NULL;
}
#ifdef BCLOUD
namespace brpc = baidu::rpc;
#endif
namespace rec {
namespace mcube {
struct DictRpcData {
std::vector<DictRequest> sub_reqs;
std::vector<DictResponse> sub_res;
};
static void dict_rpc_deleter(void* d) { delete static_cast<DictRpcData*>(d); }
static void sub_seek_done(DictResponse* response,
brpc::Controller* cntl,
std::vector<int>* offset,
std::function<void(DictValue*, size_t)> parse) {
// std::unique_ptr<DictResponse> response_guard(response);
// std::unique_ptr<brpc::Controller> cntl_guard(cntl);
if (cntl->Failed()) {
for (int i = 0; i < response->values().size(); ++i) {
DictValue* val = response->mutable_values(i);
val->set_status(CubeError::E_SEEK_FAILED);
*val->mutable_value() = "";
parse(val, (*offset)[i]);
}
} else {
for (int i = 0; i < response->values().size(); ++i) {
DictValue* val = response->mutable_values(i);
parse(val, (*offset)[i]);
}
}
}
struct DoNothing : public google::protobuf::Closure {
void Run() {}
};
CubeAPI* CubeAPI::instance() {
if (g_ins == NULL) {
g_ins = new CubeAPI();
}
return g_ins;
}
void CubeAPI::cleanup() {
if (g_ins != NULL) {
g_ins->destroy();
delete g_ins;
g_ins = NULL;
}
}
CubeAPI::CubeAPI() : _meta(NULL) {}
CubeAPI::~CubeAPI() { CHECK_EQ(0, bthread_key_delete(_tls_key)); }
int CubeAPI::init(const char* conf_file) {
// init meta
_meta = Meta::instance();
int ret = _meta->init(conf_file);
if (ret != 0) {
LOG(ERROR) << "Init cube meta failed";
return ret;
}
CHECK_EQ(0, bthread_key_create(&_tls_key, dict_rpc_deleter));
return 0;
}
int CubeAPI::destroy() {
// Meta* meta = Meta::instance();
if (_meta == NULL) {
LOG(WARNING) << "Destroy, cube meta is null";
return 0;
}
int ret = _meta->destroy();
if (ret != 0) {
LOG(WARNING) << "Destroy cube meta failed";
}
return 0;
}
int CubeAPI::seek(const std::string& dict_name,
const uint64_t& key,
CubeValue* val) {
if (_meta == NULL) {
LOG(ERROR) << "seek, meta is null";
return -1;
}
MetaInfo* info = _meta->get_meta(dict_name);
if (info == NULL) {
LOG(ERROR) << "get meta [" << dict_name << "] failed";
return -1;
}
int shard_id = key % info->shard_num;
DictRequest req;
DictResponse res;
req.add_keys(key);
::brpc::Channel* chan = info->cube_conn[shard_id];
DictService_Stub* stub = new DictService_Stub(chan);
brpc::Controller* cntl = new ::brpc::Controller();
stub->seek(cntl, &req, &res, NULL);
int ret = CubeError::E_OK;
if (cntl->Failed()) {
info->cube_rpcfail_num << 1;
val->error = CubeError::E_SEEK_FAILED;
val->buff.assign("");
ret = CubeError::E_ALL_SEEK_FAILED;
LOG(WARNING) << "cube seek from shard [" << shard_id << "] failed ["
<< cntl->ErrorText() << "]";
} else if (res.values().size() > 0) {
DictValue* res_val = res.mutable_values(0);
if (static_cast<int>(res_val->status()) == CubeError::E_NO_SUCH_KEY) {
g_cube_keys_miss_num << 1;
}
val->error = res_val->status();
val->buff.swap(*res_val->mutable_value());
} else {
val->error = CubeError::E_SEEK_FAILED;
val->buff.assign("");
ret = CubeError::E_ALL_SEEK_FAILED;
}
info->cube_request_num << 1;
g_cube_keys_num << 1;
// cleanup
delete stub;
stub = NULL;
delete cntl;
cntl = NULL;
return ret;
}
int CubeAPI::seek(const std::string& dict_name,
const std::vector<uint64_t>& keys,
std::vector<CubeValue>* vals) {
// Meta* meta = Meta::instance();
if (_meta == NULL) {
LOG(ERROR) << "seek, meta is null";
return -1;
}
MetaInfo* info = _meta->get_meta(dict_name);
if (info == NULL) {
LOG(ERROR) << "get meta [" << dict_name << "] failed";
return -1;
}
int shard_num = info->shard_num;
DictRpcData* rpc_data =
static_cast<DictRpcData*>(bthread_getspecific(_tls_key));
if (rpc_data == NULL) {
rpc_data = new DictRpcData;
CHECK_EQ(0, bthread_setspecific(_tls_key, rpc_data));
}
rpc_data->sub_reqs.resize(shard_num);
rpc_data->sub_res.resize(shard_num);
std::vector<std::vector<int>> offset;
offset.resize(shard_num);
int init_cnt = keys.size() * 2 / shard_num;
for (int i = 0; i < shard_num; ++i) {
offset[i].reserve(init_cnt);
}
for (size_t i = 0; i < keys.size(); ++i) {
uint64_t shard_id = keys[i] % shard_num;
rpc_data->sub_reqs[shard_id].add_keys(keys[i]);
offset[shard_id].push_back(i);
}
std::vector<DictService_Stub*> stubs(shard_num);
std::vector<brpc::Controller*> cntls(shard_num);
for (int i = 0; i < shard_num; ++i) {
::brpc::Channel* chan = info->cube_conn[i];
stubs[i] = new DictService_Stub(chan);
cntls[i] = new ::brpc::Controller();
}
DoNothing do_nothing;
for (int i = 0; i < shard_num; ++i) {
stubs[i]->seek(
cntls[i], &rpc_data->sub_reqs[i], &rpc_data->sub_res[i], &do_nothing);
}
int cntls_failed_cnt = 0;
for (int i = 0; i < shard_num; ++i) {
brpc::Join(cntls[i]->call_id());
if (cntls[i]->Failed()) {
++cntls_failed_cnt;
LOG(WARNING) << "cube seek from shard [" << i << "] failed ["
<< cntls[i]->ErrorText() << "]";
}
}
int ret = CubeError::E_OK;
info->cube_request_num << 1;
if (cntls_failed_cnt > 0) {
info->cube_rpcfail_num << 1;
if (cntls_failed_cnt == shard_num) {
ret = CubeError::E_ALL_SEEK_FAILED;
} else {
ret = CubeError::E_SEEK_FAILED;
}
}
vals->resize(keys.size());
// merge
size_t miss_cnt = 0;
for (int i = 0; i < shard_num; ++i) {
if (cntls[i]->Failed()) {
for (int j = 0; j < rpc_data->sub_res[i].values().size(); ++j) {
(*vals)[offset[i][j]].error = CubeError::E_SEEK_FAILED;
(*vals)[offset[i][j]].buff.assign("");
}
} else {
for (int j = 0; j < rpc_data->sub_res[i].values().size(); ++j) {
DictValue* val = rpc_data->sub_res[i].mutable_values(j);
if (static_cast<int>(val->status()) == CubeError::E_NO_SUCH_KEY) {
miss_cnt += 1;
}
(*vals)[offset[i][j]].error = val->status();
(*vals)[offset[i][j]].buff.swap(*val->mutable_value());
}
}
}
// bvar stats
g_cube_keys_num << keys.size();
if (keys.size() > 0) {
g_cube_keys_miss_num << miss_cnt;
g_cube_value_size << (*vals)[0].buff.size();
}
// cleanup
for (int i = 0; i < shard_num; ++i) {
delete stubs[i];
stubs[i] = NULL;
delete cntls[i];
cntls[i] = NULL;
rpc_data->sub_reqs[i].Clear();
rpc_data->sub_res[i].Clear();
}
return ret;
}
int CubeAPI::opt_seek(const std::string& dict_name,
const std::vector<uint64_t>& keys,
std::function<void(DictValue*, size_t)> parse) {
if (_meta == NULL) {
LOG(ERROR) << "seek, meta is null";
return -1;
}
MetaInfo* info = _meta->get_meta(dict_name);
if (info == NULL) {
LOG(ERROR) << "get meta [" << dict_name << "] failed";
return -1;
}
int shard_num = info->shard_num;
DictRpcData* rpc_data =
static_cast<DictRpcData*>(bthread_getspecific(_tls_key));
if (rpc_data == NULL) {
rpc_data = new DictRpcData;
CHECK_EQ(0, bthread_setspecific(_tls_key, rpc_data));
}
rpc_data->sub_reqs.resize(shard_num);
rpc_data->sub_res.resize(shard_num);
std::vector<std::vector<int>> offset;
offset.resize(shard_num);
int init_cnt = keys.size() * 2 / shard_num;
for (int i = 0; i < shard_num; ++i) {
offset[i].reserve(init_cnt);
}
for (size_t i = 0; i < keys.size(); ++i) {
uint64_t shard_id = keys[i] % shard_num;
rpc_data->sub_reqs[shard_id].add_keys(keys[i]);
offset[shard_id].push_back(i);
}
std::vector<DictService_Stub*> stubs(shard_num);
std::vector<brpc::Controller*> cntls(shard_num);
for (int i = 0; i < shard_num; ++i) {
::brpc::Channel* chan = info->cube_conn[i];
stubs[i] = new DictService_Stub(chan);
cntls[i] = new ::brpc::Controller();
}
for (int i = 0; i < shard_num; ++i) {
stubs[i]->seek(cntls[i],
&rpc_data->sub_reqs[i],
&rpc_data->sub_res[i],
brpc::NewCallback(sub_seek_done,
&rpc_data->sub_res[i],
cntls[i],
&(offset[i]),
parse));
}
int cntls_failed_cnt = 0;
for (int i = 0; i < shard_num; ++i) {
brpc::Join(cntls[i]->call_id());
if (cntls[i]->Failed()) {
++cntls_failed_cnt;
LOG(WARNING) << "cube seek from shard [" << i << "] failed ["
<< cntls[i]->ErrorText() << "]";
}
}
int ret = CubeError::E_OK;
info->cube_request_num << 1;
if (cntls_failed_cnt > 0) {
info->cube_rpcfail_num << 1;
if (cntls_failed_cnt == shard_num) {
ret = CubeError::E_ALL_SEEK_FAILED;
} else {
ret = CubeError::E_SEEK_FAILED;
}
}
// merge
size_t miss_cnt = 0;
for (int i = 0; i < shard_num; ++i) {
if (!cntls[i]->Failed()) {
for (int j = 0; j < rpc_data->sub_res[i].values().size(); ++j) {
if (static_cast<int>(rpc_data->sub_res[i].values(j).status()) ==
CubeError::E_NO_SUCH_KEY) {
++miss_cnt;
}
}
}
}
// bvar stats
g_cube_keys_num << keys.size();
if (keys.size() > 0) {
g_cube_keys_miss_num << miss_cnt;
}
// cleanup
for (int i = 0; i < shard_num; ++i) {
delete stubs[i];
stubs[i] = NULL;
delete cntls[i];
cntls[i] = NULL;
rpc_data->sub_reqs[i].Clear();
rpc_data->sub_res[i].Clear();
}
return ret;
}
int CubeAPI::seek(const std::string& dict_name,
const std::vector<uint64_t>& keys,
std::vector<CubeValue>* vals,
std::string* version) {
// Meta* meta = Meta::instance();
if (_meta == NULL) {
LOG(ERROR) << "seek, meta is null";
return -1;
}
MetaInfo* info = _meta->get_meta(dict_name);
if (info == NULL) {
LOG(ERROR) << "get meta [" << dict_name << "] failed";
return -1;
}
int shard_num = info->shard_num;
DictRpcData* rpc_data =
static_cast<DictRpcData*>(bthread_getspecific(_tls_key));
if (rpc_data == NULL) {
rpc_data = new DictRpcData;
CHECK_EQ(0, bthread_setspecific(_tls_key, rpc_data));
}
rpc_data->sub_reqs.resize(shard_num);
rpc_data->sub_res.resize(shard_num);
std::vector<std::vector<int>> offset;
offset.resize(shard_num);
int init_cnt = keys.size() * 2 / shard_num;
for (int i = 0; i < shard_num; ++i) {
offset[i].reserve(init_cnt);
rpc_data->sub_reqs[i].set_version_required(true);
}
for (size_t i = 0; i < keys.size(); ++i) {
uint64_t shard_id = keys[i] % shard_num;
rpc_data->sub_reqs[shard_id].add_keys(keys[i]);
offset[shard_id].push_back(i);
}
std::vector<DictService_Stub*> stubs(shard_num);
std::vector<brpc::Controller*> cntls(shard_num);
for (int i = 0; i < shard_num; ++i) {
::brpc::Channel* chan = info->cube_conn[i];
stubs[i] = new DictService_Stub(chan);
cntls[i] = new ::brpc::Controller();
}
DoNothing do_nothing;
for (int i = 0; i < shard_num; ++i) {
stubs[i]->seek(
cntls[i], &rpc_data->sub_reqs[i], &rpc_data->sub_res[i], &do_nothing);
}
int cntls_failed_cnt = 0;
for (int i = 0; i < shard_num; ++i) {
brpc::Join(cntls[i]->call_id());
if (cntls[i]->Failed()) {
++cntls_failed_cnt;
LOG(WARNING) << "cube seek from shard [" << i << "] failed ["
<< cntls[i]->ErrorText() << "]";
}
}
int ret = CubeError::E_OK;
info->cube_request_num << 1;
if (cntls_failed_cnt > 0) {
info->cube_rpcfail_num << 1;
if (cntls_failed_cnt == shard_num) {
ret = CubeError::E_ALL_SEEK_FAILED;
} else {
ret = CubeError::E_SEEK_FAILED;
}
}
vals->resize(keys.size());
// merge
size_t miss_cnt = 0;
for (int i = 0; i < shard_num; ++i) {
if (cntls[i]->Failed()) {
for (int j = 0; j < rpc_data->sub_res[i].values().size(); ++j) {
(*vals)[offset[i][j]].error = CubeError::E_SEEK_FAILED;
(*vals)[offset[i][j]].buff.assign("");
}
} else {
for (int j = 0; j < rpc_data->sub_res[i].values().size(); ++j) {
DictValue* val = rpc_data->sub_res[i].mutable_values(j);
if (static_cast<int>(val->status()) == CubeError::E_NO_SUCH_KEY) {
miss_cnt += 1;
}
(*vals)[offset[i][j]].error = val->status();
(*vals)[offset[i][j]].buff.swap(*val->mutable_value());
}
if (version->compare(rpc_data->sub_res[i].version()) < 0) {
*version = rpc_data->sub_res[i].version();
}
}
}
// bvar stats
g_cube_keys_num << keys.size();
if (keys.size() > 0) {
g_cube_keys_miss_num << miss_cnt;
g_cube_value_size << (*vals)[0].buff.size();
}
// cleanup
for (int i = 0; i < shard_num; ++i) {
delete stubs[i];
stubs[i] = NULL;
delete cntls[i];
cntls[i] = NULL;
rpc_data->sub_reqs[i].Clear();
rpc_data->sub_res[i].Clear();
}
return ret;
}
int CubeAPI::opt_seek(const std::string& dict_name,
const std::vector<uint64_t>& keys,
std::function<void(DictValue*, size_t)> parse,
std::string* version) {
if (_meta == NULL) {
LOG(ERROR) << "seek, meta is null";
return -1;
}
MetaInfo* info = _meta->get_meta(dict_name);
if (info == NULL) {
LOG(ERROR) << "get meta [" << dict_name << "] failed";
return -1;
}
int shard_num = info->shard_num;
DictRpcData* rpc_data =
static_cast<DictRpcData*>(bthread_getspecific(_tls_key));
if (rpc_data == NULL) {
rpc_data = new DictRpcData;
CHECK_EQ(0, bthread_setspecific(_tls_key, rpc_data));
}
rpc_data->sub_reqs.resize(shard_num);
rpc_data->sub_res.resize(shard_num);
std::vector<std::vector<int>> offset;
offset.resize(shard_num);
int init_cnt = keys.size() * 2 / shard_num;
for (int i = 0; i < shard_num; ++i) {
offset[i].reserve(init_cnt);
rpc_data->sub_reqs[i].set_version_required(true);
}
for (size_t i = 0; i < keys.size(); ++i) {
uint64_t shard_id = keys[i] % shard_num;
rpc_data->sub_reqs[shard_id].add_keys(keys[i]);
offset[shard_id].push_back(i);
}
std::vector<DictService_Stub*> stubs(shard_num);
std::vector<brpc::Controller*> cntls(shard_num);
for (int i = 0; i < shard_num; ++i) {
::brpc::Channel* chan = info->cube_conn[i];
stubs[i] = new DictService_Stub(chan);
cntls[i] = new ::brpc::Controller();
}
for (int i = 0; i < shard_num; ++i) {
stubs[i]->seek(cntls[i],
&rpc_data->sub_reqs[i],
&rpc_data->sub_res[i],
brpc::NewCallback(sub_seek_done,
&rpc_data->sub_res[i],
cntls[i],
&(offset[i]),
parse));
}
int cntls_failed_cnt = 0;
for (int i = 0; i < shard_num; ++i) {
brpc::Join(cntls[i]->call_id());
if (cntls[i]->Failed()) {
++cntls_failed_cnt;
LOG(WARNING) << "cube seek from shard [" << i << "] failed ["
<< cntls[i]->ErrorText() << "]";
}
}
int ret = CubeError::E_OK;
info->cube_request_num << 1;
if (cntls_failed_cnt > 0) {
info->cube_rpcfail_num << 1;
if (cntls_failed_cnt == shard_num) {
ret = CubeError::E_ALL_SEEK_FAILED;
} else {
ret = CubeError::E_SEEK_FAILED;
}
}
// merge
size_t miss_cnt = 0;
for (int i = 0; i < shard_num; ++i) {
if (!cntls[i]->Failed()) {
for (int j = 0; j < rpc_data->sub_res[i].values().size(); ++j) {
if (static_cast<int>(rpc_data->sub_res[i].values(j).status()) ==
CubeError::E_NO_SUCH_KEY) {
++miss_cnt;
}
}
if (version->compare(rpc_data->sub_res[i].version()) < 0) {
*version = rpc_data->sub_res[i].version();
}
}
}
// bvar stats
g_cube_keys_num << keys.size();
if (keys.size() > 0) {
g_cube_keys_miss_num << miss_cnt;
}
// cleanup
for (int i = 0; i < shard_num; ++i) {
delete stubs[i];
stubs[i] = NULL;
delete cntls[i];
cntls[i] = NULL;
rpc_data->sub_reqs[i].Clear();
rpc_data->sub_res[i].Clear();
}
return ret;
}
} // namespace mcube
} // namespace rec
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <gflags/gflags.h>
#include <atomic>
#include <fstream>
#include <thread> //NOLINT
#include "core/cube/cube-api/include/cube_api.h"
#define TIME_FLAG(flag) \
struct timeval flag; \
gettimeofday(&(flag), NULL);
DEFINE_string(config_file, "./cube.conf", "m-cube config file");
DEFINE_string(keys, "keys", "keys to seek");
DEFINE_string(dict, "dict", "dict to seek");
DEFINE_uint64(batch, 500, "batch size");
DEFINE_int32(timeout, 200, "timeout in ms");
DEFINE_int32(retry, 3, "retry times");
DEFINE_bool(print_output, false, "print output flag");
DEFINE_int32(thread_num, 1, "thread num");
std::atomic<int> g_concurrency(0);
std::vector<uint64_t> time_list;
std::vector<uint64_t> request_list;
namespace {
inline uint64_t time_diff(const struct timeval& start_time,
const struct timeval& end_time) {
return (end_time.tv_sec - start_time.tv_sec) * 1000000 +
(end_time.tv_usec - start_time.tv_usec);
}
}
namespace rec {
namespace mcube {
std::string string_to_hex(const std::string& input) {
static const char* const lut = "0123456789ABCDEF";
size_t len = input.length();
std::string output;
output.reserve(2 * len);
for (size_t i = 0; i < len; ++i) {
const unsigned char c = input[i];
output.push_back(lut[c >> 4]);
output.push_back(lut[c & 15]);
}
return output;
}
int run(int argc, char** argv, int thread_id) {
google::ParseCommandLineFlags(&argc, &argv, true);
CubeAPI* cube = CubeAPI::instance();
int ret = cube->init(FLAGS_config_file.c_str());
if (ret != 0) {
LOG(ERROR) << "init cube api failed err=" << ret;
return ret;
}
/*
FILE* key_file = fopen(FLAGS_keys.c_str(), "r");
if (key_file == NULL) {
LOG(ERROR) << "open key file [" << FLAGS_keys << "] failed";
return -1;
}
*/
std::atomic<uint64_t> seek_counter(0);
std::atomic<uint64_t> seek_cost_total(0);
uint64_t seek_cost_max = 0;
uint64_t seek_cost_min = 500000;
char buffer[1024];
std::vector<uint64_t> keys;
std::vector<CubeValue> values;
std::string line;
std::vector<int64_t> key_list;
std::ifstream key_file(FLAGS_keys.c_str());
while (getline(key_file, line)) {
key_list.push_back(std::stoll(line));
}
uint64_t file_size = key_list.size();
uint64_t index = 0;
uint64_t request = 0;
while (g_concurrency.load() >= FLAGS_thread_num) {
}
g_concurrency++;
while (index < file_size) {
// uint64_t key = strtoul(buffer, NULL, 10);
keys.push_back(key_list[index]);
index += 1;
int ret = 0;
if (keys.size() > FLAGS_batch) {
TIME_FLAG(seek_start);
ret = cube->seek(FLAGS_dict, keys, &values);
TIME_FLAG(seek_end);
request += 1;
if (ret != 0) {
LOG(WARNING) << "cube seek failed";
} else if (FLAGS_print_output) {
for (size_t i = 0; i < keys.size(); ++i) {
fprintf(stdout,
"key:%lu value:%s\n",
keys[i],
string_to_hex(values[i].buff).c_str());
}
}
++seek_counter;
uint64_t seek_cost = time_diff(seek_start, seek_end);
seek_cost_total += seek_cost;
if (seek_cost > seek_cost_max) {
seek_cost_max = seek_cost;
}
if (seek_cost < seek_cost_min) {
seek_cost_min = seek_cost;
}
keys.clear();
values.clear();
}
}
/*
if (keys.size() > 0) {
int ret = 0;
values.resize(keys.size());
TIME_FLAG(seek_start);
ret = cube->seek(FLAGS_dict, keys, &values);
TIME_FLAG(seek_end);
if (ret != 0) {
LOG(WARNING) << "cube seek failed";
} else if (FLAGS_print_output) {
for (size_t i = 0; i < keys.size(); ++i) {
fprintf(stdout,
"key:%lu value:%s\n",
keys[i],
string_to_hex(values[i].buff).c_str());
}
}
++seek_counter;
uint64_t seek_cost = time_diff(seek_start, seek_end);
seek_cost_total += seek_cost;
if (seek_cost > seek_cost_max) {
seek_cost_max = seek_cost;
}
if (seek_cost < seek_cost_min) {
seek_cost_min = seek_cost;
}
}
*/
g_concurrency--;
// fclose(key_file);
// ret = cube->destroy();
if (ret != 0) {
LOG(WARNING) << "destroy cube api failed err=" << ret;
}
uint64_t seek_cost_avg = seek_cost_total / seek_counter;
LOG(INFO) << "seek cost avg = " << seek_cost_avg;
LOG(INFO) << "seek cost max = " << seek_cost_max;
LOG(INFO) << "seek cost min = " << seek_cost_min;
time_list[thread_id] = seek_cost_avg;
request_list[thread_id] = request;
return 0;
}
int run_m(int argc, char** argv) {
google::ParseCommandLineFlags(&argc, &argv, true);
int thread_num = FLAGS_thread_num;
request_list.resize(thread_num);
time_list.resize(thread_num);
std::vector<std::thread*> thread_pool;
for (int i = 0; i < thread_num; i++) {
thread_pool.push_back(new std::thread(run, argc, argv, i));
}
for (int i = 0; i < thread_num; i++) {
thread_pool[i]->join();
delete thread_pool[i];
}
uint64_t sum_time = 0;
uint64_t max_time = 0;
uint64_t min_time = 1000000;
uint64_t request_num = 0;
for (int i = 0; i < thread_num; i++) {
sum_time += time_list[i];
if (time_list[i] > max_time) {
max_time = time_list[i];
}
if (time_list[i] < min_time) {
min_time = time_list[i];
}
request_num += request_list[i];
}
uint64_t mean_time = sum_time / thread_num;
LOG(INFO) << thread_num << " thread seek cost"
<< " avg = " << std::to_string(mean_time)
<< " max = " << std::to_string(max_time)
<< " min = " << std::to_string(min_time);
LOG(INFO) << " total_request = " << std::to_string(request_num)
<< " speed = " << std::to_string(1000000 * thread_num / mean_time)
<< " query per second";
}
} // namespace mcube
} // namespace rec
int main(int argc, char** argv) { return ::rec::mcube::run_m(argc, argv); }
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "core/cube/cube-api/include/meta.h"
#include <google/protobuf/descriptor.h>
#include <string.h>
#include <fstream>
#include <new>
#include <sstream>
#include "core/cube/cube-api/cube.pb.h"
namespace {
static ::rec::mcube::Meta* g_ins = NULL;
}
#ifdef BCLOUD
namespace brpc = baidu::rpc;
#endif
namespace rec {
namespace mcube {
Meta* Meta::instance() {
if (g_ins == NULL) {
g_ins = new Meta();
}
return g_ins;
}
Meta::~Meta() {}
int Meta::init(const char* conf_file) {
std::ifstream ifs(conf_file);
if (!ifs.is_open()) {
LOG(ERROR) << "open conf file [" << conf_file << "]";
return -1;
}
std::string content((std::istreambuf_iterator<char>(ifs)),
std::istreambuf_iterator<char>());
BUTIL_RAPIDJSON_NAMESPACE::Document document;
document.Parse(content.c_str());
int ret = 0;
if (document.IsArray()) {
for (auto it = document.Begin(); it != document.End(); ++it) {
MetaInfo* info = create_meta(*it);
if (info == NULL) {
LOG(ERROR) << "create dict meta failed";
ret = -1;
break;
}
LOG(INFO) << "init cube dict [" << info->dict_name << "] succ";
_metas[info->dict_name] = info;
}
} else {
LOG(ERROR)
<< "conf file [" << conf_file
<< "] json schema error, please ensure that the json is an array";
}
if (_metas.size() == 0) {
LOG(ERROR) << "no valid cube";
ret = -1;
}
return ret;
}
int Meta::destroy() {
for (auto it = _metas.begin(); it != _metas.end(); ++it) {
if (it->second != NULL) {
delete it->second;
it->second = NULL;
}
}
_metas.clear();
return 0;
}
MetaInfo* Meta::get_meta(const std::string& dict_name) {
auto iter = _metas.find(dict_name);
if (iter != _metas.end()) {
return iter->second;
}
return NULL;
}
void Meta::reset_bvar(const std::string& dict_name) {
auto iter = _metas.find(dict_name);
if (iter != _metas.end()) {
iter->second->cube_request_num.reset();
iter->second->cube_rpcfail_num.reset();
} else {
LOG(WARNING) << "reset_bvar to invalid dict [" << dict_name << "]";
}
}
MetaInfo* Meta::create_meta(const BUTIL_RAPIDJSON_NAMESPACE::Value& config) {
if (!config.HasMember("dict_name") || !config.HasMember("shard") ||
!config.HasMember("dup") || !config.HasMember("timeout") ||
!config.HasMember("retry") || !config.HasMember("backup_request") ||
!config.HasMember("type")) {
LOG(ERROR) << "create meta failed, required fields miss";
return NULL;
}
if (!config["dict_name"].IsString() || !config["shard"].IsInt() ||
!config["dup"].IsInt() || !config["timeout"].IsInt() ||
!config["retry"].IsInt() || !config["backup_request"].IsInt() ||
!config["type"].IsString()) {
LOG(ERROR) << "create meta failed, required fields type error";
return NULL;
}
MetaInfo* info = new MetaInfo();
info->dict_name = config["dict_name"].GetString();
info->shard_num = config["shard"].GetInt();
info->dup_num = config["dup"].GetInt();
int timeout = config["timeout"].GetInt();
int retry = config["retry"].GetInt();
int backup_request = config["backup_request"].GetInt();
std::string type = config["type"].GetString();
std::string load_balancer = "rr";
if (config.HasMember("load_balancer") && config["load_balancer"].IsString()) {
load_balancer = config["load_balancer"].GetString();
}
int ret = 0;
if (type.compare("ipport") == 0) {
ret = create_meta_from_ipport(
info, config["nodes"], timeout, retry, backup_request);
} else if (type.compare("ipport_list") == 0) {
ret = create_meta_from_ipport_list(
info, config["nodes"], timeout, retry, backup_request, load_balancer);
} else if (type.compare("ipport_parallel") == 0) {
ret = create_meta_from_ipport_parallel(
info, config["nodes"], timeout, retry, backup_request);
} else {
ret = -1;
}
if (ret != 0) {
LOG(ERROR) << "create meta failed error=" << ret;
delete info;
return NULL;
}
return info;
}
int Meta::create_meta_from_ipport(MetaInfo* meta,
const BUTIL_RAPIDJSON_NAMESPACE::Value& nodes,
int timeout,
int retry,
int backup_request) {
brpc::ChannelOptions options;
options.timeout_ms = timeout;
options.max_retry = retry;
if (backup_request > 0) {
options.backup_request_ms = backup_request;
}
meta->cube_conn.resize(meta->shard_num, NULL);
for (int i = 0; i < static_cast<int>(nodes.Size()); ++i) {
const BUTIL_RAPIDJSON_NAMESPACE::Value& node = nodes[i];
const std::string& node_ip = node["ip"].GetString();
int node_port = node["port"].GetInt();
::brpc::Channel* chan = new ::brpc::Channel();
if (chan->Init(node_ip.c_str(), node_port, &options) != 0) {
LOG(ERROR) << "Init connection to [" << node_ip << ":" << node_port
<< "] failed";
delete chan;
return -1;
}
meta->cube_conn[i] = chan;
}
return 0;
}
int Meta::create_meta_from_ipport_list(
MetaInfo* meta,
const BUTIL_RAPIDJSON_NAMESPACE::Value& nodes,
int timeout,
int retry,
int backup_request,
const std::string& load_balancer) {
brpc::ChannelOptions options;
options.timeout_ms = timeout;
options.max_retry = retry;
if (backup_request > 0) {
options.backup_request_ms = backup_request;
}
meta->cube_conn.resize(meta->shard_num, NULL);
for (int i = 0; i < static_cast<int>(nodes.Size()); ++i) {
const BUTIL_RAPIDJSON_NAMESPACE::Value& node = nodes[i];
const std::string& ipport_list = node["ipport_list"].GetString();
::brpc::Channel* chan = new ::brpc::Channel();
if (chan->Init(ipport_list.c_str(), load_balancer.c_str(), &options) != 0) {
LOG(ERROR) << "Init connection to [" << ipport_list << "] failed";
delete chan;
return -1;
}
meta->cube_conn[i] = chan;
}
return 0;
}
int Meta::create_meta_from_ipport_parallel(
MetaInfo* meta,
const BUTIL_RAPIDJSON_NAMESPACE::Value& nodes,
int timeout,
int retry,
int backup_request) {
if (nodes.Size() < 1) {
LOG(ERROR) << "Config nodes size less than 0";
return -1;
}
brpc::ChannelOptions options;
options.timeout_ms = timeout;
options.max_retry = retry;
if (backup_request > 0) {
options.backup_request_ms = backup_request;
}
meta->cube_conn.resize(meta->shard_num, NULL);
for (int i = 0; i < meta->shard_num; ++i) {
const BUTIL_RAPIDJSON_NAMESPACE::Value& node = nodes[0];
const std::string& node_ip = node["ip"].GetString();
int node_port = node["port"].GetInt();
::brpc::Channel* chan = new ::brpc::Channel();
if (chan->Init(node_ip.c_str(), node_port, &options) != 0) {
LOG(ERROR) << "Init connection to [" << node_ip << ":" << node_port
<< "] failed";
delete chan;
return -1;
}
meta->cube_conn[i] = chan;
}
return 0;
}
const std::vector<const MetaInfo*> Meta::metas() {
std::vector<const MetaInfo*> metas;
for (auto i : _metas) {
metas.push_back(i.second);
}
return metas;
}
} // namespace mcube
} // namespace rec
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <iostream>
#include <string>
#include <vector>
#include "core/cube/cube-builder/include/cube-builder/crovl_builder_increment.h"
#include "core/cube/cube-builder/include/cube-builder/define.h"
using std::string;
using std::vector;
class Job {
public:
void set_shard_num(int num);
int get_shard_num();
void set_input_path(string path);
string get_input_path();
void set_output_path(string path);
string get_output_path();
void set_job_mode(mode mmode);
mode get_job_mode();
void set_dict_name(string name);
string get_dict_name();
private:
int shard_num;
string input_path;
string output_path;
mode job_mode;
string dict_name;
};
void mapFileLocal(Job job,
string file,
vector<CROVLBuilderIncremental*> reduces);
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <memory>
#include <string>
#include "core/cube/cube-builder/include/cube-builder/define.h"
#include "core/cube/cube-builder/include/cube-builder/raw_reader.h"
class SequenceFileRecordReader : public RecordReader {
public:
SequenceFileRecordReader() {}
explicit SequenceFileRecordReader(const std::string& path) {
_path = path;
_raw_reader = new FileRawReader(_path);
}
virtual ~SequenceFileRecordReader() {
if (_raw_reader != nullptr) {
delete _raw_reader;
}
}
virtual int open();
virtual int close();
virtual int next(Record* record);
const Header& get_header() { return _header; }
int read_header();
private:
std::string _path{""};
RawReader* _raw_reader{nullptr};
Header _header;
};
typedef std::shared_ptr<SequenceFileRecordReader> SequenceFileRecordReaderPtr;
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <string>
#include "core/cube/cube-builder/include/cube-builder/define.h"
#include "core/cube/cube-builder/include/cube-builder/raw_reader.h"
class VInt {
public:
static int32_t decode_vint_size(const char &value) {
if (value >= -112) {
return 1;
} else if (value < -120) {
return -119 - value;
}
return -111 - value;
}
static bool is_negative_vint(const char &value) {
return value < -120 || (value >= -112 && value < 0);
}
static bool read_vint(RawReader *reader, int32_t *vint) {
char first_byte;
if (reader->read(&first_byte) <= 0) {
return false;
}
int32_t len = decode_vint_size(first_byte);
if (len == 1) {
*vint = first_byte;
return true;
}
char ch;
int32_t bitlen = 0;
int32_t i = 0, lch;
for (int idx = len - 2; idx >= 0; idx--) {
if (reader->read(&ch) <= 0) {
return false;
}
bitlen = 8 * idx;
lch = ch;
i = i | ((lch << bitlen) & (0xFFL << bitlen));
}
*vint = (is_negative_vint(first_byte) ? (i ^ (int32_t)(-1)) : i);
return true;
}
};
class VString {
public:
static const char *encode(std::string str) { return encode(str, true); }
static const char *encode(std::string str, bool /*replace*/) {
// todo
return str.c_str();
}
static std::string decode(char *bytes) { return decode(bytes, true); }
static std::string decode(char *bytes, bool /*replace*/) {
// todo
return std::string(bytes);
}
// todo
static bool read_string(RawReader *reader, std::string *str) {
int length;
if (!VInt::read_vint(reader, &length)) {
return false;
}
if (reader->read_buf(str, length) != length) {
return false;
}
return true;
}
};
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "core/cube/cube-builder/include/cube-builder/builder_job.h"
#include <stdio.h>
#include <iostream>
#include "butil/logging.h"
#include "core/cube/cube-builder/include/cube-builder/seqfile_reader.h"
#include "core/cube/cube-builder/include/cube-builder/util.h"
using std::string;
void Job::set_shard_num(int num) { shard_num = num; }
int Job::get_shard_num() { return shard_num; }
void Job::set_input_path(string path) { input_path = path; }
string Job::get_input_path() { return input_path; }
void Job::set_output_path(string path) { output_path = path; }
string Job::get_output_path() { return output_path; }
void Job::set_job_mode(mode mmode) { job_mode = mmode; }
mode Job::get_job_mode() { return job_mode; }
void Job::set_dict_name(string name) { dict_name = name; }
string Job::get_dict_name() { return dict_name; }
void mapFileLocal(Job job,
string file,
vector<CROVLBuilderIncremental *> reduces) {
SequenceFileRecordReader reader(file.c_str());
if (reader.open() != 0) {
LOG(ERROR) << "open file failed! " << file;
return;
}
if (reader.read_header() != 0) {
LOG(ERROR) << "read header error! " << file;
reader.close();
return;
}
Record record(reader.get_header());
int total_count = 0;
while (reader.next(&record) == 0) {
uint64_t key =
*reinterpret_cast<uint64_t *>(const_cast<char *>(record.key.data()));
total_count++;
int part = key % job.get_shard_num();
int64_t value_length = record.record_len - record.key_len;
reduces[part]->add(key, value_length, record.value.c_str());
}
if (reader.close() != 0) {
LOG(ERROR) << "close file failed! " << file;
return;
}
}
此差异已折叠。
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "core/cube/cube-builder/include/cube-builder/curl_simple.h"
#include <json/json.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <iostream>
#include <map>
#include <string>
const size_t BUFFER_SIZE = 9096;
CurlSimple::CurlSimple() { _p_curl = curl_easy_init(); }
CurlSimple::~CurlSimple() { curl_easy_cleanup(_p_curl); }
/* *
* @method write_callback
* @para:
* contents, the pointer response data, it will cast to information you
* need
* size * nmemb is the memory size of contents
* userp, the poinser user return info
* @return:
* size_t must return size * nmemb, or it was failed
* */
size_t CurlSimple::write_callback(void *contents,
size_t size,
size_t nmemb,
void *userp) {
size_t realsize = size * nmemb;
snprintf(static_cast<char *>(userp),
BUFFER_SIZE,
"%s",
static_cast<char *>(contents));
return realsize;
} // end write_callback
/* *
* @method curl_get
* @para:
* _p_curl, the pointer of CURL,
* url , string, input url string with get parameters
* @return:
* void
* */
std::string CurlSimple::curl_get(const char *url) {
char buffer[BUFFER_SIZE];
CURLcode res;
/* specify URL to get */
curl_easy_setopt(_p_curl, CURLOPT_URL, url);
/* send all data to this function */
curl_easy_setopt(_p_curl, CURLOPT_WRITEFUNCTION, CurlSimple::write_callback);
/* we pass our 'chunk' struct to the callback function */
curl_easy_setopt(_p_curl, CURLOPT_WRITEDATA, static_cast<void *>(buffer));
/* some servers don't like requests that are made without a user-agent
field, so we provide one */
curl_easy_setopt(_p_curl, CURLOPT_USERAGENT, "libcurl-agent/1.0");
/* get it! */
res = curl_easy_perform(_p_curl);
/* check for errors */
if (res != CURLE_OK) {
LOG(ERROR) << "curl_easy_perform() failed: " << curl_easy_strerror(res);
return "";
} else {
/*
* Now, our chunk.memory points to a memory block that is chunk.size
* bytes big and contains the remote file.
*
* Do something nice with it!
*/
return buffer;
}
} // end curl_get
/* *
* @method curl_post
* @para:
* _p_curl, the pointer of CURL,
* url , the input url string without post parameters
* para_map, std::map<std::string, std::string> the input post parameters
* @return:
* void
* */
std::string CurlSimple::curl_post(
const char *url, const std::map<std::string, std::string> &para_map) {
char buffer[BUFFER_SIZE];
CURLcode res;
std::string para_url = "";
std::map<std::string, std::string>::const_iterator para_iterator;
bool is_first = true;
for (para_iterator = para_map.begin(); para_iterator != para_map.end();
para_iterator++) {
if (is_first) {
is_first = false;
} else {
para_url.append("&");
}
std::string key = para_iterator->first;
std::string value = para_iterator->second;
para_url.append(key);
para_url.append("=");
para_url.append(value);
}
LOG(INFO) << "para_url=" << para_url.c_str() << " size:" << para_url.size();
/* specify URL to get */
curl_easy_setopt(_p_curl, CURLOPT_URL, url);
/* send all data to this function */
curl_easy_setopt(_p_curl, CURLOPT_WRITEFUNCTION, CurlSimple::write_callback);
/* send all data to this function */
curl_easy_setopt(_p_curl, CURLOPT_POSTFIELDS, para_url.c_str());
/* we pass our 'chunk' struct to the callback function */
curl_easy_setopt(
_p_curl, CURLOPT_WRITEDATA, reinterpret_cast<void *>(buffer));
/* some servers don't like requests that are made without a user-agent
field, so we provide one */
curl_easy_setopt(_p_curl, CURLOPT_USERAGENT, "libcurl-agent/1.0");
/* get it! */
int retry_num = 3;
bool is_succ = false;
for (int i = 0; i < retry_num; ++i) {
res = curl_easy_perform(_p_curl);
/* check for errors */
if (res != CURLE_OK) {
std::cerr << "curl_easy_perform() failed:" << curl_easy_strerror(res)
<< std::endl;
} else {
/*
* Now, our chunk.memory points to a memory block that is chunk.size
* bytes big and contains the remote file.
*
* Do something nice with it!
*/
is_succ = true;
break;
}
}
return is_succ ? buffer : "";
}
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <gflags/gflags.h>
#include <time.h>
#include <iostream>
#include <vector>
#include "butil/logging.h"
#include "core/cube/cube-builder/include/cube-builder/builder_job.h"
#include "core/cube/cube-builder/include/cube-builder/crovl_builder_increment.h"
#include "core/cube/cube-builder/include/cube-builder/util.h"
DEFINE_string(dict_name, "", "dict name, no need");
DEFINE_string(input_path, "", "source data input path");
DEFINE_string(output_path, "", "source data input path");
DEFINE_string(job_mode, "base", "job mode base/delta default:base");
DEFINE_int32(last_version, 0, "last version, job mode delta need");
DEFINE_int32(cur_version, 0, "current version, no need");
DEFINE_int32(depend_version, 0, "depend version, job mode delta need");
DEFINE_int32(shard_num, -1, "shard num");
DEFINE_string(master_address, "", "master address, no need");
DEFINE_bool(only_build, true, "wheather build need transfer");
int main(int argc, char *argv[]) {
google::SetVersionString("1.0.0.0");
google::SetUsageMessage("Usage : ./cube-builder --help ");
google::ParseCommandLineFlags(&argc, &argv, true);
google::InitGoogleLogging(argv[0]);
google::SetLogDestination(google::GLOG_INFO, "./log_");
google::SetStderrLogging(google::GLOG_WARNING);
LOG(INFO) << "start";
string last_version = std::to_string(FLAGS_last_version);
string cur_version = std::to_string(FLAGS_cur_version);
string depend_version = std::to_string(FLAGS_depend_version);
if (FLAGS_input_path == "" || FLAGS_output_path == "" ||
FLAGS_shard_num == -1) {
LOG(ERROR) << "Param error! Usage: " << argv[0] << " --help";
return -1;
}
if (FLAGS_job_mode == "base") {
if (FLAGS_only_build) {
time_t t;
time(&t);
cur_version = std::to_string(t);
depend_version = cur_version;
}
} else if (FLAGS_job_mode == "delta") {
if (FLAGS_last_version == 0 || FLAGS_depend_version == 0) {
LOG(ERROR) << "Param error! need last_version and depend_version! Usage: "
<< argv[0] << " --help";
return -1;
} else {
if (FLAGS_only_build) {
time_t t;
time(&t);
cur_version = std::to_string(t);
}
}
} else {
LOG(ERROR) << "Job mode error! Usage: " << argv[0] << " --help";
return -1;
}
Job job;
job.set_dict_name(FLAGS_dict_name);
job.set_shard_num(FLAGS_shard_num);
job.set_input_path(FLAGS_input_path);
job.set_output_path(FLAGS_output_path + "/" + depend_version + "_" +
cur_version);
job.set_job_mode(FLAGS_job_mode);
vector<string> files;
getAllFiles(job.get_input_path(), &files);
if (!checkDirectory(job.get_output_path())) {
LOG(ERROR) << "create output_path path failed: "
<< job.get_output_path().c_str();
return -1;
}
vector<CROVLBuilderIncremental *> reduces;
for (auto i = 0; i < job.get_shard_num(); i++) {
string tar_path = job.get_output_path() + "/" + job.get_dict_name() +
"_part" + std::to_string(i) + ".tar";
string build_path = job.get_output_path() + "/" + job.get_dict_name() +
"_part" + std::to_string(i);
CROVLBuilderIncremental *_builder = new CROVLBuilderIncremental();
if (!_builder->Init(IT_HASH,
MAX_BLOCK_SIZE,
job.get_job_mode().c_str(),
build_path.c_str(),
tar_path.c_str(),
job.get_dict_name().c_str(),
std::to_string(i),
std::to_string(0),
last_version,
cur_version,
depend_version,
FLAGS_master_address.c_str())) {
LOG(ERROR) << "CROVLBuilderIncremental init failed " << build_path;
return -1;
}
reduces.push_back(_builder);
}
for (auto file : files) {
mapFileLocal(job, file, reduces);
LOG(INFO) << "next file to reduce!";
}
for (auto reduce : reduces) {
reduce->done();
reduce->archive();
reduce->md5sum();
}
google::ShutdownGoogleLogging();
return 0;
}
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "core/cube/cube-builder/include/cube-builder/seqfile_reader.h"
#include <arpa/inet.h>
#include "butil/logging.h"
#include "core/cube/cube-builder/include/cube-builder/vtext.h"
int SequenceFileRecordReader::open() {
if (_raw_reader->open() != 0) {
return -1;
}
LOG(INFO) << "open sequence file ok! file:" << _path.c_str();
return 0;
}
int SequenceFileRecordReader::close() {
if (_raw_reader->close() != 0) {
return -1;
}
LOG(INFO) << "close sequence file ok! file:" << _path.c_str();
return 0;
}
int SequenceFileRecordReader::next(Record* record) {
uint32_t record_len = 0;
int64_t ret = _raw_reader->read(&record_len);
if (ret == 0) {
return 1; // ?????1???????????????
} else if (ret != sizeof(record_len)) {
LOG(ERROR) << "read sequence file:" << _path
<< " record(record_len) errno:" << ret
<< ", errmsg:" << _raw_reader->errno_to_str(ret);
return -1;
}
record->record_len = static_cast<int>(ntohl(record_len));
// got marker
if (record->record_len == -1) {
std::string marker;
if ((ret = _raw_reader->read_buf(&marker, 16)) != 16) {
LOG(ERROR) << "read sequence file:" << _path
<< " record(marker) errno:" << ret
<< ", errmsg:" << _raw_reader->errno_to_str(ret);
return -1;
}
if (marker != record->sync_marker) {
LOG(ERROR) << "read sequence file:" << _path
<< " record(sync_marker) error!";
return -1;
}
if ((ret = _raw_reader->read(&record->record_len)) !=
sizeof(record->record_len)) {
LOG(ERROR) << "read sequence file:" << _path
<< " record(len) errno:" << ret
<< ", errmsg:" << _raw_reader->errno_to_str(ret);
return -1;
}
record->record_len = static_cast<int>(ntohl(record->record_len));
}
uint32_t key_len = 0;
if ((ret = _raw_reader->read(&key_len)) != sizeof(key_len)) {
LOG(ERROR) << "read sequence file:" << _path
<< " record(key_len) errno:" << ret
<< ", errmsg:" << _raw_reader->errno_to_str(ret);
return -1;
}
record->key_len = static_cast<int>(ntohl(key_len));
if ((ret = _raw_reader->read_buf(&record->key, record->key_len)) !=
record->key_len) {
LOG(ERROR) << "read sequence file:" << _path
<< " record(key_len) errno:" << ret
<< ", errmsg:" << _raw_reader->errno_to_str(ret);
return -1;
}
if ((ret = _raw_reader->read_buf(&record->value,
record->record_len - record->key_len)) !=
(record->record_len - record->key_len)) {
LOG(ERROR) << "read sequence file:" << _path
<< " record(value_len) errno:" << ret
<< ", errmsg:" << _raw_reader->errno_to_str(ret);
return -1;
}
return 0;
}
int SequenceFileRecordReader::read_header() {
LOG(INFO) << "start to read sequence file header:" << _path;
char version[4];
if (_raw_reader->read_buf(&version, 4) != 4) {
LOG(ERROR) << "read sequence file header(version) error:" << _path;
return -1;
}
_header.version = version[3];
if (!VString::read_string(_raw_reader, &_header.key_class)) {
LOG(ERROR) << "read sequence file header(key_class) error:" << _path;
return -1;
}
if (!VString::read_string(_raw_reader, &_header.value_class)) {
LOG(ERROR) << "read sequence file header(value_class) error:" << _path;
return -1;
}
if (_raw_reader->read(&_header.is_compress) != sizeof(bool)) {
LOG(ERROR) << "read sequence file header(is_compress) error:" << _path;
return -1;
}
if (_raw_reader->read(&_header.is_block_compress) != sizeof(bool)) {
LOG(ERROR) << "read sequence file header(is_block_compress) error:"
<< _path;
return -1;
}
if (_header.is_compress) {
if (!VString::read_string(_raw_reader, &_header.compress_class)) {
LOG(ERROR) << "read sequence file header(compress_class) error:" << _path;
return -1;
}
}
int32_t meta_cnt = 0;
if (_raw_reader->read(&meta_cnt) != sizeof(int32_t)) {
LOG(ERROR) << "read sequence file header(meta_cnt) error:" << _path;
return -1;
}
_header.metas.resize(meta_cnt);
for (int32_t i = 0; i != meta_cnt; ++i) {
if (!VString::read_string(_raw_reader, &_header.metas[i].key)) {
LOG(ERROR) << "read sequence file header(meta_key) error:" << _path;
return -1;
}
if (!VString::read_string(_raw_reader, &_header.metas[i].value)) {
LOG(ERROR) << "read sequence file header(meta_value) error:" << _path;
return -1;
}
}
if (_raw_reader->read_buf(&_header.sync_marker, 16) != 16) {
LOG(ERROR) << "read sequence file header(sync_marker) error:" << _path;
return -1;
}
LOG(INFO) << "sync_marker:" << _header.sync_marker;
LOG(INFO) << "read sequence file header ok:" << _path;
return 0;
}
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "core/cube/cube-builder/include/cube-builder/util.h"
#include <dirent.h>
#include <stdlib.h>
#include <string.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include "butil/logging.h"
void getAllFiles(std::string path, std::vector<std::string> *files) {
DIR *dir;
struct dirent *ptr;
if ((dir = opendir(path.c_str())) == NULL) {
perror("Open dri error...");
exit(1);
}
while ((ptr = readdir(dir)) != NULL) {
if (strcmp(ptr->d_name, ".") == 0 || strcmp(ptr->d_name, "..") == 0) {
continue;
} else if ((ptr->d_type) == 8) { // file
if (ptr->d_name[0] != '.') files->push_back(path + "/" + ptr->d_name);
} else if (ptr->d_type == 10) { // link file
continue;
} else if (ptr->d_type == 4) {
getAllFiles(path + "/" + ptr->d_name, files);
}
}
closedir(dir);
}
std::string string_to_hex(const std::string &input) {
static const char *const lut = "0123456789ABCDEF";
size_t len = input.length();
std::string output;
output.reserve(2 * len);
for (size_t i = 0; i < len; ++i) {
const unsigned char c = input[i];
output.push_back(lut[c >> 4]);
output.push_back(lut[c & 15]);
}
return output;
}
bool checkDirectory(const std::string folder) {
LOG(INFO) << "check dir:" << folder;
if (access(folder.c_str(), F_OK) == 0) {
return 1;
}
LOG(WARNING) << "no dir will mkdir:" << folder;
return (mkdir(folder.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH) == 0);
}
void CmdTarfiles(const std::string folder) {
std::string cmd = "cd " + folder + " && tar -cvPf " + folder + ".tar .";
LOG(INFO) << "tar file cmd:" << cmd;
system(cmd.c_str());
}
void CmdMd5sum(const std::string folder) {
std::string cmd = "md5sum " + folder + ".tar > " + folder + ".tar.md5";
LOG(INFO) << "md5sum file cmd:" << cmd;
system(cmd.c_str());
}
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <string>
#include "core/cube/cube-server/control.pb.h"
#include "butil/third_party/rapidjson/document.h"
#include "butil/third_party/rapidjson/prettywriter.h"
#include "butil/third_party/rapidjson/stringbuffer.h"
namespace rec {
namespace mcube {
class Control : public ControlService {
public:
Control();
virtual ~Control();
virtual void cmd(::google::protobuf::RpcController* controller,
const ::rec::mcube::HttpRequest* request,
::rec::mcube::HttpResponse* response,
::google::protobuf::Closure* done);
private:
int handle_status(const BUTIL_RAPIDJSON_NAMESPACE::Document& cmd,
BUTIL_RAPIDJSON_NAMESPACE::Document* res);
int handle_reload_base(const BUTIL_RAPIDJSON_NAMESPACE::Document& cmd,
const std::string& v_path);
int handle_reload_patch(const BUTIL_RAPIDJSON_NAMESPACE::Document& cmd,
const std::string& v_path);
int handle_bg_load_base(const BUTIL_RAPIDJSON_NAMESPACE::Document& cmd,
const std::string& v_path);
int handle_bg_load_patch(const BUTIL_RAPIDJSON_NAMESPACE::Document& cmd,
const std::string& v_path);
int handle_bg_unload(const BUTIL_RAPIDJSON_NAMESPACE::Document& cmd);
int handle_bg_switch(const BUTIL_RAPIDJSON_NAMESPACE::Document& cmd);
int handle_enable(const BUTIL_RAPIDJSON_NAMESPACE::Document& cmd);
std::mutex _cmd_mutex;
}; // class Control
} // namespace mcube
} // namespace rec
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <stdint.h>
#include <atomic>
#include <memory>
#include <string>
#include <vector>
#include "core/cube/cube-server/include/cube/rw_lock.h"
#include "core/cube/cube-server/include/cube/slim_hash_map.h"
#include "core/cube/cube-server/include/cube/virtual_dict.h"
namespace rec {
namespace mcube {
class Dict : public VirtualDict {
public:
Dict();
virtual ~Dict();
/*
void set_base_dict(const Dict * dict){
_base_dict = dict;
}
*/
virtual void set_base_dict(const VirtualDict* dict) {
_base_dict = static_cast<const Dict*>(dict);
}
int load(const std::string& dict_path,
bool in_mem,
const std::string& v_path);
int destroy();
bool seek(uint64_t key, char* buff, uint64_t* buff_size);
const std::string& version(); // no lock, used by framework seek
std::string guard_version();
void atom_inc_seek_num();
void atom_dec_seek_num();
uint32_t atom_seek_num();
private:
int load_index(const std::string& dict_path, const std::string& v_path);
int load_data(const std::string& dict_path, const std::string& v_path);
int load_data_mmap(const std::string& dict_path, const std::string& v_path);
void set_version(const std::string& v_path);
private:
struct DataBlock {
DataBlock() : size(0), fd(-1) {}
std::shared_ptr<char> s_data;
uint32_t size;
int fd;
};
private:
// boost::unordered_map<uint64_t, uint64_t> _table;
slim_hash_map<uint64_t, uint64_t> _slim_table;
std::vector<DataBlock> _block_set;
std::atomic<uint32_t> _seek_num;
const Dict* _base_dict;
std::string _version;
RWLock _rw_lock;
}; // class Dict
typedef std::shared_ptr<Dict> DictPtr;
} // namespace mcube
} // namespace rec
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <atomic>
#include <string>
#include <vector>
#include "core/cube/cube-server/include/cube/dict.h"
#include "core/cube/cube-server/include/cube/virtual_dict.h"
namespace rec {
namespace mcube {
class DictSet : public VirtualDict {
public:
explicit DictSet(int dict_split);
virtual ~DictSet();
virtual int load(const std::vector<std::string>& dict_path,
bool in_mem,
const std::string& v_path);
virtual int destroy();
virtual const std::string& version();
virtual std::string guard_version();
virtual void set_base_dict(const VirtualDict* dict);
virtual bool seek(uint64_t key, char* buff, uint64_t* buff_size);
virtual void atom_inc_seek_num();
virtual void atom_dec_seek_num();
virtual uint32_t atom_seek_num();
private:
std::atomic<uint32_t> _seek_num{0};
std::vector<DictPtr> _dict_set;
int _dict_split{0};
std::string _version{""};
RWLock _rw_lock;
}; // DictSet
} // namespace mcube
} // namespace rec
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <atomic>
#include <string>
#include <vector>
#include "butil/third_party/rapidjson/document.h"
#include "butil/third_party/rapidjson/prettywriter.h"
#include "butil/third_party/rapidjson/stringbuffer.h"
#include "core/cube/cube-server/cube.pb.h"
#include "core/cube/cube-server/include/cube/rw_lock.h"
#include "core/cube/cube-server/include/cube/virtual_dict.h"
namespace rec {
namespace mcube {
struct Status {
enum StatusCode { F_RUNNING = 0, F_LOADING };
};
class Framework {
public:
static Framework* instance();
public:
~Framework();
int init(uint32_t dict_split, bool in_mem);
int destroy();
int status(BUTIL_RAPIDJSON_NAMESPACE::Document* res);
int seek(const DictRequest* req, DictResponse* res);
int reload(const std::string& v_path);
int patch(const std::string& v_path);
// load dict base
int bg_load_base(const std::string& v_path);
// load dict patch
int bg_load_patch(const std::string& v_path);
int bg_unload();
int bg_switch();
int enable(const std::string& version);
private:
void init_dict(uint32_t dict_split);
VirtualDict* create_dict();
private:
VirtualDict* get_cur_dict() {
_rw_lock.r_lock();
VirtualDict* dict = _dict[_dict_idx];
dict->atom_inc_seek_num();
_rw_lock.unlock();
return dict;
}
std::string get_cur_version() {
_rw_lock.r_lock();
VirtualDict* dict = _dict[_dict_idx];
std::string version = dict->version();
_rw_lock.unlock();
return version;
}
VirtualDict* get_bg_dict() const { return _dict[1 - _dict_idx]; }
std::string get_bg_version() {
_bg_rw_lock.r_lock();
VirtualDict* dict = _dict[1 - _dict_idx];
std::string version = "";
if (dict) {
version = dict->guard_version();
}
_bg_rw_lock.unlock();
return version;
}
void set_bg_dict(VirtualDict* dict) {
_bg_rw_lock.w_lock();
_dict[1 - _dict_idx] = dict;
_bg_rw_lock.unlock();
}
void release(VirtualDict* dict);
private:
Framework() : _dict_idx(0) {}
Framework(const Framework&) {}
private:
VirtualDict* _dict[2]{nullptr, nullptr};
int _dict_idx{0};
std::string _dict_path{""};
bool _in_mem{true};
std::atomic_int _status;
RWLock _rw_lock;
RWLock _bg_rw_lock;
uint32_t _max_val_size{0};
uint32_t _dict_split{0};
std::vector<std::string> _dict_set_path;
}; // class Framework
} // namespace mcube
} // namespace rec
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <pthread.h>
#include <queue>
#include "core/cube/cube-server/include/cube/dict.h"
namespace rec {
namespace mcube {
class Recycle {
public:
static Recycle* get_instance();
public:
~Recycle();
/**
* @brief init recycle module
*/
int init();
/**
* @brief destroy recycle module and wait recycle quit
*/
int destroy();
/**
* @brief send dict to recycle module
*/
// void recycle(Dict* dict);
/**
* @brief send dict to recycle module
*/
void recycle(VirtualDict* dict);
private:
static void* recycle_func(void*);
Recycle();
/**
* @brief lock recycle list
*/
void lock();
/**
* @brief unlock recycle list
*/
void unlock();
private:
pthread_t _recycle_thread;
pthread_mutex_t _recycle_mutex;
// std::queue<Dict*> _recycle_list;
std::queue<VirtualDict*> _recycle_list;
bool _running;
}; // class Recycle
} // namespace mcube
} // namespace rec
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include "core/cube/cube-server/cube.pb.h"
namespace rec {
namespace mcube {
class Server : public DictService {
public:
Server();
virtual ~Server();
virtual void seek(::google::protobuf::RpcController* controller,
const ::rec::mcube::DictRequest* request,
::rec::mcube::DictResponse* response,
::google::protobuf::Closure* done);
};
} // namespace mcube
} // namespace rec
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <string>
#include <vector>
#include "core/cube/cube-server/include/cube/error.h"
namespace rec {
namespace mcube {
class VirtualDict {
public:
VirtualDict() {}
virtual ~VirtualDict() {}
virtual int load(const std::string& /*dict_path*/,
bool /*in_mem*/,
const std::string& /*v_path*/) {
return E_NOT_IMPL;
}
virtual int load(const std::vector<std::string>& /*dict_path*/,
bool /*in_mem*/,
const std::string& /*v_path*/) {
return E_NOT_IMPL;
}
virtual int destroy() { return E_NOT_IMPL; }
virtual const std::string& version() {
static std::string UNKNOWN_VERSION = "UNKNOWN";
return UNKNOWN_VERSION;
}
virtual std::string guard_version() {
static std::string UNKNOWN_VERSION = "UNKNOWN";
return UNKNOWN_VERSION;
}
virtual void set_base_dict(const VirtualDict* dict) = 0;
virtual bool seek(uint64_t key, char* buff, uint64_t* buff_size) = 0;
virtual void atom_inc_seek_num() = 0;
virtual void atom_dec_seek_num() = 0;
virtual uint32_t atom_seek_num() = 0;
}; // class VirtualDict
} // namespace mcube
} // namespace rec
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <brpc/server.h>
#include "core/cube/cube-server/include/cube/control.h"
#include "core/cube/cube-server/include/cube/framework.h"
namespace rec {
namespace mcube {
using ::rec::mcube::HttpRequest;
using ::rec::mcube::HttpResponse;
using ::google::protobuf::RpcController;
using ::google::protobuf::Closure;
using ::brpc::HttpHeader;
using ::brpc::URI;
using ::brpc::Controller;
using ::brpc::ClosureGuard;
using BUTIL_RAPIDJSON_NAMESPACE::Document;
std::string rapidjson_value_to_string(
const BUTIL_RAPIDJSON_NAMESPACE::Value& value) {
BUTIL_RAPIDJSON_NAMESPACE::StringBuffer buffer;
BUTIL_RAPIDJSON_NAMESPACE::PrettyWriter<
BUTIL_RAPIDJSON_NAMESPACE::StringBuffer>
writer(buffer);
value.Accept(writer);
return buffer.GetString();
}
Control::Control() {}
Control::~Control() {}
void Control::cmd(::google::protobuf::RpcController* cntl_base,
const ::rec::mcube::HttpRequest* /*request*/,
::rec::mcube::HttpResponse* /*response*/,
::google::protobuf::Closure* done) {
ClosureGuard done_guard(done);
Controller* cntl = static_cast<Controller*>(cntl_base);
std::string cmd_str = cntl->request_attachment().to_string();
Document cmd;
cmd.Parse(cmd_str.c_str());
LOG(INFO) << "HANDLE CMD: " << cmd_str;
if (!cmd.IsObject()) {
LOG(ERROR) << "parse command failed";
cntl->http_response().set_status_code(brpc::HTTP_STATUS_BAD_REQUEST);
return;
}
if (!cmd.HasMember("cmd") || !cmd["cmd"].IsString()) {
cntl->http_response().set_status_code(brpc::HTTP_STATUS_BAD_REQUEST);
return;
}
std::string cmd_name = cmd["cmd"].GetString();
std::string version_path = "";
if (cmd.HasMember("version_path") && cmd["version_path"].IsString()) {
version_path = cmd["version_path"].GetString();
}
int ret = 0;
Document response;
if (cmd_name.compare("status") == 0) {
ret = handle_status(cmd, &response);
} else if (_cmd_mutex.try_lock()) {
if (cmd_name.compare("reload_base") == 0) {
ret = handle_reload_base(cmd, version_path);
} else if (cmd_name.compare("reload_patch") == 0) {
ret = handle_reload_patch(cmd, version_path);
} else if (cmd_name.compare("bg_load_base") == 0) {
ret = handle_bg_load_base(cmd, version_path);
} else if (cmd_name.compare("bg_load_patch") == 0) {
ret = handle_bg_load_patch(cmd, version_path);
} else if (cmd_name.compare("bg_unload") == 0) {
ret = handle_bg_unload(cmd);
} else if (cmd_name.compare("bg_switch") == 0) {
ret = handle_bg_switch(cmd);
} else if (cmd_name.compare("enable") == 0) {
ret = handle_enable(cmd);
} else {
ret = -1;
LOG(ERROR) << "unknown cmd: " << cmd_name;
}
_cmd_mutex.unlock();
} else {
LOG(ERROR) << "try to get cmd mutex failed cmd: " << cmd_name;
ret = -1;
}
cntl->response_attachment().append(rapidjson_value_to_string(response));
if (ret == 0) {
cntl->http_response().set_status_code(brpc::HTTP_STATUS_OK);
} else {
cntl->http_response().set_status_code(
brpc::HTTP_STATUS_INTERNAL_SERVER_ERROR);
}
LOG(INFO) << "CMD DONE: " << cmd_str;
return;
}
int Control::handle_status(const Document& /*cmd*/, Document* res) {
Framework* framework = Framework::instance();
return framework->status(res);
}
int Control::handle_reload_patch(const Document& /*cmd*/,
const std::string& v_path) {
Framework* framework = Framework::instance();
return framework->patch(v_path);
}
int Control::handle_reload_base(const Document& /*cmd*/,
const std::string& v_path) {
Framework* framework = Framework::instance();
return framework->reload(v_path);
}
int Control::handle_bg_load_patch(const Document& /*cmd*/,
const std::string& v_path) {
Framework* framework = Framework::instance();
return framework->bg_load_patch(v_path);
}
int Control::handle_bg_load_base(const Document& /*cmd*/,
const std::string& v_path) {
Framework* framework = Framework::instance();
return framework->bg_load_base(v_path);
}
int Control::handle_bg_unload(const Document& /*cmd*/) {
Framework* framework = Framework::instance();
return framework->bg_unload();
}
int Control::handle_bg_switch(const Document& /*cmd*/) {
Framework* framework = Framework::instance();
return framework->bg_switch();
}
int Control::handle_enable(const Document& cmd) {
if (!cmd.HasMember("version") || !cmd["version"].IsString()) {
return -1;
}
Framework* framework = Framework::instance();
return framework->enable(cmd["version"].GetString());
}
} // namespace mcube
} // namespace rec
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
logex @ 1e897307
此差异已折叠。
docopt-go @ ee0de3bc
此差异已折叠。
rfw @ 6f0a6f32
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
pybind11 @ 9a19306f
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册