diff --git a/CMakeLists.txt b/CMakeLists.txt index 8542345a8ed3b86fbe58f35eac5ba7719d807c40..3a96666de6283955363fa85d13b2e58d46a61011 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -60,6 +60,7 @@ if (NOT DEFINED WITH_MKLDNN) endif() endif() +include(external/jsoncpp) include(external/leveldb) include(external/rocksdb) include(external/zlib) @@ -83,6 +84,7 @@ include_directories(${PADDLE_SERVING_SOURCE_DIR}) include_directories(${PADDLE_SERVING_BINARY_DIR}) set(EXTERNAL_LIBS + jsoncpp gflags rocksdb glog diff --git a/cmake/external/jsoncpp.cmake b/cmake/external/jsoncpp.cmake new file mode 100644 index 0000000000000000000000000000000000000000..0f0136d606c0cdae344fa33705f5fd552b41e4a2 --- /dev/null +++ b/cmake/external/jsoncpp.cmake @@ -0,0 +1,53 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +INCLUDE(ExternalProject) + +SET(JSONCPP_SOURCES_DIR ${THIRD_PARTY_PATH}/jsoncpp) +SET(JSONCPP_INSTALL_DIR ${THIRD_PARTY_PATH}/install/jsoncpp) +SET(JSONCPP_INCLUDE_DIR "${JSONCPP_INSTALL_DIR}/include" CACHE PATH "jsoncpp include directory." FORCE) +SET(JSONCPP_LIBRARIES "${JSONCPP_INSTALL_DIR}/lib" CACHE FILEPATH "jsoncpp library." FORCE) +INCLUDE_DIRECTORIES(${JSONCPP_INCLUDE_DIR}) + +ExternalProject_Add( + extern_jsoncpp + ${EXTERNAL_PROJECT_LOG_ARGS} + PREFIX ${JSONCPP_SOURCES_DIR} + GIT_REPOSITORY "https://github.com/open-source-parsers/jsoncpp" + GIT_TAG 1.8.4 + UPDATE_COMMAND "" + CMAKE_ARGS -DCMAKE_CXX_COMPILER=${CMAKE_CXX_COMPILER} + -DCMAKE_C_COMPILER=${CMAKE_C_COMPILER} + -DCMAKE_CXX_FLAGS=${CMAKE_CXX_FLAGS} + -DCMAKE_CXX_FLAGS_RELEASE=${CMAKE_CXX_FLAGS_RELEASE} + -DCMAKE_CXX_FLAGS_DEBUG=${CMAKE_CXX_FLAGS_DEBUG} + -DCMAKE_C_FLAGS=${CMAKE_C_FLAGS} + -DCMAKE_C_FLAGS_DEBUG=${CMAKE_C_FLAGS_DEBUG} + -DCMAKE_C_FLAGS_RELEASE=${CMAKE_C_FLAGS_RELEASE} + -DCMAKE_INSTALL_PREFIX=${JSONCPP_INSTALL_DIR} + -DBUILD_TESTING=OFF + -DCMAKE_BUILD_TYPE=${THIRD_PARTY_BUILD_TYPE} + ${EXTERNAL_OPTIONAL_ARGS} + CMAKE_CACHE_ARGS -DCMAKE_INSTALL_PREFIX:PATH=${JSONCPP_INSTALL_DIR} + -DCMAKE_POSITION_INDEPENDENT_CODE:BOOL=ON + -DCMAKE_BUILD_TYPE:STRING=${THIRD_PARTY_BUILD_TYPE} +) + + +ADD_LIBRARY(jsoncpp STATIC IMPORTED GLOBAL) +SET_PROPERTY(TARGET jsoncpp PROPERTY IMPORTED_LOCATION ${JSONCPP_LIBRARIES}) +ADD_DEPENDENCIES(jsoncpp extern_jsoncpp) + +LIST(APPEND external_project_dependencies jsoncpp) + diff --git a/cube/CMakeLists.txt b/cube/CMakeLists.txt index a3b3573dc38f7910735463292b13120e0647d80a..55fd966cf553e013ecbdf50eddc9766471a33075 100644 --- a/cube/CMakeLists.txt +++ b/cube/CMakeLists.txt @@ -1,4 +1,4 @@ -# Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved. +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,3 +14,4 @@ add_subdirectory(cube-server) add_subdirectory(cube-api) +add_subdirectory(cube-builder) diff --git a/cube/cube-builder/CMakeLists.txt b/cube/cube-builder/CMakeLists.txt new file mode 100755 index 0000000000000000000000000000000000000000..84fe7674dffd168784655a87e1103095077331df --- /dev/null +++ b/cube/cube-builder/CMakeLists.txt @@ -0,0 +1,49 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License + +project(cube-builder) + +set(CMAKE_CXX_STANDARD 11) + +add_compile_options(-std=c++11) +set(CMAKE_CXX_STANDARD_REQUIRED ON) + +include_directories(SYSTEM ${CMAKE_CURRENT_LIST_DIR}/include) +include_directories(SYSTEM ${CMAKE_CURRENT_BINARY_DIR}/../) + +find_path(JSONCPP_INCLUDE_DIR json/json.h) +set(JSONCPP_LIBRARY ${JSONCPP_INCLUDE_DIR}/../lib/libjsoncpp.a) +message(STATUS "jsoncpp include: ${JSONCPP_INCLUDE_DIR}") +include_directories(${JSONCPP_INCLUDE_DIR}) + + +find_library(CURL_LIB NAMES curl) +if (NOT CURL_LIB) + message(FATAL_ERROR "Fail to find curl") +endif() + +add_executable(cube-builder src/main.cpp include/cube-builder/util.h src/util.cpp src/builder_job.cpp include/cube-builder/builder_job.h include/cube-builder/define.h src/seqfile_reader.cpp include/cube-builder/seqfile_reader.h include/cube-builder/raw_reader.h include/cube-builder/vtext.h src/crovl_builder_increment.cpp include/cube-builder/crovl_builder_increment.h src/curl_simple.cpp include/cube-builder/curl_simple.h) + +set(DYNAMIC_LIB + gflags + ${JSONCPP_LIBRARY} + -lssl + -lcrypto + ${CURL_LIB} + ) + +target_link_libraries(cube-builder ${DYNAMIC_LIB}) + +# install +install(TARGETS cube-builder RUNTIME DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/bin) diff --git a/cube/cube-builder/include/cube-builder/builder_job.h b/cube/cube-builder/include/cube-builder/builder_job.h new file mode 100644 index 0000000000000000000000000000000000000000..55c6df42ca16499b180ddb0eed7267592acbb1b8 --- /dev/null +++ b/cube/cube-builder/include/cube-builder/builder_job.h @@ -0,0 +1,57 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include "cube-builder/crovl_builder_increment.h" +#include "cube-builder/define.h" +using std::string; +using std::vector; + +class Job { + public: + void set_shard_num(int num); + + int get_shard_num(); + + void set_input_path(string path); + + string get_input_path(); + + void set_output_path(string path); + + string get_output_path(); + + void set_job_mode(mode mmode); + + mode get_job_mode(); + + void set_dict_name(string name); + + string get_dict_name(); + + private: + int shard_num; + string input_path; + string output_path; + mode job_mode; + string dict_name; +}; + +void mapFileLocal(Job job, + string file, + vector reduces); diff --git a/cube/cube-builder/include/cube-builder/crovl_builder_increment.h b/cube/cube-builder/include/cube-builder/crovl_builder_increment.h new file mode 100644 index 0000000000000000000000000000000000000000..ac0c0418c459632e0451abcee07f5281272a44e8 --- /dev/null +++ b/cube/cube-builder/include/cube-builder/crovl_builder_increment.h @@ -0,0 +1,113 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include + +class CROVLBuilderIncremental { + public: + static const uint32_t MAX_DATA_LEN = 67108860; + static const uint32_t MAX_DATA_FILE_LEN = 1073741824; + static const uint32_t MAX_DATA_DIR_LEN = 512; + static const uint32_t MAX_FIXDATA_LEN = 504; + + static const uint32_t INDEX_BUFFER = 4194304; + int _index_type; + uint32_t _data_file_len; + uint64_t _data_file; + uint32_t _data_buf_len; + uint32_t _index_buf_len; + uint32_t _index_file_num; + uint64_t _index_file_len; + uint64_t _count; + uint64_t _cur_count; + bool _fix; + uint32_t _fix_len; + uint32_t _rec_size; + + uint64_t* _index; + char* _data; + std::vector _data_file_list; + std::vector _index_file_list; + + char _data_dir[MAX_DATA_DIR_LEN + 1]; + char _data_real_dir[MAX_DATA_DIR_LEN + 1]; + + char _last_data_dir[MAX_DATA_DIR_LEN + 1]; + char _last_data_tmp_dir[MAX_DATA_DIR_LEN + 1]; + + uint64_t _inner_sort_size; + uint64_t _memory_quota; + + bool flush_data(); + bool FlushIndex(); + bool Clear(); + + std::string _mode; + std::string _dict_name; + std::string _shard_id; + std::string _split_id; + std::string _last_version; + std::string _cur_version; + std::string _depend_version; + std::string _master_address; + std::string _id; + std::string _key; + std::string _extra; + + public: + CROVLBuilderIncremental(); + + ~CROVLBuilderIncremental(); + + bool Init(int index_type, + uint32_t data_file_len, + const char* mode, + const char* data_dir, + const char* data_real_dir, + const char* dict_name, + const std::string& shard_id, + const std::string& split_id, + const std::string& last_version, + const std::string& cur_version, + const std::string& depend_version, + const std::string& master_address, + const std::string& id = "", + const std::string& key = "", + const std::string& extra = "", + bool bFix = false, + uint32_t nFixLen = 8); + + int add(uint64_t nKey, uint32_t nLen, const char* pData); + + int add(uint64_t nKey, uint64_t nValue); + + int add(uint64_t nKey, const char* pData); + + bool done(); + + void archive(); + + void md5sum(); + + bool read_last_meta_from_transfer(); + + bool write_cur_meta_to_transfer(); + + bool read_last_meta_from_local(); + + bool write_cur_meta_to_local(); +}; diff --git a/cube/cube-builder/include/cube-builder/curl_simple.h b/cube/cube-builder/include/cube-builder/curl_simple.h new file mode 100644 index 0000000000000000000000000000000000000000..7b60470449655dde72ccdebf6692df8fea58fb69 --- /dev/null +++ b/cube/cube-builder/include/cube-builder/curl_simple.h @@ -0,0 +1,69 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include "butil/logging.h" + +class CurlSimple { + public: + CurlSimple(); + + ~CurlSimple(); + + /* * + * @method curl_get + * @para: + * curl_handle, the pointer of CURL, + * url , string, input url string with get parameters + * @return: + * void + * */ + std::string curl_get(const char *url); + + /* * + * @method curl_post + * @para: + * curl_handle, the pointer of CURL, + * url , the input url string without post parameters + * para_map, std::map the input post parameters + * @return: + * void + * */ + std::string curl_post(const char *url, + const std::map ¶_map); + + private: + /* * + * @method write_callback + * @para: + * contents, the pointer response data, it will cast to information you + * need + * size * nmemb is the memory size of contents + * userp, the poinser user return info + * @return: + * size_t must return size * nmemb, or it was failed + * */ + static size_t write_callback(void *contents, + size_t size, + size_t nmemb, + void *userp); + + private: + CURL *_p_curl; +}; diff --git a/cube/cube-builder/include/cube-builder/define.h b/cube/cube-builder/include/cube-builder/define.h new file mode 100644 index 0000000000000000000000000000000000000000..00142142652d56e5aa2895b5b1aec516938ee7d7 --- /dev/null +++ b/cube/cube-builder/include/cube-builder/define.h @@ -0,0 +1,63 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +typedef std::string mode; + +enum IndexType { + IT_HASH, ///< hash index + IT_SORT, ///< sort index +}; + +const int MAX_BLOCK_SIZE = 1024 * 1024 * 1024; + +struct Meta { + std::string key; + std::string value; +}; + +struct Header { + int version; + std::string key_class; + std::string value_class; + bool is_compress; + bool is_block_compress; + std::string compress_class; + std::vector metas; + std::string sync_marker; + static const int s_sync_hash_size = 16; +}; + +struct Record { + explicit Record(const Header& header) : sync_marker(header.sync_marker) {} + const std::string& sync_marker; + int record_len; + int key_len; + std::string key; + std::string value; +}; + +class RecordReader { + public: + RecordReader() {} + virtual ~RecordReader() {} + virtual int open() = 0; + + virtual int next(Record* record) = 0; + virtual int close() = 0; +}; // class RecordReader diff --git a/cube/cube-builder/include/cube-builder/raw_reader.h b/cube/cube-builder/include/cube-builder/raw_reader.h new file mode 100644 index 0000000000000000000000000000000000000000..acd4340d2c2df8ca529bfaff90ddf73bb966d163 --- /dev/null +++ b/cube/cube-builder/include/cube-builder/raw_reader.h @@ -0,0 +1,124 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include "butil/logging.h" + +class RawReader { + public: + RawReader() {} + + virtual ~RawReader() {} + + virtual int open() = 0; + virtual int close() = 0; + virtual int64_t read(int32_t* v) = 0; + virtual int64_t read(uint32_t* v) = 0; + virtual int64_t read(bool* v) = 0; + virtual int64_t read(char* v) = 0; + virtual int64_t read_buf(std::string* s, size_t len) = 0; + virtual int64_t read_buf(void* v, size_t len) = 0; + virtual const char* errno_to_str(int err) = 0; + + private: +}; + +class FileRawReader : public RawReader { + public: + explicit FileRawReader(const std::string& path) : _path(path) {} + virtual ~FileRawReader() {} + virtual int open() { + _reader.open(_path.c_str(), std::ifstream::binary); + if (!_reader.is_open()) { + return -1; + } + LOG(INFO) << "raw open sequence file ok! file:" << _path.c_str(); + return 0; + } + virtual int close() { + _reader.close(); + LOG(INFO) << "raw close sequence file ok! file:" << _path.c_str(); + return 0; + } + virtual int64_t read(int32_t* v) { + _reader.read(reinterpret_cast(v), sizeof(int32_t)); + if (_reader.good()) + return sizeof(int32_t); + else if (_reader.eof()) + return 0; + else + return -1; + } + virtual int64_t read(uint32_t* v) { + _reader.read(reinterpret_cast(v), sizeof(uint32_t)); + if (_reader.good()) + return sizeof(uint32_t); + else if (_reader.eof()) + return 0; + else + return -1; + } + virtual int64_t read(bool* v) { + _reader.read(reinterpret_cast(v), sizeof(bool)); + if (_reader.good()) + return sizeof(bool); + else if (_reader.eof()) + return 0; + else + return -1; + } + virtual int64_t read(char* v) { + _reader.read(reinterpret_cast(v), sizeof(char)); + if (_reader.good()) + return sizeof(char); + else if (_reader.eof()) + return 0; + else + return -1; + } + virtual int64_t read_buf(std::string* s, size_t len) { + s->resize(len); + std::string& tmp = *s; + _reader.read(reinterpret_cast(&tmp[0]), len); + if (_reader.good()) + return len; + else if (_reader.eof()) + return 0; + else + return -1; + } + virtual int64_t read_buf(void* v, size_t len) { + _reader.read(reinterpret_cast(v), len); + if (_reader.good()) + return len; + else if (_reader.eof()) + return 0; + else + return -1; + } + virtual const char* errno_to_str(int err) { + switch (err) { + case -1: + return "read seqfile error"; + } + return "default error"; + } + + private: + std::string _path{""}; + std::ifstream _reader; +}; diff --git a/cube/cube-builder/include/cube-builder/seqfile_reader.h b/cube/cube-builder/include/cube-builder/seqfile_reader.h new file mode 100644 index 0000000000000000000000000000000000000000..abe36a66f9288496a11d587b1e56578b7681f157 --- /dev/null +++ b/cube/cube-builder/include/cube-builder/seqfile_reader.h @@ -0,0 +1,45 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include "cube-builder/define.h" +#include "cube-builder/raw_reader.h" +class SequenceFileRecordReader : public RecordReader { + public: + SequenceFileRecordReader() {} + explicit SequenceFileRecordReader(const std::string& path) { + _path = path; + _raw_reader = new FileRawReader(_path); + } + virtual ~SequenceFileRecordReader() { + if (_raw_reader != nullptr) { + delete _raw_reader; + } + } + virtual int open(); + virtual int close(); + virtual int next(Record* record); + const Header& get_header() { return _header; } + int read_header(); + + private: + std::string _path{""}; + RawReader* _raw_reader{nullptr}; + Header _header; +}; + +typedef std::shared_ptr SequenceFileRecordReaderPtr; diff --git a/cube/cube-builder/include/cube-builder/util.h b/cube/cube-builder/include/cube-builder/util.h new file mode 100644 index 0000000000000000000000000000000000000000..2b743e5d508c235c87dd04bef1bad8deb24f5573 --- /dev/null +++ b/cube/cube-builder/include/cube-builder/util.h @@ -0,0 +1,31 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include + +#define PATH_DELIMITER '\\' + +void getAllFiles(std::string path, std::vector *files); + +std::string string_to_hex(const std::string &input); + +bool checkDirectory(const std::string folder); + +void CmdTarfiles(const std::string folder); + +void CmdMd5sum(const std::string folder); diff --git a/cube/cube-builder/include/cube-builder/vtext.h b/cube/cube-builder/include/cube-builder/vtext.h new file mode 100644 index 0000000000000000000000000000000000000000..51c3fa3ce2479e15b55afd3b5195dec58ff876d7 --- /dev/null +++ b/cube/cube-builder/include/cube-builder/vtext.h @@ -0,0 +1,87 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include "cube-builder/define.h" +#include "cube-builder/raw_reader.h" + +class VInt { + public: + static int32_t decode_vint_size(const char &value) { + if (value >= -112) { + return 1; + } else if (value < -120) { + return -119 - value; + } + return -111 - value; + } + static bool is_negative_vint(const char &value) { + return value < -120 || (value >= -112 && value < 0); + } + static bool read_vint(RawReader *reader, int32_t *vint) { + char first_byte; + if (reader->read(&first_byte) <= 0) { + return false; + } + int32_t len = decode_vint_size(first_byte); + if (len == 1) { + *vint = first_byte; + return true; + } + char ch; + int32_t bitlen = 0; + int32_t i = 0, lch; + for (int idx = len - 2; idx >= 0; idx--) { + if (reader->read(&ch) <= 0) { + return false; + } + bitlen = 8 * idx; + lch = ch; + i = i | ((lch << bitlen) & (0xFFL << bitlen)); + } + *vint = (is_negative_vint(first_byte) ? (i ^ (int32_t)(-1)) : i); + return true; + } +}; + +class VString { + public: + static const char *encode(std::string str) { return encode(str, true); } + + static const char *encode(std::string str, bool /*replace*/) { + // todo + return str.c_str(); + } + + static std::string decode(char *bytes) { return decode(bytes, true); } + + static std::string decode(char *bytes, bool /*replace*/) { + // todo + return std::string(bytes); + } + + // todo + static bool read_string(RawReader *reader, std::string *str) { + int length; + if (!VInt::read_vint(reader, &length)) { + return false; + } + if (reader->read_buf(str, length) != length) { + return false; + } + return true; + } +}; diff --git a/cube/cube-builder/src/builder_job.cpp b/cube/cube-builder/src/builder_job.cpp new file mode 100644 index 0000000000000000000000000000000000000000..97edaf4f9c8f5417888f72bccab462bfed2588c9 --- /dev/null +++ b/cube/cube-builder/src/builder_job.cpp @@ -0,0 +1,75 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "cube-builder/builder_job.h" +#include +#include +#include "butil/logging.h" +#include "cube-builder/seqfile_reader.h" +#include "cube-builder/util.h" +using std::string; +void Job::set_shard_num(int num) { shard_num = num; } + +int Job::get_shard_num() { return shard_num; } + +void Job::set_input_path(string path) { input_path = path; } + +string Job::get_input_path() { return input_path; } + +void Job::set_output_path(string path) { output_path = path; } + +string Job::get_output_path() { return output_path; } + +void Job::set_job_mode(mode mmode) { job_mode = mmode; } + +mode Job::get_job_mode() { return job_mode; } + +void Job::set_dict_name(string name) { dict_name = name; } + +string Job::get_dict_name() { return dict_name; } + +void mapFileLocal(Job job, + string file, + vector reduces) { + SequenceFileRecordReader reader(file.c_str()); + + if (reader.open() != 0) { + LOG(ERROR) << "open file failed! " << file; + return; + } + if (reader.read_header() != 0) { + LOG(ERROR) << "read header error! " << file; + reader.close(); + return; + } + + Record record(reader.get_header()); + int total_count = 0; + + while (reader.next(&record) == 0) { + uint64_t key = + *reinterpret_cast(const_cast(record.key.data())); + + total_count++; + int part = key % job.get_shard_num(); + int64_t value_length = record.record_len - record.key_len; + + reduces[part]->add(key, value_length, record.value.c_str()); + } + + if (reader.close() != 0) { + LOG(ERROR) << "close file failed! " << file; + return; + } +} diff --git a/cube/cube-builder/src/crovl_builder_increment.cpp b/cube/cube-builder/src/crovl_builder_increment.cpp new file mode 100644 index 0000000000000000000000000000000000000000..69732071ceee8d717682880229f56fa033d4165d --- /dev/null +++ b/cube/cube-builder/src/crovl_builder_increment.cpp @@ -0,0 +1,847 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "cube-builder/crovl_builder_increment.h" +#include +#include +#include +#include +#include +#include +#include "boost/lexical_cast.hpp" +#include "cube-builder/curl_simple.h" +#include "cube-builder/define.h" +#include "cube-builder/util.h" +#include "json/json.h" + +static const uint64_t MB = 1048576; +// CSpinLock g_sLock; + +CROVLBuilderIncremental::CROVLBuilderIncremental() { + _index = NULL; + _data = NULL; + // _index_file = NULL; + _inner_sort_size = 32 * MB; + _memory_quota = 4 * 1024 * MB; +} + +bool CROVLBuilderIncremental::Clear() { + if (_index != NULL) { + delete[] _index; + _index = NULL; + } + + if (_data != NULL) { + delete[] _data; + _data = NULL; + } + + _data_file_list.clear(); + + return true; +} + +CROVLBuilderIncremental::~CROVLBuilderIncremental() { Clear(); } + +bool CROVLBuilderIncremental::Init(int index_type, + uint32_t data_file_len, + const char *mode, + const char *data_dir, + const char *data_real_dir, + const char *dict_name, + const std::string &shard_id, + const std::string &split_id, + const std::string &last_version, + const std::string &cur_version, + const std::string &depend_version, + const std::string &master_address, + const std::string &id, + const std::string &key, + const std::string &extra, + bool bFix, + uint32_t nFixLen) { + if (index_type != IT_HASH) { + LOG(ERROR) << "Error: incremental_build not support IT_SORT"; + return false; + } + + if (index_type != IT_HASH && index_type != IT_SORT) { + LOG(ERROR) << "Error: invalid index type"; + return false; + } + + if (index_type == IT_HASH && bFix) { + LOG(ERROR) << "Error: can not set fix when index type is IT_HASH"; + return false; + } + + if (nFixLen > MAX_FIXDATA_LEN) { + LOG(ERROR) << "Error: fix length too large"; + return false; + } + + if (data_file_len < MAX_DATA_LEN + 4 || data_file_len > MAX_DATA_FILE_LEN) { + LOG(ERROR) << "Error: invalid data file length" << data_file_len; + return false; + } + + if (strlen(data_dir) > MAX_DATA_DIR_LEN) { + LOG(ERROR) << "Error: data directory too long :" << data_dir; + return false; + } + + Clear(); + + strncpy(_data_dir, data_dir, sizeof(_data_dir)); + strncpy(_data_real_dir, data_real_dir, sizeof(_data_real_dir)); + + _index_type = index_type; + _data_file_len = data_file_len; + _data_file = 0; + _data_buf_len = 0; + _index_buf_len = 0; + _index_file_num = 0; // incremental_build for index + _count = 0; + _cur_count = 0; + _fix = bFix; + _fix_len = nFixLen; + _index_file_len = 0; + + _dict_name = dict_name; + _shard_id = shard_id; + _split_id = split_id; + _last_version = last_version; + _cur_version = cur_version; + _depend_version = depend_version; + _master_address = master_address; + _id = id; + _key = key; + _extra = extra; + + if (_fix) { + _rec_size = nFixLen + sizeof(uint64_t); + } else { + _rec_size = 2 * sizeof(uint64_t); + } + + _index = new uint64_t[INDEX_BUFFER]; + + if (!_fix) { + _data = new char[_data_file_len]; + } else if (_fix_len != 8) { + _data = new char[_rec_size * INDEX_BUFFER]; + } + + if (!checkDirectory(_data_dir)) { + LOG(ERROR) << "create _data_dir path failed: " << _data_dir; + return false; + } + + // read old info from data.n and index.n + _mode = mode; + LOG(INFO) << "mode: " << mode; + + if (strcmp(mode, "base") != 0) { + if (_master_address == "") { + if (!read_last_meta_from_local()) { + LOG(ERROR) << "read last meta from db error! "; + return false; + } + } else { + if (!read_last_meta_from_transfer()) { + LOG(ERROR) << "read last meta from transfer error! "; + return false; + } + } + } + LOG(INFO) << "incremental_build data file:" << _data_file + << ", count:" << _count << ", index file:" << _index_file_num; + return true; +} + +bool CROVLBuilderIncremental::flush_data() { + LOG(INFO) << "flush data" << _data_file; + if (_data_buf_len == 0) { + LOG(ERROR) << "_data_buf_len == 0"; + return true; + } + + char file[MAX_DATA_DIR_LEN * 2]; + snprintf(file, sizeof(file), "%s/data.%lu", _data_dir, _data_file); + FILE *fp; + if ((fp = fopen(file, "wb")) == NULL) { + LOG(ERROR) << "open file failed! " << file; + return false; + } + + if (fwrite(_data, 1, _data_buf_len, fp) != _data_buf_len) { + fclose(fp); + LOG(ERROR) << "write file:" << file << " error!"; + return false; + } + fclose(fp); + + _data_file_list.push_back(_data_buf_len); + _data_buf_len = 0; + ++_data_file; + return true; +} + +bool CROVLBuilderIncremental::FlushIndex() { + LOG(INFO) << "flush index:" << _index_file_num; + + if (_index_buf_len == 0) { + LOG(ERROR) << "_index_buf_len == 0"; + return true; + } + + char file[MAX_DATA_DIR_LEN * 2]; + snprintf(file, sizeof(file), "%s/index.%d", _data_dir, _index_file_num); + FILE *fp; + if ((fp = fopen(file, "wb")) == NULL) { + LOG(ERROR) << "open file failed! " << file; + return false; + } + + if (_fix && _fix_len != 8) { + if (fwrite(_data, _rec_size, _index_buf_len, fp) != _index_buf_len) { + LOG(ERROR) << "_index_buf_len error:" << _index_buf_len; + return false; + } + _index_buf_len = 0; + return true; + } + + if (fwrite(_index, sizeof(uint64_t), _index_buf_len, fp) != _index_buf_len) { + LOG(ERROR) << "write file:" << file << " error!"; + return false; + } + fclose(fp); + + _index_file_len += _index_buf_len * sizeof(uint64_t); + _index_buf_len = 0; + return true; +} + +int CROVLBuilderIncremental::add(uint64_t nKey, + uint32_t nLen, + const char *pData) { + uint64_t naddr; + uint32_t nTotalLen; + + if (nLen > MAX_DATA_LEN || _fix) { + return 0; + } + + nTotalLen = nLen + 4; + + do { + if (_data_buf_len + nTotalLen > _data_file_len) { + if (!flush_data()) { + break; + } + } + + *reinterpret_cast(_data + _data_buf_len) = nTotalLen; + memcpy(_data + _data_buf_len + 4, pData, nLen); + naddr = (_data_file << 32) + _data_buf_len; + _data_buf_len += nTotalLen; + + if (_index_buf_len + 2 > INDEX_BUFFER) { + if (!FlushIndex()) { + break; + } + } + + _index[_index_buf_len] = nKey; + _index[_index_buf_len + 1] = naddr; + _index_buf_len += 2; + ++_count; + ++_cur_count; + return 1; + } while (false); + + return -1; +} + +int CROVLBuilderIncremental::add(uint64_t nKey, uint64_t nValue) { + if (!_fix || _fix_len != 8) { + return 0; + } + + if (_index_buf_len + 2 > INDEX_BUFFER) { + if (!FlushIndex()) { + return -1; + } + } + + _index[_index_buf_len] = nKey; + _index[_index_buf_len + 1] = nValue; + _index_buf_len += 2; + ++_count; + ++_cur_count; + return 1; +} + +int CROVLBuilderIncremental::add(uint64_t nKey, const char *pData) { + if (!_fix) { + return 0; + } + + if (_fix && _fix_len == 8) { + return add(nKey, *reinterpret_cast(const_cast(pData))); + } + + if (_index_buf_len + 1 > INDEX_BUFFER) { + if (!FlushIndex()) { + return -1; + } + } + + *reinterpret_cast(_data + _index_buf_len * _rec_size) = nKey; + memcpy( + _data + (_index_buf_len * _rec_size + sizeof(uint64_t)), pData, _fix_len); + _index_buf_len++; + ++_count; + ++_cur_count; + return 1; +} + +bool CROVLBuilderIncremental::done() { + LOG(INFO) << "done"; + if (!_fix) { + if (!flush_data()) { + return false; + } + } + + if (!FlushIndex()) { + return false; + } + + if (!_fix) { + if (_data_file == 0) { + return false; + } + } + + char buffer[64]; + char file[MAX_DATA_DIR_LEN * 2]; + + // write data.n + snprintf(file, sizeof(file), "%s/data.n", _data_dir); + FILE *fp; + if ((fp = fopen(file, "wb")) == NULL) { + LOG(ERROR) << "open file failed! " << file; + return false; + } + + *reinterpret_cast(buffer) = _data_file; + if (fwrite(buffer, sizeof(uint32_t), 1, fp) != 1) { + fclose(fp); + return false; + } + + for (uint32_t i = 0; i < _data_file_list.size(); ++i) { + *reinterpret_cast(buffer) = _data_file_list[i]; + + if (fwrite(buffer, sizeof(uint32_t), 1, fp) != 1) { + fclose(fp); + return false; + } + } + fclose(fp); + + // write index.n + if (_index_type == IT_HASH) { + if (_count > (uint64_t)((uint32_t)-1)) { + return false; + } + + snprintf(file, sizeof(file), "%s/index.n", _data_dir); + + if ((fp = fopen(file, "wb")) == nullptr) { + LOG(ERROR) << "open file failed! " << file; + return false; + } + + *reinterpret_cast(buffer) = IT_HASH; + *reinterpret_cast(buffer + sizeof(uint32_t)) = (uint32_t)_count; + + if (fwrite(buffer, sizeof(uint32_t), 2, fp) != 2) { + fclose(fp); + return false; + } + + // fix empty data bug + // if the version no data, filter this version, index_file_num no add + // only can patch is null, not base + if (_cur_count > 0) { + *reinterpret_cast(buffer) = ++_index_file_num; + } else { + *reinterpret_cast(buffer) = _index_file_num; + } + + if (fwrite(buffer, sizeof(uint32_t), 1, fp) != 1) { + fclose(fp); + return false; + } + + if (_cur_count > 0) { + _index_file_list.push_back(_index_file_len); + } + for (uint32_t i = 0; i < _index_file_list.size(); ++i) { + *reinterpret_cast(buffer) = _index_file_list[i]; + + if (fwrite(buffer, sizeof(uint64_t), 1, fp) != 1) { + fclose(fp); + return false; + } + } + fclose(fp); + + if (_master_address == "") { + if (!write_cur_meta_to_local()) { + LOG(ERROR) << "write cur meta to local error"; + return false; + } + } else { + if (!write_cur_meta_to_transfer()) { + LOG(ERROR) << "write cur meta to db error master addr:" + << _master_address.c_str(); + return false; + } + } + + } else { + snprintf(file, sizeof(file), "%s/index.d", _data_dir); + LOG(ERROR) << "HASH is error!"; + } + + if (!Clear()) { + LOG(ERROR) << "clear error!"; + return false; + } + + return true; +} + +void CROVLBuilderIncremental::archive() { CmdTarfiles(_data_dir); } + +void CROVLBuilderIncremental::md5sum() { CmdMd5sum(_data_dir); } + +bool CROVLBuilderIncremental::read_last_meta_from_transfer() { + std::string url = "http://" + _master_address + "/dict/meta_info?name=" + + _dict_name + "&shard=" + _shard_id + "&split=" + _split_id + + "&version=" + _last_version + "&depend=" + _depend_version; + + LOG(INFO) << "name:" << _dict_name.c_str() << " shard:" << _shard_id.c_str() + << " split:" << _split_id.c_str() + << " last version:" << _last_version.c_str() + << " depend version:" << _depend_version.c_str(); + + CurlSimple cs; + std::string result = cs.curl_get(url.c_str()); + + if (result == "") { + LOG(ERROR) << "curl get error!"; + return false; + } + + LOG(INFO) << "curl result:" << result.c_str(); + Json::Reader reader(Json::Features::strictMode()); + Json::Value val; + + if (!reader.parse(result, val)) { + LOG(ERROR) << "parse result json error!"; + return false; + } + + if (!val.isObject()) { + LOG(ERROR) << "no valild json error!"; + return false; + } + + if (val["success"].isNull() || !val["success"].isString()) { + LOG(ERROR) << "parse field success error!"; + return false; + } + + std::string success = val["success"].asString(); + + if (success != "0") { + LOG(ERROR) << "parse field success error!" << success.c_str(); + return false; + } + + if (val["data"].isNull() || !val["data"].isObject()) { + LOG(ERROR) << "parse field data error!"; + return false; + } + + Json::Value data = val["data"]; + + if (data["meta"].isNull() || !data["meta"].isString()) { + LOG(ERROR) << "parse field meta error!"; + return false; + } + + Json::Reader meta_reader(Json::Features::strictMode()); + Json::Value meta; + + if (!meta_reader.parse(data["meta"].asString(), meta)) { + LOG(ERROR) << "parse meta json error!"; + return false; + } + + if (!meta.isObject()) { + LOG(ERROR) << "parse meta json error!"; + return false; + } + + if (meta["data_len_list"].isNull() || !meta["data_len_list"].isArray()) { + LOG(ERROR) << "parse data_len_list json error!"; + return false; + } + + Json::Value data_len_list = meta["data_len_list"]; + _data_file = data_len_list.size(); + + if (_data_file == 0) { + LOG(ERROR) << "data len list size is 0!"; + return false; + } + + for (int i = 0; i < static_cast(_data_file); ++i) { + LOG(INFO) << "data_len:" << data_len_list[i].asString().c_str(); + + try { + _data_file_list.push_back( + boost::lexical_cast(data_len_list[i].asString())); + } catch (boost::bad_lexical_cast &e) { + LOG(ERROR) << "bad lexical cast:" << e.what(); + return false; + } + } + + if (meta["index_len_list"].isNull() || !meta["index_len_list"].isArray()) { + LOG(ERROR) << "parse index_len_list json error!"; + return false; + } + + Json::Value index_len_list = meta["index_len_list"]; + _index_file_num = index_len_list.size(); + + if (_index_file_num == 0) { + LOG(ERROR) << "index len list size is 0!"; + return false; + } + + for (uint32_t i = 0; i < _index_file_num; ++i) { + LOG(INFO) << "index_len:" << index_len_list[i].asString().c_str(); + + try { + _index_file_list.push_back( + boost::lexical_cast(index_len_list[i].asString())); + } catch (boost::bad_lexical_cast &e) { + LOG(ERROR) << "bad lexical cast:" << e.what(); + return false; + } + } + + if (meta["index_total_count"].isNull() || + !meta["index_total_count"].isString()) { + LOG(ERROR) << "parse index_total_count json error!"; + return false; + } + + LOG(INFO) << "index_total_count:" + << meta["index_total_count"].asString().c_str(); + try { + _count = + boost::lexical_cast(meta["index_total_count"].asString()); + } catch (boost::bad_lexical_cast &e) { + LOG(ERROR) << "bad lexical cast:" << e.what(); + return false; + } + + return true; +} + +bool CROVLBuilderIncremental::write_cur_meta_to_transfer() { + CurlSimple cs; + std::string url = "http://" + _master_address + "/dict/meta_info/register"; + + std::map args; + args["name"] = _dict_name; + args["shard"] = _shard_id; + args["split"] = _split_id; + args["version"] = _cur_version; + args["depend"] = _depend_version; + + LOG(INFO) << "name:" << _dict_name.c_str() << " shard:" << _shard_id.c_str() + << " split:" << _split_id.c_str() + << " cur version:" << _cur_version.c_str() + << " depend version:" << _depend_version.c_str(); + + Json::Value root; + + try { + root["index_total_count"] = boost::lexical_cast(_count); + } catch (boost::bad_lexical_cast &e) { + LOG(ERROR) << "bad lexical cast:" << e.what(); + return false; + } + + Json::Value data; + + for (size_t i = 0; i < _data_file_list.size(); ++i) { + try { + data.append(boost::lexical_cast(_data_file_list[i])); + } catch (boost::bad_lexical_cast &e) { + LOG(ERROR) << "bad lexical cast:" << e.what(); + return false; + } + } + + root["data_len_list"] = data; + + Json::Value index; + + for (size_t i = 0; i < _index_file_list.size(); ++i) { + try { + index.append(boost::lexical_cast(_index_file_list[i])); + } catch (boost::bad_lexical_cast &e) { + LOG(ERROR) << "bad lexical cast:" << e.what(); + return false; + } + } + + root["index_len_list"] = index; + + Json::FastWriter writer; + // Json::StreamWriterBuilder writer; + std::string meta = writer.write(root); + + // std::string meta = Json::writeString(writer, root); + if (meta[meta.size() - 1] == '\n') { + meta.erase(meta.size() - 1, 1); + } + + LOG(INFO) << "meta:" << meta.c_str() << " size:" << meta.size(); + args["meta"] = meta; + std::string result = cs.curl_post(url.c_str(), args); + + if (result == "") { + LOG(ERROR) << "curl get error!"; + return false; + } + + LOG(INFO) << "curl result:" << result.c_str(); + + Json::Reader reader(Json::Features::strictMode()); + Json::Value val; + + if (!reader.parse(result, val)) { + LOG(ERROR) << "parse result json error!"; + return false; + } + + if (!val.isObject()) { + LOG(ERROR) << "no valild json error!"; + return false; + } + + if (val["success"].isNull() || !val["success"].isString()) { + LOG(ERROR) << "parse field success error!"; + return false; + } + + std::string success = val["success"].asString(); + + if (success != "0") { + LOG(ERROR) << "parse field success error!" << success.c_str(); + return false; + } + + LOG(INFO) << "curl post ok"; + return true; +} + +bool CROVLBuilderIncremental::read_last_meta_from_local() { + char file[MAX_DATA_DIR_LEN * 2]; + + snprintf(file, + sizeof(file), + "%s/../../meta_info/%s_%s_%s_%s.json", + _data_dir, + _last_version.c_str(), + _depend_version.c_str(), + _shard_id.c_str(), + _split_id.c_str()); + + // read from local meta file + std::string input_meta; + Json::Reader meta_reader(Json::Features::strictMode()); + Json::Value meta; + + std::ifstream ifs; + ifs.open(file); + if (!ifs) { + LOG(ERROR) << "open file failed! " << file; + return false; + } + + if (!meta_reader.parse(ifs, meta)) { + LOG(ERROR) << "parse meta json error!"; + return false; + } + ifs.close(); + + if (!meta.isObject()) { + LOG(ERROR) << "parse meta json error!"; + return false; + } + + if (meta["data_len_list"].isNull() || !meta["data_len_list"].isArray()) { + LOG(ERROR) << "parse data_len_list json error !"; + return false; + } + + Json::Value data_len_list = meta["data_len_list"]; + _data_file = data_len_list.size(); + + if (_data_file == 0) { + LOG(ERROR) << "data len list size is 0 !"; + return false; + } + + for (int i = 0; i < static_cast(_data_file); ++i) { + LOG(INFO) << "data_len:" << data_len_list[i].asString().c_str(); + + try { + _data_file_list.push_back( + boost::lexical_cast(data_len_list[i].asString())); + } catch (boost::bad_lexical_cast &e) { + LOG(ERROR) << "bad lexical cast:" << e.what(); + return false; + } + } + + if (meta["index_len_list"].isNull() || !meta["index_len_list"].isArray()) { + LOG(ERROR) << "parse index_len_list json error!"; + return false; + } + + Json::Value index_len_list = meta["index_len_list"]; + _index_file_num = index_len_list.size(); + + if (_index_file_num == 0) { + LOG(ERROR) << "index len list size is 0!"; + return false; + } + + for (uint32_t i = 0; i < _index_file_num; ++i) { + LOG(INFO) << "index_len:" << index_len_list[i].asString().c_str(); + + try { + _index_file_list.push_back( + boost::lexical_cast(index_len_list[i].asString())); + } catch (boost::bad_lexical_cast &e) { + LOG(ERROR) << "bad lexical cast:" << e.what(); + return false; + } + } + + if (meta["index_total_count"].isNull() || + !meta["index_total_count"].isString()) { + LOG(ERROR) << "parse index_total_count json error!"; + return false; + } + + LOG(INFO) << "index_total_count:" + << meta["index_total_count"].asString().c_str(); + try { + _count = + boost::lexical_cast(meta["index_total_count"].asString()); + } catch (boost::bad_lexical_cast &e) { + LOG(ERROR) << "bad lexical cast:" << e.what(); + return false; + } + + return true; +} + +bool CROVLBuilderIncremental::write_cur_meta_to_local() { + char file[MAX_DATA_DIR_LEN * 2]; + std::string meta_path = _data_dir; + meta_path = meta_path + "/../../meta_info"; + if (!checkDirectory(meta_path)) { + LOG(ERROR) << "create meta_path path failed:" << meta_path.c_str(); + return false; + } + snprintf(file, + sizeof(file), + "%s/%s_%s_%s_%s.json", + meta_path.c_str(), + _cur_version.c_str(), + _depend_version.c_str(), + _shard_id.c_str(), + _split_id.c_str()); + + Json::Value meta; + + try { + meta["index_total_count"] = boost::lexical_cast(_count); + } catch (boost::bad_lexical_cast &e) { + LOG(ERROR) << "bad lexical cast:" << e.what(); + return false; + } + + Json::Value data; + + for (size_t i = 0; i < _data_file_list.size(); ++i) { + try { + data.append(boost::lexical_cast(_data_file_list[i])); + } catch (boost::bad_lexical_cast &e) { + LOG(ERROR) << "bad lexical cast:" << e.what(); + return false; + } + } + + meta["data_len_list"] = data; + + Json::Value index; + + for (size_t i = 0; i < _index_file_list.size(); ++i) { + try { + index.append(boost::lexical_cast(_index_file_list[i])); + } catch (boost::bad_lexical_cast &e) { + LOG(ERROR) << "bad lexical cast:" << e.what(); + return false; + } + } + + meta["index_len_list"] = index; + + std::ofstream ofs; + ofs.open(file); + if (!ofs) { + LOG(ERROR) << "open file failed!" << file; + return false; + } + ofs << meta.toStyledString(); + ofs.close(); + return true; +} diff --git a/cube/cube-builder/src/curl_simple.cpp b/cube/cube-builder/src/curl_simple.cpp new file mode 100644 index 0000000000000000000000000000000000000000..f95902f180bbd35cda5cf6e3b9ed5a352136185b --- /dev/null +++ b/cube/cube-builder/src/curl_simple.cpp @@ -0,0 +1,159 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "cube-builder/curl_simple.h" +#include +#include +#include +#include +#include +#include +#include + +const size_t BUFFER_SIZE = 9096; + +CurlSimple::CurlSimple() { _p_curl = curl_easy_init(); } + +CurlSimple::~CurlSimple() { curl_easy_cleanup(_p_curl); } + +/* * + * @method write_callback + * @para: + * contents, the pointer response data, it will cast to information you + * need + * size * nmemb is the memory size of contents + * userp, the poinser user return info + * @return: + * size_t must return size * nmemb, or it was failed + * */ +size_t CurlSimple::write_callback(void *contents, + size_t size, + size_t nmemb, + void *userp) { + size_t realsize = size * nmemb; + snprintf(static_cast(userp), + BUFFER_SIZE, + "%s", + static_cast(contents)); + return realsize; +} // end write_callback + /* * + * @method curl_get + * @para: + * _p_curl, the pointer of CURL, + * url , string, input url string with get parameters + * @return: + * void + * */ +std::string CurlSimple::curl_get(const char *url) { + char buffer[BUFFER_SIZE]; + CURLcode res; + /* specify URL to get */ + curl_easy_setopt(_p_curl, CURLOPT_URL, url); + /* send all data to this function */ + curl_easy_setopt(_p_curl, CURLOPT_WRITEFUNCTION, CurlSimple::write_callback); + /* we pass our 'chunk' struct to the callback function */ + curl_easy_setopt(_p_curl, CURLOPT_WRITEDATA, static_cast(buffer)); + /* some servers don't like requests that are made without a user-agent + field, so we provide one */ + curl_easy_setopt(_p_curl, CURLOPT_USERAGENT, "libcurl-agent/1.0"); + /* get it! */ + res = curl_easy_perform(_p_curl); + + /* check for errors */ + if (res != CURLE_OK) { + LOG(ERROR) << "curl_easy_perform() failed: " << curl_easy_strerror(res); + return ""; + } else { + /* + * Now, our chunk.memory points to a memory block that is chunk.size + * bytes big and contains the remote file. + * + * Do something nice with it! + */ + return buffer; + } +} // end curl_get + /* * + * @method curl_post + * @para: + * _p_curl, the pointer of CURL, + * url , the input url string without post parameters + * para_map, std::map the input post parameters + * @return: + * void + * */ +std::string CurlSimple::curl_post( + const char *url, const std::map ¶_map) { + char buffer[BUFFER_SIZE]; + CURLcode res; + std::string para_url = ""; + std::map::const_iterator para_iterator; + bool is_first = true; + + for (para_iterator = para_map.begin(); para_iterator != para_map.end(); + para_iterator++) { + if (is_first) { + is_first = false; + } else { + para_url.append("&"); + } + + std::string key = para_iterator->first; + std::string value = para_iterator->second; + para_url.append(key); + para_url.append("="); + para_url.append(value); + } + + LOG(INFO) << "para_url=" << para_url.c_str() << " size:" << para_url.size(); + + /* specify URL to get */ + curl_easy_setopt(_p_curl, CURLOPT_URL, url); + /* send all data to this function */ + curl_easy_setopt(_p_curl, CURLOPT_WRITEFUNCTION, CurlSimple::write_callback); + + /* send all data to this function */ + curl_easy_setopt(_p_curl, CURLOPT_POSTFIELDS, para_url.c_str()); + /* we pass our 'chunk' struct to the callback function */ + curl_easy_setopt( + _p_curl, CURLOPT_WRITEDATA, reinterpret_cast(buffer)); + /* some servers don't like requests that are made without a user-agent + field, so we provide one */ + curl_easy_setopt(_p_curl, CURLOPT_USERAGENT, "libcurl-agent/1.0"); + /* get it! */ + int retry_num = 3; + bool is_succ = false; + + for (int i = 0; i < retry_num; ++i) { + res = curl_easy_perform(_p_curl); + + /* check for errors */ + if (res != CURLE_OK) { + std::cerr << "curl_easy_perform() failed:" << curl_easy_strerror(res) + << std::endl; + } else { + /* + * Now, our chunk.memory points to a memory block that is chunk.size + * bytes big and contains the remote file. + * + * Do something nice with it! + */ + is_succ = true; + break; + } + } + + return is_succ ? buffer : ""; +} diff --git a/cube/cube-builder/src/main.cpp b/cube/cube-builder/src/main.cpp new file mode 100644 index 0000000000000000000000000000000000000000..7ea428919ad063ffc63e908d203bb9c302e53021 --- /dev/null +++ b/cube/cube-builder/src/main.cpp @@ -0,0 +1,135 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include "butil/logging.h" +#include "cube-builder/builder_job.h" +#include "cube-builder/crovl_builder_increment.h" +#include "cube-builder/util.h" + +DEFINE_string(dict_name, "", "dict name, no need"); +DEFINE_string(input_path, "", "source data input path"); +DEFINE_string(output_path, "", "source data input path"); + +DEFINE_string(job_mode, "base", "job mode base/delta default:base"); +DEFINE_int32(last_version, 0, "last version, job mode delta need"); +DEFINE_int32(cur_version, 0, "current version, no need"); +DEFINE_int32(depend_version, 0, "depend version, job mode delta need"); +DEFINE_int32(shard_num, -1, "shard num"); + +DEFINE_string(master_address, "", "master address, no need"); +DEFINE_bool(only_build, true, "wheather build need transfer"); + +int main(int argc, char *argv[]) { + google::SetVersionString("1.0.0.0"); + google::SetUsageMessage("Usage : ./cube-builder --help "); + google::ParseCommandLineFlags(&argc, &argv, true); + google::InitGoogleLogging(argv[0]); + google::SetLogDestination(google::GLOG_INFO, "./log_"); + google::SetStderrLogging(google::GLOG_WARNING); + + LOG(INFO) << "start"; + + string last_version = std::to_string(FLAGS_last_version); + string cur_version = std::to_string(FLAGS_cur_version); + string depend_version = std::to_string(FLAGS_depend_version); + + if (FLAGS_input_path == "" || FLAGS_output_path == "" || + FLAGS_shard_num == -1) { + LOG(ERROR) << "Param error! Usage: " << argv[0] << " --help"; + return -1; + } + if (FLAGS_job_mode == "base") { + if (FLAGS_only_build) { + time_t t; + time(&t); + cur_version = std::to_string(t); + depend_version = cur_version; + } + + } else if (FLAGS_job_mode == "delta") { + if (FLAGS_last_version == 0 || FLAGS_depend_version == 0) { + LOG(ERROR) << "Param error! need last_version and depend_version! Usage: " + << argv[0] << " --help"; + return -1; + } else { + if (FLAGS_only_build) { + time_t t; + time(&t); + cur_version = std::to_string(t); + } + } + } else { + LOG(ERROR) << "Job mode error! Usage: " << argv[0] << " --help"; + return -1; + } + + Job job; + job.set_dict_name(FLAGS_dict_name); + job.set_shard_num(FLAGS_shard_num); + job.set_input_path(FLAGS_input_path); + job.set_output_path(FLAGS_output_path + "/" + depend_version + "_" + + cur_version); + job.set_job_mode(FLAGS_job_mode); + + vector files; + getAllFiles(job.get_input_path(), &files); + + if (!checkDirectory(job.get_output_path())) { + LOG(ERROR) << "create output_path path failed: " + << job.get_output_path().c_str(); + return -1; + } + + vector reduces; + for (auto i = 0; i < job.get_shard_num(); i++) { + string tar_path = job.get_output_path() + "/" + job.get_dict_name() + + "_part" + std::to_string(i) + ".tar"; + string build_path = job.get_output_path() + "/" + job.get_dict_name() + + "_part" + std::to_string(i); + + CROVLBuilderIncremental *_builder = new CROVLBuilderIncremental(); + if (!_builder->Init(IT_HASH, + MAX_BLOCK_SIZE, + job.get_job_mode().c_str(), + build_path.c_str(), + tar_path.c_str(), + job.get_dict_name().c_str(), + std::to_string(i), + std::to_string(0), + last_version, + cur_version, + depend_version, + FLAGS_master_address.c_str())) { + LOG(ERROR) << "CROVLBuilderIncremental init failed " << build_path; + return -1; + } + reduces.push_back(_builder); + } + + for (auto file : files) { + mapFileLocal(job, file, reduces); + LOG(INFO) << "next file to reduce!"; + } + for (auto reduce : reduces) { + reduce->done(); + reduce->archive(); + reduce->md5sum(); + } + google::ShutdownGoogleLogging(); + return 0; +} diff --git a/cube/cube-builder/src/seqfile_reader.cpp b/cube/cube-builder/src/seqfile_reader.cpp new file mode 100644 index 0000000000000000000000000000000000000000..378ce5e006a8d40ab94dac9c625cb7b6cc636858 --- /dev/null +++ b/cube/cube-builder/src/seqfile_reader.cpp @@ -0,0 +1,150 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "cube-builder/seqfile_reader.h" +#include +#include "butil/logging.h" +#include "cube-builder/vtext.h" + +int SequenceFileRecordReader::open() { + if (_raw_reader->open() != 0) { + return -1; + } + LOG(INFO) << "open sequence file ok! file:" << _path.c_str(); + return 0; +} +int SequenceFileRecordReader::close() { + if (_raw_reader->close() != 0) { + return -1; + } + LOG(INFO) << "close sequence file ok! file:" << _path.c_str(); + return 0; +} +int SequenceFileRecordReader::next(Record* record) { + uint32_t record_len = 0; + int64_t ret = _raw_reader->read(&record_len); + if (ret == 0) { + return 1; // ?????1??????????????? + } else if (ret != sizeof(record_len)) { + LOG(ERROR) << "read sequence file:" << _path + << " record(record_len) errno:" << ret + << ", errmsg:" << _raw_reader->errno_to_str(ret); + return -1; + } + record->record_len = static_cast(ntohl(record_len)); + // got marker + if (record->record_len == -1) { + std::string marker; + if ((ret = _raw_reader->read_buf(&marker, 16)) != 16) { + LOG(ERROR) << "read sequence file:" << _path + << " record(marker) errno:" << ret + << ", errmsg:" << _raw_reader->errno_to_str(ret); + return -1; + } + if (marker != record->sync_marker) { + LOG(ERROR) << "read sequence file:" << _path + << " record(sync_marker) error!"; + return -1; + } + if ((ret = _raw_reader->read(&record->record_len)) != + sizeof(record->record_len)) { + LOG(ERROR) << "read sequence file:" << _path + << " record(len) errno:" << ret + << ", errmsg:" << _raw_reader->errno_to_str(ret); + return -1; + } + record->record_len = static_cast(ntohl(record->record_len)); + } + uint32_t key_len = 0; + if ((ret = _raw_reader->read(&key_len)) != sizeof(key_len)) { + LOG(ERROR) << "read sequence file:" << _path + << " record(key_len) errno:" << ret + << ", errmsg:" << _raw_reader->errno_to_str(ret); + return -1; + } + record->key_len = static_cast(ntohl(key_len)); + if ((ret = _raw_reader->read_buf(&record->key, record->key_len)) != + record->key_len) { + LOG(ERROR) << "read sequence file:" << _path + << " record(key_len) errno:" << ret + << ", errmsg:" << _raw_reader->errno_to_str(ret); + return -1; + } + if ((ret = _raw_reader->read_buf(&record->value, + record->record_len - record->key_len)) != + (record->record_len - record->key_len)) { + LOG(ERROR) << "read sequence file:" << _path + << " record(value_len) errno:" << ret + << ", errmsg:" << _raw_reader->errno_to_str(ret); + return -1; + } + return 0; +} + +int SequenceFileRecordReader::read_header() { + LOG(INFO) << "start to read sequence file header:" << _path; + char version[4]; + if (_raw_reader->read_buf(&version, 4) != 4) { + LOG(ERROR) << "read sequence file header(version) error:" << _path; + return -1; + } + _header.version = version[3]; + if (!VString::read_string(_raw_reader, &_header.key_class)) { + LOG(ERROR) << "read sequence file header(key_class) error:" << _path; + return -1; + } + if (!VString::read_string(_raw_reader, &_header.value_class)) { + LOG(ERROR) << "read sequence file header(value_class) error:" << _path; + return -1; + } + if (_raw_reader->read(&_header.is_compress) != sizeof(bool)) { + LOG(ERROR) << "read sequence file header(is_compress) error:" << _path; + return -1; + } + if (_raw_reader->read(&_header.is_block_compress) != sizeof(bool)) { + LOG(ERROR) << "read sequence file header(is_block_compress) error:" + << _path; + return -1; + } + if (_header.is_compress) { + if (!VString::read_string(_raw_reader, &_header.compress_class)) { + LOG(ERROR) << "read sequence file header(compress_class) error:" << _path; + return -1; + } + } + int32_t meta_cnt = 0; + if (_raw_reader->read(&meta_cnt) != sizeof(int32_t)) { + LOG(ERROR) << "read sequence file header(meta_cnt) error:" << _path; + return -1; + } + _header.metas.resize(meta_cnt); + for (int32_t i = 0; i != meta_cnt; ++i) { + if (!VString::read_string(_raw_reader, &_header.metas[i].key)) { + LOG(ERROR) << "read sequence file header(meta_key) error:" << _path; + return -1; + } + if (!VString::read_string(_raw_reader, &_header.metas[i].value)) { + LOG(ERROR) << "read sequence file header(meta_value) error:" << _path; + return -1; + } + } + if (_raw_reader->read_buf(&_header.sync_marker, 16) != 16) { + LOG(ERROR) << "read sequence file header(sync_marker) error:" << _path; + return -1; + } + + LOG(INFO) << "sync_marker:" << _header.sync_marker; + LOG(INFO) << "read sequence file header ok:" << _path; + return 0; +} diff --git a/cube/cube-builder/src/util.cpp b/cube/cube-builder/src/util.cpp new file mode 100644 index 0000000000000000000000000000000000000000..8524a4c7527e68b3e67141535bffcf4dae330c9e --- /dev/null +++ b/cube/cube-builder/src/util.cpp @@ -0,0 +1,77 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "cube-builder/util.h" +#include +#include +#include +#include +#include +#include +#include "butil/logging.h" + +void getAllFiles(std::string path, std::vector *files) { + DIR *dir; + struct dirent *ptr; + if ((dir = opendir(path.c_str())) == NULL) { + perror("Open dri error..."); + exit(1); + } + while ((ptr = readdir(dir)) != NULL) { + if (strcmp(ptr->d_name, ".") == 0 || strcmp(ptr->d_name, "..") == 0) { + continue; + } else if ((ptr->d_type) == 8) { // file + if (ptr->d_name[0] != '.') files->push_back(path + "/" + ptr->d_name); + } else if (ptr->d_type == 10) { // link file + continue; + } else if (ptr->d_type == 4) { + getAllFiles(path + "/" + ptr->d_name, files); + } + } + closedir(dir); +} + +std::string string_to_hex(const std::string &input) { + static const char *const lut = "0123456789ABCDEF"; + size_t len = input.length(); + std::string output; + output.reserve(2 * len); + for (size_t i = 0; i < len; ++i) { + const unsigned char c = input[i]; + output.push_back(lut[c >> 4]); + output.push_back(lut[c & 15]); + } + return output; +} + +bool checkDirectory(const std::string folder) { + LOG(INFO) << "check dir:" << folder; + if (access(folder.c_str(), F_OK) == 0) { + return 1; + } + LOG(WARNING) << "no dir will mkdir:" << folder; + return (mkdir(folder.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH) == 0); +} + +void CmdTarfiles(const std::string folder) { + std::string cmd = "cd " + folder + " && tar -cvPf " + folder + ".tar ."; + LOG(INFO) << "tar file cmd:" << cmd; + system(cmd.c_str()); +} + +void CmdMd5sum(const std::string folder) { + std::string cmd = "md5sum " + folder + ".tar > " + folder + ".tar.md5"; + LOG(INFO) << "md5sum file cmd:" << cmd; + system(cmd.c_str()); +}