提交 eccbef52 编写于 作者: D Dang Yifei 提交者: GitHub

Merge pull request #33 from Badangel/cube-builder

Cube builder init
......@@ -60,6 +60,7 @@ if (NOT DEFINED WITH_MKLDNN)
endif()
endif()
include(external/jsoncpp)
include(external/leveldb)
include(external/rocksdb)
include(external/zlib)
......@@ -83,6 +84,7 @@ include_directories(${PADDLE_SERVING_SOURCE_DIR})
include_directories(${PADDLE_SERVING_BINARY_DIR})
set(EXTERNAL_LIBS
jsoncpp
gflags
rocksdb
glog
......
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
INCLUDE(ExternalProject)
SET(JSONCPP_SOURCES_DIR ${THIRD_PARTY_PATH}/jsoncpp)
SET(JSONCPP_INSTALL_DIR ${THIRD_PARTY_PATH}/install/jsoncpp)
SET(JSONCPP_INCLUDE_DIR "${JSONCPP_INSTALL_DIR}/include" CACHE PATH "jsoncpp include directory." FORCE)
SET(JSONCPP_LIBRARIES "${JSONCPP_INSTALL_DIR}/lib" CACHE FILEPATH "jsoncpp library." FORCE)
INCLUDE_DIRECTORIES(${JSONCPP_INCLUDE_DIR})
ExternalProject_Add(
extern_jsoncpp
${EXTERNAL_PROJECT_LOG_ARGS}
PREFIX ${JSONCPP_SOURCES_DIR}
GIT_REPOSITORY "https://github.com/open-source-parsers/jsoncpp"
GIT_TAG 1.8.4
UPDATE_COMMAND ""
CMAKE_ARGS -DCMAKE_CXX_COMPILER=${CMAKE_CXX_COMPILER}
-DCMAKE_C_COMPILER=${CMAKE_C_COMPILER}
-DCMAKE_CXX_FLAGS=${CMAKE_CXX_FLAGS}
-DCMAKE_CXX_FLAGS_RELEASE=${CMAKE_CXX_FLAGS_RELEASE}
-DCMAKE_CXX_FLAGS_DEBUG=${CMAKE_CXX_FLAGS_DEBUG}
-DCMAKE_C_FLAGS=${CMAKE_C_FLAGS}
-DCMAKE_C_FLAGS_DEBUG=${CMAKE_C_FLAGS_DEBUG}
-DCMAKE_C_FLAGS_RELEASE=${CMAKE_C_FLAGS_RELEASE}
-DCMAKE_INSTALL_PREFIX=${JSONCPP_INSTALL_DIR}
-DBUILD_TESTING=OFF
-DCMAKE_BUILD_TYPE=${THIRD_PARTY_BUILD_TYPE}
${EXTERNAL_OPTIONAL_ARGS}
CMAKE_CACHE_ARGS -DCMAKE_INSTALL_PREFIX:PATH=${JSONCPP_INSTALL_DIR}
-DCMAKE_POSITION_INDEPENDENT_CODE:BOOL=ON
-DCMAKE_BUILD_TYPE:STRING=${THIRD_PARTY_BUILD_TYPE}
)
ADD_LIBRARY(jsoncpp STATIC IMPORTED GLOBAL)
SET_PROPERTY(TARGET jsoncpp PROPERTY IMPORTED_LOCATION ${JSONCPP_LIBRARIES})
ADD_DEPENDENCIES(jsoncpp extern_jsoncpp)
LIST(APPEND external_project_dependencies jsoncpp)
# Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
......@@ -14,3 +14,4 @@
add_subdirectory(cube-server)
add_subdirectory(cube-api)
add_subdirectory(cube-builder)
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License
project(cube-builder)
set(CMAKE_CXX_STANDARD 11)
add_compile_options(-std=c++11)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
include_directories(SYSTEM ${CMAKE_CURRENT_LIST_DIR}/include)
include_directories(SYSTEM ${CMAKE_CURRENT_BINARY_DIR}/../)
find_path(JSONCPP_INCLUDE_DIR json/json.h)
set(JSONCPP_LIBRARY ${JSONCPP_INCLUDE_DIR}/../lib/libjsoncpp.a)
message(STATUS "jsoncpp include: ${JSONCPP_INCLUDE_DIR}")
include_directories(${JSONCPP_INCLUDE_DIR})
find_library(CURL_LIB NAMES curl)
if (NOT CURL_LIB)
message(FATAL_ERROR "Fail to find curl")
endif()
add_executable(cube-builder src/main.cpp include/cube-builder/util.h src/util.cpp src/builder_job.cpp include/cube-builder/builder_job.h include/cube-builder/define.h src/seqfile_reader.cpp include/cube-builder/seqfile_reader.h include/cube-builder/raw_reader.h include/cube-builder/vtext.h src/crovl_builder_increment.cpp include/cube-builder/crovl_builder_increment.h src/curl_simple.cpp include/cube-builder/curl_simple.h)
set(DYNAMIC_LIB
gflags
${JSONCPP_LIBRARY}
-lssl
-lcrypto
${CURL_LIB}
)
target_link_libraries(cube-builder ${DYNAMIC_LIB})
# install
install(TARGETS cube-builder RUNTIME DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/bin)
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <iostream>
#include <string>
#include <vector>
#include "cube-builder/crovl_builder_increment.h"
#include "cube-builder/define.h"
using std::string;
using std::vector;
class Job {
public:
void set_shard_num(int num);
int get_shard_num();
void set_input_path(string path);
string get_input_path();
void set_output_path(string path);
string get_output_path();
void set_job_mode(mode mmode);
mode get_job_mode();
void set_dict_name(string name);
string get_dict_name();
private:
int shard_num;
string input_path;
string output_path;
mode job_mode;
string dict_name;
};
void mapFileLocal(Job job,
string file,
vector<CROVLBuilderIncremental*> reduces);
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <string>
#include <vector>
class CROVLBuilderIncremental {
public:
static const uint32_t MAX_DATA_LEN = 67108860;
static const uint32_t MAX_DATA_FILE_LEN = 1073741824;
static const uint32_t MAX_DATA_DIR_LEN = 512;
static const uint32_t MAX_FIXDATA_LEN = 504;
static const uint32_t INDEX_BUFFER = 4194304;
int _index_type;
uint32_t _data_file_len;
uint64_t _data_file;
uint32_t _data_buf_len;
uint32_t _index_buf_len;
uint32_t _index_file_num;
uint64_t _index_file_len;
uint64_t _count;
uint64_t _cur_count;
bool _fix;
uint32_t _fix_len;
uint32_t _rec_size;
uint64_t* _index;
char* _data;
std::vector<uint32_t> _data_file_list;
std::vector<uint64_t> _index_file_list;
char _data_dir[MAX_DATA_DIR_LEN + 1];
char _data_real_dir[MAX_DATA_DIR_LEN + 1];
char _last_data_dir[MAX_DATA_DIR_LEN + 1];
char _last_data_tmp_dir[MAX_DATA_DIR_LEN + 1];
uint64_t _inner_sort_size;
uint64_t _memory_quota;
bool flush_data();
bool FlushIndex();
bool Clear();
std::string _mode;
std::string _dict_name;
std::string _shard_id;
std::string _split_id;
std::string _last_version;
std::string _cur_version;
std::string _depend_version;
std::string _master_address;
std::string _id;
std::string _key;
std::string _extra;
public:
CROVLBuilderIncremental();
~CROVLBuilderIncremental();
bool Init(int index_type,
uint32_t data_file_len,
const char* mode,
const char* data_dir,
const char* data_real_dir,
const char* dict_name,
const std::string& shard_id,
const std::string& split_id,
const std::string& last_version,
const std::string& cur_version,
const std::string& depend_version,
const std::string& master_address,
const std::string& id = "",
const std::string& key = "",
const std::string& extra = "",
bool bFix = false,
uint32_t nFixLen = 8);
int add(uint64_t nKey, uint32_t nLen, const char* pData);
int add(uint64_t nKey, uint64_t nValue);
int add(uint64_t nKey, const char* pData);
bool done();
void archive();
void md5sum();
bool read_last_meta_from_transfer();
bool write_cur_meta_to_transfer();
bool read_last_meta_from_local();
bool write_cur_meta_to_local();
};
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <curl/curl.h>
#include <stdio.h>
#include <map>
#include <string>
#include "butil/logging.h"
class CurlSimple {
public:
CurlSimple();
~CurlSimple();
/* *
* @method curl_get
* @para:
* curl_handle, the pointer of CURL,
* url , string, input url string with get parameters
* @return:
* void
* */
std::string curl_get(const char *url);
/* *
* @method curl_post
* @para:
* curl_handle, the pointer of CURL,
* url , the input url string without post parameters
* para_map, std::map<std::string, std::string> the input post parameters
* @return:
* void
* */
std::string curl_post(const char *url,
const std::map<std::string, std::string> &para_map);
private:
/* *
* @method write_callback
* @para:
* contents, the pointer response data, it will cast to information you
* need
* size * nmemb is the memory size of contents
* userp, the poinser user return info
* @return:
* size_t must return size * nmemb, or it was failed
* */
static size_t write_callback(void *contents,
size_t size,
size_t nmemb,
void *userp);
private:
CURL *_p_curl;
};
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <iostream>
#include <string>
#include <vector>
typedef std::string mode;
enum IndexType {
IT_HASH, ///< hash index
IT_SORT, ///< sort index
};
const int MAX_BLOCK_SIZE = 1024 * 1024 * 1024;
struct Meta {
std::string key;
std::string value;
};
struct Header {
int version;
std::string key_class;
std::string value_class;
bool is_compress;
bool is_block_compress;
std::string compress_class;
std::vector<Meta> metas;
std::string sync_marker;
static const int s_sync_hash_size = 16;
};
struct Record {
explicit Record(const Header& header) : sync_marker(header.sync_marker) {}
const std::string& sync_marker;
int record_len;
int key_len;
std::string key;
std::string value;
};
class RecordReader {
public:
RecordReader() {}
virtual ~RecordReader() {}
virtual int open() = 0;
virtual int next(Record* record) = 0;
virtual int close() = 0;
}; // class RecordReader
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <fstream>
#include <string>
#include "butil/logging.h"
class RawReader {
public:
RawReader() {}
virtual ~RawReader() {}
virtual int open() = 0;
virtual int close() = 0;
virtual int64_t read(int32_t* v) = 0;
virtual int64_t read(uint32_t* v) = 0;
virtual int64_t read(bool* v) = 0;
virtual int64_t read(char* v) = 0;
virtual int64_t read_buf(std::string* s, size_t len) = 0;
virtual int64_t read_buf(void* v, size_t len) = 0;
virtual const char* errno_to_str(int err) = 0;
private:
};
class FileRawReader : public RawReader {
public:
explicit FileRawReader(const std::string& path) : _path(path) {}
virtual ~FileRawReader() {}
virtual int open() {
_reader.open(_path.c_str(), std::ifstream::binary);
if (!_reader.is_open()) {
return -1;
}
LOG(INFO) << "raw open sequence file ok! file:" << _path.c_str();
return 0;
}
virtual int close() {
_reader.close();
LOG(INFO) << "raw close sequence file ok! file:" << _path.c_str();
return 0;
}
virtual int64_t read(int32_t* v) {
_reader.read(reinterpret_cast<char*>(v), sizeof(int32_t));
if (_reader.good())
return sizeof(int32_t);
else if (_reader.eof())
return 0;
else
return -1;
}
virtual int64_t read(uint32_t* v) {
_reader.read(reinterpret_cast<char*>(v), sizeof(uint32_t));
if (_reader.good())
return sizeof(uint32_t);
else if (_reader.eof())
return 0;
else
return -1;
}
virtual int64_t read(bool* v) {
_reader.read(reinterpret_cast<char*>(v), sizeof(bool));
if (_reader.good())
return sizeof(bool);
else if (_reader.eof())
return 0;
else
return -1;
}
virtual int64_t read(char* v) {
_reader.read(reinterpret_cast<char*>(v), sizeof(char));
if (_reader.good())
return sizeof(char);
else if (_reader.eof())
return 0;
else
return -1;
}
virtual int64_t read_buf(std::string* s, size_t len) {
s->resize(len);
std::string& tmp = *s;
_reader.read(reinterpret_cast<char*>(&tmp[0]), len);
if (_reader.good())
return len;
else if (_reader.eof())
return 0;
else
return -1;
}
virtual int64_t read_buf(void* v, size_t len) {
_reader.read(reinterpret_cast<char*>(v), len);
if (_reader.good())
return len;
else if (_reader.eof())
return 0;
else
return -1;
}
virtual const char* errno_to_str(int err) {
switch (err) {
case -1:
return "read seqfile error";
}
return "default error";
}
private:
std::string _path{""};
std::ifstream _reader;
};
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <memory>
#include <string>
#include "cube-builder/define.h"
#include "cube-builder/raw_reader.h"
class SequenceFileRecordReader : public RecordReader {
public:
SequenceFileRecordReader() {}
explicit SequenceFileRecordReader(const std::string& path) {
_path = path;
_raw_reader = new FileRawReader(_path);
}
virtual ~SequenceFileRecordReader() {
if (_raw_reader != nullptr) {
delete _raw_reader;
}
}
virtual int open();
virtual int close();
virtual int next(Record* record);
const Header& get_header() { return _header; }
int read_header();
private:
std::string _path{""};
RawReader* _raw_reader{nullptr};
Header _header;
};
typedef std::shared_ptr<SequenceFileRecordReader> SequenceFileRecordReaderPtr;
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <iostream>
#include <string>
#include <vector>
#define PATH_DELIMITER '\\'
void getAllFiles(std::string path, std::vector<std::string> *files);
std::string string_to_hex(const std::string &input);
bool checkDirectory(const std::string folder);
void CmdTarfiles(const std::string folder);
void CmdMd5sum(const std::string folder);
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <string>
#include "cube-builder/define.h"
#include "cube-builder/raw_reader.h"
class VInt {
public:
static int32_t decode_vint_size(const char &value) {
if (value >= -112) {
return 1;
} else if (value < -120) {
return -119 - value;
}
return -111 - value;
}
static bool is_negative_vint(const char &value) {
return value < -120 || (value >= -112 && value < 0);
}
static bool read_vint(RawReader *reader, int32_t *vint) {
char first_byte;
if (reader->read(&first_byte) <= 0) {
return false;
}
int32_t len = decode_vint_size(first_byte);
if (len == 1) {
*vint = first_byte;
return true;
}
char ch;
int32_t bitlen = 0;
int32_t i = 0, lch;
for (int idx = len - 2; idx >= 0; idx--) {
if (reader->read(&ch) <= 0) {
return false;
}
bitlen = 8 * idx;
lch = ch;
i = i | ((lch << bitlen) & (0xFFL << bitlen));
}
*vint = (is_negative_vint(first_byte) ? (i ^ (int32_t)(-1)) : i);
return true;
}
};
class VString {
public:
static const char *encode(std::string str) { return encode(str, true); }
static const char *encode(std::string str, bool /*replace*/) {
// todo
return str.c_str();
}
static std::string decode(char *bytes) { return decode(bytes, true); }
static std::string decode(char *bytes, bool /*replace*/) {
// todo
return std::string(bytes);
}
// todo
static bool read_string(RawReader *reader, std::string *str) {
int length;
if (!VInt::read_vint(reader, &length)) {
return false;
}
if (reader->read_buf(str, length) != length) {
return false;
}
return true;
}
};
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "cube-builder/builder_job.h"
#include <stdio.h>
#include <iostream>
#include "butil/logging.h"
#include "cube-builder/seqfile_reader.h"
#include "cube-builder/util.h"
using std::string;
void Job::set_shard_num(int num) { shard_num = num; }
int Job::get_shard_num() { return shard_num; }
void Job::set_input_path(string path) { input_path = path; }
string Job::get_input_path() { return input_path; }
void Job::set_output_path(string path) { output_path = path; }
string Job::get_output_path() { return output_path; }
void Job::set_job_mode(mode mmode) { job_mode = mmode; }
mode Job::get_job_mode() { return job_mode; }
void Job::set_dict_name(string name) { dict_name = name; }
string Job::get_dict_name() { return dict_name; }
void mapFileLocal(Job job,
string file,
vector<CROVLBuilderIncremental *> reduces) {
SequenceFileRecordReader reader(file.c_str());
if (reader.open() != 0) {
LOG(ERROR) << "open file failed! " << file;
return;
}
if (reader.read_header() != 0) {
LOG(ERROR) << "read header error! " << file;
reader.close();
return;
}
Record record(reader.get_header());
int total_count = 0;
while (reader.next(&record) == 0) {
uint64_t key =
*reinterpret_cast<uint64_t *>(const_cast<char *>(record.key.data()));
total_count++;
int part = key % job.get_shard_num();
int64_t value_length = record.record_len - record.key_len;
reduces[part]->add(key, value_length, record.value.c_str());
}
if (reader.close() != 0) {
LOG(ERROR) << "close file failed! " << file;
return;
}
}
此差异已折叠。
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "cube-builder/curl_simple.h"
#include <json/json.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <iostream>
#include <map>
#include <string>
const size_t BUFFER_SIZE = 9096;
CurlSimple::CurlSimple() { _p_curl = curl_easy_init(); }
CurlSimple::~CurlSimple() { curl_easy_cleanup(_p_curl); }
/* *
* @method write_callback
* @para:
* contents, the pointer response data, it will cast to information you
* need
* size * nmemb is the memory size of contents
* userp, the poinser user return info
* @return:
* size_t must return size * nmemb, or it was failed
* */
size_t CurlSimple::write_callback(void *contents,
size_t size,
size_t nmemb,
void *userp) {
size_t realsize = size * nmemb;
snprintf(static_cast<char *>(userp),
BUFFER_SIZE,
"%s",
static_cast<char *>(contents));
return realsize;
} // end write_callback
/* *
* @method curl_get
* @para:
* _p_curl, the pointer of CURL,
* url , string, input url string with get parameters
* @return:
* void
* */
std::string CurlSimple::curl_get(const char *url) {
char buffer[BUFFER_SIZE];
CURLcode res;
/* specify URL to get */
curl_easy_setopt(_p_curl, CURLOPT_URL, url);
/* send all data to this function */
curl_easy_setopt(_p_curl, CURLOPT_WRITEFUNCTION, CurlSimple::write_callback);
/* we pass our 'chunk' struct to the callback function */
curl_easy_setopt(_p_curl, CURLOPT_WRITEDATA, static_cast<void *>(buffer));
/* some servers don't like requests that are made without a user-agent
field, so we provide one */
curl_easy_setopt(_p_curl, CURLOPT_USERAGENT, "libcurl-agent/1.0");
/* get it! */
res = curl_easy_perform(_p_curl);
/* check for errors */
if (res != CURLE_OK) {
LOG(ERROR) << "curl_easy_perform() failed: " << curl_easy_strerror(res);
return "";
} else {
/*
* Now, our chunk.memory points to a memory block that is chunk.size
* bytes big and contains the remote file.
*
* Do something nice with it!
*/
return buffer;
}
} // end curl_get
/* *
* @method curl_post
* @para:
* _p_curl, the pointer of CURL,
* url , the input url string without post parameters
* para_map, std::map<std::string, std::string> the input post parameters
* @return:
* void
* */
std::string CurlSimple::curl_post(
const char *url, const std::map<std::string, std::string> &para_map) {
char buffer[BUFFER_SIZE];
CURLcode res;
std::string para_url = "";
std::map<std::string, std::string>::const_iterator para_iterator;
bool is_first = true;
for (para_iterator = para_map.begin(); para_iterator != para_map.end();
para_iterator++) {
if (is_first) {
is_first = false;
} else {
para_url.append("&");
}
std::string key = para_iterator->first;
std::string value = para_iterator->second;
para_url.append(key);
para_url.append("=");
para_url.append(value);
}
LOG(INFO) << "para_url=" << para_url.c_str() << " size:" << para_url.size();
/* specify URL to get */
curl_easy_setopt(_p_curl, CURLOPT_URL, url);
/* send all data to this function */
curl_easy_setopt(_p_curl, CURLOPT_WRITEFUNCTION, CurlSimple::write_callback);
/* send all data to this function */
curl_easy_setopt(_p_curl, CURLOPT_POSTFIELDS, para_url.c_str());
/* we pass our 'chunk' struct to the callback function */
curl_easy_setopt(
_p_curl, CURLOPT_WRITEDATA, reinterpret_cast<void *>(buffer));
/* some servers don't like requests that are made without a user-agent
field, so we provide one */
curl_easy_setopt(_p_curl, CURLOPT_USERAGENT, "libcurl-agent/1.0");
/* get it! */
int retry_num = 3;
bool is_succ = false;
for (int i = 0; i < retry_num; ++i) {
res = curl_easy_perform(_p_curl);
/* check for errors */
if (res != CURLE_OK) {
std::cerr << "curl_easy_perform() failed:" << curl_easy_strerror(res)
<< std::endl;
} else {
/*
* Now, our chunk.memory points to a memory block that is chunk.size
* bytes big and contains the remote file.
*
* Do something nice with it!
*/
is_succ = true;
break;
}
}
return is_succ ? buffer : "";
}
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <gflags/gflags.h>
#include <time.h>
#include <iostream>
#include <vector>
#include "butil/logging.h"
#include "cube-builder/builder_job.h"
#include "cube-builder/crovl_builder_increment.h"
#include "cube-builder/util.h"
DEFINE_string(dict_name, "", "dict name, no need");
DEFINE_string(input_path, "", "source data input path");
DEFINE_string(output_path, "", "source data input path");
DEFINE_string(job_mode, "base", "job mode base/delta default:base");
DEFINE_int32(last_version, 0, "last version, job mode delta need");
DEFINE_int32(cur_version, 0, "current version, no need");
DEFINE_int32(depend_version, 0, "depend version, job mode delta need");
DEFINE_int32(shard_num, -1, "shard num");
DEFINE_string(master_address, "", "master address, no need");
DEFINE_bool(only_build, true, "wheather build need transfer");
int main(int argc, char *argv[]) {
google::SetVersionString("1.0.0.0");
google::SetUsageMessage("Usage : ./cube-builder --help ");
google::ParseCommandLineFlags(&argc, &argv, true);
google::InitGoogleLogging(argv[0]);
google::SetLogDestination(google::GLOG_INFO, "./log_");
google::SetStderrLogging(google::GLOG_WARNING);
LOG(INFO) << "start";
string last_version = std::to_string(FLAGS_last_version);
string cur_version = std::to_string(FLAGS_cur_version);
string depend_version = std::to_string(FLAGS_depend_version);
if (FLAGS_input_path == "" || FLAGS_output_path == "" ||
FLAGS_shard_num == -1) {
LOG(ERROR) << "Param error! Usage: " << argv[0] << " --help";
return -1;
}
if (FLAGS_job_mode == "base") {
if (FLAGS_only_build) {
time_t t;
time(&t);
cur_version = std::to_string(t);
depend_version = cur_version;
}
} else if (FLAGS_job_mode == "delta") {
if (FLAGS_last_version == 0 || FLAGS_depend_version == 0) {
LOG(ERROR) << "Param error! need last_version and depend_version! Usage: "
<< argv[0] << " --help";
return -1;
} else {
if (FLAGS_only_build) {
time_t t;
time(&t);
cur_version = std::to_string(t);
}
}
} else {
LOG(ERROR) << "Job mode error! Usage: " << argv[0] << " --help";
return -1;
}
Job job;
job.set_dict_name(FLAGS_dict_name);
job.set_shard_num(FLAGS_shard_num);
job.set_input_path(FLAGS_input_path);
job.set_output_path(FLAGS_output_path + "/" + depend_version + "_" +
cur_version);
job.set_job_mode(FLAGS_job_mode);
vector<string> files;
getAllFiles(job.get_input_path(), &files);
if (!checkDirectory(job.get_output_path())) {
LOG(ERROR) << "create output_path path failed: "
<< job.get_output_path().c_str();
return -1;
}
vector<CROVLBuilderIncremental *> reduces;
for (auto i = 0; i < job.get_shard_num(); i++) {
string tar_path = job.get_output_path() + "/" + job.get_dict_name() +
"_part" + std::to_string(i) + ".tar";
string build_path = job.get_output_path() + "/" + job.get_dict_name() +
"_part" + std::to_string(i);
CROVLBuilderIncremental *_builder = new CROVLBuilderIncremental();
if (!_builder->Init(IT_HASH,
MAX_BLOCK_SIZE,
job.get_job_mode().c_str(),
build_path.c_str(),
tar_path.c_str(),
job.get_dict_name().c_str(),
std::to_string(i),
std::to_string(0),
last_version,
cur_version,
depend_version,
FLAGS_master_address.c_str())) {
LOG(ERROR) << "CROVLBuilderIncremental init failed " << build_path;
return -1;
}
reduces.push_back(_builder);
}
for (auto file : files) {
mapFileLocal(job, file, reduces);
LOG(INFO) << "next file to reduce!";
}
for (auto reduce : reduces) {
reduce->done();
reduce->archive();
reduce->md5sum();
}
google::ShutdownGoogleLogging();
return 0;
}
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "cube-builder/seqfile_reader.h"
#include <arpa/inet.h>
#include "butil/logging.h"
#include "cube-builder/vtext.h"
int SequenceFileRecordReader::open() {
if (_raw_reader->open() != 0) {
return -1;
}
LOG(INFO) << "open sequence file ok! file:" << _path.c_str();
return 0;
}
int SequenceFileRecordReader::close() {
if (_raw_reader->close() != 0) {
return -1;
}
LOG(INFO) << "close sequence file ok! file:" << _path.c_str();
return 0;
}
int SequenceFileRecordReader::next(Record* record) {
uint32_t record_len = 0;
int64_t ret = _raw_reader->read(&record_len);
if (ret == 0) {
return 1; // ?????1???????????????
} else if (ret != sizeof(record_len)) {
LOG(ERROR) << "read sequence file:" << _path
<< " record(record_len) errno:" << ret
<< ", errmsg:" << _raw_reader->errno_to_str(ret);
return -1;
}
record->record_len = static_cast<int>(ntohl(record_len));
// got marker
if (record->record_len == -1) {
std::string marker;
if ((ret = _raw_reader->read_buf(&marker, 16)) != 16) {
LOG(ERROR) << "read sequence file:" << _path
<< " record(marker) errno:" << ret
<< ", errmsg:" << _raw_reader->errno_to_str(ret);
return -1;
}
if (marker != record->sync_marker) {
LOG(ERROR) << "read sequence file:" << _path
<< " record(sync_marker) error!";
return -1;
}
if ((ret = _raw_reader->read(&record->record_len)) !=
sizeof(record->record_len)) {
LOG(ERROR) << "read sequence file:" << _path
<< " record(len) errno:" << ret
<< ", errmsg:" << _raw_reader->errno_to_str(ret);
return -1;
}
record->record_len = static_cast<int>(ntohl(record->record_len));
}
uint32_t key_len = 0;
if ((ret = _raw_reader->read(&key_len)) != sizeof(key_len)) {
LOG(ERROR) << "read sequence file:" << _path
<< " record(key_len) errno:" << ret
<< ", errmsg:" << _raw_reader->errno_to_str(ret);
return -1;
}
record->key_len = static_cast<int>(ntohl(key_len));
if ((ret = _raw_reader->read_buf(&record->key, record->key_len)) !=
record->key_len) {
LOG(ERROR) << "read sequence file:" << _path
<< " record(key_len) errno:" << ret
<< ", errmsg:" << _raw_reader->errno_to_str(ret);
return -1;
}
if ((ret = _raw_reader->read_buf(&record->value,
record->record_len - record->key_len)) !=
(record->record_len - record->key_len)) {
LOG(ERROR) << "read sequence file:" << _path
<< " record(value_len) errno:" << ret
<< ", errmsg:" << _raw_reader->errno_to_str(ret);
return -1;
}
return 0;
}
int SequenceFileRecordReader::read_header() {
LOG(INFO) << "start to read sequence file header:" << _path;
char version[4];
if (_raw_reader->read_buf(&version, 4) != 4) {
LOG(ERROR) << "read sequence file header(version) error:" << _path;
return -1;
}
_header.version = version[3];
if (!VString::read_string(_raw_reader, &_header.key_class)) {
LOG(ERROR) << "read sequence file header(key_class) error:" << _path;
return -1;
}
if (!VString::read_string(_raw_reader, &_header.value_class)) {
LOG(ERROR) << "read sequence file header(value_class) error:" << _path;
return -1;
}
if (_raw_reader->read(&_header.is_compress) != sizeof(bool)) {
LOG(ERROR) << "read sequence file header(is_compress) error:" << _path;
return -1;
}
if (_raw_reader->read(&_header.is_block_compress) != sizeof(bool)) {
LOG(ERROR) << "read sequence file header(is_block_compress) error:"
<< _path;
return -1;
}
if (_header.is_compress) {
if (!VString::read_string(_raw_reader, &_header.compress_class)) {
LOG(ERROR) << "read sequence file header(compress_class) error:" << _path;
return -1;
}
}
int32_t meta_cnt = 0;
if (_raw_reader->read(&meta_cnt) != sizeof(int32_t)) {
LOG(ERROR) << "read sequence file header(meta_cnt) error:" << _path;
return -1;
}
_header.metas.resize(meta_cnt);
for (int32_t i = 0; i != meta_cnt; ++i) {
if (!VString::read_string(_raw_reader, &_header.metas[i].key)) {
LOG(ERROR) << "read sequence file header(meta_key) error:" << _path;
return -1;
}
if (!VString::read_string(_raw_reader, &_header.metas[i].value)) {
LOG(ERROR) << "read sequence file header(meta_value) error:" << _path;
return -1;
}
}
if (_raw_reader->read_buf(&_header.sync_marker, 16) != 16) {
LOG(ERROR) << "read sequence file header(sync_marker) error:" << _path;
return -1;
}
LOG(INFO) << "sync_marker:" << _header.sync_marker;
LOG(INFO) << "read sequence file header ok:" << _path;
return 0;
}
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "cube-builder/util.h"
#include <dirent.h>
#include <stdlib.h>
#include <string.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include "butil/logging.h"
void getAllFiles(std::string path, std::vector<std::string> *files) {
DIR *dir;
struct dirent *ptr;
if ((dir = opendir(path.c_str())) == NULL) {
perror("Open dri error...");
exit(1);
}
while ((ptr = readdir(dir)) != NULL) {
if (strcmp(ptr->d_name, ".") == 0 || strcmp(ptr->d_name, "..") == 0) {
continue;
} else if ((ptr->d_type) == 8) { // file
if (ptr->d_name[0] != '.') files->push_back(path + "/" + ptr->d_name);
} else if (ptr->d_type == 10) { // link file
continue;
} else if (ptr->d_type == 4) {
getAllFiles(path + "/" + ptr->d_name, files);
}
}
closedir(dir);
}
std::string string_to_hex(const std::string &input) {
static const char *const lut = "0123456789ABCDEF";
size_t len = input.length();
std::string output;
output.reserve(2 * len);
for (size_t i = 0; i < len; ++i) {
const unsigned char c = input[i];
output.push_back(lut[c >> 4]);
output.push_back(lut[c & 15]);
}
return output;
}
bool checkDirectory(const std::string folder) {
LOG(INFO) << "check dir:" << folder;
if (access(folder.c_str(), F_OK) == 0) {
return 1;
}
LOG(WARNING) << "no dir will mkdir:" << folder;
return (mkdir(folder.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH) == 0);
}
void CmdTarfiles(const std::string folder) {
std::string cmd = "cd " + folder + " && tar -cvPf " + folder + ".tar .";
LOG(INFO) << "tar file cmd:" << cmd;
system(cmd.c_str());
}
void CmdMd5sum(const std::string folder) {
std::string cmd = "md5sum " + folder + ".tar > " + folder + ".tar.md5";
LOG(INFO) << "md5sum file cmd:" << cmd;
system(cmd.c_str());
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册