提交 628c8786 编写于 作者: W wangguibao

Merge branch 'develop' into ctr_model_serving

......@@ -60,6 +60,7 @@ if (NOT DEFINED WITH_MKLDNN)
endif()
endif()
include(external/jsoncpp)
include(external/leveldb)
include(external/rocksdb)
include(external/zlib)
......@@ -83,6 +84,7 @@ include_directories(${PADDLE_SERVING_SOURCE_DIR})
include_directories(${PADDLE_SERVING_BINARY_DIR})
set(EXTERNAL_LIBS
jsoncpp
gflags
rocksdb
glog
......
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
INCLUDE(ExternalProject)
SET(JSONCPP_SOURCES_DIR ${THIRD_PARTY_PATH}/jsoncpp)
SET(JSONCPP_INSTALL_DIR ${THIRD_PARTY_PATH}/install/jsoncpp)
SET(JSONCPP_INCLUDE_DIR "${JSONCPP_INSTALL_DIR}/include" CACHE PATH "jsoncpp include directory." FORCE)
SET(JSONCPP_LIBRARIES "${JSONCPP_INSTALL_DIR}/lib" CACHE FILEPATH "jsoncpp library." FORCE)
INCLUDE_DIRECTORIES(${JSONCPP_INCLUDE_DIR})
ExternalProject_Add(
extern_jsoncpp
${EXTERNAL_PROJECT_LOG_ARGS}
PREFIX ${JSONCPP_SOURCES_DIR}
GIT_REPOSITORY "https://github.com/open-source-parsers/jsoncpp"
GIT_TAG 1.8.4
UPDATE_COMMAND ""
CMAKE_ARGS -DCMAKE_CXX_COMPILER=${CMAKE_CXX_COMPILER}
-DCMAKE_C_COMPILER=${CMAKE_C_COMPILER}
-DCMAKE_CXX_FLAGS=${CMAKE_CXX_FLAGS}
-DCMAKE_CXX_FLAGS_RELEASE=${CMAKE_CXX_FLAGS_RELEASE}
-DCMAKE_CXX_FLAGS_DEBUG=${CMAKE_CXX_FLAGS_DEBUG}
-DCMAKE_C_FLAGS=${CMAKE_C_FLAGS}
-DCMAKE_C_FLAGS_DEBUG=${CMAKE_C_FLAGS_DEBUG}
-DCMAKE_C_FLAGS_RELEASE=${CMAKE_C_FLAGS_RELEASE}
-DCMAKE_INSTALL_PREFIX=${JSONCPP_INSTALL_DIR}
-DBUILD_TESTING=OFF
-DCMAKE_BUILD_TYPE=${THIRD_PARTY_BUILD_TYPE}
${EXTERNAL_OPTIONAL_ARGS}
CMAKE_CACHE_ARGS -DCMAKE_INSTALL_PREFIX:PATH=${JSONCPP_INSTALL_DIR}
-DCMAKE_POSITION_INDEPENDENT_CODE:BOOL=ON
-DCMAKE_BUILD_TYPE:STRING=${THIRD_PARTY_BUILD_TYPE}
)
ADD_LIBRARY(jsoncpp STATIC IMPORTED GLOBAL)
SET_PROPERTY(TARGET jsoncpp PROPERTY IMPORTED_LOCATION ${JSONCPP_LIBRARIES})
ADD_DEPENDENCIES(jsoncpp extern_jsoncpp)
LIST(APPEND external_project_dependencies jsoncpp)
# Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
......@@ -14,3 +14,4 @@
add_subdirectory(cube-server)
add_subdirectory(cube-api)
add_subdirectory(cube-builder)
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License
project(cube-builder)
set(CMAKE_CXX_STANDARD 11)
add_compile_options(-std=c++11)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
include_directories(SYSTEM ${CMAKE_CURRENT_LIST_DIR}/include)
include_directories(SYSTEM ${CMAKE_CURRENT_BINARY_DIR}/../)
find_path(JSONCPP_INCLUDE_DIR json/json.h)
set(JSONCPP_LIBRARY ${JSONCPP_INCLUDE_DIR}/../lib/libjsoncpp.a)
message(STATUS "jsoncpp include: ${JSONCPP_INCLUDE_DIR}")
include_directories(${JSONCPP_INCLUDE_DIR})
find_library(CURL_LIB NAMES curl)
if (NOT CURL_LIB)
message(FATAL_ERROR "Fail to find curl")
endif()
add_executable(cube-builder src/main.cpp include/cube-builder/util.h src/util.cpp src/builder_job.cpp include/cube-builder/builder_job.h include/cube-builder/define.h src/seqfile_reader.cpp include/cube-builder/seqfile_reader.h include/cube-builder/raw_reader.h include/cube-builder/vtext.h src/crovl_builder_increment.cpp include/cube-builder/crovl_builder_increment.h src/curl_simple.cpp include/cube-builder/curl_simple.h)
set(DYNAMIC_LIB
gflags
${JSONCPP_LIBRARY}
-lssl
-lcrypto
${CURL_LIB}
)
target_link_libraries(cube-builder ${DYNAMIC_LIB})
# install
install(TARGETS cube-builder RUNTIME DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/bin)
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <iostream>
#include <string>
#include <vector>
#include "cube-builder/crovl_builder_increment.h"
#include "cube-builder/define.h"
using std::string;
using std::vector;
class Job {
public:
void set_shard_num(int num);
int get_shard_num();
void set_input_path(string path);
string get_input_path();
void set_output_path(string path);
string get_output_path();
void set_job_mode(mode mmode);
mode get_job_mode();
void set_dict_name(string name);
string get_dict_name();
private:
int shard_num;
string input_path;
string output_path;
mode job_mode;
string dict_name;
};
void mapFileLocal(Job job,
string file,
vector<CROVLBuilderIncremental*> reduces);
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <string>
#include <vector>
class CROVLBuilderIncremental {
public:
static const uint32_t MAX_DATA_LEN = 67108860;
static const uint32_t MAX_DATA_FILE_LEN = 1073741824;
static const uint32_t MAX_DATA_DIR_LEN = 512;
static const uint32_t MAX_FIXDATA_LEN = 504;
static const uint32_t INDEX_BUFFER = 4194304;
int _index_type;
uint32_t _data_file_len;
uint64_t _data_file;
uint32_t _data_buf_len;
uint32_t _index_buf_len;
uint32_t _index_file_num;
uint64_t _index_file_len;
uint64_t _count;
uint64_t _cur_count;
bool _fix;
uint32_t _fix_len;
uint32_t _rec_size;
uint64_t* _index;
char* _data;
std::vector<uint32_t> _data_file_list;
std::vector<uint64_t> _index_file_list;
char _data_dir[MAX_DATA_DIR_LEN + 1];
char _data_real_dir[MAX_DATA_DIR_LEN + 1];
char _last_data_dir[MAX_DATA_DIR_LEN + 1];
char _last_data_tmp_dir[MAX_DATA_DIR_LEN + 1];
uint64_t _inner_sort_size;
uint64_t _memory_quota;
bool flush_data();
bool FlushIndex();
bool Clear();
std::string _mode;
std::string _dict_name;
std::string _shard_id;
std::string _split_id;
std::string _last_version;
std::string _cur_version;
std::string _depend_version;
std::string _master_address;
std::string _id;
std::string _key;
std::string _extra;
public:
CROVLBuilderIncremental();
~CROVLBuilderIncremental();
bool Init(int index_type,
uint32_t data_file_len,
const char* mode,
const char* data_dir,
const char* data_real_dir,
const char* dict_name,
const std::string& shard_id,
const std::string& split_id,
const std::string& last_version,
const std::string& cur_version,
const std::string& depend_version,
const std::string& master_address,
const std::string& id = "",
const std::string& key = "",
const std::string& extra = "",
bool bFix = false,
uint32_t nFixLen = 8);
int add(uint64_t nKey, uint32_t nLen, const char* pData);
int add(uint64_t nKey, uint64_t nValue);
int add(uint64_t nKey, const char* pData);
bool done();
void archive();
void md5sum();
bool read_last_meta_from_transfer();
bool write_cur_meta_to_transfer();
bool read_last_meta_from_local();
bool write_cur_meta_to_local();
};
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <curl/curl.h>
#include <stdio.h>
#include <map>
#include <string>
#include "butil/logging.h"
class CurlSimple {
public:
CurlSimple();
~CurlSimple();
/* *
* @method curl_get
* @para:
* curl_handle, the pointer of CURL,
* url , string, input url string with get parameters
* @return:
* void
* */
std::string curl_get(const char *url);
/* *
* @method curl_post
* @para:
* curl_handle, the pointer of CURL,
* url , the input url string without post parameters
* para_map, std::map<std::string, std::string> the input post parameters
* @return:
* void
* */
std::string curl_post(const char *url,
const std::map<std::string, std::string> &para_map);
private:
/* *
* @method write_callback
* @para:
* contents, the pointer response data, it will cast to information you
* need
* size * nmemb is the memory size of contents
* userp, the poinser user return info
* @return:
* size_t must return size * nmemb, or it was failed
* */
static size_t write_callback(void *contents,
size_t size,
size_t nmemb,
void *userp);
private:
CURL *_p_curl;
};
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <iostream>
#include <string>
#include <vector>
typedef std::string mode;
enum IndexType {
IT_HASH, ///< hash index
IT_SORT, ///< sort index
};
const int MAX_BLOCK_SIZE = 1024 * 1024 * 1024;
struct Meta {
std::string key;
std::string value;
};
struct Header {
int version;
std::string key_class;
std::string value_class;
bool is_compress;
bool is_block_compress;
std::string compress_class;
std::vector<Meta> metas;
std::string sync_marker;
static const int s_sync_hash_size = 16;
};
struct Record {
explicit Record(const Header& header) : sync_marker(header.sync_marker) {}
const std::string& sync_marker;
int record_len;
int key_len;
std::string key;
std::string value;
};
class RecordReader {
public:
RecordReader() {}
virtual ~RecordReader() {}
virtual int open() = 0;
virtual int next(Record* record) = 0;
virtual int close() = 0;
}; // class RecordReader
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <fstream>
#include <string>
#include "butil/logging.h"
class RawReader {
public:
RawReader() {}
virtual ~RawReader() {}
virtual int open() = 0;
virtual int close() = 0;
virtual int64_t read(int32_t* v) = 0;
virtual int64_t read(uint32_t* v) = 0;
virtual int64_t read(bool* v) = 0;
virtual int64_t read(char* v) = 0;
virtual int64_t read_buf(std::string* s, size_t len) = 0;
virtual int64_t read_buf(void* v, size_t len) = 0;
virtual const char* errno_to_str(int err) = 0;
private:
};
class FileRawReader : public RawReader {
public:
explicit FileRawReader(const std::string& path) : _path(path) {}
virtual ~FileRawReader() {}
virtual int open() {
_reader.open(_path.c_str(), std::ifstream::binary);
if (!_reader.is_open()) {
return -1;
}
LOG(INFO) << "raw open sequence file ok! file:" << _path.c_str();
return 0;
}
virtual int close() {
_reader.close();
LOG(INFO) << "raw close sequence file ok! file:" << _path.c_str();
return 0;
}
virtual int64_t read(int32_t* v) {
_reader.read(reinterpret_cast<char*>(v), sizeof(int32_t));
if (_reader.good())
return sizeof(int32_t);
else if (_reader.eof())
return 0;
else
return -1;
}
virtual int64_t read(uint32_t* v) {
_reader.read(reinterpret_cast<char*>(v), sizeof(uint32_t));
if (_reader.good())
return sizeof(uint32_t);
else if (_reader.eof())
return 0;
else
return -1;
}
virtual int64_t read(bool* v) {
_reader.read(reinterpret_cast<char*>(v), sizeof(bool));
if (_reader.good())
return sizeof(bool);
else if (_reader.eof())
return 0;
else
return -1;
}
virtual int64_t read(char* v) {
_reader.read(reinterpret_cast<char*>(v), sizeof(char));
if (_reader.good())
return sizeof(char);
else if (_reader.eof())
return 0;
else
return -1;
}
virtual int64_t read_buf(std::string* s, size_t len) {
s->resize(len);
std::string& tmp = *s;
_reader.read(reinterpret_cast<char*>(&tmp[0]), len);
if (_reader.good())
return len;
else if (_reader.eof())
return 0;
else
return -1;
}
virtual int64_t read_buf(void* v, size_t len) {
_reader.read(reinterpret_cast<char*>(v), len);
if (_reader.good())
return len;
else if (_reader.eof())
return 0;
else
return -1;
}
virtual const char* errno_to_str(int err) {
switch (err) {
case -1:
return "read seqfile error";
}
return "default error";
}
private:
std::string _path{""};
std::ifstream _reader;
};
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <memory>
#include <string>
#include "cube-builder/define.h"
#include "cube-builder/raw_reader.h"
class SequenceFileRecordReader : public RecordReader {
public:
SequenceFileRecordReader() {}
explicit SequenceFileRecordReader(const std::string& path) {
_path = path;
_raw_reader = new FileRawReader(_path);
}
virtual ~SequenceFileRecordReader() {
if (_raw_reader != nullptr) {
delete _raw_reader;
}
}
virtual int open();
virtual int close();
virtual int next(Record* record);
const Header& get_header() { return _header; }
int read_header();
private:
std::string _path{""};
RawReader* _raw_reader{nullptr};
Header _header;
};
typedef std::shared_ptr<SequenceFileRecordReader> SequenceFileRecordReaderPtr;
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <iostream>
#include <string>
#include <vector>
#define PATH_DELIMITER '\\'
void getAllFiles(std::string path, std::vector<std::string> *files);
std::string string_to_hex(const std::string &input);
bool checkDirectory(const std::string folder);
void CmdTarfiles(const std::string folder);
void CmdMd5sum(const std::string folder);
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <string>
#include "cube-builder/define.h"
#include "cube-builder/raw_reader.h"
class VInt {
public:
static int32_t decode_vint_size(const char &value) {
if (value >= -112) {
return 1;
} else if (value < -120) {
return -119 - value;
}
return -111 - value;
}
static bool is_negative_vint(const char &value) {
return value < -120 || (value >= -112 && value < 0);
}
static bool read_vint(RawReader *reader, int32_t *vint) {
char first_byte;
if (reader->read(&first_byte) <= 0) {
return false;
}
int32_t len = decode_vint_size(first_byte);
if (len == 1) {
*vint = first_byte;
return true;
}
char ch;
int32_t bitlen = 0;
int32_t i = 0, lch;
for (int idx = len - 2; idx >= 0; idx--) {
if (reader->read(&ch) <= 0) {
return false;
}
bitlen = 8 * idx;
lch = ch;
i = i | ((lch << bitlen) & (0xFFL << bitlen));
}
*vint = (is_negative_vint(first_byte) ? (i ^ (int32_t)(-1)) : i);
return true;
}
};
class VString {
public:
static const char *encode(std::string str) { return encode(str, true); }
static const char *encode(std::string str, bool /*replace*/) {
// todo
return str.c_str();
}
static std::string decode(char *bytes) { return decode(bytes, true); }
static std::string decode(char *bytes, bool /*replace*/) {
// todo
return std::string(bytes);
}
// todo
static bool read_string(RawReader *reader, std::string *str) {
int length;
if (!VInt::read_vint(reader, &length)) {
return false;
}
if (reader->read_buf(str, length) != length) {
return false;
}
return true;
}
};
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "cube-builder/builder_job.h"
#include <stdio.h>
#include <iostream>
#include "butil/logging.h"
#include "cube-builder/seqfile_reader.h"
#include "cube-builder/util.h"
using std::string;
void Job::set_shard_num(int num) { shard_num = num; }
int Job::get_shard_num() { return shard_num; }
void Job::set_input_path(string path) { input_path = path; }
string Job::get_input_path() { return input_path; }
void Job::set_output_path(string path) { output_path = path; }
string Job::get_output_path() { return output_path; }
void Job::set_job_mode(mode mmode) { job_mode = mmode; }
mode Job::get_job_mode() { return job_mode; }
void Job::set_dict_name(string name) { dict_name = name; }
string Job::get_dict_name() { return dict_name; }
void mapFileLocal(Job job,
string file,
vector<CROVLBuilderIncremental *> reduces) {
SequenceFileRecordReader reader(file.c_str());
if (reader.open() != 0) {
LOG(ERROR) << "open file failed! " << file;
return;
}
if (reader.read_header() != 0) {
LOG(ERROR) << "read header error! " << file;
reader.close();
return;
}
Record record(reader.get_header());
int total_count = 0;
while (reader.next(&record) == 0) {
uint64_t key =
*reinterpret_cast<uint64_t *>(const_cast<char *>(record.key.data()));
total_count++;
int part = key % job.get_shard_num();
int64_t value_length = record.record_len - record.key_len;
reduces[part]->add(key, value_length, record.value.c_str());
}
if (reader.close() != 0) {
LOG(ERROR) << "close file failed! " << file;
return;
}
}
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "cube-builder/crovl_builder_increment.h"
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <fstream>
#include <iostream>
#include "boost/lexical_cast.hpp"
#include "cube-builder/curl_simple.h"
#include "cube-builder/define.h"
#include "cube-builder/util.h"
#include "json/json.h"
static const uint64_t MB = 1048576;
// CSpinLock g_sLock;
CROVLBuilderIncremental::CROVLBuilderIncremental() {
_index = NULL;
_data = NULL;
// _index_file = NULL;
_inner_sort_size = 32 * MB;
_memory_quota = 4 * 1024 * MB;
}
bool CROVLBuilderIncremental::Clear() {
if (_index != NULL) {
delete[] _index;
_index = NULL;
}
if (_data != NULL) {
delete[] _data;
_data = NULL;
}
_data_file_list.clear();
return true;
}
CROVLBuilderIncremental::~CROVLBuilderIncremental() { Clear(); }
bool CROVLBuilderIncremental::Init(int index_type,
uint32_t data_file_len,
const char *mode,
const char *data_dir,
const char *data_real_dir,
const char *dict_name,
const std::string &shard_id,
const std::string &split_id,
const std::string &last_version,
const std::string &cur_version,
const std::string &depend_version,
const std::string &master_address,
const std::string &id,
const std::string &key,
const std::string &extra,
bool bFix,
uint32_t nFixLen) {
if (index_type != IT_HASH) {
LOG(ERROR) << "Error: incremental_build not support IT_SORT";
return false;
}
if (index_type != IT_HASH && index_type != IT_SORT) {
LOG(ERROR) << "Error: invalid index type";
return false;
}
if (index_type == IT_HASH && bFix) {
LOG(ERROR) << "Error: can not set fix when index type is IT_HASH";
return false;
}
if (nFixLen > MAX_FIXDATA_LEN) {
LOG(ERROR) << "Error: fix length too large";
return false;
}
if (data_file_len < MAX_DATA_LEN + 4 || data_file_len > MAX_DATA_FILE_LEN) {
LOG(ERROR) << "Error: invalid data file length" << data_file_len;
return false;
}
if (strlen(data_dir) > MAX_DATA_DIR_LEN) {
LOG(ERROR) << "Error: data directory too long :" << data_dir;
return false;
}
Clear();
strncpy(_data_dir, data_dir, sizeof(_data_dir));
strncpy(_data_real_dir, data_real_dir, sizeof(_data_real_dir));
_index_type = index_type;
_data_file_len = data_file_len;
_data_file = 0;
_data_buf_len = 0;
_index_buf_len = 0;
_index_file_num = 0; // incremental_build for index
_count = 0;
_cur_count = 0;
_fix = bFix;
_fix_len = nFixLen;
_index_file_len = 0;
_dict_name = dict_name;
_shard_id = shard_id;
_split_id = split_id;
_last_version = last_version;
_cur_version = cur_version;
_depend_version = depend_version;
_master_address = master_address;
_id = id;
_key = key;
_extra = extra;
if (_fix) {
_rec_size = nFixLen + sizeof(uint64_t);
} else {
_rec_size = 2 * sizeof(uint64_t);
}
_index = new uint64_t[INDEX_BUFFER];
if (!_fix) {
_data = new char[_data_file_len];
} else if (_fix_len != 8) {
_data = new char[_rec_size * INDEX_BUFFER];
}
if (!checkDirectory(_data_dir)) {
LOG(ERROR) << "create _data_dir path failed: " << _data_dir;
return false;
}
// read old info from data.n and index.n
_mode = mode;
LOG(INFO) << "mode: " << mode;
if (strcmp(mode, "base") != 0) {
if (_master_address == "") {
if (!read_last_meta_from_local()) {
LOG(ERROR) << "read last meta from db error! ";
return false;
}
} else {
if (!read_last_meta_from_transfer()) {
LOG(ERROR) << "read last meta from transfer error! ";
return false;
}
}
}
LOG(INFO) << "incremental_build data file:" << _data_file
<< ", count:" << _count << ", index file:" << _index_file_num;
return true;
}
bool CROVLBuilderIncremental::flush_data() {
LOG(INFO) << "flush data" << _data_file;
if (_data_buf_len == 0) {
LOG(ERROR) << "_data_buf_len == 0";
return true;
}
char file[MAX_DATA_DIR_LEN * 2];
snprintf(file, sizeof(file), "%s/data.%lu", _data_dir, _data_file);
FILE *fp;
if ((fp = fopen(file, "wb")) == NULL) {
LOG(ERROR) << "open file failed! " << file;
return false;
}
if (fwrite(_data, 1, _data_buf_len, fp) != _data_buf_len) {
fclose(fp);
LOG(ERROR) << "write file:" << file << " error!";
return false;
}
fclose(fp);
_data_file_list.push_back(_data_buf_len);
_data_buf_len = 0;
++_data_file;
return true;
}
bool CROVLBuilderIncremental::FlushIndex() {
LOG(INFO) << "flush index:" << _index_file_num;
if (_index_buf_len == 0) {
LOG(ERROR) << "_index_buf_len == 0";
return true;
}
char file[MAX_DATA_DIR_LEN * 2];
snprintf(file, sizeof(file), "%s/index.%d", _data_dir, _index_file_num);
FILE *fp;
if ((fp = fopen(file, "wb")) == NULL) {
LOG(ERROR) << "open file failed! " << file;
return false;
}
if (_fix && _fix_len != 8) {
if (fwrite(_data, _rec_size, _index_buf_len, fp) != _index_buf_len) {
LOG(ERROR) << "_index_buf_len error:" << _index_buf_len;
return false;
}
_index_buf_len = 0;
return true;
}
if (fwrite(_index, sizeof(uint64_t), _index_buf_len, fp) != _index_buf_len) {
LOG(ERROR) << "write file:" << file << " error!";
return false;
}
fclose(fp);
_index_file_len += _index_buf_len * sizeof(uint64_t);
_index_buf_len = 0;
return true;
}
int CROVLBuilderIncremental::add(uint64_t nKey,
uint32_t nLen,
const char *pData) {
uint64_t naddr;
uint32_t nTotalLen;
if (nLen > MAX_DATA_LEN || _fix) {
return 0;
}
nTotalLen = nLen + 4;
do {
if (_data_buf_len + nTotalLen > _data_file_len) {
if (!flush_data()) {
break;
}
}
*reinterpret_cast<uint32_t *>(_data + _data_buf_len) = nTotalLen;
memcpy(_data + _data_buf_len + 4, pData, nLen);
naddr = (_data_file << 32) + _data_buf_len;
_data_buf_len += nTotalLen;
if (_index_buf_len + 2 > INDEX_BUFFER) {
if (!FlushIndex()) {
break;
}
}
_index[_index_buf_len] = nKey;
_index[_index_buf_len + 1] = naddr;
_index_buf_len += 2;
++_count;
++_cur_count;
return 1;
} while (false);
return -1;
}
int CROVLBuilderIncremental::add(uint64_t nKey, uint64_t nValue) {
if (!_fix || _fix_len != 8) {
return 0;
}
if (_index_buf_len + 2 > INDEX_BUFFER) {
if (!FlushIndex()) {
return -1;
}
}
_index[_index_buf_len] = nKey;
_index[_index_buf_len + 1] = nValue;
_index_buf_len += 2;
++_count;
++_cur_count;
return 1;
}
int CROVLBuilderIncremental::add(uint64_t nKey, const char *pData) {
if (!_fix) {
return 0;
}
if (_fix && _fix_len == 8) {
return add(nKey, *reinterpret_cast<uint64_t *>(const_cast<char *>(pData)));
}
if (_index_buf_len + 1 > INDEX_BUFFER) {
if (!FlushIndex()) {
return -1;
}
}
*reinterpret_cast<uint64_t *>(_data + _index_buf_len * _rec_size) = nKey;
memcpy(
_data + (_index_buf_len * _rec_size + sizeof(uint64_t)), pData, _fix_len);
_index_buf_len++;
++_count;
++_cur_count;
return 1;
}
bool CROVLBuilderIncremental::done() {
LOG(INFO) << "done";
if (!_fix) {
if (!flush_data()) {
return false;
}
}
if (!FlushIndex()) {
return false;
}
if (!_fix) {
if (_data_file == 0) {
return false;
}
}
char buffer[64];
char file[MAX_DATA_DIR_LEN * 2];
// write data.n
snprintf(file, sizeof(file), "%s/data.n", _data_dir);
FILE *fp;
if ((fp = fopen(file, "wb")) == NULL) {
LOG(ERROR) << "open file failed! " << file;
return false;
}
*reinterpret_cast<uint32_t *>(buffer) = _data_file;
if (fwrite(buffer, sizeof(uint32_t), 1, fp) != 1) {
fclose(fp);
return false;
}
for (uint32_t i = 0; i < _data_file_list.size(); ++i) {
*reinterpret_cast<uint32_t *>(buffer) = _data_file_list[i];
if (fwrite(buffer, sizeof(uint32_t), 1, fp) != 1) {
fclose(fp);
return false;
}
}
fclose(fp);
// write index.n
if (_index_type == IT_HASH) {
if (_count > (uint64_t)((uint32_t)-1)) {
return false;
}
snprintf(file, sizeof(file), "%s/index.n", _data_dir);
if ((fp = fopen(file, "wb")) == nullptr) {
LOG(ERROR) << "open file failed! " << file;
return false;
}
*reinterpret_cast<uint32_t *>(buffer) = IT_HASH;
*reinterpret_cast<uint32_t *>(buffer + sizeof(uint32_t)) = (uint32_t)_count;
if (fwrite(buffer, sizeof(uint32_t), 2, fp) != 2) {
fclose(fp);
return false;
}
// fix empty data bug
// if the version no data, filter this version, index_file_num no add
// only can patch is null, not base
if (_cur_count > 0) {
*reinterpret_cast<uint32_t *>(buffer) = ++_index_file_num;
} else {
*reinterpret_cast<uint32_t *>(buffer) = _index_file_num;
}
if (fwrite(buffer, sizeof(uint32_t), 1, fp) != 1) {
fclose(fp);
return false;
}
if (_cur_count > 0) {
_index_file_list.push_back(_index_file_len);
}
for (uint32_t i = 0; i < _index_file_list.size(); ++i) {
*reinterpret_cast<uint64_t *>(buffer) = _index_file_list[i];
if (fwrite(buffer, sizeof(uint64_t), 1, fp) != 1) {
fclose(fp);
return false;
}
}
fclose(fp);
if (_master_address == "") {
if (!write_cur_meta_to_local()) {
LOG(ERROR) << "write cur meta to local error";
return false;
}
} else {
if (!write_cur_meta_to_transfer()) {
LOG(ERROR) << "write cur meta to db error master addr:"
<< _master_address.c_str();
return false;
}
}
} else {
snprintf(file, sizeof(file), "%s/index.d", _data_dir);
LOG(ERROR) << "HASH is error!";
}
if (!Clear()) {
LOG(ERROR) << "clear error!";
return false;
}
return true;
}
void CROVLBuilderIncremental::archive() { CmdTarfiles(_data_dir); }
void CROVLBuilderIncremental::md5sum() { CmdMd5sum(_data_dir); }
bool CROVLBuilderIncremental::read_last_meta_from_transfer() {
std::string url = "http://" + _master_address + "/dict/meta_info?name=" +
_dict_name + "&shard=" + _shard_id + "&split=" + _split_id +
"&version=" + _last_version + "&depend=" + _depend_version;
LOG(INFO) << "name:" << _dict_name.c_str() << " shard:" << _shard_id.c_str()
<< " split:" << _split_id.c_str()
<< " last version:" << _last_version.c_str()
<< " depend version:" << _depend_version.c_str();
CurlSimple cs;
std::string result = cs.curl_get(url.c_str());
if (result == "") {
LOG(ERROR) << "curl get error!";
return false;
}
LOG(INFO) << "curl result:" << result.c_str();
Json::Reader reader(Json::Features::strictMode());
Json::Value val;
if (!reader.parse(result, val)) {
LOG(ERROR) << "parse result json error!";
return false;
}
if (!val.isObject()) {
LOG(ERROR) << "no valild json error!";
return false;
}
if (val["success"].isNull() || !val["success"].isString()) {
LOG(ERROR) << "parse field success error!";
return false;
}
std::string success = val["success"].asString();
if (success != "0") {
LOG(ERROR) << "parse field success error!" << success.c_str();
return false;
}
if (val["data"].isNull() || !val["data"].isObject()) {
LOG(ERROR) << "parse field data error!";
return false;
}
Json::Value data = val["data"];
if (data["meta"].isNull() || !data["meta"].isString()) {
LOG(ERROR) << "parse field meta error!";
return false;
}
Json::Reader meta_reader(Json::Features::strictMode());
Json::Value meta;
if (!meta_reader.parse(data["meta"].asString(), meta)) {
LOG(ERROR) << "parse meta json error!";
return false;
}
if (!meta.isObject()) {
LOG(ERROR) << "parse meta json error!";
return false;
}
if (meta["data_len_list"].isNull() || !meta["data_len_list"].isArray()) {
LOG(ERROR) << "parse data_len_list json error!";
return false;
}
Json::Value data_len_list = meta["data_len_list"];
_data_file = data_len_list.size();
if (_data_file == 0) {
LOG(ERROR) << "data len list size is 0!";
return false;
}
for (int i = 0; i < static_cast<int>(_data_file); ++i) {
LOG(INFO) << "data_len:" << data_len_list[i].asString().c_str();
try {
_data_file_list.push_back(
boost::lexical_cast<uint32_t>(data_len_list[i].asString()));
} catch (boost::bad_lexical_cast &e) {
LOG(ERROR) << "bad lexical cast:" << e.what();
return false;
}
}
if (meta["index_len_list"].isNull() || !meta["index_len_list"].isArray()) {
LOG(ERROR) << "parse index_len_list json error!";
return false;
}
Json::Value index_len_list = meta["index_len_list"];
_index_file_num = index_len_list.size();
if (_index_file_num == 0) {
LOG(ERROR) << "index len list size is 0!";
return false;
}
for (uint32_t i = 0; i < _index_file_num; ++i) {
LOG(INFO) << "index_len:" << index_len_list[i].asString().c_str();
try {
_index_file_list.push_back(
boost::lexical_cast<uint64_t>(index_len_list[i].asString()));
} catch (boost::bad_lexical_cast &e) {
LOG(ERROR) << "bad lexical cast:" << e.what();
return false;
}
}
if (meta["index_total_count"].isNull() ||
!meta["index_total_count"].isString()) {
LOG(ERROR) << "parse index_total_count json error!";
return false;
}
LOG(INFO) << "index_total_count:"
<< meta["index_total_count"].asString().c_str();
try {
_count =
boost::lexical_cast<uint64_t>(meta["index_total_count"].asString());
} catch (boost::bad_lexical_cast &e) {
LOG(ERROR) << "bad lexical cast:" << e.what();
return false;
}
return true;
}
bool CROVLBuilderIncremental::write_cur_meta_to_transfer() {
CurlSimple cs;
std::string url = "http://" + _master_address + "/dict/meta_info/register";
std::map<std::string, std::string> args;
args["name"] = _dict_name;
args["shard"] = _shard_id;
args["split"] = _split_id;
args["version"] = _cur_version;
args["depend"] = _depend_version;
LOG(INFO) << "name:" << _dict_name.c_str() << " shard:" << _shard_id.c_str()
<< " split:" << _split_id.c_str()
<< " cur version:" << _cur_version.c_str()
<< " depend version:" << _depend_version.c_str();
Json::Value root;
try {
root["index_total_count"] = boost::lexical_cast<std::string>(_count);
} catch (boost::bad_lexical_cast &e) {
LOG(ERROR) << "bad lexical cast:" << e.what();
return false;
}
Json::Value data;
for (size_t i = 0; i < _data_file_list.size(); ++i) {
try {
data.append(boost::lexical_cast<std::string>(_data_file_list[i]));
} catch (boost::bad_lexical_cast &e) {
LOG(ERROR) << "bad lexical cast:" << e.what();
return false;
}
}
root["data_len_list"] = data;
Json::Value index;
for (size_t i = 0; i < _index_file_list.size(); ++i) {
try {
index.append(boost::lexical_cast<std::string>(_index_file_list[i]));
} catch (boost::bad_lexical_cast &e) {
LOG(ERROR) << "bad lexical cast:" << e.what();
return false;
}
}
root["index_len_list"] = index;
Json::FastWriter writer;
// Json::StreamWriterBuilder writer;
std::string meta = writer.write(root);
// std::string meta = Json::writeString(writer, root);
if (meta[meta.size() - 1] == '\n') {
meta.erase(meta.size() - 1, 1);
}
LOG(INFO) << "meta:" << meta.c_str() << " size:" << meta.size();
args["meta"] = meta;
std::string result = cs.curl_post(url.c_str(), args);
if (result == "") {
LOG(ERROR) << "curl get error!";
return false;
}
LOG(INFO) << "curl result:" << result.c_str();
Json::Reader reader(Json::Features::strictMode());
Json::Value val;
if (!reader.parse(result, val)) {
LOG(ERROR) << "parse result json error!";
return false;
}
if (!val.isObject()) {
LOG(ERROR) << "no valild json error!";
return false;
}
if (val["success"].isNull() || !val["success"].isString()) {
LOG(ERROR) << "parse field success error!";
return false;
}
std::string success = val["success"].asString();
if (success != "0") {
LOG(ERROR) << "parse field success error!" << success.c_str();
return false;
}
LOG(INFO) << "curl post ok";
return true;
}
bool CROVLBuilderIncremental::read_last_meta_from_local() {
char file[MAX_DATA_DIR_LEN * 2];
snprintf(file,
sizeof(file),
"%s/../../meta_info/%s_%s_%s_%s.json",
_data_dir,
_last_version.c_str(),
_depend_version.c_str(),
_shard_id.c_str(),
_split_id.c_str());
// read from local meta file
std::string input_meta;
Json::Reader meta_reader(Json::Features::strictMode());
Json::Value meta;
std::ifstream ifs;
ifs.open(file);
if (!ifs) {
LOG(ERROR) << "open file failed! " << file;
return false;
}
if (!meta_reader.parse(ifs, meta)) {
LOG(ERROR) << "parse meta json error!";
return false;
}
ifs.close();
if (!meta.isObject()) {
LOG(ERROR) << "parse meta json error!";
return false;
}
if (meta["data_len_list"].isNull() || !meta["data_len_list"].isArray()) {
LOG(ERROR) << "parse data_len_list json error !";
return false;
}
Json::Value data_len_list = meta["data_len_list"];
_data_file = data_len_list.size();
if (_data_file == 0) {
LOG(ERROR) << "data len list size is 0 !";
return false;
}
for (int i = 0; i < static_cast<int>(_data_file); ++i) {
LOG(INFO) << "data_len:" << data_len_list[i].asString().c_str();
try {
_data_file_list.push_back(
boost::lexical_cast<uint32_t>(data_len_list[i].asString()));
} catch (boost::bad_lexical_cast &e) {
LOG(ERROR) << "bad lexical cast:" << e.what();
return false;
}
}
if (meta["index_len_list"].isNull() || !meta["index_len_list"].isArray()) {
LOG(ERROR) << "parse index_len_list json error!";
return false;
}
Json::Value index_len_list = meta["index_len_list"];
_index_file_num = index_len_list.size();
if (_index_file_num == 0) {
LOG(ERROR) << "index len list size is 0!";
return false;
}
for (uint32_t i = 0; i < _index_file_num; ++i) {
LOG(INFO) << "index_len:" << index_len_list[i].asString().c_str();
try {
_index_file_list.push_back(
boost::lexical_cast<uint64_t>(index_len_list[i].asString()));
} catch (boost::bad_lexical_cast &e) {
LOG(ERROR) << "bad lexical cast:" << e.what();
return false;
}
}
if (meta["index_total_count"].isNull() ||
!meta["index_total_count"].isString()) {
LOG(ERROR) << "parse index_total_count json error!";
return false;
}
LOG(INFO) << "index_total_count:"
<< meta["index_total_count"].asString().c_str();
try {
_count =
boost::lexical_cast<uint64_t>(meta["index_total_count"].asString());
} catch (boost::bad_lexical_cast &e) {
LOG(ERROR) << "bad lexical cast:" << e.what();
return false;
}
return true;
}
bool CROVLBuilderIncremental::write_cur_meta_to_local() {
char file[MAX_DATA_DIR_LEN * 2];
std::string meta_path = _data_dir;
meta_path = meta_path + "/../../meta_info";
if (!checkDirectory(meta_path)) {
LOG(ERROR) << "create meta_path path failed:" << meta_path.c_str();
return false;
}
snprintf(file,
sizeof(file),
"%s/%s_%s_%s_%s.json",
meta_path.c_str(),
_cur_version.c_str(),
_depend_version.c_str(),
_shard_id.c_str(),
_split_id.c_str());
Json::Value meta;
try {
meta["index_total_count"] = boost::lexical_cast<std::string>(_count);
} catch (boost::bad_lexical_cast &e) {
LOG(ERROR) << "bad lexical cast:" << e.what();
return false;
}
Json::Value data;
for (size_t i = 0; i < _data_file_list.size(); ++i) {
try {
data.append(boost::lexical_cast<std::string>(_data_file_list[i]));
} catch (boost::bad_lexical_cast &e) {
LOG(ERROR) << "bad lexical cast:" << e.what();
return false;
}
}
meta["data_len_list"] = data;
Json::Value index;
for (size_t i = 0; i < _index_file_list.size(); ++i) {
try {
index.append(boost::lexical_cast<std::string>(_index_file_list[i]));
} catch (boost::bad_lexical_cast &e) {
LOG(ERROR) << "bad lexical cast:" << e.what();
return false;
}
}
meta["index_len_list"] = index;
std::ofstream ofs;
ofs.open(file);
if (!ofs) {
LOG(ERROR) << "open file failed!" << file;
return false;
}
ofs << meta.toStyledString();
ofs.close();
return true;
}
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "cube-builder/curl_simple.h"
#include <json/json.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <iostream>
#include <map>
#include <string>
const size_t BUFFER_SIZE = 9096;
CurlSimple::CurlSimple() { _p_curl = curl_easy_init(); }
CurlSimple::~CurlSimple() { curl_easy_cleanup(_p_curl); }
/* *
* @method write_callback
* @para:
* contents, the pointer response data, it will cast to information you
* need
* size * nmemb is the memory size of contents
* userp, the poinser user return info
* @return:
* size_t must return size * nmemb, or it was failed
* */
size_t CurlSimple::write_callback(void *contents,
size_t size,
size_t nmemb,
void *userp) {
size_t realsize = size * nmemb;
snprintf(static_cast<char *>(userp),
BUFFER_SIZE,
"%s",
static_cast<char *>(contents));
return realsize;
} // end write_callback
/* *
* @method curl_get
* @para:
* _p_curl, the pointer of CURL,
* url , string, input url string with get parameters
* @return:
* void
* */
std::string CurlSimple::curl_get(const char *url) {
char buffer[BUFFER_SIZE];
CURLcode res;
/* specify URL to get */
curl_easy_setopt(_p_curl, CURLOPT_URL, url);
/* send all data to this function */
curl_easy_setopt(_p_curl, CURLOPT_WRITEFUNCTION, CurlSimple::write_callback);
/* we pass our 'chunk' struct to the callback function */
curl_easy_setopt(_p_curl, CURLOPT_WRITEDATA, static_cast<void *>(buffer));
/* some servers don't like requests that are made without a user-agent
field, so we provide one */
curl_easy_setopt(_p_curl, CURLOPT_USERAGENT, "libcurl-agent/1.0");
/* get it! */
res = curl_easy_perform(_p_curl);
/* check for errors */
if (res != CURLE_OK) {
LOG(ERROR) << "curl_easy_perform() failed: " << curl_easy_strerror(res);
return "";
} else {
/*
* Now, our chunk.memory points to a memory block that is chunk.size
* bytes big and contains the remote file.
*
* Do something nice with it!
*/
return buffer;
}
} // end curl_get
/* *
* @method curl_post
* @para:
* _p_curl, the pointer of CURL,
* url , the input url string without post parameters
* para_map, std::map<std::string, std::string> the input post parameters
* @return:
* void
* */
std::string CurlSimple::curl_post(
const char *url, const std::map<std::string, std::string> &para_map) {
char buffer[BUFFER_SIZE];
CURLcode res;
std::string para_url = "";
std::map<std::string, std::string>::const_iterator para_iterator;
bool is_first = true;
for (para_iterator = para_map.begin(); para_iterator != para_map.end();
para_iterator++) {
if (is_first) {
is_first = false;
} else {
para_url.append("&");
}
std::string key = para_iterator->first;
std::string value = para_iterator->second;
para_url.append(key);
para_url.append("=");
para_url.append(value);
}
LOG(INFO) << "para_url=" << para_url.c_str() << " size:" << para_url.size();
/* specify URL to get */
curl_easy_setopt(_p_curl, CURLOPT_URL, url);
/* send all data to this function */
curl_easy_setopt(_p_curl, CURLOPT_WRITEFUNCTION, CurlSimple::write_callback);
/* send all data to this function */
curl_easy_setopt(_p_curl, CURLOPT_POSTFIELDS, para_url.c_str());
/* we pass our 'chunk' struct to the callback function */
curl_easy_setopt(
_p_curl, CURLOPT_WRITEDATA, reinterpret_cast<void *>(buffer));
/* some servers don't like requests that are made without a user-agent
field, so we provide one */
curl_easy_setopt(_p_curl, CURLOPT_USERAGENT, "libcurl-agent/1.0");
/* get it! */
int retry_num = 3;
bool is_succ = false;
for (int i = 0; i < retry_num; ++i) {
res = curl_easy_perform(_p_curl);
/* check for errors */
if (res != CURLE_OK) {
std::cerr << "curl_easy_perform() failed:" << curl_easy_strerror(res)
<< std::endl;
} else {
/*
* Now, our chunk.memory points to a memory block that is chunk.size
* bytes big and contains the remote file.
*
* Do something nice with it!
*/
is_succ = true;
break;
}
}
return is_succ ? buffer : "";
}
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <gflags/gflags.h>
#include <time.h>
#include <iostream>
#include <vector>
#include "butil/logging.h"
#include "cube-builder/builder_job.h"
#include "cube-builder/crovl_builder_increment.h"
#include "cube-builder/util.h"
DEFINE_string(dict_name, "", "dict name, no need");
DEFINE_string(input_path, "", "source data input path");
DEFINE_string(output_path, "", "source data input path");
DEFINE_string(job_mode, "base", "job mode base/delta default:base");
DEFINE_int32(last_version, 0, "last version, job mode delta need");
DEFINE_int32(cur_version, 0, "current version, no need");
DEFINE_int32(depend_version, 0, "depend version, job mode delta need");
DEFINE_int32(shard_num, -1, "shard num");
DEFINE_string(master_address, "", "master address, no need");
DEFINE_bool(only_build, true, "wheather build need transfer");
int main(int argc, char *argv[]) {
google::SetVersionString("1.0.0.0");
google::SetUsageMessage("Usage : ./cube-builder --help ");
google::ParseCommandLineFlags(&argc, &argv, true);
google::InitGoogleLogging(argv[0]);
google::SetLogDestination(google::GLOG_INFO, "./log_");
google::SetStderrLogging(google::GLOG_WARNING);
LOG(INFO) << "start";
string last_version = std::to_string(FLAGS_last_version);
string cur_version = std::to_string(FLAGS_cur_version);
string depend_version = std::to_string(FLAGS_depend_version);
if (FLAGS_input_path == "" || FLAGS_output_path == "" ||
FLAGS_shard_num == -1) {
LOG(ERROR) << "Param error! Usage: " << argv[0] << " --help";
return -1;
}
if (FLAGS_job_mode == "base") {
if (FLAGS_only_build) {
time_t t;
time(&t);
cur_version = std::to_string(t);
depend_version = cur_version;
}
} else if (FLAGS_job_mode == "delta") {
if (FLAGS_last_version == 0 || FLAGS_depend_version == 0) {
LOG(ERROR) << "Param error! need last_version and depend_version! Usage: "
<< argv[0] << " --help";
return -1;
} else {
if (FLAGS_only_build) {
time_t t;
time(&t);
cur_version = std::to_string(t);
}
}
} else {
LOG(ERROR) << "Job mode error! Usage: " << argv[0] << " --help";
return -1;
}
Job job;
job.set_dict_name(FLAGS_dict_name);
job.set_shard_num(FLAGS_shard_num);
job.set_input_path(FLAGS_input_path);
job.set_output_path(FLAGS_output_path + "/" + depend_version + "_" +
cur_version);
job.set_job_mode(FLAGS_job_mode);
vector<string> files;
getAllFiles(job.get_input_path(), &files);
if (!checkDirectory(job.get_output_path())) {
LOG(ERROR) << "create output_path path failed: "
<< job.get_output_path().c_str();
return -1;
}
vector<CROVLBuilderIncremental *> reduces;
for (auto i = 0; i < job.get_shard_num(); i++) {
string tar_path = job.get_output_path() + "/" + job.get_dict_name() +
"_part" + std::to_string(i) + ".tar";
string build_path = job.get_output_path() + "/" + job.get_dict_name() +
"_part" + std::to_string(i);
CROVLBuilderIncremental *_builder = new CROVLBuilderIncremental();
if (!_builder->Init(IT_HASH,
MAX_BLOCK_SIZE,
job.get_job_mode().c_str(),
build_path.c_str(),
tar_path.c_str(),
job.get_dict_name().c_str(),
std::to_string(i),
std::to_string(0),
last_version,
cur_version,
depend_version,
FLAGS_master_address.c_str())) {
LOG(ERROR) << "CROVLBuilderIncremental init failed " << build_path;
return -1;
}
reduces.push_back(_builder);
}
for (auto file : files) {
mapFileLocal(job, file, reduces);
LOG(INFO) << "next file to reduce!";
}
for (auto reduce : reduces) {
reduce->done();
reduce->archive();
reduce->md5sum();
}
google::ShutdownGoogleLogging();
return 0;
}
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "cube-builder/seqfile_reader.h"
#include <arpa/inet.h>
#include "butil/logging.h"
#include "cube-builder/vtext.h"
int SequenceFileRecordReader::open() {
if (_raw_reader->open() != 0) {
return -1;
}
LOG(INFO) << "open sequence file ok! file:" << _path.c_str();
return 0;
}
int SequenceFileRecordReader::close() {
if (_raw_reader->close() != 0) {
return -1;
}
LOG(INFO) << "close sequence file ok! file:" << _path.c_str();
return 0;
}
int SequenceFileRecordReader::next(Record* record) {
uint32_t record_len = 0;
int64_t ret = _raw_reader->read(&record_len);
if (ret == 0) {
return 1; // ?????1???????????????
} else if (ret != sizeof(record_len)) {
LOG(ERROR) << "read sequence file:" << _path
<< " record(record_len) errno:" << ret
<< ", errmsg:" << _raw_reader->errno_to_str(ret);
return -1;
}
record->record_len = static_cast<int>(ntohl(record_len));
// got marker
if (record->record_len == -1) {
std::string marker;
if ((ret = _raw_reader->read_buf(&marker, 16)) != 16) {
LOG(ERROR) << "read sequence file:" << _path
<< " record(marker) errno:" << ret
<< ", errmsg:" << _raw_reader->errno_to_str(ret);
return -1;
}
if (marker != record->sync_marker) {
LOG(ERROR) << "read sequence file:" << _path
<< " record(sync_marker) error!";
return -1;
}
if ((ret = _raw_reader->read(&record->record_len)) !=
sizeof(record->record_len)) {
LOG(ERROR) << "read sequence file:" << _path
<< " record(len) errno:" << ret
<< ", errmsg:" << _raw_reader->errno_to_str(ret);
return -1;
}
record->record_len = static_cast<int>(ntohl(record->record_len));
}
uint32_t key_len = 0;
if ((ret = _raw_reader->read(&key_len)) != sizeof(key_len)) {
LOG(ERROR) << "read sequence file:" << _path
<< " record(key_len) errno:" << ret
<< ", errmsg:" << _raw_reader->errno_to_str(ret);
return -1;
}
record->key_len = static_cast<int>(ntohl(key_len));
if ((ret = _raw_reader->read_buf(&record->key, record->key_len)) !=
record->key_len) {
LOG(ERROR) << "read sequence file:" << _path
<< " record(key_len) errno:" << ret
<< ", errmsg:" << _raw_reader->errno_to_str(ret);
return -1;
}
if ((ret = _raw_reader->read_buf(&record->value,
record->record_len - record->key_len)) !=
(record->record_len - record->key_len)) {
LOG(ERROR) << "read sequence file:" << _path
<< " record(value_len) errno:" << ret
<< ", errmsg:" << _raw_reader->errno_to_str(ret);
return -1;
}
return 0;
}
int SequenceFileRecordReader::read_header() {
LOG(INFO) << "start to read sequence file header:" << _path;
char version[4];
if (_raw_reader->read_buf(&version, 4) != 4) {
LOG(ERROR) << "read sequence file header(version) error:" << _path;
return -1;
}
_header.version = version[3];
if (!VString::read_string(_raw_reader, &_header.key_class)) {
LOG(ERROR) << "read sequence file header(key_class) error:" << _path;
return -1;
}
if (!VString::read_string(_raw_reader, &_header.value_class)) {
LOG(ERROR) << "read sequence file header(value_class) error:" << _path;
return -1;
}
if (_raw_reader->read(&_header.is_compress) != sizeof(bool)) {
LOG(ERROR) << "read sequence file header(is_compress) error:" << _path;
return -1;
}
if (_raw_reader->read(&_header.is_block_compress) != sizeof(bool)) {
LOG(ERROR) << "read sequence file header(is_block_compress) error:"
<< _path;
return -1;
}
if (_header.is_compress) {
if (!VString::read_string(_raw_reader, &_header.compress_class)) {
LOG(ERROR) << "read sequence file header(compress_class) error:" << _path;
return -1;
}
}
int32_t meta_cnt = 0;
if (_raw_reader->read(&meta_cnt) != sizeof(int32_t)) {
LOG(ERROR) << "read sequence file header(meta_cnt) error:" << _path;
return -1;
}
_header.metas.resize(meta_cnt);
for (int32_t i = 0; i != meta_cnt; ++i) {
if (!VString::read_string(_raw_reader, &_header.metas[i].key)) {
LOG(ERROR) << "read sequence file header(meta_key) error:" << _path;
return -1;
}
if (!VString::read_string(_raw_reader, &_header.metas[i].value)) {
LOG(ERROR) << "read sequence file header(meta_value) error:" << _path;
return -1;
}
}
if (_raw_reader->read_buf(&_header.sync_marker, 16) != 16) {
LOG(ERROR) << "read sequence file header(sync_marker) error:" << _path;
return -1;
}
LOG(INFO) << "sync_marker:" << _header.sync_marker;
LOG(INFO) << "read sequence file header ok:" << _path;
return 0;
}
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "cube-builder/util.h"
#include <dirent.h>
#include <stdlib.h>
#include <string.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include "butil/logging.h"
void getAllFiles(std::string path, std::vector<std::string> *files) {
DIR *dir;
struct dirent *ptr;
if ((dir = opendir(path.c_str())) == NULL) {
perror("Open dri error...");
exit(1);
}
while ((ptr = readdir(dir)) != NULL) {
if (strcmp(ptr->d_name, ".") == 0 || strcmp(ptr->d_name, "..") == 0) {
continue;
} else if ((ptr->d_type) == 8) { // file
if (ptr->d_name[0] != '.') files->push_back(path + "/" + ptr->d_name);
} else if (ptr->d_type == 10) { // link file
continue;
} else if (ptr->d_type == 4) {
getAllFiles(path + "/" + ptr->d_name, files);
}
}
closedir(dir);
}
std::string string_to_hex(const std::string &input) {
static const char *const lut = "0123456789ABCDEF";
size_t len = input.length();
std::string output;
output.reserve(2 * len);
for (size_t i = 0; i < len; ++i) {
const unsigned char c = input[i];
output.push_back(lut[c >> 4]);
output.push_back(lut[c & 15]);
}
return output;
}
bool checkDirectory(const std::string folder) {
LOG(INFO) << "check dir:" << folder;
if (access(folder.c_str(), F_OK) == 0) {
return 1;
}
LOG(WARNING) << "no dir will mkdir:" << folder;
return (mkdir(folder.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH) == 0);
}
void CmdTarfiles(const std::string folder) {
std::string cmd = "cd " + folder + " && tar -cvPf " + folder + ".tar .";
LOG(INFO) << "tar file cmd:" << cmd;
system(cmd.c_str());
}
void CmdMd5sum(const std::string folder) {
std::string cmd = "md5sum " + folder + ".tar > " + folder + ".tar.md5";
LOG(INFO) << "md5sum file cmd:" << cmd;
system(cmd.c_str());
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册