From e608c4a511a8868008e1e14c64b1f58fbf4ebf6c Mon Sep 17 00:00:00 2001 From: wangguibao Date: Wed, 13 Feb 2019 22:15:59 +0800 Subject: [PATCH] 20190213 --- CMakeLists.txt | 4 + cmake/external/pcre.cmake | 54 +++ configure/CMakeLists.txt | 4 + configure/compiler/src/idl_gram.c | 6 +- mempool/CMakeLists.txt | 4 + mempool/mempool.cpp | 81 +++++ mempool/mempool.h | 482 +++++++++++++++++++++++++ predictor/CMakeLists.txt | 16 +- predictor/common/CMakeLists.txt | 6 +- predictor/common/constant.h | 2 +- predictor/common/inner_common.h | 2 +- predictor/common/utils.h | 14 +- predictor/framework/CMakeLists.txt | 15 +- predictor/framework/bsf-inl-tensor.h | 10 +- predictor/framework/bsf.h | 8 +- predictor/framework/channel.h | 2 +- predictor/framework/dag.cpp | 12 +- predictor/framework/dag_view.cpp | 20 +- predictor/framework/dag_view.h | 6 +- predictor/framework/factory.h | 2 +- predictor/framework/infer.h | 4 +- predictor/framework/manager.h | 4 +- predictor/framework/op_repository.h | 6 +- predictor/framework/predictor_metric.h | 31 +- predictor/framework/resource.h | 10 +- predictor/framework/server.cpp | 10 +- predictor/framework/server.h | 4 +- predictor/framework/service.cpp | 14 +- predictor/framework/service.h | 12 +- predictor/framework/workflow.cpp | 8 +- predictor/op/CMakeLists.txt | 10 +- predictor/op/dense_echo_op.cpp | 2 +- predictor/op/op.cpp | 12 +- predictor/op/op.h | 4 +- predictor/op/sparse_echo_op.cpp | 2 +- predictor/op/write_json_op.cpp | 6 +- predictor/proto/CMakeLists.txt | 4 +- predictor/src/pdcodegen.cpp | 6 +- predictor/src/pdserving.cpp | 10 +- 39 files changed, 762 insertions(+), 147 deletions(-) create mode 100644 cmake/external/pcre.cmake create mode 100644 mempool/CMakeLists.txt create mode 100644 mempool/mempool.cpp create mode 100644 mempool/mempool.h diff --git a/CMakeLists.txt b/CMakeLists.txt index c4df5d42..27f9feff 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -68,6 +68,7 @@ include(external/leveldb) include(external/protobuf) include(external/snappy) include(external/brpc) +include(external/pcre) include(external/boost) include(flags) include(configure) @@ -81,6 +82,8 @@ set(EXTERNAL_LIBS glog protobuf paddlepaddle + pcre + brpc ) if(WITH_MKLML) @@ -96,4 +99,5 @@ add_subdirectory(bsl) add_subdirectory(ullib) add_subdirectory(spreg) add_subdirectory(configure) +add_subdirectory(mempool) add_subdirectory(predictor) diff --git a/cmake/external/pcre.cmake b/cmake/external/pcre.cmake new file mode 100644 index 00000000..6fab0866 --- /dev/null +++ b/cmake/external/pcre.cmake @@ -0,0 +1,54 @@ +# Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +INCLUDE(ExternalProject) + +SET(PCRE_SOURCES_DIR ${THIRD_PARTY_PATH}/pcre) +SET(PCRE_INSTALL_DIR ${THIRD_PARTY_PATH}/install/pcre) +SET(PCRE_ROOT ${PCRE_INSTALL_DIR} CACHE FILEPATH "pcre root directory." FORCE) +SET(PCRE_INCLUDE_DIR "${PCRE_INSTALL_DIR}/include" CACHE PATH "pcre include directory." FORCE) + +INCLUDE_DIRECTORIES(${PCRE_INCLUDE_DIR}) # For pcre code to include its own headers. +INCLUDE_DIRECTORIES(${THIRD_PARTY_PATH}/install) # For Paddle code to include pcre.h. + +ExternalProject_Add( + extern_pcre + ${EXTERNAL_PROJECT_LOG_ARGS} + SVN_REPOSITORY "svn://vcs.exim.org/pcre/code/tags/pcre-7.7" + PREFIX ${PCRE_SOURCES_DIR} + UPDATE_COMMAND "" + PATCH_COMMAND sh autogen.sh + CONFIGURE_COMMAND ../extern_pcre/configure --prefix=${PCRE_INSTALL_DIR} --disable-shared --with-pic + BUILD_COMMAND make + INSTALL_COMMAND make install + CMAKE_CACHE_ARGS -DCMAKE_INSTALL_PREFIX:PATH=${PCRE_INSTALL_DIR} + -DCMAKE_POSITION_INDEPENDENT_CODE:BOOL=ON + -DCMAKE_BUILD_TYPE:STRING=${THIRD_PARTY_BUILD_TYPE} +) +IF(WIN32) + IF(NOT EXISTS "${PCRE_INSTALL_DIR}/lib/libz.lib") + add_custom_command(TARGET extern_pcre POST_BUILD + COMMAND cmake -E copy ${PCRE_INSTALL_DIR}/lib/pcrestatic.lib ${PCRE_INSTALL_DIR}/lib/libz.lib + ) + ENDIF() + SET(PCRE_LIBRARIES "${PCRE_INSTALL_DIR}/lib/libpcre.lib" CACHE FILEPATH "pcre library." FORCE) +ELSE(WIN32) + SET(PCRE_LIBRARIES "${PCRE_INSTALL_DIR}/lib/libpcre.a" CACHE FILEPATH "pcre library." FORCE) +ENDIF(WIN32) + +ADD_LIBRARY(pcre STATIC IMPORTED GLOBAL) +SET_PROPERTY(TARGET pcre PROPERTY IMPORTED_LOCATION ${PCRE_LIBRARIES}) +ADD_DEPENDENCIES(pcre extern_pcre) + +LIST(APPEND external_project_dependencies pcre) diff --git a/configure/CMakeLists.txt b/configure/CMakeLists.txt index 02bb426d..41e49c72 100644 --- a/configure/CMakeLists.txt +++ b/configure/CMakeLists.txt @@ -20,6 +20,10 @@ LIST(APPEND configure_srcs ${CMAKE_CURRENT_LIST_DIR}/compiler/src/idl_gram.c ) +SET_SOURCE_FILES_PROPERTIES(${CMAKE_CURRENT_LIST_DIR}/compiler/src/idl.c + ${CMAKE_CURRENT_LIST_DIR}/compiler/src/idl_lex.c + ${CMAKE_CURRENT_LIST_DIR}/compiler/src/idl_gram.c PROPERTIES LANGUAGE CXX) + add_library(configure ${configure_srcs}) add_dependencies(configure bsl brpc ullib spreg) target_include_directories(configure PUBLIC diff --git a/configure/compiler/src/idl_gram.c b/configure/compiler/src/idl_gram.c index d738c235..609a8354 100644 --- a/configure/compiler/src/idl_gram.c +++ b/configure/compiler/src/idl_gram.c @@ -1550,11 +1550,11 @@ yyerrlab: YYSTACK_FREE (yymsg); } else - yyerror (scanner, loc, idl, "syntax error; also virtual memory exhausted"); + yyerror (scanner, loc, idl, (char *)"syntax error; also virtual memory exhausted"); } else #endif /* YYERROR_VERBOSE */ - yyerror (scanner, loc, idl, "syntax error"); + yyerror (scanner, loc, idl, (char *)"syntax error"); } @@ -1672,7 +1672,7 @@ yyabortlab: | yyoverflowlab -- parser overflow comes here. | `----------------------------------------------*/ yyoverflowlab: - yyerror (scanner, loc, idl, "parser stack overflow"); + yyerror (scanner, loc, idl, (char *)"parser stack overflow"); yyresult = 2; /* Fall through. */ #endif diff --git a/mempool/CMakeLists.txt b/mempool/CMakeLists.txt new file mode 100644 index 00000000..401e3dfe --- /dev/null +++ b/mempool/CMakeLists.txt @@ -0,0 +1,4 @@ +add_library(mempool ${CMAKE_CURRENT_LIST_DIR}/mempool.cpp) +add_dependencies(mempool brpc) +target_include_directories(mempool PUBLIC + ${CMAKE_CURRENT_BINARY_DIR}/../bsl/include) diff --git a/mempool/mempool.cpp b/mempool/mempool.cpp new file mode 100644 index 00000000..ba4a536a --- /dev/null +++ b/mempool/mempool.cpp @@ -0,0 +1,81 @@ +#include "mempool.h" + +namespace im { + +__thread Mempool* g_mempool = NULL; + +namespace fugue { + +namespace memory { + +void Region::init() { + _big_mem_capacity = 32 * 1024 * 1024; + _big_mem_start = new char[_big_mem_capacity]; +} + +void Region::reset() { + // release memory allocate from GlobalMempool + _free_blocks.unsafe_foreach(); + _free_blocks.reset(); + + // release memory from malloc + BigNode* head = _big_nodes.release(); + while (head) { + BigNode* next = head->next; + ::free(head); + head = next; + } + _mlc_mem_size.store(0, butil::memory_order_relaxed); + _mlc_mem_count.store(0, butil::memory_order_relaxed); + + // clear the large buffer + _big_mem_size.store(0, butil::memory_order_relaxed); + _big_mem_count.store(0, butil::memory_order_relaxed); + +} + +BlockReference* Region::get() { + BlockReference* ref = _free_blocks.get(); + if (ref->block == NULL) { + ref->offset = 0; + ref->block = GlobalBlockFreeList::instance()->get(); + } + return ref; +} + +void Region::put(BlockReference* block) { + _free_blocks.put(block); +} + +void* Region::malloc(size_t size) { + if (size < MLC_MEM_THRESHOLD) { + uint32_t offset = _big_mem_size.fetch_add(size, butil::memory_order_relaxed); + if (offset + size < _big_mem_capacity) { + _big_mem_count.fetch_add(1, butil::memory_order_relaxed); + return _big_mem_start + offset; + } + } + + _mlc_mem_size.fetch_add(size, butil::memory_order_relaxed); + _mlc_mem_count.fetch_add(1, butil::memory_order_relaxed); + BigNode* node = (BigNode*)::malloc(sizeof(BigNode) + size); + _big_nodes.push(node); + return node->data; +} + +Region::Region() { + _big_mem_size.store(0, butil::memory_order_relaxed); + _big_mem_count.store(0, butil::memory_order_relaxed); + + _big_mem_start = NULL; + _big_mem_capacity = 0; + + _mlc_mem_size.store(0, butil::memory_order_relaxed); + _mlc_mem_count.store(0, butil::memory_order_relaxed); +} + +} + +} + +} diff --git a/mempool/mempool.h b/mempool/mempool.h new file mode 100644 index 00000000..af80ebd4 --- /dev/null +++ b/mempool/mempool.h @@ -0,0 +1,482 @@ +#ifndef APP_ECOM_IM_MEMPOOL_SRC_MEMPOOL_H +#define APP_ECOM_IM_MEMPOOL_SRC_MEMPOOL_H + +#include +#include +#include +#include +#include +#include // for bsl::mempool +#include +#include +#include + +namespace im { +namespace fugue { + +namespace lockfree { + +template +class PushOnlyStack { +public: + PushOnlyStack() { + _head.store(NULL, butil::memory_order_relaxed); + } + + void push(T* node) { + T* head = _head.load(butil::memory_order_relaxed); + node->next = head; + while (!_head.compare_exchange_weak( + head, node, butil::memory_order_relaxed)) { + node->next = head; + } + } + + T* release() { + return _head.exchange(NULL, butil::memory_order_relaxed); + } + +private: + butil::atomic _head; +}; + +template +struct FreeListNode { + uint64_t id; + uint64_t next; + T data; +}; + +template +class FreeList { +public: + typedef FreeListNode Node; + static const uint64_t EMPTY = 0xFFFFFFFFFFFFFFFF; + + T* get() { + uint64_t head = _head.load(butil::memory_order_acquire); + if (head == EMPTY) { + return new_node(); + } + + Node* node = address(head); + while (!_head.compare_exchange_weak( + head, node->next, butil::memory_order_acquire)) { + if (head == EMPTY) { + return new_node(); + } + node = address(head); + } + return &node->data; + } + + void put(T* value) { + Node* node = container_of(value, Node, data); + + uint64_t head = _head.load(butil::memory_order_acquire); + // add version + node->id += (1UL << 32); + node->next = head; + + // NOTE: we MUST use a temp var *head* to call compare_exchange_weak + // because Boost.Atomic will update the *expected* even success + // std::atomic do not have this limitation + while (!_head.compare_exchange_weak( + head, node->id, butil::memory_order_release)) { + node->next = head; + } + } + + template + void unsafe_foreach() { + uint32_t used_blk_cnt = _slot_index.load(butil::memory_order_relaxed); + for (uint32_t i = 0; i < used_blk_cnt; ++i) { + F()(&_node[i]->data); + } + } + + uint32_t real_used_size() const { + uint32_t used_blk_cnt = _slot_index.load(butil::memory_order_relaxed); + uint64_t used_bytes = 0; + for (uint32_t i = 0; i < used_blk_cnt; ++i) { + used_bytes += _node[i]->data.offset; + } + return used_bytes >> 10; + } + + uint32_t allocate_blocks() const { + return _slot_index.load(butil::memory_order_relaxed); + } + + uint32_t free_blocks() const { + uint64_t head = _head.load(butil::memory_order_relaxed); + uint32_t size = 0; + while (head != FreeList::EMPTY) { + const Node* head_ptr = address(head); + head = head_ptr->next; + ++size; + } + return size; + } + + void reset() { + _head.store(FreeList::EMPTY, butil::memory_order_relaxed); + _slot_index.store(0, butil::memory_order_relaxed); + } + + FreeList() { + for (int i = 0; i < CAP; ++i) { + _node[i] = NULL; + } + reset(); + } + +private: + uint32_t slot(uint64_t id) const { + return static_cast(id); + } + + T* new_node() { + uint32_t index = _slot_index.fetch_add(1, butil::memory_order_relaxed); + if (index >= CAP) { + return NULL; + } + + if (_node[index] != NULL) { + return &(_node[index]->data); + } + + Node* node = (Node*)malloc(sizeof(Node)); + new (node) Node; + + node->id = index; + _node[index] = node; + + return &node->data; + } + + Node* address(uint64_t id) { + return _node[slot(id)]; + } + + const Node* address(uint64_t id) const { + return _node[slot(id)]; + } + + butil::atomic _head; + butil::atomic _slot_index; + Node* _node[CAP]; +}; + +} + +namespace memory { + +struct Block { + static const int BLOCK_SIZE = 2 * 1024 * 1024; + char data[BLOCK_SIZE]; +}; + +class GlobalBlockFreeList { +public: + static const int MAX_BLOCK_COUNT = 32 * 1024; + typedef lockfree::FreeList type; + static type* instance() { + static type singleton; + return &singleton; + } +}; + +struct BlockReference { + BlockReference() : offset(0), block(NULL) { + // do nothing + } + + void reset() { + offset = 0; + block = NULL; + } + + uint32_t offset; + Block* block; +}; + +class Region { +public: + struct GlobalPut { + void operator()(BlockReference* block_ref) { + if (block_ref->block != NULL) { + GlobalBlockFreeList::instance()->put(block_ref->block); + } + block_ref->reset(); + } + }; + + struct BigNode { + BigNode* next; + char data[0]; + }; + + ~Region() { + reset(); + delete [] _big_mem_start; + _big_mem_start = NULL; + } + + char const* debug_str() const { + _debug_str.clear(); + uint32_t alloc_blocks = _free_blocks.allocate_blocks(); + uint32_t free_blocks = _free_blocks.free_blocks(); + uint32_t used_mem_mb = _free_blocks.real_used_size(); + uint32_t big_buf_size = _big_mem_size.load(butil::memory_order_relaxed); + uint32_t big_buf_count = _big_mem_count.load(butil::memory_order_relaxed); + uint32_t mlc_mem_size = _mlc_mem_size.load(butil::memory_order_relaxed); + uint32_t mlc_mem_count = _mlc_mem_count.load(butil::memory_order_relaxed); + + _debug_str.appendf("[alloc_blks:%u,free_blks:%u,used_mem_kb:%u," + "big_mem_kb:%u,big_buf_cnt:%u,mlc_mem_kb:%u,mlc_cnt:%u]", + alloc_blocks, free_blocks, used_mem_mb, big_buf_size >> 10, + big_buf_count, mlc_mem_size >> 10, mlc_mem_count); + return _debug_str.c_str(); + } + + Region(); + + void init(); + + void reset(); + + BlockReference* get(); + + void* malloc(size_t size); + + void put(BlockReference* block); + + static const int MAX_BLOCK_COUNT = 1024; + static const int BIG_MEM_THRESHOLD = 256 * 1024; + static const int MLC_MEM_THRESHOLD = 4 * 1024 * 1024; + static const int COUNTER_SIZE = MLC_MEM_THRESHOLD / BIG_MEM_THRESHOLD + 1; + +private: + lockfree::FreeList _free_blocks; + lockfree::PushOnlyStack _big_nodes; + + butil::atomic _big_mem_size; + butil::atomic _big_mem_count; + + char* _big_mem_start; + uint32_t _big_mem_capacity; + + butil::atomic _mlc_mem_size; + butil::atomic _mlc_mem_count; + + bsl::string mutable _debug_str; +}; + +} + +} + +class Mempool { +public: + void* malloc(size_t size) { + size = _align(size); + if (size <= _free_size) { + void* p = _free_cursor; + _free_size -= size; + _free_cursor += size; + return p; + } + + return malloc_from_region(size); + } + + void free(void* p, size_t size) { + if (size >= fugue::memory::Region::BIG_MEM_THRESHOLD) { + return; + } + + if (_free_cursor - size == static_cast(p)) { + size_t down_aligned = _down_align(size); + _free_cursor -= down_aligned; + _free_size += down_aligned; + } + } + + void* realloc(void* old_data, size_t old_size, size_t new_size) { + if (old_size >= new_size) { + return old_data; + } + + size_t required = new_size - old_size; + if (_free_cursor == static_cast(old_data) + old_size) { + if (_free_size >= required) { + _free_cursor += required; + _free_size -= required; + return old_data; + } else { + _free_cursor = static_cast(old_data); + _free_size += old_size; + } + } + + void* p = this->malloc_from_region(new_size); + if (p != NULL) { + memcpy(p, old_data, old_size); + return p; + } + + return NULL; + } + + Mempool(fugue::memory::Region* blocks) : _free_size(0) + , _free_cursor(NULL) + , _blocks(blocks) { + _block = NULL; + } + + ~Mempool() { + release_block(); + } + + void release_block() { + if (_block) { + _block->offset = fugue::memory::Block::BLOCK_SIZE - _free_size; + _blocks->put(_block); + } + + _free_size = 0; + _free_cursor = NULL; + _block = NULL; + } + +private: + void* malloc_from_region(size_t size) { + if (size >= fugue::memory::Region::BIG_MEM_THRESHOLD) { + return _blocks->malloc(size); + } + + while (true) { + fugue::memory::BlockReference* block = _blocks->get(); + if (block == NULL) { + return NULL; + } + + uint32_t free_size = fugue::memory::Block::BLOCK_SIZE - block->offset; + if (size <= free_size) { + if (_block) { + _block->offset = fugue::memory::Block::BLOCK_SIZE - _free_size; + } + + char* p = block->block->data + block->offset; + _free_size = free_size - size; + _free_cursor = p + size; + _block = block; + return p; + } + } + return _blocks->malloc(size); + } + + static const int ALIGN_SIZE = sizeof(void*); + + inline size_t _align(size_t size) const { + return (size + (ALIGN_SIZE - 1)) & ~(ALIGN_SIZE - 1); + } + + inline size_t _down_align(size_t size) const { + return size & ~(ALIGN_SIZE - 1); + } + + size_t _free_size; + char* _free_cursor; + + fugue::memory::Region* _blocks; + fugue::memory::BlockReference* _block; +}; + +extern __thread Mempool* g_mempool; + +class GlobalMempool : public bsl::mempool { +public: + GlobalMempool() { + // do nothing; + } + + virtual ~GlobalMempool() { + // do nothing; + } + + static GlobalMempool* instance() { + static GlobalMempool singleton; + return &singleton; + } + + void reset(Mempool* mempool) { + g_mempool = mempool; + } + + void* malloc(size_t size) { + return g_mempool->malloc(size); + } + + void* realloc(void* old_data, size_t old_size, size_t new_size) { + return g_mempool->realloc(old_data, old_size, new_size); + } + + void free(void* p, size_t s) { + g_mempool->free(p, s); + } + + void clear() { + g_mempool->release_block(); + } + + Mempool* get() { + return g_mempool; + } + +}; + +class MempoolGuard { +public: + MempoolGuard(fugue::memory::Region* region) : _mempool(region) { + acquire(); + } + + void acquire() { + _saved_mempool = g_mempool; + g_mempool = &_mempool; + } + + void release() { + _mempool.release_block(); + g_mempool = _saved_mempool; + } + + ~MempoolGuard() { + release(); + } + +private: + Mempool _mempool; + Mempool* _saved_mempool; +}; + +inline std::string print_trace() { + const static int BT_BUF_SIZE = 400; + std::stringstream debug_stream; + + void* buffer[BT_BUF_SIZE]; + int nptrs = backtrace(buffer, BT_BUF_SIZE); + char** strings = backtrace_symbols(buffer, nptrs); + + for (int j = 0; j < nptrs; j++) { + debug_stream << strings[j] << "\t"; + } + + return debug_stream.str(); +} + +} +#endif diff --git a/predictor/CMakeLists.txt b/predictor/CMakeLists.txt index 7a785326..a55b17a4 100644 --- a/predictor/CMakeLists.txt +++ b/predictor/CMakeLists.txt @@ -2,5 +2,17 @@ include(proto/CMakeLists.txt) include(common/CMakeLists.txt) include(op/CMakeLists.txt) include(framework/CMakeLists.txt) -add_library(predictor ${predictor_SOURCES}) -add_dependencies(predictor proto op protobuf boost brpc) +add_library(predictor ${predictor_srcs}) +set_source_files_properties( + ${predictor_srcs} + PROPERTIES + COMPILE_FLAGS "-Wno-non-virtual-dtor -Wno-error=non-virtual-dtor -Wno-error=delete-non-virtual-dtor") +add_dependencies(predictor protobuf boost brpc) +target_include_directories(predictor PUBLIC + ${CMAKE_CURRENT_LIST_DIR}/ + ${CMAKE_CURRENT_BINARY_DIR}/ + ${CMAKE_CURRENT_LIST_DIR}/../configure + ${CMAKE_CURRENT_LIST_DIR}/../mempool + ${CMAKE_CURRENT_LIST_DIR}/../spreg + ${CMAKE_CURRENT_LIST_DIR}/../ullib/include + ${CMAKE_CURRENT_BINARY_DIR}/../bsl/include) diff --git a/predictor/common/CMakeLists.txt b/predictor/common/CMakeLists.txt index 9708b7ed..7530df97 100644 --- a/predictor/common/CMakeLists.txt +++ b/predictor/common/CMakeLists.txt @@ -1,4 +1,2 @@ -add_library(common - ${CMAKE_CURRENT_LIST_DIR}/constant.cpp - ) -add_dependencies(common brpc configure) +FILE(GLOB common_srcs ${CMAKE_CURRENT_LIST_DIR}/*.cpp) +LIST(APPEND predictor_srcs ${common_srcs}) diff --git a/predictor/common/constant.h b/predictor/common/constant.h index 5b14dd03..e64353e5 100644 --- a/predictor/common/constant.h +++ b/predictor/common/constant.h @@ -30,7 +30,7 @@ DECLARE_bool(enable_model_toolkit); DECLARE_string(enable_protocol_list); // STATIC Variables -static const char* START_OP_NAME = "startup_op"; +const char* START_OP_NAME = "startup_op"; // ERRORCODE enum { diff --git a/predictor/common/inner_common.h b/predictor/common/inner_common.h index 062f7e4f..2010063d 100644 --- a/predictor/common/inner_common.h +++ b/predictor/common/inner_common.h @@ -26,7 +26,7 @@ #include #include "Configure.h" -#include +// #include #include "common/utils.h" #include "common/types.h" diff --git a/predictor/common/utils.h b/predictor/common/utils.h index cc8df877..a1f7e341 100644 --- a/predictor/common/utils.h +++ b/predictor/common/utils.h @@ -24,7 +24,7 @@ public: } TimerFlow(const char* name) : _csize(0), _name(name) { - _last = _start = base::cpuwide_time_us(); + _last = _start = butil::cpuwide_time_us(); _auto = true; _started = true; } @@ -34,7 +34,7 @@ public: } void start() { - _last = _start = base::cpuwide_time_us(); + _last = _start = butil::cpuwide_time_us(); _started = true; } @@ -43,12 +43,12 @@ public: LOG(WARNING) << "Timer not started yet!"; return false; } - uint64_t now = base::cpuwide_time_us(); + uint64_t now = butil::cpuwide_time_us(); if (!appendf("%s:%lu|", tag, now - _last)) { LOG(WARNING) << "Failed check timer: " << _name << ", value = [" << tag << ":" - << (now - _last) << "]!" << noflush; + << (now - _last) << "]!"; return false; } @@ -61,7 +61,7 @@ public: } void end() { - uint64_t now = base::cpuwide_time_us(); + uint64_t now = butil::cpuwide_time_us(); if (!appendf("total:%lu", now - _start)) { LOG(WARNING) << "Failed dump time_info[" << _name << "]"; } @@ -72,7 +72,7 @@ public: if (!_auto) { return; } - uint64_t now = base::cpuwide_time_us(); + uint64_t now = butil::cpuwide_time_us(); if (appendf("total:%lu,%s", now - _start, _name)) { LOG(INFO) << " " << _name << "_tc=[" << _buf << "]"; @@ -88,7 +88,7 @@ private: try { int bytes = vsnprintf(_buf + _csize, MAX_SIZE - _csize, fmt, ap); if (bytes >= MAX_SIZE - _csize || bytes < 0) { - LOG(WARNING) << "Overflow when appendf!" << noflush; + LOG(WARNING) << "Overflow when appendf!"; return false; } _csize += bytes; diff --git a/predictor/framework/CMakeLists.txt b/predictor/framework/CMakeLists.txt index bc9248b3..29b29be4 100644 --- a/predictor/framework/CMakeLists.txt +++ b/predictor/framework/CMakeLists.txt @@ -1,13 +1,2 @@ -list(APPEND predictor_SOURCES - ${CMAKE_CURRENT_LIST_DIR}/dag.cpp - ${CMAKE_CURRENT_LIST_DIR}/dag_view.cpp - ${CMAKE_CURRENT_LIST_DIR}/mc_cache.cpp - ${CMAKE_CURRENT_LIST_DIR}/memory.cpp - ${CMAKE_CURRENT_LIST_DIR}/op_repository.cpp - ${CMAKE_CURRENT_LIST_DIR}/predictor_metric.cpp - ${CMAKE_CURRENT_LIST_DIR}/resource.cpp - ${CMAKE_CURRENT_LIST_DIR}/server.cpp - ${CMAKE_CURRENT_LIST_DIR}/service.cpp - ${CMAKE_CURRENT_LIST_DIR}/workflow.cpp - ) -INCLUDE_DIRECTORIES(.) +FILE(GLOB framework_srcs ${CMAKE_CURRENT_LIST_DIR}/*.cpp) +LIST(APPEND predictor_srcs ${framework_srcs}) diff --git a/predictor/framework/bsf-inl-tensor.h b/predictor/framework/bsf-inl-tensor.h index 8134923c..8b7bc42c 100644 --- a/predictor/framework/bsf-inl-tensor.h +++ b/predictor/framework/bsf-inl-tensor.h @@ -3,7 +3,7 @@ #include #include #include -#include +#include #include #include "common/inner_common.h" #include "framework/infer_data.h" @@ -46,7 +46,7 @@ struct Task index; + butil::atomic index; const BatchTensor* get(bool is_in) const { if (is_in) { @@ -72,7 +72,7 @@ struct Taskwrite_fd, &c, 1) != 1 && errno == EINTR) { ; } - base::return_object(task); + butil::return_object(task); } } } diff --git a/predictor/framework/bsf.h b/predictor/framework/bsf.h index 2ff6e120..3665968d 100644 --- a/predictor/framework/bsf.h +++ b/predictor/framework/bsf.h @@ -4,7 +4,7 @@ #include #include #include -#include +#include #include #include "common/inner_common.h" @@ -38,7 +38,7 @@ struct Task { return in->size(); } - base::atomic index; + butil::atomic index; Task() { read_fd = -1; @@ -48,7 +48,7 @@ struct Task { out = NULL; rem = -1; size = -1; - index.store(0, base::memory_order_relaxed); + index.store(0, butil::memory_order_relaxed); } }; @@ -148,7 +148,7 @@ public: while (write(task->write_fd, &c, 1) != 1 && errno == EINTR) { ; } - base::return_object(task); + butil::return_object(task); } } } diff --git a/predictor/framework/channel.h b/predictor/framework/channel.h index fb1d8eb4..b38ef157 100644 --- a/predictor/framework/channel.h +++ b/predictor/framework/channel.h @@ -76,7 +76,7 @@ public: if (bus->regist(_op, this) != 0) { LOG(FATAL) << "Failed regist channel[" << _op - << "] to bus!" << noflush; + << "] to bus!"; return -1; } diff --git a/predictor/framework/dag.cpp b/predictor/framework/dag.cpp index ce4e0dfd..823d5068 100644 --- a/predictor/framework/dag.cpp +++ b/predictor/framework/dag.cpp @@ -137,21 +137,21 @@ int Dag::init(const comcfg::Configure& conf, const std::string& name) { } if (FLAGS_el_log_level == 16) { - LOG(DEBUG) << "DAG: " << _dag_name << noflush; - LOG(DEBUG) << ", Op Num: " << _index_nodes.size(); + LOG(INFO) << "DAG: " << _dag_name << noflush; + LOG(INFO) << ", Op Num: " << _index_nodes.size(); for (uint32_t nid = 0; nid < _index_nodes.size(); nid++) { DagNode* node = _index_nodes[nid]; - LOG(DEBUG) + LOG(INFO) << ", OP-" << node->id << "-" << node->name << "-" << node->type << noflush; - LOG(DEBUG) << " depends: " << node->depends.size() << noflush; + LOG(INFO) << " depends: " << node->depends.size() << noflush; boost::unordered_map::iterator it; for (it = node->depends.begin(); it != node->depends.end(); it++) { - LOG(DEBUG) << " " << it->first << " " << it->second << noflush; + LOG(INFO) << " " << it->first << " " << it->second << noflush; } } - LOG(DEBUG) << ""; + LOG(INFO) << ""; } return ERR_OK; diff --git a/predictor/framework/dag_view.cpp b/predictor/framework/dag_view.cpp index af0527c4..3b2ced6b 100644 --- a/predictor/framework/dag_view.cpp +++ b/predictor/framework/dag_view.cpp @@ -10,7 +10,7 @@ namespace predictor { int DagView::init(Dag* dag, const std::string& service_name) { _name = dag->name(); _full_name = service_name + NAME_DELIMITER + dag->name(); - _bus = base::get_object(); + _bus = butil::get_object(); _bus->clear(); uint32_t stage_size = dag->stage_size(); // create tls stage view @@ -20,7 +20,7 @@ int DagView::init(Dag* dag, const std::string& service_name) { LOG(FATAL) << "Failed get stage by index:" << si; return ERR_INTERNAL_FAILURE; } - ViewStage* vstage = base::get_object(); + ViewStage* vstage = butil::get_object(); if (vstage == NULL) { LOG(FATAL) << "Failed get vstage from object pool" @@ -32,7 +32,7 @@ int DagView::init(Dag* dag, const std::string& service_name) { // create tls view node for (uint32_t ni = 0; ni < node_size; ni++) { DagNode* node = stage->nodes[ni]; - ViewNode* vnode = base::get_object(); + ViewNode* vnode = butil::get_object(); if (vnode == NULL) { LOG(FATAL) << "Failed get vnode at:" << ni; return ERR_MEM_ALLOC_FAILURE; @@ -72,19 +72,19 @@ int DagView::deinit() { OpRepository::instance().return_op(vnode->op); vnode->reset(); // clear item - base::return_object(vnode); + butil::return_object(vnode); } // clear vector vstage->nodes.clear(); - base::return_object(vstage); + butil::return_object(vstage); } _view.clear(); _bus->clear(); - base::return_object(_bus); + butil::return_object(_bus); return ERR_OK; } -int DagView::execute(base::IOBufBuilder* debug_os) { +int DagView::execute(butil::IOBufBuilder* debug_os) { uint32_t stage_size = _view.size(); for (uint32_t si = 0; si < stage_size; si++) { TRACEPRINTF("start to execute stage[%u]", si); @@ -104,8 +104,8 @@ int DagView::execute(base::IOBufBuilder* debug_os) { // You can derive a subclass to implement this func. // ParallelDagView maybe the one you want. int DagView::execute_one_stage(ViewStage* vstage, - base::IOBufBuilder* debug_os) { - base::Timer stage_time(base::Timer::STARTED); + butil::IOBufBuilder* debug_os) { + butil::Timer stage_time(butil::Timer::STARTED); uint32_t node_size = vstage->nodes.size(); for (uint32_t ni = 0; ni < node_size; ni++) { ViewNode* vnode = vstage->nodes[ni]; @@ -121,7 +121,7 @@ int DagView::execute_one_stage(ViewStage* vstage, } if (errcode > 0) { - LOG(TRACE) + LOG(INFO) << "Execute ignore, Op:" << op->debug_string(); continue; } diff --git a/predictor/framework/dag_view.h b/predictor/framework/dag_view.h index 29efe599..37295419 100644 --- a/predictor/framework/dag_view.h +++ b/predictor/framework/dag_view.h @@ -41,13 +41,13 @@ public: int deinit(); - int execute(base::IOBufBuilder* debug_os); + int execute(butil::IOBufBuilder* debug_os); // The default execution strategy is in sequencing // You can derive a subclass to implement this func. // ParallelDagView maybe the one you want. virtual int execute_one_stage(ViewStage* vstage, - base::IOBufBuilder* debug_os); + butil::IOBufBuilder* debug_os); int set_request_channel(Channel& request); @@ -72,7 +72,7 @@ private: // strategy, by implments the execute_one_stage(). class ParallelDagView : public DagView { public: - int execute_one_stage(ViewStage* vstage, base::IOBufBuilder*) { + int execute_one_stage(ViewStage* vstage, butil::IOBufBuilder*) { return 0; } }; diff --git a/predictor/framework/factory.h b/predictor/framework/factory.h index b3aabd35..82a945e9 100644 --- a/predictor/framework/factory.h +++ b/predictor/framework/factory.h @@ -127,7 +127,7 @@ public: return -1; } - LOG(TRACE) << "Succ insert one factory, tag: " << tag + LOG(INFO) << "Succ insert one factory, tag: " << tag << ", base type: " << typeid(B).name(); return 0; diff --git a/predictor/framework/infer.h b/predictor/framework/infer.h index b93cca5e..3f5acf58 100644 --- a/predictor/framework/infer.h +++ b/predictor/framework/infer.h @@ -833,7 +833,7 @@ public: iter->first; return -1; } - LOG(DEBUG) << "Succ thrd clear version engine: " << iter->first; + LOG(INFO) << "Succ thrd clear version engine: " << iter->first; } return 0; } @@ -1114,7 +1114,7 @@ public: return -1; } version = infer_engine->version(); - LOG(DEBUG) << "Succ get version: " << version << " for model: " + LOG(INFO) << "Succ get version: " << version << " for model: " << model; return 0; } diff --git a/predictor/framework/manager.h b/predictor/framework/manager.h index ef7bbb88..43dfe0fd 100644 --- a/predictor/framework/manager.h +++ b/predictor/framework/manager.h @@ -72,7 +72,7 @@ public: return -1; } - LOG(TRACE) + LOG(INFO) << "Succ init item:" << name << " from conf:" << path << "/" << file << ", at:" << ii << "!"; } @@ -127,7 +127,7 @@ public: } } - LOG(TRACE) << "Finish reload " + LOG(INFO) << "Finish reload " << _item_map.size() << " " << T::tag() << "(s)"; return ret; diff --git a/predictor/framework/op_repository.h b/predictor/framework/op_repository.h index ebea2260..4ecf2806 100644 --- a/predictor/framework/op_repository.h +++ b/predictor/framework/op_repository.h @@ -22,11 +22,11 @@ template class OpFactory : public Factory { public: Op* get_op() { - return base::get_object(); + return butil::get_object(); } void return_op(Op* op) { - base::return_object(dynamic_cast(op)); + butil::return_object(dynamic_cast(op)); } static OpFactory& instance() { @@ -50,7 +50,7 @@ public: template void regist_op(std::string op_type) { _repository[op_type] = &OpFactory::instance(); - LOG(TRACE) << "Succ regist op: " << op_type << "!"; + LOG(INFO) << "Succ regist op: " << op_type << "!"; } Op* get_op(std::string op_type); diff --git a/predictor/framework/predictor_metric.h b/predictor/framework/predictor_metric.h index e0cbae66..3a3afd0b 100644 --- a/predictor/framework/predictor_metric.h +++ b/predictor/framework/predictor_metric.h @@ -2,9 +2,9 @@ #define BAIDU_PADDLE_SERVING_PREDICTOR_FRAMEWORK_PREDICTOR_METRIC_H #include // bvar -#include // BAIDU_SCOPED_LOCK -#include // FlatMap -#include // DefaultSingletonTraits +#include // BAIDU_SCOPED_LOCK +#include // FlatMap +#include // DefaultSingletonTraits namespace baidu { namespace paddle_serving { @@ -30,6 +30,7 @@ public: inline AdderWindowMetric& operator<<(int count) { sum << count; + return *this; } public: @@ -76,6 +77,7 @@ public: inline AvgWindowMetric& operator<<(int64_t value) { avg << value; + return *this; } public: @@ -93,6 +95,7 @@ public: inline AvgDoubleWindowMetric& operator<<(int64_t value) { recorder << value; + return *this; } public: @@ -110,31 +113,31 @@ public: static PredictorMetric* GetInstance(); ~PredictorMetric() { - for (::base::FlatMap::iterator iter + for (::butil::FlatMap::iterator iter = latency_recorder_map.begin(); iter != latency_recorder_map.end(); ++iter) { delete iter->second; } - for (::base::FlatMap::iterator iter + for (::butil::FlatMap::iterator iter = adder_window_map.begin(); iter != adder_window_map.end(); ++iter) { delete iter->second; } - for (::base::FlatMap::iterator iter + for (::butil::FlatMap::iterator iter = avg_window_map.begin(); iter != avg_window_map.end(); ++iter) { delete iter->second; } - for (::base::FlatMap::iterator iter + for (::butil::FlatMap::iterator iter = avg_double_window_map.begin(); iter != avg_double_window_map.end(); ++iter) { delete iter->second; } - for (::base::FlatMap::iterator iter + for (::butil::FlatMap::iterator iter = rate_map.begin(); iter != rate_map.end(); ++iter) { @@ -268,14 +271,14 @@ private: private: const size_t bucket_count; - ::base::FlatMap latency_recorder_map; - ::base::FlatMap adder_window_map; - ::base::FlatMap avg_window_map; - ::base::FlatMap avg_double_window_map; - ::base::FlatMap rate_map; + ::butil::FlatMap latency_recorder_map; + ::butil::FlatMap adder_window_map; + ::butil::FlatMap avg_window_map; + ::butil::FlatMap avg_double_window_map; + ::butil::FlatMap rate_map; friend struct DefaultSingletonTraits; - mutable base::Mutex _mutex; + mutable butil::Mutex _mutex; DISALLOW_COPY_AND_ASSIGN(PredictorMetric); }; diff --git a/predictor/framework/resource.h b/predictor/framework/resource.h index 4c1be358..a21bfa1e 100644 --- a/predictor/framework/resource.h +++ b/predictor/framework/resource.h @@ -3,7 +3,6 @@ #include "common/inner_common.h" #include "framework/memory.h" -#include "framework/mc_cache.h" // McCache namespace baidu { namespace paddle_serving { @@ -23,9 +22,7 @@ struct DynamicResource { class Resource { public: - Resource() : - _mc_cache(NULL) { - } + Resource() : {} ~Resource() { finalize(); } @@ -48,10 +45,6 @@ public: return (DynamicResource*) THREAD_GETSPECIFIC(_tls_bspec_key); } - McCache* get_mc_cache() { - return _mc_cache; - } - private: int thread_finalize() { return 0; @@ -59,7 +52,6 @@ private: THREAD_KEY_T _tls_bspec_key; - McCache* _mc_cache; }; } // predictor diff --git a/predictor/framework/server.cpp b/predictor/framework/server.cpp index c437aaf2..88312b8a 100644 --- a/predictor/framework/server.cpp +++ b/predictor/framework/server.cpp @@ -15,7 +15,7 @@ namespace predictor { volatile bool ServerManager::_s_reload_starting = true; bool ServerManager::_compare_string_piece_without_case( - const base::StringPiece& s1, const char* s2) { + const butil::StringPiece& s1, const char* s2) { if (strlen(s2) != s1.size()) { return false; } @@ -91,7 +91,7 @@ int ServerManager::start_and_wait() { } void ServerManager::_set_server_option_by_protocol( - const ::base::StringPiece& protocol_type) { + const ::butil::StringPiece& protocol_type) { std::string enabled_protocols = FLAGS_enable_protocol_list; if (_compare_string_piece_without_case(protocol_type, "itp")) { _options.nshead_service = new ::baidu::rpc::policy::ItpAdaptor; @@ -129,10 +129,10 @@ int ServerManager::_wait_reloader() { } void* ServerManager::_reload_worker(void* args) { - LOG(TRACE) << "Entrence reload worker, " + LOG(INFO) << "Entrence reload worker, " << "interval_s: " << FLAGS_reload_interval_s; while (ServerManager::reload_starting()) { - LOG(TRACE) << "Begin reload framework..."; + LOG(INFO) << "Begin reload framework..."; if (Resource::instance().reload() != 0) { LOG(FATAL) << "Failed reload resource!"; } @@ -144,7 +144,7 @@ void* ServerManager::_reload_worker(void* args) { usleep(FLAGS_reload_interval_s * 1000000); } - LOG(TRACE) << "Exit reload worker!"; + LOG(INFO) << "Exit reload worker!"; return NULL; } diff --git a/predictor/framework/server.h b/predictor/framework/server.h index 09e297e3..aa000bd3 100644 --- a/predictor/framework/server.h +++ b/predictor/framework/server.h @@ -37,9 +37,9 @@ private: static void* _reload_worker(void* args); bool _compare_string_piece_without_case( - const base::StringPiece& s1, const char* s2); + const butil::StringPiece& s1, const char* s2); - void _set_server_option_by_protocol(const ::base::StringPiece& protocol_type); + void _set_server_option_by_protocol(const ::butil::StringPiece& protocol_type); baidu::rpc::ServerOptions _options; baidu::rpc::Server _server; diff --git a/predictor/framework/service.cpp b/predictor/framework/service.cpp index c8b67f3e..678ec062 100644 --- a/predictor/framework/service.cpp +++ b/predictor/framework/service.cpp @@ -2,7 +2,7 @@ #include "framework/channel.h" #include "common/constant.h" #include "framework/service.h" -#include // base::Timer +#include // butil::Timer #include "framework/server.h" #include "framework/dag_view.h" #include "framework/manager.h" @@ -116,7 +116,7 @@ int InferService::init(const comcfg::ConfigUnit& conf) { } } - LOG(TRACE) + LOG(INFO) << "Succ load infer_service: " << _infer_service_format << "!"; @@ -135,7 +135,7 @@ const std::string& InferService::name() const { int InferService::inference( const google::protobuf::Message* request, google::protobuf::Message* response, - base::IOBufBuilder* debug_os) { + butil::IOBufBuilder* debug_os) { TRACEPRINTF("start to inference"); // when funtion call begins, framework will reset @@ -188,7 +188,7 @@ int InferService::inference( int InferService::debug( const google::protobuf::Message* request, google::protobuf::Message* response, - base::IOBufBuilder* debug_os) { + butil::IOBufBuilder* debug_os) { return inference(request, response, debug_os); } @@ -196,7 +196,7 @@ int InferService::execute_one_workflow( uint32_t index, const google::protobuf::Message* request, google::protobuf::Message* response, - base::IOBufBuilder* debug_os) { + butil::IOBufBuilder* debug_os) { if (index >= _flows.size()) { LOG(FATAL) << "Faield execute workflow, index: " << index << " >= max:" << _flows.size(); @@ -210,8 +210,8 @@ int InferService::_execute_workflow( Workflow* workflow, const google::protobuf::Message* request, google::protobuf::Message* response, - base::IOBufBuilder* debug_os) { - base::Timer workflow_time(base::Timer::STARTED); + butil::IOBufBuilder* debug_os) { + butil::Timer workflow_time(butil::Timer::STARTED); // create and submit beginer channel BuiltinChannel req_channel; req_channel.init(0, START_OP_NAME); diff --git a/predictor/framework/service.h b/predictor/framework/service.h index e8c73f9f..1fdeb01b 100644 --- a/predictor/framework/service.h +++ b/predictor/framework/service.h @@ -42,24 +42,24 @@ public: virtual int inference( const google::protobuf::Message* request, google::protobuf::Message* response, - base::IOBufBuilder* debug_os = NULL); + butil::IOBufBuilder* debug_os = NULL); int debug( const google::protobuf::Message* request, google::protobuf::Message* response, - base::IOBufBuilder* debug_os); + butil::IOBufBuilder* debug_os); int execute_one_workflow( uint32_t index, const google::protobuf::Message* request, google::protobuf::Message* response, - base::IOBufBuilder* debug_os); + butil::IOBufBuilder* debug_os); private: int _execute_workflow( Workflow* workflow, const google::protobuf::Message* request, google::protobuf::Message* response, - base::IOBufBuilder* debug_os); + butil::IOBufBuilder* debug_os); std::vector* _map_request_to_workflow(const google::protobuf::Message* request); @@ -69,7 +69,7 @@ private: uint64_t _last_change_timestamp; bool _enable_map_request_to_workflow; std::string _request_field_key; - ::base::FlatMap > _request_to_workflow_map; + ::butil::FlatMap > _request_to_workflow_map; IMerger* _merger; }; @@ -79,7 +79,7 @@ public: int inference( const google::protobuf::Message* request, google::protobuf::Message* response, - base::IOBufBuilder* debug_os) { + butil::IOBufBuilder* debug_os) { return 0; } }; diff --git a/predictor/framework/workflow.cpp b/predictor/framework/workflow.cpp index 20eccb8f..4c343179 100644 --- a/predictor/framework/workflow.cpp +++ b/predictor/framework/workflow.cpp @@ -29,9 +29,9 @@ int Workflow::init(const comcfg::ConfigUnit& conf) { DagView* Workflow::fetch_dag_view(const std::string& service_name) { DagView* view = NULL; if (_type == "Sequence") { - view = base::get_object(); + view = butil::get_object(); } else if (_type == "Parallel") { - view = base::get_object(); + view = butil::get_object(); } else { LOG(FATAL) << "Unknown dag type:" << _type << "!"; @@ -48,9 +48,9 @@ DagView* Workflow::fetch_dag_view(const std::string& service_name) { void Workflow::return_dag_view(DagView* view) { view->deinit(); if (_type == "Sequence") { - base::return_object(view); + butil::return_object(view); } else if (_type == "Parallel") { - base::return_object( + butil::return_object( dynamic_cast(view)); } else { LOG(FATAL) diff --git a/predictor/op/CMakeLists.txt b/predictor/op/CMakeLists.txt index ea21de11..191fe75d 100644 --- a/predictor/op/CMakeLists.txt +++ b/predictor/op/CMakeLists.txt @@ -1,8 +1,2 @@ -add_library(op STATIC - ${CMAKE_CURRENT_LIST_DIR}/common_echo_op.cpp - ${CMAKE_CURRENT_LIST_DIR}/dense_echo_op.cpp - ${CMAKE_CURRENT_LIST_DIR}/op.cpp - ${CMAKE_CURRENT_LIST_DIR}/sparse_echo_op.cpp - ${CMAKE_CURRENT_LIST_DIR}/write_json_op.cpp - ) -add_dependencies(op proto brpc configure) +FILE(GLOB op_srcs ${CMAKE_CURRENT_LIST_DIR}/*.cpp) +LIST(APPEND predictor_srcs ${op_srcs}) diff --git a/predictor/op/dense_echo_op.cpp b/predictor/op/dense_echo_op.cpp index ae705a11..505d17ea 100644 --- a/predictor/op/dense_echo_op.cpp +++ b/predictor/op/dense_echo_op.cpp @@ -12,7 +12,7 @@ int DenseEchoOp::inference() { const Request* req = dynamic_cast(get_request_message()); Response* res = mutable_data(); - LOG(DEBUG) << "Receive request in dense service:" + LOG(INFO) << "Receive request in dense service:" << req->ShortDebugString(); uint32_t sample_size = req->instances_size(); for (uint32_t si = 0; si < sample_size; si++) { diff --git a/predictor/op/op.cpp b/predictor/op/op.cpp index 8770b340..175fa61d 100644 --- a/predictor/op/op.cpp +++ b/predictor/op/op.cpp @@ -1,5 +1,5 @@ #include "op/op.h" -#include // base::Timer +#include // butil::Timer #include "common/utils.h" #include "common/constant.h" #include "framework/channel.h" @@ -18,7 +18,7 @@ int Op::init(Bus* bus, Dag* dag, uint32_t id, const std::string& name, _type = type; set_config(conf); - _timer = base::get_object(); + _timer = butil::get_object(); if (!_timer) { LOG(FATAL) << "Invalid timerflow in op:" << this->name(); @@ -42,7 +42,7 @@ int Op::init(Bus* bus, Dag* dag, uint32_t id, const std::string& name, int Op::deinit() { if (_timer) { - base::return_object(_timer); + butil::return_object(_timer); } _bus = NULL; @@ -73,7 +73,7 @@ int Op::check_time(const char* tag) { } int Op::process(bool debug) { - base::Timer op_time(base::Timer::STARTED); + butil::Timer op_time(butil::Timer::STARTED); if (debug && _timer) { _timer->start(); } @@ -84,7 +84,7 @@ int Op::process(bool debug) { } if (_has_calc) { - LOG(DEBUG) + LOG(INFO) << "Op: " << _name << " already processed before"; return ERR_OK; } @@ -133,7 +133,7 @@ int Op::process(bool debug) { op_time.stop(); PredictorMetric::GetInstance()->update_latency_metric( OP_METRIC_PREFIX + full_name(), op_time.u_elapsed()); - LOG(NOTICE) << " " << name() << "_time=[" << op_time.u_elapsed() << "]" << noflush; + LOG(INFO) << " " << name() << "_time=[" << op_time.u_elapsed() << "]"; return ERR_OK; } diff --git a/predictor/op/op.h b/predictor/op/op.h index 0108c24c..73928f68 100644 --- a/predictor/op/op.h +++ b/predictor/op/op.h @@ -195,7 +195,7 @@ public: return _channel; } - _channel = base::get_object(); + _channel = butil::get_object(); if (!_channel) { LOG(FATAL) << "Failed mutable channel of type:" @@ -213,7 +213,7 @@ public: int release_channel() { if (_channel) { _channel->deinit(); - base::return_object(_channel); + butil::return_object(_channel); } _channel = NULL; diff --git a/predictor/op/sparse_echo_op.cpp b/predictor/op/sparse_echo_op.cpp index f060e334..25200d64 100644 --- a/predictor/op/sparse_echo_op.cpp +++ b/predictor/op/sparse_echo_op.cpp @@ -18,7 +18,7 @@ int SparseEchoOp::inference() { // get/mutable_depend_argment() // ... - LOG(DEBUG) + LOG(INFO) << "Receive request in sparse service:" << req->ShortDebugString(); uint32_t sample_size = req->instances_size(); diff --git a/predictor/op/write_json_op.cpp b/predictor/op/write_json_op.cpp index 5bc06d48..5f55e423 100644 --- a/predictor/op/write_json_op.cpp +++ b/predictor/op/write_json_op.cpp @@ -1,4 +1,4 @@ -#include "pb_to_json.h" +#include "json2pb/pb_to_json.h" #include #include "op/write_json_op.h" @@ -38,7 +38,7 @@ int WriteJsonOp::inference() { return -1; } std::string* text = ins->mutable_response_json(); - if (!ProtoMessageToJson(classify_out->predictions(si), + if (!json2pb::ProtoMessageToJson(classify_out->predictions(si), text, &err_string)) { LOG(ERROR) << "Failed convert message[" << classify_out->predictions(si).ShortDebugString() @@ -47,7 +47,7 @@ int WriteJsonOp::inference() { } } - LOG(TRACE) << "Succ write json:" + LOG(INFO) << "Succ write json:" << classify_out->ShortDebugString(); return 0; diff --git a/predictor/proto/CMakeLists.txt b/predictor/proto/CMakeLists.txt index aee78459..0ad5320a 100644 --- a/predictor/proto/CMakeLists.txt +++ b/predictor/proto/CMakeLists.txt @@ -1,5 +1,3 @@ FILE(GLOB protofiles "${CMAKE_CURRENT_LIST_DIR}/*.proto") protobuf_generate_cpp(PROTO_SRCS PROTO_HDRS ${protofiles}) - -add_library(proto STATIC ${PROTO_SRCS} ${PROTO_HDRS}) -include_directories(${CMAKE_CURRENT_BINARY_DIR}) +LIST(APPEND predictor_srcs ${PROTO_SRCS}) diff --git a/predictor/src/pdcodegen.cpp b/predictor/src/pdcodegen.cpp index 54c1cb9d..d5a9e043 100644 --- a/predictor/src/pdcodegen.cpp +++ b/predictor/src/pdcodegen.cpp @@ -521,7 +521,7 @@ private: printer->Print( "google::protobuf::Message* cur_res = _stub_handler->fetch_response();\n" "if (cur_res == NULL) {\n" - " LOG(TRACE) << \"Failed fetch response from stub handler, new it\";\n" + " LOG(INFO) << \"Failed fetch response from stub handler, new it\";\n" " cur_res = response->New();\n" " if (cur_res == NULL) {\n" " LOG(FATAL) << \"Failed new response item!\";\n" @@ -530,7 +530,7 @@ private: " }\n" " return baidu::rpc::SubCall(method, request, cur_res, baidu::rpc::DELETE_RESPONSE);\n" "}\n"); - "LOG(DEBUG) \n" + "LOG(INFO) \n" " << \"[default] Succ map, channel_index: \" << channel_index;\n"; printer->Print( "return baidu::rpc::SubCall(method, request, cur_res, 0);\n" @@ -675,7 +675,7 @@ private: } printer->Print( - "LOG(DEBUG)\n" + "LOG(INFO)\n" " << \"[pack] Succ map req at: \"\n" " << channel_index;\n"); printer->Print( diff --git a/predictor/src/pdserving.cpp b/predictor/src/pdserving.cpp index 1511b902..5f2088d9 100644 --- a/predictor/src/pdserving.cpp +++ b/predictor/src/pdserving.cpp @@ -94,7 +94,7 @@ int main(int argc, char** argv) { return -1; } - LOG(TRACE) << "Succ initialize logger"; + LOG(INFO) << "Succ initialize logger"; // initialize resource manager if (Resource::instance().initialize( @@ -103,7 +103,7 @@ int main(int argc, char** argv) { << FLAGS_resource_path << "/" << FLAGS_resource_file; return -1; } - LOG(TRACE) << "Succ initialize resource"; + LOG(INFO) << "Succ initialize resource"; // initialize workflow manager if (WorkflowManager::instance().initialize( @@ -112,7 +112,7 @@ int main(int argc, char** argv) { << FLAGS_workflow_path << "/" << FLAGS_workflow_file; return -1; } - LOG(TRACE) << "Succ initialize workflow"; + LOG(INFO) << "Succ initialize workflow"; // initialize service manager if (InferServiceManager::instance().initialize( @@ -122,7 +122,7 @@ int main(int argc, char** argv) { << FLAGS_inferservice_path << "/" << FLAGS_inferservice_file; return -1; } - LOG(TRACE) << "Succ initialize inferservice"; + LOG(INFO) << "Succ initialize inferservice"; int errcode = bthread_set_worker_startfn(pthread_worker_start_fn); if (errcode != 0) { @@ -135,7 +135,7 @@ int main(int argc, char** argv) { LOG(ERROR) << "Failed start server and wait!"; return -1; } - LOG(TRACE) << "Succ start service manager"; + LOG(INFO) << "Succ start service manager"; if (InferServiceManager::instance().finalize() != 0) { LOG(ERROR) << "Failed finalize infer service manager."; -- GitLab