Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
PaddlePaddle
Serving
提交
e608c4a5
S
Serving
项目概览
PaddlePaddle
/
Serving
大约 1 年 前同步成功
通知
185
Star
833
Fork
253
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
105
列表
看板
标记
里程碑
合并请求
10
Wiki
2
Wiki
分析
仓库
DevOps
项目成员
Pages
S
Serving
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
105
Issue
105
列表
看板
标记
里程碑
合并请求
10
合并请求
10
Pages
分析
分析
仓库分析
DevOps
Wiki
2
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
e608c4a5
编写于
2月 13, 2019
作者:
W
wangguibao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
20190213
上级
9f08b178
变更
39
隐藏空白更改
内联
并排
Showing
39 changed file
with
762 addition
and
147 deletion
+762
-147
CMakeLists.txt
CMakeLists.txt
+4
-0
cmake/external/pcre.cmake
cmake/external/pcre.cmake
+54
-0
configure/CMakeLists.txt
configure/CMakeLists.txt
+4
-0
configure/compiler/src/idl_gram.c
configure/compiler/src/idl_gram.c
+3
-3
mempool/CMakeLists.txt
mempool/CMakeLists.txt
+4
-0
mempool/mempool.cpp
mempool/mempool.cpp
+81
-0
mempool/mempool.h
mempool/mempool.h
+482
-0
predictor/CMakeLists.txt
predictor/CMakeLists.txt
+14
-2
predictor/common/CMakeLists.txt
predictor/common/CMakeLists.txt
+2
-4
predictor/common/constant.h
predictor/common/constant.h
+1
-1
predictor/common/inner_common.h
predictor/common/inner_common.h
+1
-1
predictor/common/utils.h
predictor/common/utils.h
+7
-7
predictor/framework/CMakeLists.txt
predictor/framework/CMakeLists.txt
+2
-13
predictor/framework/bsf-inl-tensor.h
predictor/framework/bsf-inl-tensor.h
+5
-5
predictor/framework/bsf.h
predictor/framework/bsf.h
+4
-4
predictor/framework/channel.h
predictor/framework/channel.h
+1
-1
predictor/framework/dag.cpp
predictor/framework/dag.cpp
+6
-6
predictor/framework/dag_view.cpp
predictor/framework/dag_view.cpp
+10
-10
predictor/framework/dag_view.h
predictor/framework/dag_view.h
+3
-3
predictor/framework/factory.h
predictor/framework/factory.h
+1
-1
predictor/framework/infer.h
predictor/framework/infer.h
+2
-2
predictor/framework/manager.h
predictor/framework/manager.h
+2
-2
predictor/framework/op_repository.h
predictor/framework/op_repository.h
+3
-3
predictor/framework/predictor_metric.h
predictor/framework/predictor_metric.h
+17
-14
predictor/framework/resource.h
predictor/framework/resource.h
+1
-9
predictor/framework/server.cpp
predictor/framework/server.cpp
+5
-5
predictor/framework/server.h
predictor/framework/server.h
+2
-2
predictor/framework/service.cpp
predictor/framework/service.cpp
+7
-7
predictor/framework/service.h
predictor/framework/service.h
+6
-6
predictor/framework/workflow.cpp
predictor/framework/workflow.cpp
+4
-4
predictor/op/CMakeLists.txt
predictor/op/CMakeLists.txt
+2
-8
predictor/op/dense_echo_op.cpp
predictor/op/dense_echo_op.cpp
+1
-1
predictor/op/op.cpp
predictor/op/op.cpp
+6
-6
predictor/op/op.h
predictor/op/op.h
+2
-2
predictor/op/sparse_echo_op.cpp
predictor/op/sparse_echo_op.cpp
+1
-1
predictor/op/write_json_op.cpp
predictor/op/write_json_op.cpp
+3
-3
predictor/proto/CMakeLists.txt
predictor/proto/CMakeLists.txt
+1
-3
predictor/src/pdcodegen.cpp
predictor/src/pdcodegen.cpp
+3
-3
predictor/src/pdserving.cpp
predictor/src/pdserving.cpp
+5
-5
未找到文件。
CMakeLists.txt
浏览文件 @
e608c4a5
...
...
@@ -68,6 +68,7 @@ include(external/leveldb)
include
(
external/protobuf
)
include
(
external/snappy
)
include
(
external/brpc
)
include
(
external/pcre
)
include
(
external/boost
)
include
(
flags
)
include
(
configure
)
...
...
@@ -81,6 +82,8 @@ set(EXTERNAL_LIBS
glog
protobuf
paddlepaddle
pcre
brpc
)
if
(
WITH_MKLML
)
...
...
@@ -96,4 +99,5 @@ add_subdirectory(bsl)
add_subdirectory
(
ullib
)
add_subdirectory
(
spreg
)
add_subdirectory
(
configure
)
add_subdirectory
(
mempool
)
add_subdirectory
(
predictor
)
cmake/external/pcre.cmake
0 → 100644
浏览文件 @
e608c4a5
# Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
INCLUDE
(
ExternalProject
)
SET
(
PCRE_SOURCES_DIR
${
THIRD_PARTY_PATH
}
/pcre
)
SET
(
PCRE_INSTALL_DIR
${
THIRD_PARTY_PATH
}
/install/pcre
)
SET
(
PCRE_ROOT
${
PCRE_INSTALL_DIR
}
CACHE FILEPATH
"pcre root directory."
FORCE
)
SET
(
PCRE_INCLUDE_DIR
"
${
PCRE_INSTALL_DIR
}
/include"
CACHE PATH
"pcre include directory."
FORCE
)
INCLUDE_DIRECTORIES
(
${
PCRE_INCLUDE_DIR
}
)
# For pcre code to include its own headers.
INCLUDE_DIRECTORIES
(
${
THIRD_PARTY_PATH
}
/install
)
# For Paddle code to include pcre.h.
ExternalProject_Add
(
extern_pcre
${
EXTERNAL_PROJECT_LOG_ARGS
}
SVN_REPOSITORY
"svn://vcs.exim.org/pcre/code/tags/pcre-7.7"
PREFIX
${
PCRE_SOURCES_DIR
}
UPDATE_COMMAND
""
PATCH_COMMAND sh autogen.sh
CONFIGURE_COMMAND ../extern_pcre/configure --prefix=
${
PCRE_INSTALL_DIR
}
--disable-shared --with-pic
BUILD_COMMAND make
INSTALL_COMMAND make install
CMAKE_CACHE_ARGS -DCMAKE_INSTALL_PREFIX:PATH=
${
PCRE_INSTALL_DIR
}
-DCMAKE_POSITION_INDEPENDENT_CODE:BOOL=ON
-DCMAKE_BUILD_TYPE:STRING=
${
THIRD_PARTY_BUILD_TYPE
}
)
IF
(
WIN32
)
IF
(
NOT EXISTS
"
${
PCRE_INSTALL_DIR
}
/lib/libz.lib"
)
add_custom_command
(
TARGET extern_pcre POST_BUILD
COMMAND cmake -E copy
${
PCRE_INSTALL_DIR
}
/lib/pcrestatic.lib
${
PCRE_INSTALL_DIR
}
/lib/libz.lib
)
ENDIF
()
SET
(
PCRE_LIBRARIES
"
${
PCRE_INSTALL_DIR
}
/lib/libpcre.lib"
CACHE FILEPATH
"pcre library."
FORCE
)
ELSE
(
WIN32
)
SET
(
PCRE_LIBRARIES
"
${
PCRE_INSTALL_DIR
}
/lib/libpcre.a"
CACHE FILEPATH
"pcre library."
FORCE
)
ENDIF
(
WIN32
)
ADD_LIBRARY
(
pcre STATIC IMPORTED GLOBAL
)
SET_PROPERTY
(
TARGET pcre PROPERTY IMPORTED_LOCATION
${
PCRE_LIBRARIES
}
)
ADD_DEPENDENCIES
(
pcre extern_pcre
)
LIST
(
APPEND external_project_dependencies pcre
)
configure/CMakeLists.txt
浏览文件 @
e608c4a5
...
...
@@ -20,6 +20,10 @@ LIST(APPEND configure_srcs
${
CMAKE_CURRENT_LIST_DIR
}
/compiler/src/idl_gram.c
)
SET_SOURCE_FILES_PROPERTIES
(
${
CMAKE_CURRENT_LIST_DIR
}
/compiler/src/idl.c
${
CMAKE_CURRENT_LIST_DIR
}
/compiler/src/idl_lex.c
${
CMAKE_CURRENT_LIST_DIR
}
/compiler/src/idl_gram.c PROPERTIES LANGUAGE CXX
)
add_library
(
configure
${
configure_srcs
}
)
add_dependencies
(
configure bsl brpc ullib spreg
)
target_include_directories
(
configure PUBLIC
...
...
configure/compiler/src/idl_gram.c
浏览文件 @
e608c4a5
...
...
@@ -1550,11 +1550,11 @@ yyerrlab:
YYSTACK_FREE
(
yymsg
);
}
else
yyerror
(
scanner
,
loc
,
idl
,
"syntax error; also virtual memory exhausted"
);
yyerror
(
scanner
,
loc
,
idl
,
(
char
*
)
"syntax error; also virtual memory exhausted"
);
}
else
#endif
/* YYERROR_VERBOSE */
yyerror
(
scanner
,
loc
,
idl
,
"syntax error"
);
yyerror
(
scanner
,
loc
,
idl
,
(
char
*
)
"syntax error"
);
}
...
...
@@ -1672,7 +1672,7 @@ yyabortlab:
| yyoverflowlab -- parser overflow comes here. |
`----------------------------------------------*/
yyoverflowlab:
yyerror
(
scanner
,
loc
,
idl
,
"parser stack overflow"
);
yyerror
(
scanner
,
loc
,
idl
,
(
char
*
)
"parser stack overflow"
);
yyresult
=
2
;
/* Fall through. */
#endif
...
...
mempool/CMakeLists.txt
0 → 100644
浏览文件 @
e608c4a5
add_library
(
mempool
${
CMAKE_CURRENT_LIST_DIR
}
/mempool.cpp
)
add_dependencies
(
mempool brpc
)
target_include_directories
(
mempool PUBLIC
${
CMAKE_CURRENT_BINARY_DIR
}
/../bsl/include
)
mempool/mempool.cpp
0 → 100644
浏览文件 @
e608c4a5
#include "mempool.h"
namespace
im
{
__thread
Mempool
*
g_mempool
=
NULL
;
namespace
fugue
{
namespace
memory
{
void
Region
::
init
()
{
_big_mem_capacity
=
32
*
1024
*
1024
;
_big_mem_start
=
new
char
[
_big_mem_capacity
];
}
void
Region
::
reset
()
{
// release memory allocate from GlobalMempool
_free_blocks
.
unsafe_foreach
<
GlobalPut
>
();
_free_blocks
.
reset
();
// release memory from malloc
BigNode
*
head
=
_big_nodes
.
release
();
while
(
head
)
{
BigNode
*
next
=
head
->
next
;
::
free
(
head
);
head
=
next
;
}
_mlc_mem_size
.
store
(
0
,
butil
::
memory_order_relaxed
);
_mlc_mem_count
.
store
(
0
,
butil
::
memory_order_relaxed
);
// clear the large buffer
_big_mem_size
.
store
(
0
,
butil
::
memory_order_relaxed
);
_big_mem_count
.
store
(
0
,
butil
::
memory_order_relaxed
);
}
BlockReference
*
Region
::
get
()
{
BlockReference
*
ref
=
_free_blocks
.
get
();
if
(
ref
->
block
==
NULL
)
{
ref
->
offset
=
0
;
ref
->
block
=
GlobalBlockFreeList
::
instance
()
->
get
();
}
return
ref
;
}
void
Region
::
put
(
BlockReference
*
block
)
{
_free_blocks
.
put
(
block
);
}
void
*
Region
::
malloc
(
size_t
size
)
{
if
(
size
<
MLC_MEM_THRESHOLD
)
{
uint32_t
offset
=
_big_mem_size
.
fetch_add
(
size
,
butil
::
memory_order_relaxed
);
if
(
offset
+
size
<
_big_mem_capacity
)
{
_big_mem_count
.
fetch_add
(
1
,
butil
::
memory_order_relaxed
);
return
_big_mem_start
+
offset
;
}
}
_mlc_mem_size
.
fetch_add
(
size
,
butil
::
memory_order_relaxed
);
_mlc_mem_count
.
fetch_add
(
1
,
butil
::
memory_order_relaxed
);
BigNode
*
node
=
(
BigNode
*
)
::
malloc
(
sizeof
(
BigNode
)
+
size
);
_big_nodes
.
push
(
node
);
return
node
->
data
;
}
Region
::
Region
()
{
_big_mem_size
.
store
(
0
,
butil
::
memory_order_relaxed
);
_big_mem_count
.
store
(
0
,
butil
::
memory_order_relaxed
);
_big_mem_start
=
NULL
;
_big_mem_capacity
=
0
;
_mlc_mem_size
.
store
(
0
,
butil
::
memory_order_relaxed
);
_mlc_mem_count
.
store
(
0
,
butil
::
memory_order_relaxed
);
}
}
}
}
mempool/mempool.h
0 → 100644
浏览文件 @
e608c4a5
#ifndef APP_ECOM_IM_MEMPOOL_SRC_MEMPOOL_H
#define APP_ECOM_IM_MEMPOOL_SRC_MEMPOOL_H
#include <execinfo.h>
#include <sstream>
#include <pthread.h>
#include <new>
#include <butil/atomicops.h>
#include <bsl/pool.h> // for bsl::mempool
#include <bsl/string.h>
#include <butil/logging.h>
#include <iostream>
namespace
im
{
namespace
fugue
{
namespace
lockfree
{
template
<
class
T
>
class
PushOnlyStack
{
public:
PushOnlyStack
()
{
_head
.
store
(
NULL
,
butil
::
memory_order_relaxed
);
}
void
push
(
T
*
node
)
{
T
*
head
=
_head
.
load
(
butil
::
memory_order_relaxed
);
node
->
next
=
head
;
while
(
!
_head
.
compare_exchange_weak
(
head
,
node
,
butil
::
memory_order_relaxed
))
{
node
->
next
=
head
;
}
}
T
*
release
()
{
return
_head
.
exchange
(
NULL
,
butil
::
memory_order_relaxed
);
}
private:
butil
::
atomic
<
T
*>
_head
;
};
template
<
class
T
>
struct
FreeListNode
{
uint64_t
id
;
uint64_t
next
;
T
data
;
};
template
<
class
T
,
int
CAP
>
class
FreeList
{
public:
typedef
FreeListNode
<
T
>
Node
;
static
const
uint64_t
EMPTY
=
0xFFFFFFFFFFFFFFFF
;
T
*
get
()
{
uint64_t
head
=
_head
.
load
(
butil
::
memory_order_acquire
);
if
(
head
==
EMPTY
)
{
return
new_node
();
}
Node
*
node
=
address
(
head
);
while
(
!
_head
.
compare_exchange_weak
(
head
,
node
->
next
,
butil
::
memory_order_acquire
))
{
if
(
head
==
EMPTY
)
{
return
new_node
();
}
node
=
address
(
head
);
}
return
&
node
->
data
;
}
void
put
(
T
*
value
)
{
Node
*
node
=
container_of
(
value
,
Node
,
data
);
uint64_t
head
=
_head
.
load
(
butil
::
memory_order_acquire
);
// add version
node
->
id
+=
(
1UL
<<
32
);
node
->
next
=
head
;
// NOTE: we MUST use a temp var *head* to call compare_exchange_weak
// because Boost.Atomic will update the *expected* even success
// std::atomic do not have this limitation
while
(
!
_head
.
compare_exchange_weak
(
head
,
node
->
id
,
butil
::
memory_order_release
))
{
node
->
next
=
head
;
}
}
template
<
class
F
>
void
unsafe_foreach
()
{
uint32_t
used_blk_cnt
=
_slot_index
.
load
(
butil
::
memory_order_relaxed
);
for
(
uint32_t
i
=
0
;
i
<
used_blk_cnt
;
++
i
)
{
F
()(
&
_node
[
i
]
->
data
);
}
}
uint32_t
real_used_size
()
const
{
uint32_t
used_blk_cnt
=
_slot_index
.
load
(
butil
::
memory_order_relaxed
);
uint64_t
used_bytes
=
0
;
for
(
uint32_t
i
=
0
;
i
<
used_blk_cnt
;
++
i
)
{
used_bytes
+=
_node
[
i
]
->
data
.
offset
;
}
return
used_bytes
>>
10
;
}
uint32_t
allocate_blocks
()
const
{
return
_slot_index
.
load
(
butil
::
memory_order_relaxed
);
}
uint32_t
free_blocks
()
const
{
uint64_t
head
=
_head
.
load
(
butil
::
memory_order_relaxed
);
uint32_t
size
=
0
;
while
(
head
!=
FreeList
::
EMPTY
)
{
const
Node
*
head_ptr
=
address
(
head
);
head
=
head_ptr
->
next
;
++
size
;
}
return
size
;
}
void
reset
()
{
_head
.
store
(
FreeList
::
EMPTY
,
butil
::
memory_order_relaxed
);
_slot_index
.
store
(
0
,
butil
::
memory_order_relaxed
);
}
FreeList
()
{
for
(
int
i
=
0
;
i
<
CAP
;
++
i
)
{
_node
[
i
]
=
NULL
;
}
reset
();
}
private:
uint32_t
slot
(
uint64_t
id
)
const
{
return
static_cast
<
uint32_t
>
(
id
);
}
T
*
new_node
()
{
uint32_t
index
=
_slot_index
.
fetch_add
(
1
,
butil
::
memory_order_relaxed
);
if
(
index
>=
CAP
)
{
return
NULL
;
}
if
(
_node
[
index
]
!=
NULL
)
{
return
&
(
_node
[
index
]
->
data
);
}
Node
*
node
=
(
Node
*
)
malloc
(
sizeof
(
Node
));
new
(
node
)
Node
;
node
->
id
=
index
;
_node
[
index
]
=
node
;
return
&
node
->
data
;
}
Node
*
address
(
uint64_t
id
)
{
return
_node
[
slot
(
id
)];
}
const
Node
*
address
(
uint64_t
id
)
const
{
return
_node
[
slot
(
id
)];
}
butil
::
atomic
<
uint64_t
>
_head
;
butil
::
atomic
<
uint32_t
>
_slot_index
;
Node
*
_node
[
CAP
];
};
}
namespace
memory
{
struct
Block
{
static
const
int
BLOCK_SIZE
=
2
*
1024
*
1024
;
char
data
[
BLOCK_SIZE
];
};
class
GlobalBlockFreeList
{
public:
static
const
int
MAX_BLOCK_COUNT
=
32
*
1024
;
typedef
lockfree
::
FreeList
<
Block
,
MAX_BLOCK_COUNT
>
type
;
static
type
*
instance
()
{
static
type
singleton
;
return
&
singleton
;
}
};
struct
BlockReference
{
BlockReference
()
:
offset
(
0
),
block
(
NULL
)
{
// do nothing
}
void
reset
()
{
offset
=
0
;
block
=
NULL
;
}
uint32_t
offset
;
Block
*
block
;
};
class
Region
{
public:
struct
GlobalPut
{
void
operator
()(
BlockReference
*
block_ref
)
{
if
(
block_ref
->
block
!=
NULL
)
{
GlobalBlockFreeList
::
instance
()
->
put
(
block_ref
->
block
);
}
block_ref
->
reset
();
}
};
struct
BigNode
{
BigNode
*
next
;
char
data
[
0
];
};
~
Region
()
{
reset
();
delete
[]
_big_mem_start
;
_big_mem_start
=
NULL
;
}
char
const
*
debug_str
()
const
{
_debug_str
.
clear
();
uint32_t
alloc_blocks
=
_free_blocks
.
allocate_blocks
();
uint32_t
free_blocks
=
_free_blocks
.
free_blocks
();
uint32_t
used_mem_mb
=
_free_blocks
.
real_used_size
();
uint32_t
big_buf_size
=
_big_mem_size
.
load
(
butil
::
memory_order_relaxed
);
uint32_t
big_buf_count
=
_big_mem_count
.
load
(
butil
::
memory_order_relaxed
);
uint32_t
mlc_mem_size
=
_mlc_mem_size
.
load
(
butil
::
memory_order_relaxed
);
uint32_t
mlc_mem_count
=
_mlc_mem_count
.
load
(
butil
::
memory_order_relaxed
);
_debug_str
.
appendf
(
"[alloc_blks:%u,free_blks:%u,used_mem_kb:%u,"
"big_mem_kb:%u,big_buf_cnt:%u,mlc_mem_kb:%u,mlc_cnt:%u]"
,
alloc_blocks
,
free_blocks
,
used_mem_mb
,
big_buf_size
>>
10
,
big_buf_count
,
mlc_mem_size
>>
10
,
mlc_mem_count
);
return
_debug_str
.
c_str
();
}
Region
();
void
init
();
void
reset
();
BlockReference
*
get
();
void
*
malloc
(
size_t
size
);
void
put
(
BlockReference
*
block
);
static
const
int
MAX_BLOCK_COUNT
=
1024
;
static
const
int
BIG_MEM_THRESHOLD
=
256
*
1024
;
static
const
int
MLC_MEM_THRESHOLD
=
4
*
1024
*
1024
;
static
const
int
COUNTER_SIZE
=
MLC_MEM_THRESHOLD
/
BIG_MEM_THRESHOLD
+
1
;
private:
lockfree
::
FreeList
<
BlockReference
,
MAX_BLOCK_COUNT
>
_free_blocks
;
lockfree
::
PushOnlyStack
<
BigNode
>
_big_nodes
;
butil
::
atomic
<
uint32_t
>
_big_mem_size
;
butil
::
atomic
<
uint32_t
>
_big_mem_count
;
char
*
_big_mem_start
;
uint32_t
_big_mem_capacity
;
butil
::
atomic
<
uint32_t
>
_mlc_mem_size
;
butil
::
atomic
<
uint32_t
>
_mlc_mem_count
;
bsl
::
string
mutable
_debug_str
;
};
}
}
class
Mempool
{
public:
void
*
malloc
(
size_t
size
)
{
size
=
_align
(
size
);
if
(
size
<=
_free_size
)
{
void
*
p
=
_free_cursor
;
_free_size
-=
size
;
_free_cursor
+=
size
;
return
p
;
}
return
malloc_from_region
(
size
);
}
void
free
(
void
*
p
,
size_t
size
)
{
if
(
size
>=
fugue
::
memory
::
Region
::
BIG_MEM_THRESHOLD
)
{
return
;
}
if
(
_free_cursor
-
size
==
static_cast
<
char
*>
(
p
))
{
size_t
down_aligned
=
_down_align
(
size
);
_free_cursor
-=
down_aligned
;
_free_size
+=
down_aligned
;
}
}
void
*
realloc
(
void
*
old_data
,
size_t
old_size
,
size_t
new_size
)
{
if
(
old_size
>=
new_size
)
{
return
old_data
;
}
size_t
required
=
new_size
-
old_size
;
if
(
_free_cursor
==
static_cast
<
char
*>
(
old_data
)
+
old_size
)
{
if
(
_free_size
>=
required
)
{
_free_cursor
+=
required
;
_free_size
-=
required
;
return
old_data
;
}
else
{
_free_cursor
=
static_cast
<
char
*>
(
old_data
);
_free_size
+=
old_size
;
}
}
void
*
p
=
this
->
malloc_from_region
(
new_size
);
if
(
p
!=
NULL
)
{
memcpy
(
p
,
old_data
,
old_size
);
return
p
;
}
return
NULL
;
}
Mempool
(
fugue
::
memory
::
Region
*
blocks
)
:
_free_size
(
0
)
,
_free_cursor
(
NULL
)
,
_blocks
(
blocks
)
{
_block
=
NULL
;
}
~
Mempool
()
{
release_block
();
}
void
release_block
()
{
if
(
_block
)
{
_block
->
offset
=
fugue
::
memory
::
Block
::
BLOCK_SIZE
-
_free_size
;
_blocks
->
put
(
_block
);
}
_free_size
=
0
;
_free_cursor
=
NULL
;
_block
=
NULL
;
}
private:
void
*
malloc_from_region
(
size_t
size
)
{
if
(
size
>=
fugue
::
memory
::
Region
::
BIG_MEM_THRESHOLD
)
{
return
_blocks
->
malloc
(
size
);
}
while
(
true
)
{
fugue
::
memory
::
BlockReference
*
block
=
_blocks
->
get
();
if
(
block
==
NULL
)
{
return
NULL
;
}
uint32_t
free_size
=
fugue
::
memory
::
Block
::
BLOCK_SIZE
-
block
->
offset
;
if
(
size
<=
free_size
)
{
if
(
_block
)
{
_block
->
offset
=
fugue
::
memory
::
Block
::
BLOCK_SIZE
-
_free_size
;
}
char
*
p
=
block
->
block
->
data
+
block
->
offset
;
_free_size
=
free_size
-
size
;
_free_cursor
=
p
+
size
;
_block
=
block
;
return
p
;
}
}
return
_blocks
->
malloc
(
size
);
}
static
const
int
ALIGN_SIZE
=
sizeof
(
void
*
);
inline
size_t
_align
(
size_t
size
)
const
{
return
(
size
+
(
ALIGN_SIZE
-
1
))
&
~
(
ALIGN_SIZE
-
1
);
}
inline
size_t
_down_align
(
size_t
size
)
const
{
return
size
&
~
(
ALIGN_SIZE
-
1
);
}
size_t
_free_size
;
char
*
_free_cursor
;
fugue
::
memory
::
Region
*
_blocks
;
fugue
::
memory
::
BlockReference
*
_block
;
};
extern
__thread
Mempool
*
g_mempool
;
class
GlobalMempool
:
public
bsl
::
mempool
{
public:
GlobalMempool
()
{
// do nothing;
}
virtual
~
GlobalMempool
()
{
// do nothing;
}
static
GlobalMempool
*
instance
()
{
static
GlobalMempool
singleton
;
return
&
singleton
;
}
void
reset
(
Mempool
*
mempool
)
{
g_mempool
=
mempool
;
}
void
*
malloc
(
size_t
size
)
{
return
g_mempool
->
malloc
(
size
);
}
void
*
realloc
(
void
*
old_data
,
size_t
old_size
,
size_t
new_size
)
{
return
g_mempool
->
realloc
(
old_data
,
old_size
,
new_size
);
}
void
free
(
void
*
p
,
size_t
s
)
{
g_mempool
->
free
(
p
,
s
);
}
void
clear
()
{
g_mempool
->
release_block
();
}
Mempool
*
get
()
{
return
g_mempool
;
}
};
class
MempoolGuard
{
public:
MempoolGuard
(
fugue
::
memory
::
Region
*
region
)
:
_mempool
(
region
)
{
acquire
();
}
void
acquire
()
{
_saved_mempool
=
g_mempool
;
g_mempool
=
&
_mempool
;
}
void
release
()
{
_mempool
.
release_block
();
g_mempool
=
_saved_mempool
;
}
~
MempoolGuard
()
{
release
();
}
private:
Mempool
_mempool
;
Mempool
*
_saved_mempool
;
};
inline
std
::
string
print_trace
()
{
const
static
int
BT_BUF_SIZE
=
400
;
std
::
stringstream
debug_stream
;
void
*
buffer
[
BT_BUF_SIZE
];
int
nptrs
=
backtrace
(
buffer
,
BT_BUF_SIZE
);
char
**
strings
=
backtrace_symbols
(
buffer
,
nptrs
);
for
(
int
j
=
0
;
j
<
nptrs
;
j
++
)
{
debug_stream
<<
strings
[
j
]
<<
"
\t
"
;
}
return
debug_stream
.
str
();
}
}
#endif
predictor/CMakeLists.txt
浏览文件 @
e608c4a5
...
...
@@ -2,5 +2,17 @@ include(proto/CMakeLists.txt)
include
(
common/CMakeLists.txt
)
include
(
op/CMakeLists.txt
)
include
(
framework/CMakeLists.txt
)
add_library
(
predictor
${
predictor_SOURCES
}
)
add_dependencies
(
predictor proto op protobuf boost brpc
)
add_library
(
predictor
${
predictor_srcs
}
)
set_source_files_properties
(
${
predictor_srcs
}
PROPERTIES
COMPILE_FLAGS
"-Wno-non-virtual-dtor -Wno-error=non-virtual-dtor -Wno-error=delete-non-virtual-dtor"
)
add_dependencies
(
predictor protobuf boost brpc
)
target_include_directories
(
predictor PUBLIC
${
CMAKE_CURRENT_LIST_DIR
}
/
${
CMAKE_CURRENT_BINARY_DIR
}
/
${
CMAKE_CURRENT_LIST_DIR
}
/../configure
${
CMAKE_CURRENT_LIST_DIR
}
/../mempool
${
CMAKE_CURRENT_LIST_DIR
}
/../spreg
${
CMAKE_CURRENT_LIST_DIR
}
/../ullib/include
${
CMAKE_CURRENT_BINARY_DIR
}
/../bsl/include
)
predictor/common/CMakeLists.txt
浏览文件 @
e608c4a5
add_library
(
common
${
CMAKE_CURRENT_LIST_DIR
}
/constant.cpp
)
add_dependencies
(
common brpc configure
)
FILE
(
GLOB common_srcs
${
CMAKE_CURRENT_LIST_DIR
}
/*.cpp
)
LIST
(
APPEND predictor_srcs
${
common_srcs
}
)
predictor/common/constant.h
浏览文件 @
e608c4a5
...
...
@@ -30,7 +30,7 @@ DECLARE_bool(enable_model_toolkit);
DECLARE_string
(
enable_protocol_list
);
// STATIC Variables
static
const
char
*
START_OP_NAME
=
"startup_op"
;
const
char
*
START_OP_NAME
=
"startup_op"
;
// ERRORCODE
enum
{
...
...
predictor/common/inner_common.h
浏览文件 @
e608c4a5
...
...
@@ -26,7 +26,7 @@
#include <error.h>
#include "Configure.h"
#include <comlog/comlog.h>
//
#include <comlog/comlog.h>
#include "common/utils.h"
#include "common/types.h"
...
...
predictor/common/utils.h
浏览文件 @
e608c4a5
...
...
@@ -24,7 +24,7 @@ public:
}
TimerFlow
(
const
char
*
name
)
:
_csize
(
0
),
_name
(
name
)
{
_last
=
_start
=
b
ase
::
cpuwide_time_us
();
_last
=
_start
=
b
util
::
cpuwide_time_us
();
_auto
=
true
;
_started
=
true
;
}
...
...
@@ -34,7 +34,7 @@ public:
}
void
start
()
{
_last
=
_start
=
b
ase
::
cpuwide_time_us
();
_last
=
_start
=
b
util
::
cpuwide_time_us
();
_started
=
true
;
}
...
...
@@ -43,12 +43,12 @@ public:
LOG
(
WARNING
)
<<
"Timer not started yet!"
;
return
false
;
}
uint64_t
now
=
b
ase
::
cpuwide_time_us
();
uint64_t
now
=
b
util
::
cpuwide_time_us
();
if
(
!
appendf
(
"%s:%lu|"
,
tag
,
now
-
_last
))
{
LOG
(
WARNING
)
<<
"Failed check timer: "
<<
_name
<<
", value = ["
<<
tag
<<
":"
<<
(
now
-
_last
)
<<
"]!"
<<
noflush
;
<<
(
now
-
_last
)
<<
"]!"
;
return
false
;
}
...
...
@@ -61,7 +61,7 @@ public:
}
void
end
()
{
uint64_t
now
=
b
ase
::
cpuwide_time_us
();
uint64_t
now
=
b
util
::
cpuwide_time_us
();
if
(
!
appendf
(
"total:%lu"
,
now
-
_start
))
{
LOG
(
WARNING
)
<<
"Failed dump time_info["
<<
_name
<<
"]"
;
}
...
...
@@ -72,7 +72,7 @@ public:
if
(
!
_auto
)
{
return
;
}
uint64_t
now
=
b
ase
::
cpuwide_time_us
();
uint64_t
now
=
b
util
::
cpuwide_time_us
();
if
(
appendf
(
"total:%lu,%s"
,
now
-
_start
,
_name
))
{
LOG
(
INFO
)
<<
" "
<<
_name
<<
"_tc=["
<<
_buf
<<
"]"
;
...
...
@@ -88,7 +88,7 @@ private:
try
{
int
bytes
=
vsnprintf
(
_buf
+
_csize
,
MAX_SIZE
-
_csize
,
fmt
,
ap
);
if
(
bytes
>=
MAX_SIZE
-
_csize
||
bytes
<
0
)
{
LOG
(
WARNING
)
<<
"Overflow when appendf!"
<<
noflush
;
LOG
(
WARNING
)
<<
"Overflow when appendf!"
;
return
false
;
}
_csize
+=
bytes
;
...
...
predictor/framework/CMakeLists.txt
浏览文件 @
e608c4a5
list
(
APPEND predictor_SOURCES
${
CMAKE_CURRENT_LIST_DIR
}
/dag.cpp
${
CMAKE_CURRENT_LIST_DIR
}
/dag_view.cpp
${
CMAKE_CURRENT_LIST_DIR
}
/mc_cache.cpp
${
CMAKE_CURRENT_LIST_DIR
}
/memory.cpp
${
CMAKE_CURRENT_LIST_DIR
}
/op_repository.cpp
${
CMAKE_CURRENT_LIST_DIR
}
/predictor_metric.cpp
${
CMAKE_CURRENT_LIST_DIR
}
/resource.cpp
${
CMAKE_CURRENT_LIST_DIR
}
/server.cpp
${
CMAKE_CURRENT_LIST_DIR
}
/service.cpp
${
CMAKE_CURRENT_LIST_DIR
}
/workflow.cpp
)
INCLUDE_DIRECTORIES
(
.
)
FILE
(
GLOB framework_srcs
${
CMAKE_CURRENT_LIST_DIR
}
/*.cpp
)
LIST
(
APPEND predictor_srcs
${
framework_srcs
}
)
predictor/framework/bsf-inl-tensor.h
浏览文件 @
e608c4a5
...
...
@@ -3,7 +3,7 @@
#include <errno.h>
#include <vector>
#include <deque>
#include <b
ase
/atomicops.h>
#include <b
util
/atomicops.h>
#include <comlog/comlog.h>
#include "common/inner_common.h"
#include "framework/infer_data.h"
...
...
@@ -46,7 +46,7 @@ struct Task<baidu::paddle_serving::predictor::Tensor,
size_t
rem
;
size_t
size
;
b
ase
::
atomic
<
size_t
>
index
;
b
util
::
atomic
<
size_t
>
index
;
const
BatchTensor
*
get
(
bool
is_in
)
const
{
if
(
is_in
)
{
...
...
@@ -72,7 +72,7 @@ struct Task<baidu::paddle_serving::predictor::Tensor,
out
=
NULL
;
rem
=
-
1
;
size
=
-
1
;
index
.
store
(
0
,
b
ase
::
memory_order_relaxed
);
index
.
store
(
0
,
b
util
::
memory_order_relaxed
);
}
};
...
...
@@ -269,7 +269,7 @@ public:
}
}
LOG
(
TRACE
)
<<
"merge input("
<<
is_in
<<
") samples: "
LOG
(
INFO
)
<<
"merge input("
<<
is_in
<<
") samples: "
<<
batch_size
<<
" from "
<<
_tasks
.
size
()
<<
" pvs"
;
}
...
...
@@ -327,7 +327,7 @@ public:
while
(
write
(
task
->
write_fd
,
&
c
,
1
)
!=
1
&&
errno
==
EINTR
)
{
;
}
b
ase
::
return_object
(
task
);
b
util
::
return_object
(
task
);
}
}
}
...
...
predictor/framework/bsf.h
浏览文件 @
e608c4a5
...
...
@@ -4,7 +4,7 @@
#include <errno.h>
#include <vector>
#include <deque>
#include <b
ase
/atomicops.h>
#include <b
util
/atomicops.h>
#include <comlog/comlog.h>
#include "common/inner_common.h"
...
...
@@ -38,7 +38,7 @@ struct Task {
return
in
->
size
();
}
b
ase
::
atomic
<
size_t
>
index
;
b
util
::
atomic
<
size_t
>
index
;
Task
()
{
read_fd
=
-
1
;
...
...
@@ -48,7 +48,7 @@ struct Task {
out
=
NULL
;
rem
=
-
1
;
size
=
-
1
;
index
.
store
(
0
,
b
ase
::
memory_order_relaxed
);
index
.
store
(
0
,
b
util
::
memory_order_relaxed
);
}
};
...
...
@@ -148,7 +148,7 @@ public:
while
(
write
(
task
->
write_fd
,
&
c
,
1
)
!=
1
&&
errno
==
EINTR
)
{
;
}
b
ase
::
return_object
(
task
);
b
util
::
return_object
(
task
);
}
}
}
...
...
predictor/framework/channel.h
浏览文件 @
e608c4a5
...
...
@@ -76,7 +76,7 @@ public:
if
(
bus
->
regist
(
_op
,
this
)
!=
0
)
{
LOG
(
FATAL
)
<<
"Failed regist channel["
<<
_op
<<
"] to bus!"
<<
noflush
;
<<
"] to bus!"
;
return
-
1
;
}
...
...
predictor/framework/dag.cpp
浏览文件 @
e608c4a5
...
...
@@ -137,21 +137,21 @@ int Dag::init(const comcfg::Configure& conf, const std::string& name) {
}
if
(
FLAGS_el_log_level
==
16
)
{
LOG
(
DEBUG
)
<<
"DAG: "
<<
_dag_name
<<
noflush
;
LOG
(
DEBUG
)
<<
", Op Num: "
<<
_index_nodes
.
size
();
LOG
(
INFO
)
<<
"DAG: "
<<
_dag_name
<<
noflush
;
LOG
(
INFO
)
<<
", Op Num: "
<<
_index_nodes
.
size
();
for
(
uint32_t
nid
=
0
;
nid
<
_index_nodes
.
size
();
nid
++
)
{
DagNode
*
node
=
_index_nodes
[
nid
];
LOG
(
DEBUG
)
LOG
(
INFO
)
<<
", OP-"
<<
node
->
id
<<
"-"
<<
node
->
name
<<
"-"
<<
node
->
type
<<
noflush
;
LOG
(
DEBUG
)
<<
" depends: "
<<
node
->
depends
.
size
()
<<
noflush
;
LOG
(
INFO
)
<<
" depends: "
<<
node
->
depends
.
size
()
<<
noflush
;
boost
::
unordered_map
<
std
::
string
,
EdgeMode
>::
iterator
it
;
for
(
it
=
node
->
depends
.
begin
();
it
!=
node
->
depends
.
end
();
it
++
)
{
LOG
(
DEBUG
)
<<
" "
<<
it
->
first
<<
" "
<<
it
->
second
<<
noflush
;
LOG
(
INFO
)
<<
" "
<<
it
->
first
<<
" "
<<
it
->
second
<<
noflush
;
}
}
LOG
(
DEBUG
)
<<
""
;
LOG
(
INFO
)
<<
""
;
}
return
ERR_OK
;
...
...
predictor/framework/dag_view.cpp
浏览文件 @
e608c4a5
...
...
@@ -10,7 +10,7 @@ namespace predictor {
int
DagView
::
init
(
Dag
*
dag
,
const
std
::
string
&
service_name
)
{
_name
=
dag
->
name
();
_full_name
=
service_name
+
NAME_DELIMITER
+
dag
->
name
();
_bus
=
b
ase
::
get_object
<
Bus
>
();
_bus
=
b
util
::
get_object
<
Bus
>
();
_bus
->
clear
();
uint32_t
stage_size
=
dag
->
stage_size
();
// create tls stage view
...
...
@@ -20,7 +20,7 @@ int DagView::init(Dag* dag, const std::string& service_name) {
LOG
(
FATAL
)
<<
"Failed get stage by index:"
<<
si
;
return
ERR_INTERNAL_FAILURE
;
}
ViewStage
*
vstage
=
b
ase
::
get_object
<
ViewStage
>
();
ViewStage
*
vstage
=
b
util
::
get_object
<
ViewStage
>
();
if
(
vstage
==
NULL
)
{
LOG
(
FATAL
)
<<
"Failed get vstage from object pool"
...
...
@@ -32,7 +32,7 @@ int DagView::init(Dag* dag, const std::string& service_name) {
// create tls view node
for
(
uint32_t
ni
=
0
;
ni
<
node_size
;
ni
++
)
{
DagNode
*
node
=
stage
->
nodes
[
ni
];
ViewNode
*
vnode
=
b
ase
::
get_object
<
ViewNode
>
();
ViewNode
*
vnode
=
b
util
::
get_object
<
ViewNode
>
();
if
(
vnode
==
NULL
)
{
LOG
(
FATAL
)
<<
"Failed get vnode at:"
<<
ni
;
return
ERR_MEM_ALLOC_FAILURE
;
...
...
@@ -72,19 +72,19 @@ int DagView::deinit() {
OpRepository
::
instance
().
return_op
(
vnode
->
op
);
vnode
->
reset
();
// clear item
b
ase
::
return_object
(
vnode
);
b
util
::
return_object
(
vnode
);
}
// clear vector
vstage
->
nodes
.
clear
();
b
ase
::
return_object
(
vstage
);
b
util
::
return_object
(
vstage
);
}
_view
.
clear
();
_bus
->
clear
();
b
ase
::
return_object
(
_bus
);
b
util
::
return_object
(
_bus
);
return
ERR_OK
;
}
int
DagView
::
execute
(
b
ase
::
IOBufBuilder
*
debug_os
)
{
int
DagView
::
execute
(
b
util
::
IOBufBuilder
*
debug_os
)
{
uint32_t
stage_size
=
_view
.
size
();
for
(
uint32_t
si
=
0
;
si
<
stage_size
;
si
++
)
{
TRACEPRINTF
(
"start to execute stage[%u]"
,
si
);
...
...
@@ -104,8 +104,8 @@ int DagView::execute(base::IOBufBuilder* debug_os) {
// You can derive a subclass to implement this func.
// ParallelDagView maybe the one you want.
int
DagView
::
execute_one_stage
(
ViewStage
*
vstage
,
b
ase
::
IOBufBuilder
*
debug_os
)
{
b
ase
::
Timer
stage_time
(
base
::
Timer
::
STARTED
);
b
util
::
IOBufBuilder
*
debug_os
)
{
b
util
::
Timer
stage_time
(
butil
::
Timer
::
STARTED
);
uint32_t
node_size
=
vstage
->
nodes
.
size
();
for
(
uint32_t
ni
=
0
;
ni
<
node_size
;
ni
++
)
{
ViewNode
*
vnode
=
vstage
->
nodes
[
ni
];
...
...
@@ -121,7 +121,7 @@ int DagView::execute_one_stage(ViewStage* vstage,
}
if
(
errcode
>
0
)
{
LOG
(
TRACE
)
LOG
(
INFO
)
<<
"Execute ignore, Op:"
<<
op
->
debug_string
();
continue
;
}
...
...
predictor/framework/dag_view.h
浏览文件 @
e608c4a5
...
...
@@ -41,13 +41,13 @@ public:
int
deinit
();
int
execute
(
b
ase
::
IOBufBuilder
*
debug_os
);
int
execute
(
b
util
::
IOBufBuilder
*
debug_os
);
// The default execution strategy is in sequencing
// You can derive a subclass to implement this func.
// ParallelDagView maybe the one you want.
virtual
int
execute_one_stage
(
ViewStage
*
vstage
,
b
ase
::
IOBufBuilder
*
debug_os
);
b
util
::
IOBufBuilder
*
debug_os
);
int
set_request_channel
(
Channel
&
request
);
...
...
@@ -72,7 +72,7 @@ private:
// strategy, by implments the execute_one_stage().
class
ParallelDagView
:
public
DagView
{
public:
int
execute_one_stage
(
ViewStage
*
vstage
,
b
ase
::
IOBufBuilder
*
)
{
int
execute_one_stage
(
ViewStage
*
vstage
,
b
util
::
IOBufBuilder
*
)
{
return
0
;
}
};
...
...
predictor/framework/factory.h
浏览文件 @
e608c4a5
...
...
@@ -127,7 +127,7 @@ public:
return
-
1
;
}
LOG
(
TRACE
)
<<
"Succ insert one factory, tag: "
<<
tag
LOG
(
INFO
)
<<
"Succ insert one factory, tag: "
<<
tag
<<
", base type: "
<<
typeid
(
B
).
name
();
return
0
;
...
...
predictor/framework/infer.h
浏览文件 @
e608c4a5
...
...
@@ -833,7 +833,7 @@ public:
iter
->
first
;
return
-
1
;
}
LOG
(
DEBUG
)
<<
"Succ thrd clear version engine: "
<<
iter
->
first
;
LOG
(
INFO
)
<<
"Succ thrd clear version engine: "
<<
iter
->
first
;
}
return
0
;
}
...
...
@@ -1114,7 +1114,7 @@ public:
return
-
1
;
}
version
=
infer_engine
->
version
();
LOG
(
DEBUG
)
<<
"Succ get version: "
<<
version
<<
" for model: "
LOG
(
INFO
)
<<
"Succ get version: "
<<
version
<<
" for model: "
<<
model
;
return
0
;
}
...
...
predictor/framework/manager.h
浏览文件 @
e608c4a5
...
...
@@ -72,7 +72,7 @@ public:
return
-
1
;
}
LOG
(
TRACE
)
LOG
(
INFO
)
<<
"Succ init item:"
<<
name
<<
" from conf:"
<<
path
<<
"/"
<<
file
<<
", at:"
<<
ii
<<
"!"
;
}
...
...
@@ -127,7 +127,7 @@ public:
}
}
LOG
(
TRACE
)
<<
"Finish reload "
LOG
(
INFO
)
<<
"Finish reload "
<<
_item_map
.
size
()
<<
" "
<<
T
::
tag
()
<<
"(s)"
;
return
ret
;
...
...
predictor/framework/op_repository.h
浏览文件 @
e608c4a5
...
...
@@ -22,11 +22,11 @@ template<typename OP_TYPE>
class
OpFactory
:
public
Factory
{
public:
Op
*
get_op
()
{
return
b
ase
::
get_object
<
OP_TYPE
>
();
return
b
util
::
get_object
<
OP_TYPE
>
();
}
void
return_op
(
Op
*
op
)
{
b
ase
::
return_object
<
OP_TYPE
>
(
dynamic_cast
<
OP_TYPE
*>
(
op
));
b
util
::
return_object
<
OP_TYPE
>
(
dynamic_cast
<
OP_TYPE
*>
(
op
));
}
static
OpFactory
<
OP_TYPE
>&
instance
()
{
...
...
@@ -50,7 +50,7 @@ public:
template
<
typename
OP_TYPE
>
void
regist_op
(
std
::
string
op_type
)
{
_repository
[
op_type
]
=
&
OpFactory
<
OP_TYPE
>::
instance
();
LOG
(
TRACE
)
<<
"Succ regist op: "
<<
op_type
<<
"!"
;
LOG
(
INFO
)
<<
"Succ regist op: "
<<
op_type
<<
"!"
;
}
Op
*
get_op
(
std
::
string
op_type
);
...
...
predictor/framework/predictor_metric.h
浏览文件 @
e608c4a5
...
...
@@ -2,9 +2,9 @@
#define BAIDU_PADDLE_SERVING_PREDICTOR_FRAMEWORK_PREDICTOR_METRIC_H
#include <bvar/bvar.h> // bvar
#include <b
ase
/scoped_lock.h> // BAIDU_SCOPED_LOCK
#include <b
ase
/containers/flat_map.h> // FlatMap
#include <b
ase
/memory/singleton.h> // DefaultSingletonTraits
#include <b
util
/scoped_lock.h> // BAIDU_SCOPED_LOCK
#include <b
util
/containers/flat_map.h> // FlatMap
#include <b
util
/memory/singleton.h> // DefaultSingletonTraits
namespace
baidu
{
namespace
paddle_serving
{
...
...
@@ -30,6 +30,7 @@ public:
inline
AdderWindowMetric
&
operator
<<
(
int
count
)
{
sum
<<
count
;
return
*
this
;
}
public:
...
...
@@ -76,6 +77,7 @@ public:
inline
AvgWindowMetric
&
operator
<<
(
int64_t
value
)
{
avg
<<
value
;
return
*
this
;
}
public:
...
...
@@ -93,6 +95,7 @@ public:
inline
AvgDoubleWindowMetric
&
operator
<<
(
int64_t
value
)
{
recorder
<<
value
;
return
*
this
;
}
public:
...
...
@@ -110,31 +113,31 @@ public:
static
PredictorMetric
*
GetInstance
();
~
PredictorMetric
()
{
for
(
::
b
ase
::
FlatMap
<
std
::
string
,
bvar
::
LatencyRecorder
*>::
iterator
iter
for
(
::
b
util
::
FlatMap
<
std
::
string
,
bvar
::
LatencyRecorder
*>::
iterator
iter
=
latency_recorder_map
.
begin
();
iter
!=
latency_recorder_map
.
end
();
++
iter
)
{
delete
iter
->
second
;
}
for
(
::
b
ase
::
FlatMap
<
std
::
string
,
AdderWindowMetric
*>::
iterator
iter
for
(
::
b
util
::
FlatMap
<
std
::
string
,
AdderWindowMetric
*>::
iterator
iter
=
adder_window_map
.
begin
();
iter
!=
adder_window_map
.
end
();
++
iter
)
{
delete
iter
->
second
;
}
for
(
::
b
ase
::
FlatMap
<
std
::
string
,
AvgWindowMetric
*>::
iterator
iter
for
(
::
b
util
::
FlatMap
<
std
::
string
,
AvgWindowMetric
*>::
iterator
iter
=
avg_window_map
.
begin
();
iter
!=
avg_window_map
.
end
();
++
iter
)
{
delete
iter
->
second
;
}
for
(
::
b
ase
::
FlatMap
<
std
::
string
,
AvgDoubleWindowMetric
*>::
iterator
iter
for
(
::
b
util
::
FlatMap
<
std
::
string
,
AvgDoubleWindowMetric
*>::
iterator
iter
=
avg_double_window_map
.
begin
();
iter
!=
avg_double_window_map
.
end
();
++
iter
)
{
delete
iter
->
second
;
}
for
(
::
b
ase
::
FlatMap
<
std
::
string
,
RateBaseMetric
*>::
iterator
iter
for
(
::
b
util
::
FlatMap
<
std
::
string
,
RateBaseMetric
*>::
iterator
iter
=
rate_map
.
begin
();
iter
!=
rate_map
.
end
();
++
iter
)
{
...
...
@@ -268,14 +271,14 @@ private:
private:
const
size_t
bucket_count
;
::
b
ase
::
FlatMap
<
std
::
string
,
bvar
::
LatencyRecorder
*>
latency_recorder_map
;
::
b
ase
::
FlatMap
<
std
::
string
,
AdderWindowMetric
*>
adder_window_map
;
::
b
ase
::
FlatMap
<
std
::
string
,
AvgWindowMetric
*>
avg_window_map
;
::
b
ase
::
FlatMap
<
std
::
string
,
AvgDoubleWindowMetric
*>
avg_double_window_map
;
::
b
ase
::
FlatMap
<
std
::
string
,
RateBaseMetric
*>
rate_map
;
::
b
util
::
FlatMap
<
std
::
string
,
bvar
::
LatencyRecorder
*>
latency_recorder_map
;
::
b
util
::
FlatMap
<
std
::
string
,
AdderWindowMetric
*>
adder_window_map
;
::
b
util
::
FlatMap
<
std
::
string
,
AvgWindowMetric
*>
avg_window_map
;
::
b
util
::
FlatMap
<
std
::
string
,
AvgDoubleWindowMetric
*>
avg_double_window_map
;
::
b
util
::
FlatMap
<
std
::
string
,
RateBaseMetric
*>
rate_map
;
friend
struct
DefaultSingletonTraits
<
PredictorMetric
>
;
mutable
b
ase
::
Mutex
_mutex
;
mutable
b
util
::
Mutex
_mutex
;
DISALLOW_COPY_AND_ASSIGN
(
PredictorMetric
);
};
...
...
predictor/framework/resource.h
浏览文件 @
e608c4a5
...
...
@@ -3,7 +3,6 @@
#include "common/inner_common.h"
#include "framework/memory.h"
#include "framework/mc_cache.h" // McCache
namespace
baidu
{
namespace
paddle_serving
{
...
...
@@ -23,9 +22,7 @@ struct DynamicResource {
class
Resource
{
public:
Resource
()
:
_mc_cache
(
NULL
)
{
}
Resource
()
:
{}
~
Resource
()
{
finalize
();
}
...
...
@@ -48,10 +45,6 @@ public:
return
(
DynamicResource
*
)
THREAD_GETSPECIFIC
(
_tls_bspec_key
);
}
McCache
*
get_mc_cache
()
{
return
_mc_cache
;
}
private:
int
thread_finalize
()
{
return
0
;
...
...
@@ -59,7 +52,6 @@ private:
THREAD_KEY_T
_tls_bspec_key
;
McCache
*
_mc_cache
;
};
}
// predictor
...
...
predictor/framework/server.cpp
浏览文件 @
e608c4a5
...
...
@@ -15,7 +15,7 @@ namespace predictor {
volatile
bool
ServerManager
::
_s_reload_starting
=
true
;
bool
ServerManager
::
_compare_string_piece_without_case
(
const
b
ase
::
StringPiece
&
s1
,
const
char
*
s2
)
{
const
b
util
::
StringPiece
&
s1
,
const
char
*
s2
)
{
if
(
strlen
(
s2
)
!=
s1
.
size
())
{
return
false
;
}
...
...
@@ -91,7 +91,7 @@ int ServerManager::start_and_wait() {
}
void
ServerManager
::
_set_server_option_by_protocol
(
const
::
b
ase
::
StringPiece
&
protocol_type
)
{
const
::
b
util
::
StringPiece
&
protocol_type
)
{
std
::
string
enabled_protocols
=
FLAGS_enable_protocol_list
;
if
(
_compare_string_piece_without_case
(
protocol_type
,
"itp"
))
{
_options
.
nshead_service
=
new
::
baidu
::
rpc
::
policy
::
ItpAdaptor
;
...
...
@@ -129,10 +129,10 @@ int ServerManager::_wait_reloader() {
}
void
*
ServerManager
::
_reload_worker
(
void
*
args
)
{
LOG
(
TRACE
)
<<
"Entrence reload worker, "
LOG
(
INFO
)
<<
"Entrence reload worker, "
<<
"interval_s: "
<<
FLAGS_reload_interval_s
;
while
(
ServerManager
::
reload_starting
())
{
LOG
(
TRACE
)
<<
"Begin reload framework..."
;
LOG
(
INFO
)
<<
"Begin reload framework..."
;
if
(
Resource
::
instance
().
reload
()
!=
0
)
{
LOG
(
FATAL
)
<<
"Failed reload resource!"
;
}
...
...
@@ -144,7 +144,7 @@ void* ServerManager::_reload_worker(void* args) {
usleep
(
FLAGS_reload_interval_s
*
1000000
);
}
LOG
(
TRACE
)
<<
"Exit reload worker!"
;
LOG
(
INFO
)
<<
"Exit reload worker!"
;
return
NULL
;
}
...
...
predictor/framework/server.h
浏览文件 @
e608c4a5
...
...
@@ -37,9 +37,9 @@ private:
static
void
*
_reload_worker
(
void
*
args
);
bool
_compare_string_piece_without_case
(
const
b
ase
::
StringPiece
&
s1
,
const
char
*
s2
);
const
b
util
::
StringPiece
&
s1
,
const
char
*
s2
);
void
_set_server_option_by_protocol
(
const
::
b
ase
::
StringPiece
&
protocol_type
);
void
_set_server_option_by_protocol
(
const
::
b
util
::
StringPiece
&
protocol_type
);
baidu
::
rpc
::
ServerOptions
_options
;
baidu
::
rpc
::
Server
_server
;
...
...
predictor/framework/service.cpp
浏览文件 @
e608c4a5
...
...
@@ -2,7 +2,7 @@
#include "framework/channel.h"
#include "common/constant.h"
#include "framework/service.h"
#include <b
ase/time.h> // base
::Timer
#include <b
util/time.h> // butil
::Timer
#include "framework/server.h"
#include "framework/dag_view.h"
#include "framework/manager.h"
...
...
@@ -116,7 +116,7 @@ int InferService::init(const comcfg::ConfigUnit& conf) {
}
}
LOG
(
TRACE
)
LOG
(
INFO
)
<<
"Succ load infer_service: "
<<
_infer_service_format
<<
"!"
;
...
...
@@ -135,7 +135,7 @@ const std::string& InferService::name() const {
int
InferService
::
inference
(
const
google
::
protobuf
::
Message
*
request
,
google
::
protobuf
::
Message
*
response
,
b
ase
::
IOBufBuilder
*
debug_os
)
{
b
util
::
IOBufBuilder
*
debug_os
)
{
TRACEPRINTF
(
"start to inference"
);
// when funtion call begins, framework will reset
...
...
@@ -188,7 +188,7 @@ int InferService::inference(
int
InferService
::
debug
(
const
google
::
protobuf
::
Message
*
request
,
google
::
protobuf
::
Message
*
response
,
b
ase
::
IOBufBuilder
*
debug_os
)
{
b
util
::
IOBufBuilder
*
debug_os
)
{
return
inference
(
request
,
response
,
debug_os
);
}
...
...
@@ -196,7 +196,7 @@ int InferService::execute_one_workflow(
uint32_t
index
,
const
google
::
protobuf
::
Message
*
request
,
google
::
protobuf
::
Message
*
response
,
b
ase
::
IOBufBuilder
*
debug_os
)
{
b
util
::
IOBufBuilder
*
debug_os
)
{
if
(
index
>=
_flows
.
size
())
{
LOG
(
FATAL
)
<<
"Faield execute workflow, index: "
<<
index
<<
" >= max:"
<<
_flows
.
size
();
...
...
@@ -210,8 +210,8 @@ int InferService::_execute_workflow(
Workflow
*
workflow
,
const
google
::
protobuf
::
Message
*
request
,
google
::
protobuf
::
Message
*
response
,
b
ase
::
IOBufBuilder
*
debug_os
)
{
b
ase
::
Timer
workflow_time
(
base
::
Timer
::
STARTED
);
b
util
::
IOBufBuilder
*
debug_os
)
{
b
util
::
Timer
workflow_time
(
butil
::
Timer
::
STARTED
);
// create and submit beginer channel
BuiltinChannel
req_channel
;
req_channel
.
init
(
0
,
START_OP_NAME
);
...
...
predictor/framework/service.h
浏览文件 @
e608c4a5
...
...
@@ -42,24 +42,24 @@ public:
virtual
int
inference
(
const
google
::
protobuf
::
Message
*
request
,
google
::
protobuf
::
Message
*
response
,
b
ase
::
IOBufBuilder
*
debug_os
=
NULL
);
b
util
::
IOBufBuilder
*
debug_os
=
NULL
);
int
debug
(
const
google
::
protobuf
::
Message
*
request
,
google
::
protobuf
::
Message
*
response
,
b
ase
::
IOBufBuilder
*
debug_os
);
b
util
::
IOBufBuilder
*
debug_os
);
int
execute_one_workflow
(
uint32_t
index
,
const
google
::
protobuf
::
Message
*
request
,
google
::
protobuf
::
Message
*
response
,
b
ase
::
IOBufBuilder
*
debug_os
);
b
util
::
IOBufBuilder
*
debug_os
);
private:
int
_execute_workflow
(
Workflow
*
workflow
,
const
google
::
protobuf
::
Message
*
request
,
google
::
protobuf
::
Message
*
response
,
b
ase
::
IOBufBuilder
*
debug_os
);
b
util
::
IOBufBuilder
*
debug_os
);
std
::
vector
<
Workflow
*>*
_map_request_to_workflow
(
const
google
::
protobuf
::
Message
*
request
);
...
...
@@ -69,7 +69,7 @@ private:
uint64_t
_last_change_timestamp
;
bool
_enable_map_request_to_workflow
;
std
::
string
_request_field_key
;
::
b
ase
::
FlatMap
<
std
::
string
,
std
::
vector
<
Workflow
*>
>
_request_to_workflow_map
;
::
b
util
::
FlatMap
<
std
::
string
,
std
::
vector
<
Workflow
*>
>
_request_to_workflow_map
;
IMerger
*
_merger
;
};
...
...
@@ -79,7 +79,7 @@ public:
int
inference
(
const
google
::
protobuf
::
Message
*
request
,
google
::
protobuf
::
Message
*
response
,
b
ase
::
IOBufBuilder
*
debug_os
)
{
b
util
::
IOBufBuilder
*
debug_os
)
{
return
0
;
}
};
...
...
predictor/framework/workflow.cpp
浏览文件 @
e608c4a5
...
...
@@ -29,9 +29,9 @@ int Workflow::init(const comcfg::ConfigUnit& conf) {
DagView
*
Workflow
::
fetch_dag_view
(
const
std
::
string
&
service_name
)
{
DagView
*
view
=
NULL
;
if
(
_type
==
"Sequence"
)
{
view
=
b
ase
::
get_object
<
DagView
>
();
view
=
b
util
::
get_object
<
DagView
>
();
}
else
if
(
_type
==
"Parallel"
)
{
view
=
b
ase
::
get_object
<
ParallelDagView
>
();
view
=
b
util
::
get_object
<
ParallelDagView
>
();
}
else
{
LOG
(
FATAL
)
<<
"Unknown dag type:"
<<
_type
<<
"!"
;
...
...
@@ -48,9 +48,9 @@ DagView* Workflow::fetch_dag_view(const std::string& service_name) {
void
Workflow
::
return_dag_view
(
DagView
*
view
)
{
view
->
deinit
();
if
(
_type
==
"Sequence"
)
{
b
ase
::
return_object
<
DagView
>
(
view
);
b
util
::
return_object
<
DagView
>
(
view
);
}
else
if
(
_type
==
"Parallel"
)
{
b
ase
::
return_object
<
ParallelDagView
>
(
b
util
::
return_object
<
ParallelDagView
>
(
dynamic_cast
<
ParallelDagView
*>
(
view
));
}
else
{
LOG
(
FATAL
)
...
...
predictor/op/CMakeLists.txt
浏览文件 @
e608c4a5
add_library
(
op STATIC
${
CMAKE_CURRENT_LIST_DIR
}
/common_echo_op.cpp
${
CMAKE_CURRENT_LIST_DIR
}
/dense_echo_op.cpp
${
CMAKE_CURRENT_LIST_DIR
}
/op.cpp
${
CMAKE_CURRENT_LIST_DIR
}
/sparse_echo_op.cpp
${
CMAKE_CURRENT_LIST_DIR
}
/write_json_op.cpp
)
add_dependencies
(
op proto brpc configure
)
FILE
(
GLOB op_srcs
${
CMAKE_CURRENT_LIST_DIR
}
/*.cpp
)
LIST
(
APPEND predictor_srcs
${
op_srcs
}
)
predictor/op/dense_echo_op.cpp
浏览文件 @
e608c4a5
...
...
@@ -12,7 +12,7 @@ int DenseEchoOp::inference() {
const
Request
*
req
=
dynamic_cast
<
const
Request
*>
(
get_request_message
());
Response
*
res
=
mutable_data
<
Response
>
();
LOG
(
DEBUG
)
<<
"Receive request in dense service:"
LOG
(
INFO
)
<<
"Receive request in dense service:"
<<
req
->
ShortDebugString
();
uint32_t
sample_size
=
req
->
instances_size
();
for
(
uint32_t
si
=
0
;
si
<
sample_size
;
si
++
)
{
...
...
predictor/op/op.cpp
浏览文件 @
e608c4a5
#include "op/op.h"
#include <b
ase/time.h> // base
::Timer
#include <b
util/time.h> // butil
::Timer
#include "common/utils.h"
#include "common/constant.h"
#include "framework/channel.h"
...
...
@@ -18,7 +18,7 @@ int Op::init(Bus* bus, Dag* dag, uint32_t id, const std::string& name,
_type
=
type
;
set_config
(
conf
);
_timer
=
b
ase
::
get_object
<
TimerFlow
>
();
_timer
=
b
util
::
get_object
<
TimerFlow
>
();
if
(
!
_timer
)
{
LOG
(
FATAL
)
<<
"Invalid timerflow in op:"
<<
this
->
name
();
...
...
@@ -42,7 +42,7 @@ int Op::init(Bus* bus, Dag* dag, uint32_t id, const std::string& name,
int
Op
::
deinit
()
{
if
(
_timer
)
{
b
ase
::
return_object
(
_timer
);
b
util
::
return_object
(
_timer
);
}
_bus
=
NULL
;
...
...
@@ -73,7 +73,7 @@ int Op::check_time(const char* tag) {
}
int
Op
::
process
(
bool
debug
)
{
b
ase
::
Timer
op_time
(
base
::
Timer
::
STARTED
);
b
util
::
Timer
op_time
(
butil
::
Timer
::
STARTED
);
if
(
debug
&&
_timer
)
{
_timer
->
start
();
}
...
...
@@ -84,7 +84,7 @@ int Op::process(bool debug) {
}
if
(
_has_calc
)
{
LOG
(
DEBUG
)
LOG
(
INFO
)
<<
"Op: "
<<
_name
<<
" already processed before"
;
return
ERR_OK
;
}
...
...
@@ -133,7 +133,7 @@ int Op::process(bool debug) {
op_time
.
stop
();
PredictorMetric
::
GetInstance
()
->
update_latency_metric
(
OP_METRIC_PREFIX
+
full_name
(),
op_time
.
u_elapsed
());
LOG
(
NOTICE
)
<<
" "
<<
name
()
<<
"_time=["
<<
op_time
.
u_elapsed
()
<<
"]"
<<
noflush
;
LOG
(
INFO
)
<<
" "
<<
name
()
<<
"_time=["
<<
op_time
.
u_elapsed
()
<<
"]"
;
return
ERR_OK
;
}
...
...
predictor/op/op.h
浏览文件 @
e608c4a5
...
...
@@ -195,7 +195,7 @@ public:
return
_channel
;
}
_channel
=
b
ase
::
get_object
<
ChannelType
>
();
_channel
=
b
util
::
get_object
<
ChannelType
>
();
if
(
!
_channel
)
{
LOG
(
FATAL
)
<<
"Failed mutable channel of type:"
...
...
@@ -213,7 +213,7 @@ public:
int
release_channel
()
{
if
(
_channel
)
{
_channel
->
deinit
();
b
ase
::
return_object
<
ChannelType
>
(
_channel
);
b
util
::
return_object
<
ChannelType
>
(
_channel
);
}
_channel
=
NULL
;
...
...
predictor/op/sparse_echo_op.cpp
浏览文件 @
e608c4a5
...
...
@@ -18,7 +18,7 @@ int SparseEchoOp::inference() {
// get/mutable_depend_argment()
// ...
LOG
(
DEBUG
)
LOG
(
INFO
)
<<
"Receive request in sparse service:"
<<
req
->
ShortDebugString
();
uint32_t
sample_size
=
req
->
instances_size
();
...
...
predictor/op/write_json_op.cpp
浏览文件 @
e608c4a5
#include "pb_to_json.h"
#include "
json2pb/
pb_to_json.h"
#include <google/protobuf/text_format.h>
#include "op/write_json_op.h"
...
...
@@ -38,7 +38,7 @@ int WriteJsonOp::inference() {
return
-
1
;
}
std
::
string
*
text
=
ins
->
mutable_response_json
();
if
(
!
ProtoMessageToJson
(
classify_out
->
predictions
(
si
),
if
(
!
json2pb
::
ProtoMessageToJson
(
classify_out
->
predictions
(
si
),
text
,
&
err_string
))
{
LOG
(
ERROR
)
<<
"Failed convert message["
<<
classify_out
->
predictions
(
si
).
ShortDebugString
()
...
...
@@ -47,7 +47,7 @@ int WriteJsonOp::inference() {
}
}
LOG
(
TRACE
)
<<
"Succ write json:"
LOG
(
INFO
)
<<
"Succ write json:"
<<
classify_out
->
ShortDebugString
();
return
0
;
...
...
predictor/proto/CMakeLists.txt
浏览文件 @
e608c4a5
FILE
(
GLOB protofiles
"
${
CMAKE_CURRENT_LIST_DIR
}
/*.proto"
)
protobuf_generate_cpp
(
PROTO_SRCS PROTO_HDRS
${
protofiles
}
)
add_library
(
proto STATIC
${
PROTO_SRCS
}
${
PROTO_HDRS
}
)
include_directories
(
${
CMAKE_CURRENT_BINARY_DIR
}
)
LIST
(
APPEND predictor_srcs
${
PROTO_SRCS
}
)
predictor/src/pdcodegen.cpp
浏览文件 @
e608c4a5
...
...
@@ -521,7 +521,7 @@ private:
printer
->
Print
(
"google::protobuf::Message* cur_res = _stub_handler->fetch_response();
\n
"
"if (cur_res == NULL) {
\n
"
" LOG(
TRACE
) <<
\"
Failed fetch response from stub handler, new it
\"
;
\n
"
" LOG(
INFO
) <<
\"
Failed fetch response from stub handler, new it
\"
;
\n
"
" cur_res = response->New();
\n
"
" if (cur_res == NULL) {
\n
"
" LOG(FATAL) <<
\"
Failed new response item!
\"
;
\n
"
...
...
@@ -530,7 +530,7 @@ private:
" }
\n
"
" return baidu::rpc::SubCall(method, request, cur_res, baidu::rpc::DELETE_RESPONSE);
\n
"
"}
\n
"
);
"LOG(
DEBUG
)
\n
"
"LOG(
INFO
)
\n
"
" <<
\"
[default] Succ map, channel_index:
\"
<< channel_index;
\n
"
;
printer
->
Print
(
"return baidu::rpc::SubCall(method, request, cur_res, 0);
\n
"
...
...
@@ -675,7 +675,7 @@ private:
}
printer
->
Print
(
"LOG(
DEBUG
)
\n
"
"LOG(
INFO
)
\n
"
" <<
\"
[pack] Succ map req at:
\"\n
"
" << channel_index;
\n
"
);
printer
->
Print
(
...
...
predictor/src/pdserving.cpp
浏览文件 @
e608c4a5
...
...
@@ -94,7 +94,7 @@ int main(int argc, char** argv) {
return
-
1
;
}
LOG
(
TRACE
)
<<
"Succ initialize logger"
;
LOG
(
INFO
)
<<
"Succ initialize logger"
;
// initialize resource manager
if
(
Resource
::
instance
().
initialize
(
...
...
@@ -103,7 +103,7 @@ int main(int argc, char** argv) {
<<
FLAGS_resource_path
<<
"/"
<<
FLAGS_resource_file
;
return
-
1
;
}
LOG
(
TRACE
)
<<
"Succ initialize resource"
;
LOG
(
INFO
)
<<
"Succ initialize resource"
;
// initialize workflow manager
if
(
WorkflowManager
::
instance
().
initialize
(
...
...
@@ -112,7 +112,7 @@ int main(int argc, char** argv) {
<<
FLAGS_workflow_path
<<
"/"
<<
FLAGS_workflow_file
;
return
-
1
;
}
LOG
(
TRACE
)
<<
"Succ initialize workflow"
;
LOG
(
INFO
)
<<
"Succ initialize workflow"
;
// initialize service manager
if
(
InferServiceManager
::
instance
().
initialize
(
...
...
@@ -122,7 +122,7 @@ int main(int argc, char** argv) {
<<
FLAGS_inferservice_path
<<
"/"
<<
FLAGS_inferservice_file
;
return
-
1
;
}
LOG
(
TRACE
)
<<
"Succ initialize inferservice"
;
LOG
(
INFO
)
<<
"Succ initialize inferservice"
;
int
errcode
=
bthread_set_worker_startfn
(
pthread_worker_start_fn
);
if
(
errcode
!=
0
)
{
...
...
@@ -135,7 +135,7 @@ int main(int argc, char** argv) {
LOG
(
ERROR
)
<<
"Failed start server and wait!"
;
return
-
1
;
}
LOG
(
TRACE
)
<<
"Succ start service manager"
;
LOG
(
INFO
)
<<
"Succ start service manager"
;
if
(
InferServiceManager
::
instance
().
finalize
()
!=
0
)
{
LOG
(
ERROR
)
<<
"Failed finalize infer service manager."
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录