diff --git a/core/cube/CMakeLists.txt b/core/cube/CMakeLists.txt index f9dc4d2c2508720f450b4aee3aba5dfdd7ccd43b..a61d2df92a92bc26fabd4a3cf87c6db1dc1cc3f0 100644 --- a/core/cube/CMakeLists.txt +++ b/core/cube/CMakeLists.txt @@ -11,10 +11,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License - -#execute_process(COMMAND go env -w GO111MODULE=off) add_subdirectory(cube-server) add_subdirectory(cube-api) add_subdirectory(cube-builder) -#add_subdirectory(cube-transfer) -#add_subdirectory(cube-agent) +add_subdirectory(cube-transfer) +add_subdirectory(cube-agent) diff --git a/core/cube/cube-agent/CMakeLists.txt b/core/cube/cube-agent/CMakeLists.txt index 30158aa506e53ec8a37d10aef4f29bfcd5a60d06..701f0c8a55e3326e1327f3b1f68458f99c60143b 100644 --- a/core/cube/cube-agent/CMakeLists.txt +++ b/core/cube/cube-agent/CMakeLists.txt @@ -15,7 +15,6 @@ set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_CURRENT_SOURCE_DIR}/cmake") project(cube-agent Go) - include(cmake/golang.cmake) ExternalGoProject_Add(agent-docopt-go github.com/docopt/docopt-go) diff --git a/core/cube/cube-builder/CMakeLists.txt b/core/cube/cube-builder/CMakeLists.txt index 65f77f4eb0ff16299d5ee54f192c2171ac5b956c..00278939b78235ba5f5b3042d347ad905ac3c8fe 100644 --- a/core/cube/cube-builder/CMakeLists.txt +++ b/core/cube/cube-builder/CMakeLists.txt @@ -22,7 +22,7 @@ include_directories(SYSTEM ${CMAKE_CURRENT_BINARY_DIR}/../) add_executable(cube-builder src/main.cpp include/cube-builder/util.h src/util.cpp src/builder_job.cpp include/cube-builder/builder_job.h include/cube-builder/define.h src/seqfile_reader.cpp include/cube-builder/seqfile_reader.h include/cube-builder/raw_reader.h include/cube-builder/vtext.h src/crovl_builder_increment.cpp include/cube-builder/crovl_builder_increment.h src/curl_simple.cpp include/cube-builder/curl_simple.h) -add_dependencies(cube-builder jsoncpp boost) +add_dependencies(cube-builder jsoncpp boost brpc) set(DYNAMIC_LIB gflags @@ -39,4 +39,8 @@ target_link_libraries(cube-builder ${DYNAMIC_LIB}) # install install(TARGETS cube-builder RUNTIME DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/bin) -install(DIRECTORY ${CMAKE_CURRENT_LIST_DIR}/tool DESTINATION ${PADDLE_SERVING_INSTALL_DIR}) +install(FILES ${CMAKE_CURRENT_LIST_DIR}/tool/kvtool.py DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/tool) + +install(FILES ${CMAKE_CURRENT_LIST_DIR}/tool/kv_to_seqfile.py DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/tool) + +install(DIRECTORY ${CMAKE_CURRENT_LIST_DIR}/tool/source DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/tool) diff --git a/core/cube/cube-server/include/cube/slim_hash_map.h b/core/cube/cube-server/include/cube/slim_hash_map.h index 761ce9214f628a824f257611c07b07dab2503a48..26e9cd8c5702810a3fcaa83eeaeac17cdae97ba1 100644 --- a/core/cube/cube-server/include/cube/slim_hash_map.h +++ b/core/cube/cube-server/include/cube/slim_hash_map.h @@ -212,7 +212,7 @@ class slim_hash_map { int copy_data_from(const slim_hash_map& rhs) { destroy(); - + LOG(INFO) << "start copy data, rhs info, mHashSize: " << rhs.m_nHashSize; if (rhs.m_nHashSize > 0) { m_hashTable = new (std::nothrow) uint32_t[rhs.m_nHashSize]; if (!m_hashTable) { @@ -231,7 +231,7 @@ class slim_hash_map { << sizeof(hash_node_t) * BLOCK_SIZE; return -1; } - + LOG(INFO) << "copy data, m_nBlockNum: " << m_nBlockNum << " , copy size:" << sizeof(hash_node_t) * BLOCK_SIZE; memcpy(m_blockAddr[m_nBlockNum], rhs.m_blockAddr[m_nBlockNum], sizeof(hash_node_t) * BLOCK_SIZE); @@ -265,11 +265,13 @@ class slim_hash_map { } size_type index = key % m_nHashSize; hash_node_t* node = get_node(m_hashTable[index]); - + int node_cnt = 0; while (node != NULL && node->data.first != key) { + LOG(INFO) << "node link get:" << node->data.first; + node_cnt++; node = get_node(node->next); } - + LOG(INFO) << "key: " << key << " , found count: " << node_cnt; if (node == NULL) { return end(); } @@ -390,7 +392,6 @@ class slim_hash_map { if (node != NULL) { return node->data.second; } - return add_node(index, key)->data.second; } void clear() { @@ -399,16 +400,16 @@ class slim_hash_map { m_nFreeEntries = 0; m_nSize = 0; } - bool load(const char* file) { + bool load(const char* file, uint32_t block_id) { // clear(); + // bias = 0 means base mode, bias = K means patch mode, and base dict has size K int size = sizeof(key_t) + sizeof(value_t); FILE* fp = fopen(file, "rb"); char* buf = reinterpret_cast(malloc(size * 100000)); - + LOG(INFO) << "current block id: " << block_id; if (fp == NULL || buf == NULL) { return false; } - size_t read_count; bool err = false; key_t key; @@ -423,6 +424,8 @@ class slim_hash_map { for (int i = 0; i < static_cast(read_count); ++i) { key = *(reinterpret_cast(buf + i * size)); value = *(reinterpret_cast(buf + i * size + sizeof(key_t))); + value = ((uint64_t)block_id << 32) | value; + LOG(INFO) << "slim map key: " << key << " , value: " << value; (*this)[key] = value; } } @@ -557,7 +560,6 @@ class slim_hash_map { } hash_node_t* add_node(uint32_t index, const key_type& key) { ++m_nSize; - if (m_nFreeEntries) { uint32_t addr = m_nFreeEntries; hash_node_t* node = get_node(addr); @@ -569,7 +571,7 @@ class slim_hash_map { } uint32_t block = ((m_nNextEntry & 0xFF800000) >> 23); - + //LOG(INFO) << "key: " << key << " here. index: " << index << " , m_nNextEntry: "<< m_nNextEntry << " , block:" << block<< ", m_nBlockNum:" << m_nBlockNum; if (block >= m_nBlockNum) { try { m_blockAddr[m_nBlockNum++] = new hash_node_t[BLOCK_SIZE]; @@ -581,7 +583,6 @@ class slim_hash_map { return NULL; } } - uint32_t addr = m_nNextEntry; ++m_nNextEntry; hash_node_t* node = get_node(addr); diff --git a/core/cube/cube-server/src/dict.cpp b/core/cube/cube-server/src/dict.cpp index 05f401115ab5e95f8b014bf30bda71d8a10a74cb..dd21d518e61bd199108032c2e382d76d3b8b55a7 100644 --- a/core/cube/cube-server/src/dict.cpp +++ b/core/cube/cube-server/src/dict.cpp @@ -51,13 +51,12 @@ int Dict::load(const std::string& dict_path, bool in_mem, const std::string& v_path) { TIME_FLAG(load_start); - int ret = load_index(dict_path, v_path); if (ret != E_OK) { LOG(WARNING) << "load index failed"; return ret; } - + LOG(INFO) << "load index in mem mode: " << in_mem ; if (in_mem) { ret = load_data(dict_path, v_path); if (ret != E_OK) { @@ -81,8 +80,11 @@ int Dict::load_index(const std::string& dict_path, const std::string& v_path) { std::string index_n_path(dict_path); index_n_path.append(v_path); index_n_path.append("/index.n"); + + uint32_t cur_block_id = 0; + if (_base_dict) cur_block_id = _base_dict->_block_set.size(); LOG(INFO) << "index file path: " << index_n_path; - + //ERR HERE std::unique_ptr pf(fopen(index_n_path.c_str(), "rb"), &fclose); if (pf.get() == NULL) { @@ -150,12 +152,16 @@ int Dict::load_index(const std::string& dict_path, const std::string& v_path) { return E_DATA_ERROR; } } else { + if (_slim_table.copy_data_from(_base_dict->_slim_table) != 0) { + LOG(ERROR) << "copy data from old index failed in patch mode"; + return E_DATA_ERROR; + } file_idx = 0; LOG(INFO) - << "index check file len failed in patch mode, set file_idx to 0"; + << "index check fail, direct copy"; } } - + LOG(INFO) << "resize slim table, new count: " << count/2; _slim_table.resize(count / 2); char file[1024]; @@ -167,6 +173,7 @@ int Dict::load_index(const std::string& dict_path, const std::string& v_path) { dict_path.c_str(), v_path.c_str(), file_idx); + LOG(INFO) << "load file str: " << file; if (stat(file, &fstat) < 0) { if (errno == ENOENT) { LOG(WARNING) << "index." << file_idx << " not exist"; @@ -181,8 +188,8 @@ int Dict::load_index(const std::string& dict_path, const std::string& v_path) { << (uint64_t)fstat.st_size; return E_DATA_ERROR; } - LOG(INFO) << "loading from index." << file_idx; - if (!_slim_table.load(file) || _slim_table.size() > count) { + LOG(INFO) << "loading from index." << file_idx << " . table size: " << _slim_table.size(); + if (!_slim_table.load(file, cur_block_id)) { return E_DATA_ERROR; } @@ -193,8 +200,15 @@ int Dict::load_index(const std::string& dict_path, const std::string& v_path) { } int Dict::load_data(const std::string& dict_path, const std::string& v_path) { + std::vector block_size; + uint64_t total_data_size = 0; if (_base_dict) { _block_set = _base_dict->_block_set; + LOG(INFO)<< "load data base dict block set size: " << _block_set[0].size; + for (size_t i = 0; i < _block_set.size(); ++i) { + block_size.push_back(_block_set[i].size); + total_data_size += _block_set[i].size; + } } std::string data_n_path(dict_path); @@ -212,8 +226,6 @@ int Dict::load_data(const std::string& dict_path, const std::string& v_path) { return E_DATA_ERROR; } - std::vector block_size; - uint64_t total_data_size = 0; for (uint32_t i = 0; i < count; ++i) { uint32_t size = 0; if (fread(reinterpret_cast(&size), sizeof(uint32_t), 1, pf) != 1) { @@ -222,6 +234,7 @@ int Dict::load_data(const std::string& dict_path, const std::string& v_path) { return E_DATA_ERROR; } block_size.push_back(size); + LOG(INFO) << "new block size: " << size; total_data_size += size; } g_data_size << (total_data_size / 1024 / 1024); @@ -229,36 +242,35 @@ int Dict::load_data(const std::string& dict_path, const std::string& v_path) { pf = NULL; uint32_t old_size = _block_set.size(); + LOG(INFO) << "load data old size: " << old_size; for (size_t i = 0; i < old_size; ++i) { if (_block_set[i].size != block_size[i]) { old_size = 0; break; } } - _block_set.resize(count); + LOG(INFO) << "load data block set count: " << count << " , old size: " << old_size; + _block_set.resize(count + old_size); for (size_t i = old_size; i < _block_set.size(); ++i) { char data_path[1024]; LOG(INFO) << "load from data." << i; - snprintf( - data_path, 1024, "%s%s/data.%lu", dict_path.c_str(), v_path.c_str(), i); - + //snprintf( + // data_path, 1024, "%s%s/data.%lu", dict_path.c_str(), v_path.c_str(), i); + snprintf(data_path, 1024, "%s%s/data.%lu", dict_path.c_str(), v_path.c_str(), i - old_size); FILE* data_file = fopen(data_path, "rb"); if (data_file == NULL) { - LOG(WARNING) << "open data file [" << data_path << " failed"; + LOG(WARNING) << "open data file [" << data_path << " ]failed"; _block_set[i].s_data.reset(); _block_set[i].size = 0; continue; } - - _block_set[i].s_data.reset( - reinterpret_cast(malloc(block_size[i] * sizeof(char)))); + _block_set[i].s_data.reset(reinterpret_cast(malloc(block_size[i] * sizeof(char)))); if (_block_set[i].s_data.get() == NULL) { LOG(ERROR) << "malloc data failed"; fclose(data_file); return E_OOM; } _block_set[i].size = block_size[i]; - if (fread(reinterpret_cast(_block_set[i].s_data.get()), sizeof(char), _block_set[i].size, @@ -267,7 +279,10 @@ int Dict::load_data(const std::string& dict_path, const std::string& v_path) { fclose(data_file); return E_DATA_ERROR; } - + LOG(INFO) << "load new data to BlockSet succ"; + for (size_t ii = 0; ii < 20; ++ii) { + LOG(INFO) << "data ptr: " << (int)(_block_set[i].s_data.get()[ii]); + } fclose(data_file); } @@ -386,12 +401,11 @@ bool Dict::seek(uint64_t key, char* buff, uint64_t* buff_size) { uint64_t flag = it->second; uint32_t id = (uint32_t)(flag >> 32); uint64_t addr = (uint32_t)(flag); - + LOG(INFO) << "search key: " << id << " , addr: " << addr; if (_block_set.size() > id) { uint32_t block_size = _block_set[id].size; char* block_data = NULL; block_data = _block_set[id].s_data.get(); - if (block_data && addr + sizeof(uint32_t) <= block_size) { uint32_t len = *(reinterpret_cast(block_data + addr)); if (addr + len <= block_size && len >= sizeof(uint32_t)) { @@ -405,6 +419,7 @@ bool Dict::seek(uint64_t key, char* buff, uint64_t* buff_size) { << default_buffer_size; return false; } + LOG(INFO) << "seek key: " << key << " , addr: " << addr; memcpy(buff, (block_data + addr + sizeof(uint32_t)), len - sizeof(uint32_t)); diff --git a/core/cube/cube-transfer/CMakeLists.txt b/core/cube/cube-transfer/CMakeLists.txt index 78e47c5b840631a3092f3a799e2424d370444a2e..2e9d3dede03c5b27bcd0e24eaa6584df343c09e2 100644 --- a/core/cube/cube-transfer/CMakeLists.txt +++ b/core/cube/cube-transfer/CMakeLists.txt @@ -18,11 +18,9 @@ project(cube-transfer Go) include(cmake/golang.cmake) -ExternalGoProject_Add(rfw github.com/mipearson/rfw) -ExternalGoProject_Add(docopt-go github.com/docopt/docopt-go) -add_custom_target(logex - COMMAND env GOPATH=${GOPATH} ${CMAKE_Go_COMPILER} get github.com/Badangel/logex - DEPENDS rfw) +ExternalGoProject_Add(transfer-rfw github.com/mipearson/rfw) +ExternalGoProject_Add(transfer-docopt-go github.com/docopt/docopt-go) +ExternalGoProject_Add(transfer-logex github.com/Badangel/logex) add_subdirectory(src) install(DIRECTORY ${CMAKE_CURRENT_LIST_DIR}/conf DESTINATION ${PADDLE_SERVING_INSTALL_DIR}) diff --git a/core/cube/cube-transfer/src/CMakeLists.txt b/core/cube/cube-transfer/src/CMakeLists.txt index 62d3f7ef7759a0d2a09eb4fe32a064694ece5408..b71278537a2ee03468019e7bd7e5ec4d786becf2 100644 --- a/core/cube/cube-transfer/src/CMakeLists.txt +++ b/core/cube/cube-transfer/src/CMakeLists.txt @@ -14,6 +14,6 @@ set(SOURCE_FILE cube-transfer.go) add_go_executable(cube-transfer ${SOURCE_FILE}) -add_dependencies(cube-transfer docopt-go) -add_dependencies(cube-transfer rfw) -add_dependencies(cube-transfer logex) +add_dependencies(cube-transfer transfer-docopt-go) +add_dependencies(cube-transfer transfer-rfw) +add_dependencies(cube-transfer transfer-logex) diff --git a/core/cube/cube-transfer/src/transfer/transfer.go b/core/cube/cube-transfer/src/transfer/transfer.go index 84ab7427333b3a639efd2e48df3dd248209924be..d29c29a496a62930a86ae1dcb44c02a4d32f1552 100644 --- a/core/cube/cube-transfer/src/transfer/transfer.go +++ b/core/cube/cube-transfer/src/transfer/transfer.go @@ -17,68 +17,56 @@ package transfer import ( "fmt" "github.com/Badangel/logex" - "os" - "time" "transfer/dict" ) func Start() { - go BackupTransfer() - logex.Notice(">>> starting server...") - addr := ":" + Port - err := startHttp(addr) - if err != nil { - logex.Fatalf("start http(addr=%v) failed: %v", addr, err) - os.Exit(255) - } - - logex.Notice(">>> start server succ") + BackupTransfer() } func BackupTransfer() { - for { - //trigger - version, err := TriggerStart(Dict.DonefileAddress) - if err != nil { - logex.Fatalf("[trigger err]trigger err:%v ", err) - fmt.Printf("[error]trigger err:%v \n", err) - break - } - logex.Noticef("[trigger] get version:%v \n", version) - if version.Id == 0 { - logex.Noticef("[sleep]no new version, sleep 5 min") - fmt.Printf("[sleep]no new version, wait 5 min\n") - time.Sleep(5 * time.Minute) - continue - } + //trigger + version, err := TriggerStart(Dict.DonefileAddress) + if err != nil { + logex.Fatalf("[trigger err]trigger err:%v ", err) + fmt.Printf("[error]trigger err:%v \n", err) + fmt.Print("transfer over!") + logex.Noticef("[transfer]status machine exit!") + return + } + logex.Noticef("[trigger] get version:%v \n", version) Dict.WaitVersionInfo = version - logex.Noticef("[trigger finish] WaitVersionInfo version:%v \n", Dict.WaitVersionInfo) - WriteWaitVersionInfoToFile() + logex.Noticef("[trigger finish] WaitVersionInfo version:%v \n", Dict.WaitVersionInfo) + WriteWaitVersionInfoToFile() - //builder - Dict.WaitVersionInfo.Status = dict.Dict_Status_Building - Dict.WaitVersionInfo.MetaInfos = make(map[int]string) - WriteWaitVersionInfoToFile() - if err = BuilderStart(Dict.WaitVersionInfo); err != nil { - logex.Fatalf("builder err:%v \n", err) - } + //builder + Dict.WaitVersionInfo.Status = dict.Dict_Status_Building + Dict.WaitVersionInfo.MetaInfos = make(map[int]string) + WriteWaitVersionInfoToFile() + if err = BuilderStart(Dict.WaitVersionInfo); err != nil { + logex.Fatalf("builder err:%v \n", err) + } - if Dict.WaitVersionInfo.Mode == dict.BASE { - var newCurrentVersion []dict.DictVersionInfo - Dict.CurrentVersionInfo = newCurrentVersion - WriteCurrentVersionInfoToFile() - } - logex.Noticef("[builder finish] WaitVersionInfo version:%v \n", Dict.WaitVersionInfo) + if Dict.WaitVersionInfo.Mode == dict.BASE { + var newCurrentVersion []dict.DictVersionInfo + Dict.CurrentVersionInfo = newCurrentVersion + WriteCurrentVersionInfoToFile() + } + if Dict.WaitVersionInfo.Mode == dict.DELTA { + var newCurrentVersion []dict.DictVersionInfo + Dict.CurrentVersionInfo = newCurrentVersion + WriteCurrentVersionInfoToFile() + } + logex.Noticef("[builder finish] WaitVersionInfo version:%v \n", Dict.WaitVersionInfo) - //deployer - Dict.WaitVersionInfo.Status = dict.Dict_Status_Deploying - WriteWaitVersionInfoToFile() - if err = DeployStart(Dict.WaitVersionInfo); err != nil { - logex.Fatalf("deploy err:%v \n", err) - } - logex.Noticef("[deploy finish]current version: %v\n",Dict.CurrentVersionInfo) + //deployer + Dict.WaitVersionInfo.Status = dict.Dict_Status_Deploying + WriteWaitVersionInfoToFile() + if err = DeployStart(Dict.WaitVersionInfo); err != nil { + logex.Fatalf("deploy err:%v \n", err) } + logex.Noticef("[deploy finish]current version: %v\n",Dict.CurrentVersionInfo) fmt.Print("transfer over!") logex.Noticef("[transfer]status machine exit!") } diff --git a/core/cube/cube-transfer/src/transfer/trigger.go b/core/cube/cube-transfer/src/transfer/trigger.go index b3696dc58b7ca33de307cbe7ea2d4509d269753c..768f7218c036d7d948c046a6763d11c68ce9a306 100644 --- a/core/cube/cube-transfer/src/transfer/trigger.go +++ b/core/cube/cube-transfer/src/transfer/trigger.go @@ -38,18 +38,19 @@ func GetDoneFileInfo(addr string) (version dict.DictVersionInfo, err error) { Wget(addr, donefileAddr) addr = donefileAddr } - - baseDonefile := addr + "/base.txt" - fmt.Printf("[trigrer]donefile path:%v \n", baseDonefile) - logex.Noticef("[trigrer]base donefile path:%v", baseDonefile) - contents, err := ioutil.ReadFile(baseDonefile) VersionLen := len(Dict.CurrentVersionInfo) version.DictName = Dict.DictName - if err != nil { - fmt.Printf("[trigrer]read files err:%v \n", err) - logex.Fatalf("[trigrer]read files err:%v ", err) + fmt.Printf("get into mode check here\n") + if Dict.DictMode == dict.BASE_ONLY { + baseDonefile := addr + "/base.txt" + fmt.Printf("[trigrer]donefile path:%v \n", baseDonefile) + logex.Noticef("[trigrer]base donefile path:%v", baseDonefile) + contents, err_0 := ioutil.ReadFile(baseDonefile) + if err_0 != nil { + fmt.Printf("[trigrer]read files err:%v \n", err_0) + logex.Fatalf("[trigrer]read files err:%v ", err_0) return - } else { + } else { contentss := string(contents) lines := strings.Split(contentss, "\n") index := len(lines) - 1 @@ -80,19 +81,21 @@ func GetDoneFileInfo(addr string) (version dict.DictVersionInfo, err error) { version.Mode = dict.BASE return } - } - if Dict.DictMode == dict.BASR_DELTA && VersionLen > 0 { + } + } + if Dict.DictMode == dict.BASR_DELTA { patchDonefile := addr + "/patch.txt" fmt.Printf("[trigrer]patchDonefile path:%v \n", patchDonefile) logex.Noticef("[trigrer]patch donefile path:%v", patchDonefile) - contents, err = ioutil.ReadFile(patchDonefile) - if err != nil { - fmt.Printf("read files err:%v \n", err) + contents, err_0 := ioutil.ReadFile(patchDonefile) + if err_0 != nil { + fmt.Printf("[trigrer]read files err:%v \n", err_0) + logex.Fatalf("[trigrer]read files err:%v ", err_0) return } else { contentss := string(contents) lines := strings.Split(contentss, "\n") - + fmt.Printf("[trigger]get patch lines here\n") for index := 0; index < len(lines)-1; index++ { if len(lines[index]) < 3 { logex.Noticef("[trigrer]get patch donfile info error") @@ -106,14 +109,15 @@ func GetDoneFileInfo(addr string) (version dict.DictVersionInfo, err error) { logex.Noticef("[trigrer]donfile info:%v", donefileInfo) newId, _ := strconv.Atoi(donefileInfo.Id) newKey, _ := strconv.Atoi(donefileInfo.Key) - if newId > Dict.CurrentVersionInfo[VersionLen-1].Id && newKey == Dict.CurrentVersionInfo[VersionLen-1].Key { + fmt.Printf("[trigger]read patch id: %d, key: %d\n", newId, newKey) + if VersionLen == 0 || newId > Dict.CurrentVersionInfo[VersionLen-1].Id { version.Id = newId version.Key, _ = strconv.Atoi(donefileInfo.Key) version.Input = donefileInfo.Input deployVersion := int(time.Now().Unix()) version.CreateTime = deployVersion version.Version = deployVersion - version.Depend = Dict.CurrentVersionInfo[VersionLen-1].Depend + version.Depend = deployVersion version.Mode = dict.DELTA return } diff --git a/core/cube/cube-transfer/src/transfer/util.go b/core/cube/cube-transfer/src/transfer/util.go index f3c1834319ab2752a2338cda737855854cf73356..8f9e5c545f35e504f248466e84f8a7d368b80db8 100644 --- a/core/cube/cube-transfer/src/transfer/util.go +++ b/core/cube/cube-transfer/src/transfer/util.go @@ -96,7 +96,8 @@ func ExeCommad(files string, params []string) (err error) { func Wget(ftpPath string, downPath string) { var params []string - params = append(params, "-P") + params = append(params, "--limit-rate=100m") + params = append(params, "-P") params = append(params, downPath) params = append(params, "-r") params = append(params, "-N") @@ -110,4 +111,4 @@ func Wget(ftpPath string, downPath string) { if err != nil { fmt.Printf("wget exe: %v\n", err) } -} \ No newline at end of file +} diff --git a/core/general-server/op/general_dist_kv_infer_op.cpp b/core/general-server/op/general_dist_kv_infer_op.cpp index 6cfd88788063a13d60f0f8ada29711760ecae174..2228ccb952b1a91a5e34f990ae4c186570b91f5d 100644 --- a/core/general-server/op/general_dist_kv_infer_op.cpp +++ b/core/general-server/op/general_dist_kv_infer_op.cpp @@ -37,7 +37,149 @@ using baidu::paddle_serving::predictor::general_model::Request; using baidu::paddle_serving::predictor::InferManager; using baidu::paddle_serving::predictor::PaddleGeneralModelConfig; -int GeneralDistKVInferOp::inference() { return 0; } +// DistKV Infer Op: seek cube and then call paddle inference +// op seq: general_reader-> dist_kv_infer -> general_response +int GeneralDistKVInferOp::inference() { + VLOG(2) << "Going to run inference"; + const std::vector pre_node_names = pre_names(); + if (pre_node_names.size() != 1) { + LOG(ERROR) << "This op(" << op_name() + << ") can only have one predecessor op, but received " + << pre_node_names.size(); + return -1; + } + const std::string pre_name = pre_node_names[0]; + + const GeneralBlob *input_blob = get_depend_argument(pre_name); + if (!input_blob) { + LOG(ERROR) << "input_blob is nullptr,error"; + return -1; + } + uint64_t log_id = input_blob->GetLogId(); + VLOG(2) << "(logid=" << log_id << ") Get precedent op name: " << pre_name; + + GeneralBlob *output_blob = mutable_data(); + if (!output_blob) { + LOG(ERROR) << "(logid=" << log_id << ") output_blob is nullptr,error"; + return -1; + } + output_blob->SetLogId(log_id); + + if (!input_blob) { + LOG(ERROR) << "(logid=" << log_id + << ") Failed mutable depended argument, op:" << pre_name; + return -1; + } + + const TensorVector *in = &input_blob->tensor_vector; + TensorVector *out = &output_blob->tensor_vector; + std::vector keys; + std::vector values; + int sparse_count = 0; // sparse inputs counts, sparse would seek cube + int dense_count = 0; // dense inputs counts, dense would directly call paddle infer + std::vector> dataptr_size_pairs; + size_t key_len = 0; + for (size_t i = 0; i < in->size(); ++i) { + if (in->at(i).dtype != paddle::PaddleDType::INT64) { + ++dense_count; + continue; + } + ++sparse_count; + size_t elem_num = 1; + for (size_t s = 0; s < in->at(i).shape.size(); ++s) { + elem_num *= in->at(i).shape[s]; + } + key_len += elem_num; + int64_t *data_ptr = static_cast(in->at(i).data.data()); + dataptr_size_pairs.push_back(std::make_pair(data_ptr, elem_num)); + } + keys.resize(key_len); + VLOG(3) << "(logid=" << log_id << ") cube number of keys to look up: " << key_len; + int key_idx = 0; + for (size_t i = 0; i < dataptr_size_pairs.size(); ++i) { + std::copy(dataptr_size_pairs[i].first, + dataptr_size_pairs[i].first + dataptr_size_pairs[i].second, + keys.begin() + key_idx); + key_idx += dataptr_size_pairs[i].second; + } + rec::mcube::CubeAPI *cube = rec::mcube::CubeAPI::instance(); + std::vector table_names = cube->get_table_names(); + if (table_names.size() == 0) { + LOG(ERROR) << "cube init error or cube config not given."; + return -1; + } + // gather keys and seek cube servers, put results in values + int ret = cube->seek(table_names[0], keys, &values); + VLOG(3) << "(logid=" << log_id << ") cube seek status: " << ret; + if (values.size() != keys.size() || values[0].buff.size() == 0) { + LOG(ERROR) << "cube value return null"; + } + // EMBEDDING_SIZE means the length of sparse vector, user can define length here. + size_t EMBEDDING_SIZE = values[0].buff.size() / sizeof(float); + TensorVector sparse_out; + sparse_out.resize(sparse_count); + TensorVector dense_out; + dense_out.resize(dense_count); + int cube_val_idx = 0; + int sparse_idx = 0; + int dense_idx = 0; + std::unordered_map in_out_map; + baidu::paddle_serving::predictor::Resource &resource = + baidu::paddle_serving::predictor::Resource::instance(); + std::shared_ptr model_config = resource.get_general_model_config().front(); + //copy data to tnsor + for (size_t i = 0; i < in->size(); ++i) { + if (in->at(i).dtype != paddle::PaddleDType::INT64) { + dense_out[dense_idx] = in->at(i); + ++dense_idx; + continue; + } + sparse_out[sparse_idx].lod.resize(in->at(i).lod.size()); + for (size_t x = 0; x < sparse_out[sparse_idx].lod.size(); ++x) { + sparse_out[sparse_idx].lod[x].resize(in->at(i).lod[x].size()); + std::copy(in->at(i).lod[x].begin(), + in->at(i).lod[x].end(), + sparse_out[sparse_idx].lod[x].begin()); + } + sparse_out[sparse_idx].dtype = paddle::PaddleDType::FLOAT32; + sparse_out[sparse_idx].shape.push_back(sparse_out[sparse_idx].lod[0].back()); + sparse_out[sparse_idx].shape.push_back(EMBEDDING_SIZE); + sparse_out[sparse_idx].name = model_config->_feed_name[i]; + sparse_out[sparse_idx].data.Resize(sparse_out[sparse_idx].lod[0].back() * + EMBEDDING_SIZE * sizeof(float)); + float *dst_ptr = static_cast(sparse_out[sparse_idx].data.data()); + for (int x = 0; x < sparse_out[sparse_idx].lod[0].back(); ++x) { + float *data_ptr = dst_ptr + x * EMBEDDING_SIZE; + memcpy(data_ptr, + values[cube_val_idx].buff.data(), + values[cube_val_idx].buff.size()); + cube_val_idx++; + } + ++sparse_idx; + } + VLOG(3) << "(logid=" << log_id << ") sparse tensor load success."; + TensorVector infer_in; + infer_in.insert(infer_in.end(), dense_out.begin(), dense_out.end()); + infer_in.insert(infer_in.end(), sparse_out.begin(), sparse_out.end()); + int batch_size = input_blob->_batch_size; + output_blob->_batch_size = batch_size; + Timer timeline; + int64_t start = timeline.TimeStampUS(); + timeline.Start(); + // call paddle inference here + if (InferManager::instance().infer( + engine_name().c_str(), &infer_in, out, batch_size)) { + LOG(ERROR) << "(logid=" << log_id << ") Failed do infer in fluid model: " << engine_name(); + return -1; + } + int64_t end = timeline.TimeStampUS(); + + CopyBlobInfo(input_blob, output_blob); + AddBlobInfo(output_blob, start); + AddBlobInfo(output_blob, end); + return 0; + +} DEFINE_OP(GeneralDistKVInferOp); } // namespace serving diff --git a/core/predictor/tools/CMakeLists.txt b/core/predictor/tools/CMakeLists.txt index 73e0d2a4b3a36681fbddd0b8789b394e89e792ff..c15ada04307c0fb546dc2cb7864d542fe8f2994f 100644 --- a/core/predictor/tools/CMakeLists.txt +++ b/core/predictor/tools/CMakeLists.txt @@ -2,3 +2,16 @@ set(seq_gen_src ${CMAKE_CURRENT_LIST_DIR}/seq_generator.cpp ${CMAKE_CURRENT_LIS LIST(APPEND seq_gen_src ${PROTO_SRCS}) add_executable(seq_generator ${seq_gen_src}) target_link_libraries(seq_generator protobuf -lpthread) + + +set(seq_reader_src ${CMAKE_CURRENT_LIST_DIR}/seq_reader.cpp ${CMAKE_CURRENT_LIST_DIR}/../../cube/cube-builder/src/seqfile_reader.cpp) +add_executable(seq_reader ${seq_reader_src}) +add_dependencies(seq_reader brpc) +install(TARGETS seq_reader + RUNTIME DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/bin + ARCHIVE DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/lib + LIBRARY DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/so + ) + +install(TARGETS seq_reader RUNTIME DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/tool) +install(TARGETS seq_generator RUNTIME DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/tool) diff --git a/core/predictor/tools/seq_reader.cpp b/core/predictor/tools/seq_reader.cpp new file mode 100644 index 0000000000000000000000000000000000000000..3bd271c03b8f172ad0eec067d54a928de0404bb3 --- /dev/null +++ b/core/predictor/tools/seq_reader.cpp @@ -0,0 +1,91 @@ +// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include "core/cube/cube-builder/include/cube-builder/seqfile_reader.h" +std::string string_to_hex(const std::string& input) { + static const char* const lut = "0123456789ABCDEF"; + size_t len = input.length(); + + std::string output; + output.reserve(2 * len); + for (size_t i = 0; i < len; ++i) { + const unsigned char c = input[i]; + output.push_back(lut[c >> 4]); + output.push_back(lut[c & 15]); + } + return output; +} + +void printSeq(std::string file, int limit) { + SequenceFileRecordReader reader(file.c_str()); + + if (reader.open() != 0) { + std::cerr << "open file failed! " << file; + return; + } + if (reader.read_header() != 0) { + std::cerr << "read header error! " << file; + reader.close(); + return; + } + + Record record(reader.get_header()); + int total_count = 0; + + while (reader.next(&record) == 0) { + uint64_t key = + *reinterpret_cast(const_cast(record.key.data())); + + total_count++; + int64_t value_length = record.record_len - record.key_len; + std::cout << "key: " << key << " , value: " << string_to_hex(record.value.c_str()) << std::endl; + if (total_count >= limit) { + break; + } + } + + if (reader.close() != 0) { + std::cerr << "close file failed! " << file; + return; + } +} + +int main(int argc, char **argv) { + if (argc != 3 && argc != 2) { + std::cout << "Seq Reader Usage:" << std::endl; + std::cout << "get all keys: ./seq_reader $FILENAME " << std::endl; + std::cout << "get some keys: ./seq_reader $FILENAME $KEY_NUM" << std::endl; + return -1; + } + if (argc == 3 || argc == 2) { + const char* filename_str = argv[1]; + std::cout << "cstr filename is " << filename_str << std::endl; + std::string filename = filename_str; + std::cout << "filename is " << filename << std::endl; + if (argc == 3) { + const char* key_num_str = argv[2]; + int key_limit = std::stoi(key_num_str); + printSeq(filename, key_limit); + } else { + printSeq(filename, INT_MAX); + } + } + return 0; +} diff --git a/doc/COMPILE.md b/doc/COMPILE.md index 1e27b32e69a4579b14b2d91da3032fec6c52b82d..ea7f53b2ed1704777b611a58c3a8d971d48eb312 100644 --- a/doc/COMPILE.md +++ b/doc/COMPILE.md @@ -87,6 +87,7 @@ go get -u github.com/grpc-ecosystem/grpc-gateway/protoc-gen-grpc-gateway@v1.15.2 go get -u github.com/grpc-ecosystem/grpc-gateway/protoc-gen-swagger@v1.15.2 go get -u github.com/golang/protobuf/protoc-gen-go@v1.4.3 go get -u google.golang.org/grpc@v1.33.0 +go env -w GO111MODULE=auto ``` diff --git a/doc/COMPILE_CN.md b/doc/COMPILE_CN.md index 33d5dfa9786d034c88002daf379a29d2b394ee07..89178cee78746013915fc416b212b5a49f6762c2 100644 --- a/doc/COMPILE_CN.md +++ b/doc/COMPILE_CN.md @@ -86,6 +86,7 @@ go get -u github.com/grpc-ecosystem/grpc-gateway/protoc-gen-grpc-gateway@v1.15.2 go get -u github.com/grpc-ecosystem/grpc-gateway/protoc-gen-swagger@v1.15.2 go get -u github.com/golang/protobuf/protoc-gen-go@v1.4.3 go get -u google.golang.org/grpc@v1.33.0 +go env -w GO111MODULE=auto ``` diff --git a/doc/CUBE_TEST_CN.md b/doc/CUBE_TEST_CN.md new file mode 100644 index 0000000000000000000000000000000000000000..c9e8c23ca3be43390ffd959d83c456cf47722056 --- /dev/null +++ b/doc/CUBE_TEST_CN.md @@ -0,0 +1,61 @@ +## 如果获得稀疏参数索引Cube所需的模型输入 + +### 背景知识 + +推荐系统需要大规模稀疏参数索引来帮助分布式部署,可在`python/example/criteo_ctr_with_cube`或是[PaddleRec](https://github.com/paddlepaddle/paddlerec)了解推荐模型。 + +稀疏参数索引的模型格式是SequenceFile,源自Hadoop生态的键值对格式文件。 + +为了方便调试,我们给出了从特定格式的可读文本文件到SequenceFile格式文件的转换工具,以及SequenceFile格式文件与可阅读文字的转换。 + +用户在调试Cube服务功能时,可以自定义KV对生成SequenceFile格式文件来进行调试。 +用户在验证Cube的配送正确性时,可以转换SequenceFile格式文件至可读文字来进行比对验证。 + +### 预备知识 + +- 需要会编译Paddle Serving,参见[编译文档](./COMPILE.md) + +### 用法 + +在编译结束后的安装文件,可以得到 seq_reader 和 kv_to_seqfile.py。 + +#### 生成SequenceFile + +在`output/tool/`下,修改`output/tool/source/file.txt`,该文件每一行对应一个键值对,用冒号`:`区分key和value部分。 + +例如: +``` +1676869128226002114:48241 37064 91 -539 114 51 -122 269 229 -134 -282 +1657749292782759014:167 40 98 27 117 10 -29 15 74 67 -54 +``` +执行 +``` +python kv_to_seqfile.py +``` +即可生成`data`文件夹,我们看下它的结构 + +``` +. +├── 20210805095422 +│   └── base +│   └── feature +└── donefile + └── base.txt +``` +其中`20210805095422/base/feature` 就是SequenceFile格式文件,donefile保存在`donefile/base.txt`。 + +#### 查看SequenceFile + +我们使用`seq_reader`工具来解读SequenceFile格式文件。 +``` +./seq_reader 20210805095422/base/feature 10 # 阅读开头的10个KV对 +./seq_reader 20210805095422/base/feature # 阅读所有KV对 +``` + +结果 +``` +key: 1676869128226002114 , value: 343832343109333730363409093931092D35333909313134093531092D3132320932363909323239092D313334092D323832 +key: 1657749292782759014 , value: 3136370934300909393809323709313137093130092D3239093135093734093637092D3534 +``` + +其中value 我们目前都以16进制的形式打印。 diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt index de1fe2843bd32a9cc9e2aa567c0ddddd7457c67c..589420ad45ae7f347c8e7b9b25c5cc0034830263 100644 --- a/python/CMakeLists.txt +++ b/python/CMakeLists.txt @@ -81,7 +81,6 @@ if (SERVER) if(WITH_LITE) set(VERSION_SUFFIX 2) endif() - add_custom_command( OUTPUT ${PADDLE_SERVING_BINARY_DIR}/.timestamp COMMAND cp -r diff --git a/python/examples/criteo_ctr_with_cube/README.md b/python/examples/criteo_ctr_with_cube/README.md new file mode 100755 index 0000000000000000000000000000000000000000..493b3d72c1fff9275c2a99cfee45efd4bef1af4c --- /dev/null +++ b/python/examples/criteo_ctr_with_cube/README.md @@ -0,0 +1,72 @@ +## Criteo CTR with Sparse Parameter Indexing Service + +([简体中文](./README_CN.md)|English) + +### Get Sample Dataset + +go to directory `python/examples/criteo_ctr_with_cube` +``` +sh get_data.sh +``` + +### Download Model and Sparse Parameter Sequence Files +``` +wget https://paddle-serving.bj.bcebos.com/unittest/ctr_cube_unittest.tar.gz +tar xf ctr_cube_unittest.tar.gz +mv models/ctr_client_conf ./ +mv models/ctr_serving_model_kv ./ +mv models/data ./cube/ +``` +the model will be in ./ctr_server_model_kv and ./ctr_client_config. + +### Start Sparse Parameter Indexing Service +``` +wget https://paddle-serving.bj.bcebos.com/others/cube_app.tar.gz +tar xf cube_app.tar.gz +mv cube_app/cube* ./cube/ +sh cube_prepare.sh & +``` + +Here, the sparse parameter is loaded by cube sparse parameter indexing service Cube. + +### Start RPC Predictor, the number of serving thread is 4(configurable in test_server.py) + +``` +python test_server.py ctr_serving_model_kv +``` + +### Run Prediction + +``` +python test_client.py ctr_client_conf/serving_client_conf.prototxt ./raw_data +``` + +### Benchmark + +CPU :Intel(R) Xeon(R) CPU 6148 @ 2.40GHz + +Model :[Criteo CTR](https://github.com/PaddlePaddle/Serving/blob/develop/python/examples/criteo_ctr_with_cube/network_conf.py) + +server core/thread num : 4/8 + +Run +``` +bash benchmark.sh +``` +1000 batches will be sent by every client + +| client thread num | prepro | client infer | op0 | op1 | op2 | postpro | avg_latency | qps | +| ------------------ | ------ | ------------ | ------ | ----- | ------ | ------- | ----- | ----- | +| 1 | 0.035 | 1.596 | 0.021 | 0.518 | 0.0024 | 0.0025 | 6.774 | 147.7 | +| 2 | 0.034 | 1.780 | 0.027 | 0.463 | 0.0020 | 0.0023 | 6.931 | 288.3 | +| 4 | 0.038 | 2.954 | 0.025 | 0.455 | 0.0019 | 0.0027 | 8.378 | 477.5 | +| 8 | 0.044 | 8.230 | 0.028 | 0.464 | 0.0023 | 0.0034 | 14.191 | 563.8 | +| 16 | 0.048 | 21.037 | 0.028 | 0.455 | 0.0025 | 0.0041 | 27.236 | 587.5 | + +the average latency of threads + +![avg cost](../../../doc/criteo-cube-benchmark-avgcost.png) + +The QPS is + +![qps](../../../doc/criteo-cube-benchmark-qps.png) diff --git a/python/examples/criteo_ctr_with_cube/README_CN.md b/python/examples/criteo_ctr_with_cube/README_CN.md new file mode 100644 index 0000000000000000000000000000000000000000..7a0eb43c203aafeb38b64d249954cdabf7bf7a38 --- /dev/null +++ b/python/examples/criteo_ctr_with_cube/README_CN.md @@ -0,0 +1,70 @@ +## 带稀疏参数索引服务的CTR预测服务 +(简体中文|[English](./README.md)) + +### 获取样例数据 +进入目录 `python/examples/criteo_ctr_with_cube` +``` +sh get_data.sh +``` + +### 下载模型和稀疏参数序列文件 +``` +wget https://paddle-serving.bj.bcebos.com/unittest/ctr_cube_unittest.tar.gz +tar xf ctr_cube_unittest.tar.gz +mv models/ctr_client_conf ./ +mv models/ctr_serving_model_kv ./ +mv models/data ./cube/ +``` +执行脚本后会在当前目录有ctr_server_model_kv和ctr_client_config文件夹。 + +### 启动稀疏参数索引服务 +``` +wget https://paddle-serving.bj.bcebos.com/others/cube_app.tar.gz +tar xf cube_app.tar.gz +mv cube_app/cube* ./cube/ +sh cube_prepare.sh & +``` + +此处,模型当中的稀疏参数会被存放在稀疏参数索引服务Cube当中。 + +### 启动RPC预测服务,服务端线程数为4(可在test_server.py配置) + +``` +python test_server.py ctr_serving_model_kv +``` + +### 执行预测 + +``` +python test_client.py ctr_client_conf/serving_client_conf.prototxt ./raw_data +``` + +### Benchmark + +设备 :Intel(R) Xeon(R) CPU 6148 @ 2.40GHz + +模型 :[Criteo CTR](https://github.com/PaddlePaddle/Serving/blob/develop/python/examples/criteo_ctr_with_cube/network_conf.py) + +server core/thread num : 4/8 + +执行 +``` +bash benchmark.sh +``` +客户端每个线程会发送1000个batch + +| client thread num | prepro | client infer | op0 | op1 | op2 | postpro | avg_latency | qps | +| ------------------ | ------ | ------------ | ------ | ----- | ------ | ------- | ----- | ----- | +| 1 | 0.035 | 1.596 | 0.021 | 0.518 | 0.0024 | 0.0025 | 6.774 | 147.7 | +| 2 | 0.034 | 1.780 | 0.027 | 0.463 | 0.0020 | 0.0023 | 6.931 | 288.3 | +| 4 | 0.038 | 2.954 | 0.025 | 0.455 | 0.0019 | 0.0027 | 8.378 | 477.5 | +| 8 | 0.044 | 8.230 | 0.028 | 0.464 | 0.0023 | 0.0034 | 14.191 | 563.8 | +| 16 | 0.048 | 21.037 | 0.028 | 0.455 | 0.0025 | 0.0041 | 27.236 | 587.5 | + +平均每个线程耗时图如下 + +![avg cost](../../../doc/criteo-cube-benchmark-avgcost.png) + +每个线程QPS耗时如下 + +![qps](../../../doc/criteo-cube-benchmark-qps.png) diff --git a/python/examples/criteo_ctr_with_cube/criteo_reader.py b/python/examples/criteo_ctr_with_cube/criteo_reader.py new file mode 100755 index 0000000000000000000000000000000000000000..2a80af78a9c2033bf246f703ca70a817ab786af3 --- /dev/null +++ b/python/examples/criteo_ctr_with_cube/criteo_reader.py @@ -0,0 +1,83 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# pylint: disable=doc-string-missing + +import sys +import paddle.fluid.incubate.data_generator as dg + + +class CriteoDataset(dg.MultiSlotDataGenerator): + def setup(self, sparse_feature_dim): + self.cont_min_ = [0, -3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] + self.cont_max_ = [ + 20, 600, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50 + ] + self.cont_diff_ = [ + 20, 603, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50 + ] + self.hash_dim_ = sparse_feature_dim + # here, training data are lines with line_index < train_idx_ + self.train_idx_ = 41256555 + self.continuous_range_ = range(1, 14) + self.categorical_range_ = range(14, 40) + + def _process_line(self, line): + features = line.rstrip('\n').split('\t') + dense_feature = [] + sparse_feature = [] + for idx in self.continuous_range_: + if features[idx] == '': + dense_feature.append(0.0) + else: + dense_feature.append((float(features[idx]) - self.cont_min_[idx - 1]) / \ + self.cont_diff_[idx - 1]) + for idx in self.categorical_range_: + sparse_feature.append( + [hash(str(idx) + features[idx]) % self.hash_dim_]) + + return dense_feature, sparse_feature, [int(features[0])] + + def infer_reader(self, filelist, batch, buf_size): + def local_iter(): + for fname in filelist: + with open(fname.strip(), "r") as fin: + for line in fin: + dense_feature, sparse_feature, label = self._process_line( + line) + #yield dense_feature, sparse_feature, label + yield [dense_feature] + sparse_feature + [label] + + import paddle + batch_iter = paddle.batch( + paddle.reader.shuffle( + local_iter, buf_size=buf_size), + batch_size=batch) + return batch_iter + + def generate_sample(self, line): + def data_iter(): + dense_feature, sparse_feature, label = self._process_line(line) + feature_name = ["dense_input"] + for idx in self.categorical_range_: + feature_name.append("C" + str(idx - 13)) + feature_name.append("label") + yield zip(feature_name, [dense_feature] + sparse_feature + [label]) + + return data_iter + + +if __name__ == "__main__": + criteo_dataset = CriteoDataset() + criteo_dataset.setup(int(sys.argv[1])) + criteo_dataset.run_from_stdin() diff --git a/python/examples/criteo_ctr_with_cube/cube_prepare.sh b/python/examples/criteo_ctr_with_cube/cube_prepare.sh new file mode 100755 index 0000000000000000000000000000000000000000..773baba4d91b02b244e766cd8ebf899cc740dbbc --- /dev/null +++ b/python/examples/criteo_ctr_with_cube/cube_prepare.sh @@ -0,0 +1,20 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# pylint: disable=doc-string-missing +#! /bin/bash + +mkdir -p cube_model +mkdir -p cube/data +./cube/cube-builder -dict_name=test_dict -job_mode=base -last_version=0 -cur_version=0 -depend_version=0 -input_path=./cube_model -output_path=${PWD}/cube/data -shard_num=1 -only_build=false +cd cube && ./cube diff --git a/python/examples/criteo_ctr_with_cube/get_data.sh b/python/examples/criteo_ctr_with_cube/get_data.sh new file mode 100755 index 0000000000000000000000000000000000000000..1f244b3a4aa81488bb493825576ba30c4b3bba22 --- /dev/null +++ b/python/examples/criteo_ctr_with_cube/get_data.sh @@ -0,0 +1,2 @@ +wget --no-check-certificate https://paddle-serving.bj.bcebos.com/data/ctr_prediction/ctr_data.tar.gz +tar -zxvf ctr_data.tar.gz diff --git a/python/examples/criteo_ctr_with_cube/local_train.py b/python/examples/criteo_ctr_with_cube/local_train.py new file mode 100755 index 0000000000000000000000000000000000000000..555e2e929c170c24a3175a88144ff74356d82514 --- /dev/null +++ b/python/examples/criteo_ctr_with_cube/local_train.py @@ -0,0 +1,101 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# pylint: disable=doc-string-missing + +from __future__ import print_function + +from args import parse_args +import os +import paddle.fluid as fluid +import paddle +import sys +from network_conf import dnn_model + +dense_feature_dim = 13 + +paddle.enable_static() +def train(): + args = parse_args() + sparse_only = args.sparse_only + if not os.path.isdir(args.model_output_dir): + os.mkdir(args.model_output_dir) + dense_input = fluid.layers.data( + name="dense_input", shape=[dense_feature_dim], dtype='float32') + sparse_input_ids = [ + fluid.layers.data( + name="C" + str(i), shape=[1], lod_level=1, dtype="int64") + for i in range(1, 27) + ] + label = fluid.layers.data(name='label', shape=[1], dtype='int64') + + #nn_input = None if sparse_only else dense_input + nn_input = dense_input + predict_y, loss, auc_var, batch_auc_var, infer_vars = dnn_model( + nn_input, sparse_input_ids, label, args.embedding_size, + args.sparse_feature_dim) + + optimizer = fluid.optimizer.SGD(learning_rate=1e-4) + optimizer.minimize(loss) + + exe = fluid.Executor(fluid.CPUPlace()) + exe.run(fluid.default_startup_program()) + dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + dataset.set_use_var([dense_input] + sparse_input_ids + [label]) + + python_executable = "python3.6" + pipe_command = "{} criteo_reader.py {}".format(python_executable, + args.sparse_feature_dim) + + dataset.set_pipe_command(pipe_command) + dataset.set_batch_size(128) + thread_num = 10 + dataset.set_thread(thread_num) + + whole_filelist = [ + "raw_data/part-%d" % x for x in range(len(os.listdir("raw_data"))) + ] + + print(whole_filelist) + dataset.set_filelist(whole_filelist[:100]) + dataset.load_into_memory() + fluid.layers.Print(auc_var) + epochs = 1 + for i in range(epochs): + exe.train_from_dataset( + program=fluid.default_main_program(), dataset=dataset, debug=True) + print("epoch {} finished".format(i)) + + import paddle_serving_client.io as server_io + feed_var_dict = {} + feed_var_dict['dense_input'] = dense_input + for i, sparse in enumerate(sparse_input_ids): + feed_var_dict["embedding_{}.tmp_0".format(i)] = sparse + fetch_var_dict = {"prob": predict_y} + + feed_kv_dict = {} + feed_kv_dict['dense_input'] = dense_input + for i, emb in enumerate(infer_vars): + feed_kv_dict["embedding_{}.tmp_0".format(i)] = emb + fetch_var_dict = {"prob": predict_y} + + server_io.save_model("ctr_serving_model", "ctr_client_conf", feed_var_dict, + fetch_var_dict, fluid.default_main_program()) + + server_io.save_model("ctr_serving_model_kv", "ctr_client_conf_kv", + feed_kv_dict, fetch_var_dict, + fluid.default_main_program()) + + +if __name__ == '__main__': + train() diff --git a/python/examples/criteo_ctr_with_cube/network_conf.py b/python/examples/criteo_ctr_with_cube/network_conf.py new file mode 100755 index 0000000000000000000000000000000000000000..2975533a72ad21d6dd5896446fd06c1f9bdfe8b4 --- /dev/null +++ b/python/examples/criteo_ctr_with_cube/network_conf.py @@ -0,0 +1,77 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# pylint: disable=doc-string-missing + +import paddle.fluid as fluid +import math + + +def dnn_model(dense_input, sparse_inputs, label, embedding_size, + sparse_feature_dim): + def embedding_layer(input): + emb = fluid.layers.embedding( + input=input, + is_sparse=True, + is_distributed=False, + size=[sparse_feature_dim, embedding_size], + param_attr=fluid.ParamAttr( + name="SparseFeatFactors", + initializer=fluid.initializer.Uniform())) + x = fluid.layers.sequence_pool(input=emb, pool_type='sum') + return emb, x + + def mlp_input_tensor(emb_sums, dense_tensor): + #if isinstance(dense_tensor, fluid.Variable): + # return fluid.layers.concat(emb_sums, axis=1) + #else: + return fluid.layers.concat(emb_sums + [dense_tensor], axis=1) + + def mlp(mlp_input): + fc1 = fluid.layers.fc(input=mlp_input, + size=400, + act='relu', + param_attr=fluid.ParamAttr( + initializer=fluid.initializer.Normal( + scale=1 / math.sqrt(mlp_input.shape[1])))) + fc2 = fluid.layers.fc(input=fc1, + size=400, + act='relu', + param_attr=fluid.ParamAttr( + initializer=fluid.initializer.Normal( + scale=1 / math.sqrt(fc1.shape[1])))) + fc3 = fluid.layers.fc(input=fc2, + size=400, + act='relu', + param_attr=fluid.ParamAttr( + initializer=fluid.initializer.Normal( + scale=1 / math.sqrt(fc2.shape[1])))) + pre = fluid.layers.fc(input=fc3, + size=2, + act='softmax', + param_attr=fluid.ParamAttr( + initializer=fluid.initializer.Normal( + scale=1 / math.sqrt(fc3.shape[1])))) + return pre + + emb_pair_sums = list(map(embedding_layer, sparse_inputs)) + emb_sums = [x[1] for x in emb_pair_sums] + infer_vars = [x[0] for x in emb_pair_sums] + mlp_in = mlp_input_tensor(emb_sums, dense_input) + predict = mlp(mlp_in) + cost = fluid.layers.cross_entropy(input=predict, label=label) + avg_cost = fluid.layers.reduce_sum(cost) + accuracy = fluid.layers.accuracy(input=predict, label=label) + auc_var, batch_auc_var, auc_states = \ + fluid.layers.auc(input=predict, label=label, num_thresholds=2 ** 12, slide_steps=20) + return predict, avg_cost, auc_var, batch_auc_var, infer_vars diff --git a/python/examples/criteo_ctr_with_cube/test_client.py b/python/examples/criteo_ctr_with_cube/test_client.py new file mode 100755 index 0000000000000000000000000000000000000000..bef04807e9b5d5c2cdc316828ed6f960f0eeb0f8 --- /dev/null +++ b/python/examples/criteo_ctr_with_cube/test_client.py @@ -0,0 +1,56 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# pylint: disable=doc-string-missing + +from paddle_serving_client import Client +import sys +import os +import criteo as criteo +import time +from paddle_serving_client.metric import auc +import numpy as np +py_version = sys.version_info[0] + +client = Client() +client.load_client_config(sys.argv[1]) +client.connect(["127.0.0.1:9292"]) + +batch = 1 +buf_size = 100 +dataset = criteo.CriteoDataset() +dataset.setup(1000001) +test_filelists = ["{}/part-0".format(sys.argv[2])] +reader = dataset.infer_reader(test_filelists, batch, buf_size) +label_list = [] +prob_list = [] +start = time.time() +for ei in range(10000): + if py_version == 2: + data = reader().next() + else: + data = reader().__next__() + feed_dict = {} + feed_dict['dense_input'] = data[0][0] + for i in range(1, 27): + feed_dict["embedding_{}.tmp_0".format(i - 1)] = np.array(data[0][i]).reshape(-1) + feed_dict["embedding_{}.tmp_0.lod".format(i - 1)] = [0, len(data[0][i])] + fetch_map = client.predict(feed=feed_dict, fetch=["prob"]) + print(fetch_map) + prob_list.append(fetch_map['prob'][0][1]) + label_list.append(data[0][-1][0]) + +print(auc(label_list, prob_list)) +end = time.time() +print(end - start) + diff --git a/python/examples/criteo_ctr_with_cube/test_server.py b/python/examples/criteo_ctr_with_cube/test_server.py new file mode 100755 index 0000000000000000000000000000000000000000..479c602910b5afa52b35a66d00316f54905c0741 --- /dev/null +++ b/python/examples/criteo_ctr_with_cube/test_server.py @@ -0,0 +1,41 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# pylint: disable=doc-string-missing + +import os +import sys +from paddle_serving_server import OpMaker +from paddle_serving_server import OpSeqMaker +from paddle_serving_server import Server + +op_maker = OpMaker() +read_op = op_maker.create('general_reader') +general_dist_kv_infer_op = op_maker.create('general_dist_kv_infer') +response_op = op_maker.create('general_response') + +op_seq_maker = OpSeqMaker() +op_seq_maker.add_op(read_op) +op_seq_maker.add_op(general_dist_kv_infer_op) +op_seq_maker.add_op(response_op) + +server = Server() +server.set_op_sequence(op_seq_maker.get_op_sequence()) +server.set_num_threads(4) +server.load_model_config(sys.argv[1]) +server.prepare_server( + workdir="work_dir1", + port=9292, + device="cpu", + cube_conf="./cube/conf/cube.conf") +server.run_server() diff --git a/python/examples/pipeline/PaddleClas/DarkNet53/resnet50_web_service.py b/python/examples/pipeline/PaddleClas/DarkNet53/resnet50_web_service.py index c16966e3c4d9481b03344bc51abcd2a2090e5bb7..4fadfd7959ab548c2c88994a36604b2abb7db6d2 100644 --- a/python/examples/pipeline/PaddleClas/DarkNet53/resnet50_web_service.py +++ b/python/examples/pipeline/PaddleClas/DarkNet53/resnet50_web_service.py @@ -46,7 +46,7 @@ class ImagenetOp(Op): input_imgs = np.concatenate(imgs, axis=0) return {"image": input_imgs}, False, None, "" - def postprocess(self, input_dicts, fetch_dict, log_id): + def postprocess(self, input_dicts, fetch_dict, data_id, log_id): score_list = fetch_dict["prediction"] result = {"label": [], "prob": []} for score in score_list: diff --git a/python/examples/pipeline/PaddleClas/HRNet_W18_C/resnet50_web_service.py b/python/examples/pipeline/PaddleClas/HRNet_W18_C/resnet50_web_service.py index 6469657ac297c09f57f08c9cbafb806f62214fea..3e43ce8608e5e0edac1802910856be2ed6e6b635 100644 --- a/python/examples/pipeline/PaddleClas/HRNet_W18_C/resnet50_web_service.py +++ b/python/examples/pipeline/PaddleClas/HRNet_W18_C/resnet50_web_service.py @@ -49,7 +49,7 @@ class ImagenetOp(Op): input_imgs = np.concatenate(imgs, axis=0) return {"image": input_imgs}, False, None, "" - def postprocess(self, input_dicts, fetch_dict, log_id): + def postprocess(self, input_dicts, fetch_dict, data_id, log_id): score_list = fetch_dict["prediction"] result = {"label": [], "prob": []} for score in score_list: diff --git a/python/examples/pipeline/PaddleClas/MobileNetV1/resnet50_web_service.py b/python/examples/pipeline/PaddleClas/MobileNetV1/resnet50_web_service.py index 6469657ac297c09f57f08c9cbafb806f62214fea..3e43ce8608e5e0edac1802910856be2ed6e6b635 100644 --- a/python/examples/pipeline/PaddleClas/MobileNetV1/resnet50_web_service.py +++ b/python/examples/pipeline/PaddleClas/MobileNetV1/resnet50_web_service.py @@ -49,7 +49,7 @@ class ImagenetOp(Op): input_imgs = np.concatenate(imgs, axis=0) return {"image": input_imgs}, False, None, "" - def postprocess(self, input_dicts, fetch_dict, log_id): + def postprocess(self, input_dicts, fetch_dict, data_id, log_id): score_list = fetch_dict["prediction"] result = {"label": [], "prob": []} for score in score_list: diff --git a/python/examples/pipeline/PaddleClas/MobileNetV2/resnet50_web_service.py b/python/examples/pipeline/PaddleClas/MobileNetV2/resnet50_web_service.py index 6469657ac297c09f57f08c9cbafb806f62214fea..3e43ce8608e5e0edac1802910856be2ed6e6b635 100644 --- a/python/examples/pipeline/PaddleClas/MobileNetV2/resnet50_web_service.py +++ b/python/examples/pipeline/PaddleClas/MobileNetV2/resnet50_web_service.py @@ -49,7 +49,7 @@ class ImagenetOp(Op): input_imgs = np.concatenate(imgs, axis=0) return {"image": input_imgs}, False, None, "" - def postprocess(self, input_dicts, fetch_dict, log_id): + def postprocess(self, input_dicts, fetch_dict, data_id, log_id): score_list = fetch_dict["prediction"] result = {"label": [], "prob": []} for score in score_list: diff --git a/python/examples/pipeline/PaddleClas/MobileNetV3_large_x1_0/resnet50_web_service.py b/python/examples/pipeline/PaddleClas/MobileNetV3_large_x1_0/resnet50_web_service.py index 6469657ac297c09f57f08c9cbafb806f62214fea..3e43ce8608e5e0edac1802910856be2ed6e6b635 100644 --- a/python/examples/pipeline/PaddleClas/MobileNetV3_large_x1_0/resnet50_web_service.py +++ b/python/examples/pipeline/PaddleClas/MobileNetV3_large_x1_0/resnet50_web_service.py @@ -49,7 +49,7 @@ class ImagenetOp(Op): input_imgs = np.concatenate(imgs, axis=0) return {"image": input_imgs}, False, None, "" - def postprocess(self, input_dicts, fetch_dict, log_id): + def postprocess(self, input_dicts, fetch_dict, data_id, log_id): score_list = fetch_dict["prediction"] result = {"label": [], "prob": []} for score in score_list: diff --git a/python/examples/pipeline/PaddleClas/ResNeXt101_vd_64x4d/resnet50_web_service.py b/python/examples/pipeline/PaddleClas/ResNeXt101_vd_64x4d/resnet50_web_service.py index 6469657ac297c09f57f08c9cbafb806f62214fea..3e43ce8608e5e0edac1802910856be2ed6e6b635 100644 --- a/python/examples/pipeline/PaddleClas/ResNeXt101_vd_64x4d/resnet50_web_service.py +++ b/python/examples/pipeline/PaddleClas/ResNeXt101_vd_64x4d/resnet50_web_service.py @@ -49,7 +49,7 @@ class ImagenetOp(Op): input_imgs = np.concatenate(imgs, axis=0) return {"image": input_imgs}, False, None, "" - def postprocess(self, input_dicts, fetch_dict, log_id): + def postprocess(self, input_dicts, fetch_dict, data_id, log_id): score_list = fetch_dict["prediction"] result = {"label": [], "prob": []} for score in score_list: diff --git a/python/examples/pipeline/PaddleClas/ResNet50_vd/resnet50_web_service.py b/python/examples/pipeline/PaddleClas/ResNet50_vd/resnet50_web_service.py index 6469657ac297c09f57f08c9cbafb806f62214fea..3e43ce8608e5e0edac1802910856be2ed6e6b635 100644 --- a/python/examples/pipeline/PaddleClas/ResNet50_vd/resnet50_web_service.py +++ b/python/examples/pipeline/PaddleClas/ResNet50_vd/resnet50_web_service.py @@ -49,7 +49,7 @@ class ImagenetOp(Op): input_imgs = np.concatenate(imgs, axis=0) return {"image": input_imgs}, False, None, "" - def postprocess(self, input_dicts, fetch_dict, log_id): + def postprocess(self, input_dicts, fetch_dict, data_id, log_id): score_list = fetch_dict["prediction"] result = {"label": [], "prob": []} for score in score_list: diff --git a/python/examples/pipeline/PaddleClas/ResNet50_vd_FPGM/resnet50_web_service.py b/python/examples/pipeline/PaddleClas/ResNet50_vd_FPGM/resnet50_web_service.py index 668b119273df6ab351e5234badb98b41bef87c1e..b89c2cc74f4c57906ff871e1dde244d5b37098c4 100644 --- a/python/examples/pipeline/PaddleClas/ResNet50_vd_FPGM/resnet50_web_service.py +++ b/python/examples/pipeline/PaddleClas/ResNet50_vd_FPGM/resnet50_web_service.py @@ -49,7 +49,7 @@ class ImagenetOp(Op): input_imgs = np.concatenate(imgs, axis=0) return {"image": input_imgs}, False, None, "" - def postprocess(self, input_dicts, fetch_dict, log_id): + def postprocess(self, input_dicts, fetch_dict, data_id, log_id): score_list = fetch_dict["save_infer_model/scale_0.tmp_1"] result = {"label": [], "prob": []} for score in score_list: diff --git a/python/examples/pipeline/PaddleClas/ResNet50_vd_KL/resnet50_web_service.py b/python/examples/pipeline/PaddleClas/ResNet50_vd_KL/resnet50_web_service.py index 359ea1ed817c7117f64e68fd8a984aa0e7bf5f60..7aade27ea198afe1cbac7b775cfe3a6cbcb3b1df 100644 --- a/python/examples/pipeline/PaddleClas/ResNet50_vd_KL/resnet50_web_service.py +++ b/python/examples/pipeline/PaddleClas/ResNet50_vd_KL/resnet50_web_service.py @@ -49,7 +49,7 @@ class ImagenetOp(Op): input_imgs = np.concatenate(imgs, axis=0) return {"inputs": input_imgs}, False, None, "" - def postprocess(self, input_dicts, fetch_dict, log_id): + def postprocess(self, input_dicts, fetch_dict, data_id, log_id): score_list = fetch_dict["save_infer_model/scale_0.tmp_0"] result = {"label": [], "prob": []} for score in score_list: diff --git a/python/examples/pipeline/PaddleClas/ResNet50_vd_PACT/resnet50_web_service.py b/python/examples/pipeline/PaddleClas/ResNet50_vd_PACT/resnet50_web_service.py index 2c35ab72255fe2fabd9c83d7a3bd152b744bdd8e..2734521dda15fe1c491fc66c5536203888d00d23 100644 --- a/python/examples/pipeline/PaddleClas/ResNet50_vd_PACT/resnet50_web_service.py +++ b/python/examples/pipeline/PaddleClas/ResNet50_vd_PACT/resnet50_web_service.py @@ -49,7 +49,7 @@ class ImagenetOp(Op): input_imgs = np.concatenate(imgs, axis=0) return {"inputs": input_imgs}, False, None, "" - def postprocess(self, input_dicts, fetch_dict, log_id): + def postprocess(self, input_dicts, fetch_dict, data_id, log_id): score_list = fetch_dict["save_infer_model/scale_0.tmp_1"] result = {"label": [], "prob": []} for score in score_list: diff --git a/python/examples/pipeline/PaddleClas/ResNet_V2_50/resnet50_web_service.py b/python/examples/pipeline/PaddleClas/ResNet_V2_50/resnet50_web_service.py index 8b3815f7d7d1397ffd7618048a43a21b5b3123e0..6a7213b7abd0ddf892f64e81f96601205e5b249c 100644 --- a/python/examples/pipeline/PaddleClas/ResNet_V2_50/resnet50_web_service.py +++ b/python/examples/pipeline/PaddleClas/ResNet_V2_50/resnet50_web_service.py @@ -46,7 +46,7 @@ class ImagenetOp(Op): input_imgs = np.concatenate(imgs, axis=0) return {"image": input_imgs}, False, None, "" - def postprocess(self, input_dicts, fetch_dict, log_id): + def postprocess(self, input_dicts, fetch_dict, data_id, log_id): score_list = fetch_dict["score"] result = {"label": [], "prob": []} for score in score_list: diff --git a/python/examples/pipeline/PaddleClas/ShuffleNetV2_x1_0/resnet50_web_service.py b/python/examples/pipeline/PaddleClas/ShuffleNetV2_x1_0/resnet50_web_service.py index 6469657ac297c09f57f08c9cbafb806f62214fea..3e43ce8608e5e0edac1802910856be2ed6e6b635 100644 --- a/python/examples/pipeline/PaddleClas/ShuffleNetV2_x1_0/resnet50_web_service.py +++ b/python/examples/pipeline/PaddleClas/ShuffleNetV2_x1_0/resnet50_web_service.py @@ -49,7 +49,7 @@ class ImagenetOp(Op): input_imgs = np.concatenate(imgs, axis=0) return {"image": input_imgs}, False, None, "" - def postprocess(self, input_dicts, fetch_dict, log_id): + def postprocess(self, input_dicts, fetch_dict, data_id, log_id): score_list = fetch_dict["prediction"] result = {"label": [], "prob": []} for score in score_list: diff --git a/python/examples/pipeline/PaddleDetection/faster_rcnn/web_service.py b/python/examples/pipeline/PaddleDetection/faster_rcnn/web_service.py index 691d647befa8e1c583a53121e89ab5f2859f64b7..fa026000e399cf0246df4afa2a37005d40d53d70 100644 --- a/python/examples/pipeline/PaddleDetection/faster_rcnn/web_service.py +++ b/python/examples/pipeline/PaddleDetection/faster_rcnn/web_service.py @@ -19,6 +19,7 @@ import cv2 from paddle_serving_app.reader import * import base64 + class FasterRCNNOp(Op): def init_op(self): self.img_preprocess = Sequential([ @@ -38,22 +39,30 @@ class FasterRCNNOp(Op): im = cv2.imdecode(data, cv2.IMREAD_COLOR) im = self.img_preprocess(im) imgs.append({ - "image": im[np.newaxis,:], - "im_shape": np.array(list(im.shape[1:])).reshape(-1)[np.newaxis,:], - "scale_factor": np.array([1.0, 1.0]).reshape(-1)[np.newaxis,:], + "image": im[np.newaxis, :], + "im_shape": + np.array(list(im.shape[1:])).reshape(-1)[np.newaxis, :], + "scale_factor": np.array([1.0, 1.0]).reshape(-1)[np.newaxis, :], }) feed_dict = { - "image": np.concatenate([x["image"] for x in imgs], axis=0), - "im_shape": np.concatenate([x["im_shape"] for x in imgs], axis=0), - "scale_factor": np.concatenate([x["scale_factor"] for x in imgs], axis=0) + "image": np.concatenate( + [x["image"] for x in imgs], axis=0), + "im_shape": np.concatenate( + [x["im_shape"] for x in imgs], axis=0), + "scale_factor": np.concatenate( + [x["scale_factor"] for x in imgs], axis=0) } #for key in feed_dict.keys(): # print(key, feed_dict[key].shape) return feed_dict, False, None, "" - def postprocess(self, input_dicts, fetch_dict, log_id): + def postprocess(self, input_dicts, fetch_dict, data_id, log_id): #print(fetch_dict) - res_dict = {"bbox_result": str(self.img_postprocess(fetch_dict, visualize=False))} + res_dict = { + "bbox_result": + str(self.img_postprocess( + fetch_dict, visualize=False)) + } return res_dict, None, "" diff --git a/python/examples/pipeline/PaddleDetection/ppyolo_mbv3/web_service.py b/python/examples/pipeline/PaddleDetection/ppyolo_mbv3/web_service.py index 8611b0671862a887efd1705b3c1a922db906581d..1cfa0aee793d1a6fa22f109284c426b1e7676e0b 100644 --- a/python/examples/pipeline/PaddleDetection/ppyolo_mbv3/web_service.py +++ b/python/examples/pipeline/PaddleDetection/ppyolo_mbv3/web_service.py @@ -19,6 +19,7 @@ import cv2 from paddle_serving_app.reader import * import base64 + class PPYoloMbvOp(Op): def init_op(self): self.img_preprocess = Sequential([ @@ -38,23 +39,31 @@ class PPYoloMbvOp(Op): im = cv2.imdecode(data, cv2.IMREAD_COLOR) im = self.img_preprocess(im) imgs.append({ - "image": im[np.newaxis,:], - "im_shape": np.array(list(im.shape[1:])).reshape(-1)[np.newaxis,:], - "scale_factor": np.array([1.0, 1.0]).reshape(-1)[np.newaxis,:], + "image": im[np.newaxis, :], + "im_shape": + np.array(list(im.shape[1:])).reshape(-1)[np.newaxis, :], + "scale_factor": np.array([1.0, 1.0]).reshape(-1)[np.newaxis, :], }) feed_dict = { - "image": np.concatenate([x["image"] for x in imgs], axis=0), - "im_shape": np.concatenate([x["im_shape"] for x in imgs], axis=0), - "scale_factor": np.concatenate([x["scale_factor"] for x in imgs], axis=0) + "image": np.concatenate( + [x["image"] for x in imgs], axis=0), + "im_shape": np.concatenate( + [x["im_shape"] for x in imgs], axis=0), + "scale_factor": np.concatenate( + [x["scale_factor"] for x in imgs], axis=0) } for key in feed_dict.keys(): print(key, feed_dict[key].shape) return feed_dict, False, None, "" - def postprocess(self, input_dicts, fetch_dict, log_id): + def postprocess(self, input_dicts, fetch_dict, data_id, log_id): #print(fetch_dict) - res_dict = {"bbox_result": str(self.img_postprocess(fetch_dict, visualize=False))} + res_dict = { + "bbox_result": + str(self.img_postprocess( + fetch_dict, visualize=False)) + } return res_dict, None, "" diff --git a/python/examples/pipeline/PaddleDetection/yolov3/web_service.py b/python/examples/pipeline/PaddleDetection/yolov3/web_service.py index d28c22b9cc1060af29b6b31140911fc848bdec28..fa55f78067118184ae5b5541c1bc1fe36db617a0 100644 --- a/python/examples/pipeline/PaddleDetection/yolov3/web_service.py +++ b/python/examples/pipeline/PaddleDetection/yolov3/web_service.py @@ -19,6 +19,7 @@ import cv2 from paddle_serving_app.reader import * import base64 + class Yolov3Op(Op): def init_op(self): self.img_preprocess = Sequential([ @@ -38,22 +39,30 @@ class Yolov3Op(Op): im = cv2.imdecode(data, cv2.IMREAD_COLOR) im = self.img_preprocess(im) imgs.append({ - "image": im[np.newaxis,:], - "im_shape": np.array(list(im.shape[1:])).reshape(-1)[np.newaxis,:], - "scale_factor": np.array([1.0, 1.0]).reshape(-1)[np.newaxis,:], + "image": im[np.newaxis, :], + "im_shape": + np.array(list(im.shape[1:])).reshape(-1)[np.newaxis, :], + "scale_factor": np.array([1.0, 1.0]).reshape(-1)[np.newaxis, :], }) feed_dict = { - "image": np.concatenate([x["image"] for x in imgs], axis=0), - "im_shape": np.concatenate([x["im_shape"] for x in imgs], axis=0), - "scale_factor": np.concatenate([x["scale_factor"] for x in imgs], axis=0) + "image": np.concatenate( + [x["image"] for x in imgs], axis=0), + "im_shape": np.concatenate( + [x["im_shape"] for x in imgs], axis=0), + "scale_factor": np.concatenate( + [x["scale_factor"] for x in imgs], axis=0) } #for key in feed_dict.keys(): # print(key, feed_dict[key].shape) return feed_dict, False, None, "" - def postprocess(self, input_dicts, fetch_dict, log_id): + def postprocess(self, input_dicts, fetch_dict, data_id, log_id): #print(fetch_dict) - res_dict = {"bbox_result": str(self.img_postprocess(fetch_dict, visualize=False))} + res_dict = { + "bbox_result": + str(self.img_postprocess( + fetch_dict, visualize=False)) + } return res_dict, None, "" diff --git a/python/examples/pipeline/bert/README.md b/python/examples/pipeline/bert/README.md index 6074aa2b80dbe96c69726b7b8049e28db853445a..c396b77c9d2b9198d0474540872cb1c4dcdce5b1 100644 --- a/python/examples/pipeline/bert/README.md +++ b/python/examples/pipeline/bert/README.md @@ -4,7 +4,7 @@ This document will takes Imagenet service as an example to introduce how to use ## Get model ``` -sh get_model.sh +sh get_data.sh ``` ## Start server diff --git a/python/examples/pipeline/bert/README_CN.md b/python/examples/pipeline/bert/README_CN.md index ace7b76fe717c8a0922bf41aa5615b3b5da945a1..841abdadf5a3848fcf1e042d8e73c051610eefaa 100644 --- a/python/examples/pipeline/bert/README_CN.md +++ b/python/examples/pipeline/bert/README_CN.md @@ -4,7 +4,7 @@ ## 获取模型 ``` -sh get_model.sh +sh get_data.sh ``` ## 启动服务 diff --git a/python/examples/pipeline/bert/web_service.py b/python/examples/pipeline/bert/web_service.py index 7f5128f95d772a8d108e5ab3a92314eee103235d..46495a850886c6bc9f33a117a1485ec0d2ea6d9a 100644 --- a/python/examples/pipeline/bert/web_service.py +++ b/python/examples/pipeline/bert/web_service.py @@ -43,9 +43,11 @@ class BertOp(Op): print(key, feed_dict[key].shape) return feed_dict, False, None, "" - def postprocess(self, input_dicts, fetch_dict, log_id): - fetch_dict["pooled_output"] = str(fetch_dict["pooled_output"]) - return fetch_dict, None, "" + def postprocess(self, input_dicts, fetch_dict, data_id, log_id): + new_dict = {} + new_dict["pooled_output"] = str(fetch_dict["pooled_output"]) + new_dict["sequence_output"] = str(fetch_dict["sequence_output"]) + return new_dict, None, "" class BertService(WebService): diff --git a/python/examples/pipeline/imagenet/resnet50_web_service.py b/python/examples/pipeline/imagenet/resnet50_web_service.py index 53a0b6d9c5d7290b709df9c5ba7a314d29bbd08d..a4d37ed600a8eb90836b83f33f0cbe32e35d5008 100644 --- a/python/examples/pipeline/imagenet/resnet50_web_service.py +++ b/python/examples/pipeline/imagenet/resnet50_web_service.py @@ -42,7 +42,7 @@ class ImagenetOp(Op): img = self.seq(im) return {"image": img[np.newaxis, :].copy()}, False, None, "" - def postprocess(self, input_dicts, fetch_dict, log_id): + def postprocess(self, input_dicts, fetch_dict, data_id, log_id): print(fetch_dict) score_list = fetch_dict["score"] result = {"label": [], "prob": []} diff --git a/python/examples/pipeline/ocr/web_service.py b/python/examples/pipeline/ocr/web_service.py index 6724415886497e43595672b840f6ed9c7362f2ee..c19d481113a0563bbea92b5038968ae9d18e0ab5 100644 --- a/python/examples/pipeline/ocr/web_service.py +++ b/python/examples/pipeline/ocr/web_service.py @@ -54,7 +54,7 @@ class DetOp(Op): imgs.append(det_img[np.newaxis, :].copy()) return {"image": np.concatenate(imgs, axis=0)}, False, None, "" - def postprocess(self, input_dicts, fetch_dict, log_id): + def postprocess(self, input_dicts, fetch_dict, data_id, log_id): # print(fetch_dict) det_out = fetch_dict["concat_1.tmp_0"] ratio_list = [ @@ -149,7 +149,7 @@ class RecOp(Op): return feed_list, False, None, "" - def postprocess(self, input_dicts, fetch_data, log_id): + def postprocess(self, input_dicts, fetch_data, data_id, log_id): res_list = [] if isinstance(fetch_data, dict): if len(fetch_data) > 0: diff --git a/python/examples/pipeline/simple_web_service/web_service.py b/python/examples/pipeline/simple_web_service/web_service.py index ea3109cf998ab81ecf68f556c0254fe35b3f4091..5f999f94f9da10809c0128a45c115d90f05f0f41 100644 --- a/python/examples/pipeline/simple_web_service/web_service.py +++ b/python/examples/pipeline/simple_web_service/web_service.py @@ -40,9 +40,10 @@ class UciOp(Op): proc_dict = {} return input_dict, False, None, "" - def postprocess(self, input_dicts, fetch_dict, log_id): - _LOGGER.info("UciOp::postprocess >>> log_id:{}, fetch_dict:{}".format( - log_id, fetch_dict)) + def postprocess(self, input_dicts, fetch_dict, data_id, log_id): + _LOGGER.info( + "UciOp::postprocess >>> data_id:{}, log_id:{}, fetch_dict:{}". + format(data_id, log_id, fetch_dict)) fetch_dict["price"] = str(fetch_dict["price"]) return fetch_dict, None, "" diff --git a/python/examples/pipeline/simple_web_service/web_service_java.py b/python/examples/pipeline/simple_web_service/web_service_java.py index da944a1df2a3265f930eb458c11709dd6b9402ee..c4ddfb2b1b3c57b4975cac3dc048e1310aa10772 100644 --- a/python/examples/pipeline/simple_web_service/web_service_java.py +++ b/python/examples/pipeline/simple_web_service/web_service_java.py @@ -41,9 +41,10 @@ class UciOp(Op): return input_dict, False, None, "" - def postprocess(self, input_dicts, fetch_dict, log_id): - _LOGGER.info("UciOp::postprocess >>> log_id:{}, fetch_dict:{}".format( - log_id, fetch_dict)) + def postprocess(self, input_dicts, fetch_dict, data_id, log_id): + _LOGGER.info( + "UciOp::postprocess >>> data_id:{}, log_id:{}, fetch_dict:{}". + format(data_id, log_id, fetch_dict)) fetch_dict["price"] = str(fetch_dict["price"][0][0]) return fetch_dict, None, "" diff --git a/python/paddle_serving_app/local_predict.py b/python/paddle_serving_app/local_predict.py index a95aeecf99da5def9f83635e900faa89e851e8de..b4bc96e2b96f724a9d871b5a843635eba7aff4a2 100644 --- a/python/paddle_serving_app/local_predict.py +++ b/python/paddle_serving_app/local_predict.py @@ -127,7 +127,8 @@ class LocalPredictor(object): for i, var in enumerate(model_conf.fetch_var): self.fetch_names_to_idx_[var.alias_name] = i - self.fetch_names_to_type_[var.alias_name] = var.fetch_type + self.fetch_types_[var.alias_name] = var.fetch_type + self.fetch_names_to_type_[var.alias_name] = var.shape # set precision of inference. precision_type = paddle_infer.PrecisionType.Float32 @@ -253,8 +254,27 @@ class LocalPredictor(object): feed[name] = feed[name].astype("float32") elif self.feed_types_[name] == 2: feed[name] = feed[name].astype("int32") + elif self.feed_types_[name] == 3: + feed[name] = feed[name].astype("float64") + elif self.feed_types_[name] == 4: + feed[name] = feed[name].astype("int16") + elif self.feed_types_[name] == 5: + feed[name] = feed[name].astype("float16") + elif self.feed_types_[name] == 6: + feed[name] = feed[name].astype("uint16") + elif self.feed_types_[name] == 7: + feed[name] = feed[name].astype("uint8") + elif self.feed_types_[name] == 8: + feed[name] = feed[name].astype("int8") + elif self.feed_types_[name] == 9: + feed[name] = feed[name].astype("bool") + elif self.feed_types_[name] == 10: + feed[name] = feed[name].astype("complex64") + elif self.feed_types_[name] == 11: + feed[name] = feed[name].astype("complex128") else: raise ValueError("local predictor receives wrong data type") + input_tensor_handle = self.predictor.get_input_handle(name) if "{}.lod".format(name) in feed: input_tensor_handle.set_lod([feed["{}.lod".format(name)]]) diff --git a/python/paddle_serving_client/client.py b/python/paddle_serving_client/client.py index 95d6ca63099a9e13b408bf2712bd198d29a1dd56..42ba64198a04d70ec511ddd03edd9c9ef26d21a5 100755 --- a/python/paddle_serving_client/client.py +++ b/python/paddle_serving_client/client.py @@ -337,8 +337,6 @@ class Client(object): string_shape = [] fetch_names = [] - counter = 0 - for key in fetch_list: if key in self.fetch_names_: fetch_names.append(key) diff --git a/python/paddle_serving_client/io/__init__.py b/python/paddle_serving_client/io/__init__.py index b7b0898a3b3b811c8f089c8409b6c5f94185660a..7e09a53c77510a21fba993de74a4517b7267372d 100644 --- a/python/paddle_serving_client/io/__init__.py +++ b/python/paddle_serving_client/io/__init__.py @@ -31,6 +31,21 @@ import paddle.nn.functional as F import errno from paddle.jit import to_static +_PADDLE_DTYPE_2_NUMPY_DTYPE = { + core.VarDesc.VarType.BOOL: 'bool', + core.VarDesc.VarType.FP16: 'float16', + core.VarDesc.VarType.BF16: 'uint16', + core.VarDesc.VarType.FP32: 'float32', + core.VarDesc.VarType.FP64: 'float64', + core.VarDesc.VarType.INT8: 'int8', + core.VarDesc.VarType.INT16: 'int16', + core.VarDesc.VarType.INT32: 'int32', + core.VarDesc.VarType.INT64: 'int64', + core.VarDesc.VarType.UINT8: 'uint8', + core.VarDesc.VarType.COMPLEX64: 'complex64', + core.VarDesc.VarType.COMPLEX128: 'complex128', +} + def save_dygraph_model(serving_model_folder, client_config_folder, model): paddle.jit.save(model, "serving_tmp") @@ -57,13 +72,8 @@ def save_dygraph_model(serving_model_folder, client_config_folder, model): feed_var = model_conf.FeedVar() feed_var.alias_name = key feed_var.name = feed_var_dict[key].name + feed_var.feed_type = var_type_conversion(feed_var_dict[key].dtype) feed_var.is_lod_tensor = feed_var_dict[key].lod_level >= 1 - if feed_var_dict[key].dtype == core.VarDesc.VarType.INT64: - feed_var.feed_type = 0 - if feed_var_dict[key].dtype == core.VarDesc.VarType.FP32: - feed_var.feed_type = 1 - if feed_var_dict[key].dtype == core.VarDesc.VarType.INT32: - feed_var.feed_type = 2 if feed_var.is_lod_tensor: feed_var.shape.extend([-1]) else: @@ -77,13 +87,8 @@ def save_dygraph_model(serving_model_folder, client_config_folder, model): fetch_var = model_conf.FetchVar() fetch_var.alias_name = key fetch_var.name = fetch_var_dict[key].name + fetch_var.fetch_type = var_type_conversion(fetch_var_dict[key].dtype) fetch_var.is_lod_tensor = 1 - if fetch_var_dict[key].dtype == core.VarDesc.VarType.INT64: - fetch_var.fetch_type = 0 - if fetch_var_dict[key].dtype == core.VarDesc.VarType.FP32: - fetch_var.fetch_type = 1 - if fetch_var_dict[key].dtype == core.VarDesc.VarType.INT32: - fetch_var.fetch_type = 2 if fetch_var.is_lod_tensor: fetch_var.shape.extend([-1]) else: @@ -119,6 +124,59 @@ def save_dygraph_model(serving_model_folder, client_config_folder, model): fout.write(config.SerializeToString()) +def var_type_conversion(dtype): + """ + Variable type conversion + + Args: + dtype: type of core.VarDesc.VarType.xxxxx + (https://github.com/PaddlePaddle/Paddle/blob/release/2.1/python/paddle/framework/dtype.py) + + Returns: + (int)type value, -1 is type matching failed. + int64 => 0; + float32 => 1; + int32 => 2; + float64 => 3; + int16 => 4; + float16 => 5; + bfloat16 => 6; + uint8 => 7; + int8 => 8; + bool => 9; + complex64 => 10, + complex128 => 11; + """ + type_val = -1 + if dtype == core.VarDesc.VarType.INT64: + type_val = 0 + elif dtype == core.VarDesc.VarType.FP32: + type_val = 1 + elif dtype == core.VarDesc.VarType.INT32: + type_val = 2 + elif dtype == core.VarDesc.VarType.FP64: + type_val = 3 + elif dtype == core.VarDesc.VarType.INT16: + type_val = 4 + elif dtype == core.VarDesc.VarType.FP16: + type_val = 5 + elif dtype == core.VarDesc.VarType.BF16: + type_val = 6 + elif dtype == core.VarDesc.VarType.UINT8: + type_val = 7 + elif dtype == core.VarDesc.VarType.INT8: + type_val = 8 + elif dtype == core.VarDesc.VarType.BOOL: + type_val = 9 + elif dtype == core.VarDesc.VarType.COMPLEX64: + type_val = 10 + elif dtype == core.VarDesc.VarType.COMPLEX128: + type_val = 11 + else: + type_val = -1 + return type_val + + def save_model(server_model_folder, client_config_folder, feed_var_dict, @@ -164,18 +222,13 @@ def save_model(server_model_folder, config = model_conf.GeneralModelConfig() - #int64 = 0; float32 = 1; int32 = 2; for key in feed_var_dict: feed_var = model_conf.FeedVar() feed_var.alias_name = key feed_var.name = feed_var_dict[key].name + feed_var.feed_type = var_type_conversion(feed_var_dict[key].dtype) + feed_var.is_lod_tensor = feed_var_dict[key].lod_level >= 1 - if feed_var_dict[key].dtype == core.VarDesc.VarType.INT64: - feed_var.feed_type = 0 - if feed_var_dict[key].dtype == core.VarDesc.VarType.FP32: - feed_var.feed_type = 1 - if feed_var_dict[key].dtype == core.VarDesc.VarType.INT32: - feed_var.feed_type = 2 if feed_var.is_lod_tensor: feed_var.shape.extend([-1]) else: @@ -190,14 +243,10 @@ def save_model(server_model_folder, fetch_var = model_conf.FetchVar() fetch_var.alias_name = key fetch_var.name = fetch_var_dict[key].name - #fetch_var.is_lod_tensor = fetch_var_dict[key].lod_level >= 1 - fetch_var.is_lod_tensor = 1 - if fetch_var_dict[key].dtype == core.VarDesc.VarType.INT64: - fetch_var.fetch_type = 0 - if fetch_var_dict[key].dtype == core.VarDesc.VarType.FP32: - fetch_var.fetch_type = 1 - if fetch_var_dict[key].dtype == core.VarDesc.VarType.INT32: - fetch_var.fetch_type = 2 + fetch_var.fetch_type = var_type_conversion(fetch_var_dict[key].dtype) + + fetch_var.is_lod_tensor = fetch_var_dict[key].lod_level >= 1 + #fetch_var.is_lod_tensor = 1 if fetch_var.is_lod_tensor: fetch_var.shape.extend([-1]) else: diff --git a/python/paddle_serving_server/serve.py b/python/paddle_serving_server/serve.py index 284627843091cfaaaac4e6cce965984fd37be394..abccf5e179ccd4946fdf51a5f74ff6e5ee685b4c 100755 --- a/python/paddle_serving_server/serve.py +++ b/python/paddle_serving_server/serve.py @@ -101,7 +101,6 @@ def is_gpu_mode(unformatted_gpus): for ids in op_gpu_list: if int(ids) >= 0: return True - return False diff --git a/python/paddle_serving_server/server.py b/python/paddle_serving_server/server.py index 6b28e34f2ef1a94760767e63ecf0cbf05ae125e1..d1d3155112e44b0c71faa0bdd704dffa826aa077 100644 --- a/python/paddle_serving_server/server.py +++ b/python/paddle_serving_server/server.py @@ -140,7 +140,7 @@ class Server(object): def set_ir_optimize(self, flag=False): self.ir_optimization = flag - # Multi-Server does not have this Function. + # Multi-Server does not have this Function. def set_product_name(self, product_name=None): if product_name == None: raise ValueError("product_name can't be None.") @@ -437,7 +437,6 @@ class Server(object): def download_bin(self): os.chdir(self.module_path) - need_download = False #acquire lock version_file = open("{}/version.py".format(self.module_path), "r") diff --git a/python/pipeline/dag.py b/python/pipeline/dag.py index 1f8f9cefeb178f57bd613f6b4a7e7a4e4a9f90c4..69ed7124f51948e643e204001c699f820bf288f4 100644 --- a/python/pipeline/dag.py +++ b/python/pipeline/dag.py @@ -176,7 +176,7 @@ class DAGExecutor(object): "in_channel must be Channel type, but get {}". format(type(in_channel))) os._exit(-1) - in_channel.add_producer(self.name) + self._in_channel = in_channel _LOGGER.info("[DAG] set in channel succ, name [{}]".format(self.name)) @@ -669,14 +669,14 @@ class DAG(object): out_degree_ops) dag_views = list(reversed(dag_views)) if not self._build_dag_each_worker: - _LOGGER.debug("================== DAG ====================") + _LOGGER.info("================== DAG ====================") for idx, view in enumerate(dag_views): - _LOGGER.debug("(VIEW {})".format(idx)) + _LOGGER.info("(VIEW {})".format(idx)) for op in view: - _LOGGER.debug(" [{}]".format(op.name)) + _LOGGER.info(" [{}]".format(op.name)) for out_op in out_degree_ops[op.name]: - _LOGGER.debug(" - {}".format(out_op.name)) - _LOGGER.debug("-------------------------------------------") + _LOGGER.info(" - {}".format(out_op.name)) + _LOGGER.info("-------------------------------------------") # create channels and virtual ops virtual_op_name_gen = NameGenerator("vir") @@ -719,6 +719,7 @@ class DAG(object): channel = self._gen_channel(channel_name_gen) channels.append(channel) op.add_input_channel(channel) + _LOGGER.info("op:{} add input channel.".format(op.name)) pred_ops = pred_op_of_next_view_op[op.name] if v_idx == 0: input_channel = channel @@ -726,6 +727,8 @@ class DAG(object): # if pred_op is virtual op, it will use ancestors as producers to channel for pred_op in pred_ops: pred_op.add_output_channel(channel) + _LOGGER.info("pred_op:{} add output channel".format( + pred_op.name)) processed_op.add(op.name) # find same input op to combine channel for other_op in actual_next_view[o_idx + 1:]: @@ -745,6 +748,7 @@ class DAG(object): output_channel = self._gen_channel(channel_name_gen) channels.append(output_channel) last_op.add_output_channel(output_channel) + _LOGGER.info("last op:{} add output channel".format(last_op.name)) pack_func, unpack_func = None, None pack_func = response_op.pack_response_package @@ -752,7 +756,11 @@ class DAG(object): actual_ops = virtual_ops for op in used_ops: if len(op.get_input_ops()) == 0: + #set special features of the request op. + #1.set unpack function. + #2.set output channel. unpack_func = op.unpack_request_package + op.add_output_channel(input_channel) continue actual_ops.append(op) diff --git a/python/pipeline/operator.py b/python/pipeline/operator.py index f9f56219f21b2196a02fde621f8964d8445db520..87df16e060a5caec7211dba5d970afb5818c121c 100644 --- a/python/pipeline/operator.py +++ b/python/pipeline/operator.py @@ -58,13 +58,15 @@ class Op(object): retry=0, batch_size=None, auto_batching_timeout=None, - local_service_handler=None): + local_service_handler=None, + jump_to_ops=[]): # In __init__, all the parameters are just saved and Op is not initialized if name is None: name = _op_name_gen.next() self.name = name # to identify the type of OP, it must be globally unique self.concurrency = concurrency # amount of concurrency self.set_input_ops(input_ops) + self.set_jump_to_ops(jump_to_ops) self._local_service_handler = local_service_handler self._server_endpoints = server_endpoints @@ -99,9 +101,7 @@ class Op(object): conf: config.yaml Returns: - None """ - # init op if self.concurrency is None: self.concurrency = conf["concurrency"] if self._retry is None: @@ -372,6 +372,79 @@ class Op(object): os._exit(-1) self._input_ops.append(op) + def get_jump_to_ops(self): + return self._jump_to_ops + + def set_jump_to_ops(self, ops): + """ + Set jump to ops, then, this op can send channeldata to output channel. + + Args: + ops: op list to be jumpped + + Returns: + None. + """ + if not isinstance(ops, list): + ops = [] if ops is None else [ops] + + self._jump_to_ops = [] + for op in ops: + if not isinstance(op, Op): + _LOGGER.critical( + self._log("Failed to set input_ops: input op " + "must be Op type, not {}".format(type(op)))) + os._exit(-1) + self._jump_to_ops.append(op) + + def is_jump_op(self): + """ + The op has _jump_to_ops members or not. + + Args: + None + + Returns: + True or False + """ + return len(self._jump_to_ops) > 0 + + def check_jumping(self, input_data): + """ + Check whether to send data to jump ops.WhileOp needs to rewrite + this interface. this function returns False default. + + Args: + input_data: input data to be preprocessed + + Returns: + True, send data to the output channel of jump ops + False, send data to output channel. + """ + return False + + def get_output_channels_of_jump_ops(self): + """ + Get output channels of jump ops + + Args: + None + + Returns: + list of channels + """ + channels = [] + if self.is_jump_op() is False: + return channels + for op in self._jump_to_ops: + _LOGGER.info("op:{} extend op._get_output_channels:{}".format( + op.name, op._get_output_channels())) + channels.extend(op._get_output_channels()) + + _LOGGER.info("get_output_channels_of_jump_ops, channels:{}".format( + channels)) + return channels + def add_input_channel(self, channel): """ Adding one input channel to the Op. Each op have many front op, @@ -410,6 +483,7 @@ class Op(object): os._exit(-1) channel.add_producer(self.name) self._outputs.append(channel) + _LOGGER.info("op:{} add output_channel {}".format(self.name, channel)) def clean_output_channels(self): self._outputs = [] @@ -424,7 +498,7 @@ class Op(object): Args: input_dicts: input data to be preprocessed - data_id: inner unique id, 0 default + data_id: inner unique id, increase auto log_id: global unique id for RTT, 0 default Return: @@ -484,12 +558,13 @@ class Op(object): ''' return call_result - def postprocess(self, input_data, fetch_data, log_id=0): + def postprocess(self, input_data, fetch_data, data_id=0, log_id=0): """ In postprocess stage, assemble data for next op or output. Args: input_data: data returned in preprocess stage, dict(for single predict) or list(for batch predict) fetch_data: data returned in process stage, dict(for single predict) or list(for batch predict) + data_id: inner unique id, increase auto log_id: logid, 0 default Returns: @@ -593,7 +668,8 @@ class Op(object): self.device_type, self.devices, self.mem_optim, self.ir_optim, self.precision, self.use_mkldnn, self.mkldnn_cache_capacity, self.mkldnn_op_list, - self.mkldnn_bf16_op_list)) + self.mkldnn_bf16_op_list, self.is_jump_op(), + self.get_output_channels_of_jump_ops())) p.daemon = True p.start() process.append(p) @@ -629,7 +705,8 @@ class Op(object): self.device_type, self.devices, self.mem_optim, self.ir_optim, self.precision, self.use_mkldnn, self.mkldnn_cache_capacity, self.mkldnn_op_list, - self.mkldnn_bf16_op_list)) + self.mkldnn_bf16_op_list, self.is_jump_op(), + self.get_output_channels_of_jump_ops())) # When a process exits, it attempts to terminate # all of its daemonic child processes. t.daemon = True @@ -954,7 +1031,7 @@ class Op(object): prod_errcode, prod_errinfo = None, None try: postped_data, prod_errcode, prod_errinfo = self.postprocess( - parsed_data_dict[data_id], midped_data, + parsed_data_dict[data_id], midped_data, data_id, logid_dict.get(data_id)) except Exception as e: error_info = "(data_id={} log_id={}) {} Failed to postprocess: {}".format( @@ -1100,7 +1177,8 @@ class Op(object): def _run(self, concurrency_idx, input_channel, output_channels, is_thread_op, trace_buffer, model_config, workdir, thread_num, device_type, devices, mem_optim, ir_optim, precision, use_mkldnn, - mkldnn_cache_capacity, mkldnn_op_list, mkldnn_bf16_op_list): + mkldnn_cache_capacity, mkldnn_op_list, mkldnn_bf16_op_list, + is_jump_op, output_channels_of_jump_ops): """ _run() is the entry function of OP process / thread model.When client type is local_predictor in process mode, the CUDA environment needs to @@ -1127,6 +1205,8 @@ class Op(object): mkldnn_cache_capacity: cache capacity of mkldnn, 0 means no limit. mkldnn_op_list: OP list optimized by mkldnn, None default. mkldnn_bf16_op_list: OP list optimized by mkldnn bf16, None default. + is_jump_op: OP has jump op list or not, False default. + output_channels_of_jump_ops: all output channels of jump ops. Returns: None @@ -1267,27 +1347,46 @@ class Op(object): break if len(postped_data_dict) == 0: continue + # push data to channel (if run succ) start = int(round(_time() * 1000000)) try: profile_str = profiler.gen_profile_str() - for data_id, postped_data in postped_data_dict.items(): - if self._server_use_profile: - sys.stderr.write(profile_str) - self._push_to_output_channels( - data=postped_data, - channels=output_channels, - profile_str=profile_str, - client_need_profile=need_profile_dict[data_id], - profile_set=profile_dict[data_id]) - after_outchannel_time = _time() - _LOGGER.debug( - "(data_id={}) PUSH OUTPUT CHANNEL! op:{} push cost:{} ms". - format(data_id, self.name, (after_outchannel_time - - after_postp_time) * 1000)) - _LOGGER.debug( - "(data_id={}) PUSH OUTPUT CHANNEL! op:{} push data:{}". - format(data_id, self.name, postped_data.get_all_data())) + if self.is_jump_op() is True and self.check_jumping( + postped_data_dict) is True: + # push data to output channel of ops to be jumped + for data_id, postped_data in postped_data_dict.items(): + if self._server_use_profile: + sys.stderr.write(profile_str) + self._push_to_output_channels( + data=postped_data, + channels=output_channels_of_jump_ops, + profile_str=profile_str, + client_need_profile=need_profile_dict[data_id], + profile_set=profile_dict[data_id]) + after_outchannel_time = _time() + _LOGGER.debug( + "(data_id={}) PUSH OUTPUT CHANNEL OF JUMP OPs! op:{} push cost:{} ms". + format(data_id, self.name, (after_outchannel_time - + after_postp_time) * + 1000)) + else: + # push data to output channel. + for data_id, postped_data in postped_data_dict.items(): + if self._server_use_profile: + sys.stderr.write(profile_str) + self._push_to_output_channels( + data=postped_data, + channels=output_channels, + profile_str=profile_str, + client_need_profile=need_profile_dict[data_id], + profile_set=profile_dict[data_id]) + after_outchannel_time = _time() + _LOGGER.debug( + "(data_id={}) PUSH OUTPUT CHANNEL! op:{} push cost:{} ms". + format(data_id, self.name, (after_outchannel_time - + after_postp_time) * + 1000)) except ChannelStopError: _LOGGER.debug("{} Stop.".format(op_info_prefix)) self._finalize(is_thread_op) @@ -1410,7 +1509,7 @@ class RequestOp(Op): for idx, key in enumerate(request.key): dict_data[key] = request.value[idx] log_id = request.logid - _LOGGER.info("RequestOp unpack one request. log_id:{}, clientip:{} \ + _LOGGER.debug("RequestOp unpack one request. log_id:{}, clientip:{} \ name:{}, method:{}".format(log_id, request.clientip, request.name, request.method)) diff --git a/tools/scripts/ipipe_py3.sh b/tools/scripts/ipipe_py3.sh index f7b34a81f330e48337647f238ec250670ca7e355..98577060e2729842c756cd7f947616a2e2d6ece1 100644 --- a/tools/scripts/ipipe_py3.sh +++ b/tools/scripts/ipipe_py3.sh @@ -36,6 +36,7 @@ go get -u github.com/grpc-ecosystem/grpc-gateway/protoc-gen-grpc-gateway@v1.15.2 go get -u github.com/grpc-ecosystem/grpc-gateway/protoc-gen-swagger@v1.15.2 go get -u github.com/golang/protobuf/protoc-gen-go@v1.4.3 go get -u google.golang.org/grpc@v1.33.0 +go env -w GO111MODULE=auto build_whl_list=(build_cpu_server build_gpu_server build_client build_app) rpc_model_list=(grpc_fit_a_line grpc_yolov4 pipeline_imagenet bert_rpc_gpu bert_rpc_cpu ResNet50_rpc \