Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
PaddlePaddle
Serving
提交
0c5bb75b
S
Serving
项目概览
PaddlePaddle
/
Serving
接近 2 年 前同步成功
通知
186
Star
833
Fork
253
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
105
列表
看板
标记
里程碑
合并请求
10
Wiki
2
Wiki
分析
仓库
DevOps
项目成员
Pages
S
Serving
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
105
Issue
105
列表
看板
标记
里程碑
合并请求
10
合并请求
10
Pages
分析
分析
仓库分析
DevOps
Wiki
2
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
0c5bb75b
编写于
6月 22, 2021
作者:
Y
Ybjjwwang
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
add cube op for v0.6.2
上级
9183a5ec
变更
9
隐藏空白更改
内联
并排
Showing
9 changed file
with
640 addition
and
1 deletion
+640
-1
core/general-server/op/general_dist_kv_infer_op.cpp
core/general-server/op/general_dist_kv_infer_op.cpp
+138
-1
python/examples/criteo_ctr_with_cube/README.md
python/examples/criteo_ctr_with_cube/README.md
+72
-0
python/examples/criteo_ctr_with_cube/README_CN.md
python/examples/criteo_ctr_with_cube/README_CN.md
+70
-0
python/examples/criteo_ctr_with_cube/criteo_reader.py
python/examples/criteo_ctr_with_cube/criteo_reader.py
+83
-0
python/examples/criteo_ctr_with_cube/get_data.sh
python/examples/criteo_ctr_with_cube/get_data.sh
+2
-0
python/examples/criteo_ctr_with_cube/local_train.py
python/examples/criteo_ctr_with_cube/local_train.py
+101
-0
python/examples/criteo_ctr_with_cube/network_conf.py
python/examples/criteo_ctr_with_cube/network_conf.py
+77
-0
python/examples/criteo_ctr_with_cube/test_client.py
python/examples/criteo_ctr_with_cube/test_client.py
+56
-0
python/examples/criteo_ctr_with_cube/test_server.py
python/examples/criteo_ctr_with_cube/test_server.py
+41
-0
未找到文件。
core/general-server/op/general_dist_kv_infer_op.cpp
浏览文件 @
0c5bb75b
...
...
@@ -38,7 +38,144 @@ using baidu::paddle_serving::predictor::general_model::FetchInst;
using
baidu
::
paddle_serving
::
predictor
::
InferManager
;
using
baidu
::
paddle_serving
::
predictor
::
PaddleGeneralModelConfig
;
int
GeneralDistKVInferOp
::
inference
()
{
return
0
;
}
int
GeneralDistKVInferOp
::
inference
()
{
VLOG
(
2
)
<<
"Going to run inference"
;
const
std
::
vector
<
std
::
string
>
pre_node_names
=
pre_names
();
if
(
pre_node_names
.
size
()
!=
1
)
{
LOG
(
ERROR
)
<<
"This op("
<<
op_name
()
<<
") can only have one predecessor op, but received "
<<
pre_node_names
.
size
();
return
-
1
;
}
const
std
::
string
pre_name
=
pre_node_names
[
0
];
const
GeneralBlob
*
input_blob
=
get_depend_argument
<
GeneralBlob
>
(
pre_name
);
if
(
!
input_blob
)
{
LOG
(
ERROR
)
<<
"input_blob is nullptr,error"
;
return
-
1
;
}
uint64_t
log_id
=
input_blob
->
GetLogId
();
VLOG
(
2
)
<<
"(logid="
<<
log_id
<<
") Get precedent op name: "
<<
pre_name
;
GeneralBlob
*
output_blob
=
mutable_data
<
GeneralBlob
>
();
if
(
!
output_blob
)
{
LOG
(
ERROR
)
<<
"output_blob is nullptr,error"
;
return
-
1
;
}
output_blob
->
SetLogId
(
log_id
);
if
(
!
input_blob
)
{
LOG
(
ERROR
)
<<
"(logid="
<<
log_id
<<
") Failed mutable depended argument, op:"
<<
pre_name
;
return
-
1
;
}
const
TensorVector
*
in
=
&
input_blob
->
tensor_vector
;
TensorVector
*
out
=
&
output_blob
->
tensor_vector
;
std
::
vector
<
uint64_t
>
keys
;
std
::
vector
<
rec
::
mcube
::
CubeValue
>
values
;
int
sparse_count
=
0
;
int
dense_count
=
0
;
std
::
vector
<
std
::
pair
<
int64_t
*
,
size_t
>>
dataptr_size_pairs
;
size_t
key_len
=
0
;
for
(
size_t
i
=
0
;
i
<
in
->
size
();
++
i
)
{
if
(
in
->
at
(
i
).
dtype
!=
paddle
::
PaddleDType
::
INT64
)
{
++
dense_count
;
continue
;
}
++
sparse_count
;
size_t
elem_num
=
1
;
for
(
size_t
s
=
0
;
s
<
in
->
at
(
i
).
shape
.
size
();
++
s
)
{
elem_num
*=
in
->
at
(
i
).
shape
[
s
];
}
key_len
+=
elem_num
;
int64_t
*
data_ptr
=
static_cast
<
int64_t
*>
(
in
->
at
(
i
).
data
.
data
());
dataptr_size_pairs
.
push_back
(
std
::
make_pair
(
data_ptr
,
elem_num
));
}
keys
.
resize
(
key_len
);
VLOG
(
2
)
<<
"(logid="
<<
log_id
<<
") cube number of keys to look up: "
<<
key_len
;
int
key_idx
=
0
;
for
(
size_t
i
=
0
;
i
<
dataptr_size_pairs
.
size
();
++
i
)
{
std
::
copy
(
dataptr_size_pairs
[
i
].
first
,
dataptr_size_pairs
[
i
].
first
+
dataptr_size_pairs
[
i
].
second
,
keys
.
begin
()
+
key_idx
);
key_idx
+=
dataptr_size_pairs
[
i
].
second
;
}
rec
::
mcube
::
CubeAPI
*
cube
=
rec
::
mcube
::
CubeAPI
::
instance
();
std
::
vector
<
std
::
string
>
table_names
=
cube
->
get_table_names
();
if
(
table_names
.
size
()
==
0
)
{
LOG
(
ERROR
)
<<
"cube init error or cube config not given."
;
return
-
1
;
}
int
ret
=
cube
->
seek
(
table_names
[
0
],
keys
,
&
values
);
VLOG
(
2
)
<<
"(logid="
<<
log_id
<<
") cube seek status: "
<<
ret
;
if
(
values
.
size
()
!=
keys
.
size
()
||
values
[
0
].
buff
.
size
()
==
0
)
{
LOG
(
ERROR
)
<<
"cube value return null"
;
}
size_t
EMBEDDING_SIZE
=
values
[
0
].
buff
.
size
()
/
sizeof
(
float
);
TensorVector
sparse_out
;
sparse_out
.
resize
(
sparse_count
);
TensorVector
dense_out
;
dense_out
.
resize
(
dense_count
);
int
cube_val_idx
=
0
;
int
sparse_idx
=
0
;
int
dense_idx
=
0
;
std
::
unordered_map
<
int
,
int
>
in_out_map
;
baidu
::
paddle_serving
::
predictor
::
Resource
&
resource
=
baidu
::
paddle_serving
::
predictor
::
Resource
::
instance
();
std
::
shared_ptr
<
PaddleGeneralModelConfig
>
model_config
=
resource
.
get_general_model_config
().
front
();
for
(
size_t
i
=
0
;
i
<
in
->
size
();
++
i
)
{
if
(
in
->
at
(
i
).
dtype
!=
paddle
::
PaddleDType
::
INT64
)
{
dense_out
[
dense_idx
]
=
in
->
at
(
i
);
++
dense_idx
;
continue
;
}
sparse_out
[
sparse_idx
].
lod
.
resize
(
in
->
at
(
i
).
lod
.
size
());
for
(
size_t
x
=
0
;
x
<
sparse_out
[
sparse_idx
].
lod
.
size
();
++
x
)
{
sparse_out
[
sparse_idx
].
lod
[
x
].
resize
(
in
->
at
(
i
).
lod
[
x
].
size
());
std
::
copy
(
in
->
at
(
i
).
lod
[
x
].
begin
(),
in
->
at
(
i
).
lod
[
x
].
end
(),
sparse_out
[
sparse_idx
].
lod
[
x
].
begin
());
}
sparse_out
[
sparse_idx
].
dtype
=
paddle
::
PaddleDType
::
FLOAT32
;
sparse_out
[
sparse_idx
].
shape
.
push_back
(
sparse_out
[
sparse_idx
].
lod
[
0
].
back
());
sparse_out
[
sparse_idx
].
shape
.
push_back
(
EMBEDDING_SIZE
);
sparse_out
[
sparse_idx
].
name
=
model_config
->
_feed_name
[
i
];
sparse_out
[
sparse_idx
].
data
.
Resize
(
sparse_out
[
sparse_idx
].
lod
[
0
].
back
()
*
EMBEDDING_SIZE
*
sizeof
(
float
));
float
*
dst_ptr
=
static_cast
<
float
*>
(
sparse_out
[
sparse_idx
].
data
.
data
());
for
(
int
x
=
0
;
x
<
sparse_out
[
sparse_idx
].
lod
[
0
].
back
();
++
x
)
{
float
*
data_ptr
=
dst_ptr
+
x
*
EMBEDDING_SIZE
;
memcpy
(
data_ptr
,
values
[
cube_val_idx
].
buff
.
data
(),
values
[
cube_val_idx
].
buff
.
size
());
cube_val_idx
++
;
}
++
sparse_idx
;
}
VLOG
(
2
)
<<
"(logid="
<<
log_id
<<
") sparse tensor load success."
;
TensorVector
infer_in
;
infer_in
.
insert
(
infer_in
.
end
(),
dense_out
.
begin
(),
dense_out
.
end
());
infer_in
.
insert
(
infer_in
.
end
(),
sparse_out
.
begin
(),
sparse_out
.
end
());
int
batch_size
=
input_blob
->
_batch_size
;
output_blob
->
_batch_size
=
batch_size
;
Timer
timeline
;
int64_t
start
=
timeline
.
TimeStampUS
();
timeline
.
Start
();
if
(
InferManager
::
instance
().
infer
(
engine_name
().
c_str
(),
&
infer_in
,
out
,
batch_size
))
{
LOG
(
ERROR
)
<<
"Failed do infer in fluid model: "
<<
engine_name
();
return
-
1
;
}
int64_t
end
=
timeline
.
TimeStampUS
();
CopyBlobInfo
(
input_blob
,
output_blob
);
AddBlobInfo
(
output_blob
,
start
);
AddBlobInfo
(
output_blob
,
end
);
return
0
;
}
DEFINE_OP
(
GeneralDistKVInferOp
);
}
// namespace serving
...
...
python/examples/criteo_ctr_with_cube/README.md
0 → 100755
浏览文件 @
0c5bb75b
## Criteo CTR with Sparse Parameter Indexing Service
(
[
简体中文
](
./README_CN.md
)
|English)
### Get Sample Dataset
go to directory
`python/examples/criteo_ctr_with_cube`
```
sh get_data.sh
```
### Download Model and Sparse Parameter Sequence Files
```
wget https://paddle-serving.bj.bcebos.com/unittest/ctr_cube_unittest.tar.gz
tar xf ctr_cube_unittest.tar.gz
mv models/ctr_client_conf ./
mv models/ctr_serving_model_kv ./
mv models/data ./cube/
```
the model will be in ./ctr_server_model_kv and ./ctr_client_config.
### Start Sparse Parameter Indexing Service
```
wget https://paddle-serving.bj.bcebos.com/others/cube_app.tar.gz
tar xf cube_app.tar.gz
mv cube_app/cube* ./cube/
sh cube_prepare.sh &
```
Here, the sparse parameter is loaded by cube sparse parameter indexing service Cube.
### Start RPC Predictor, the number of serving thread is 4(configurable in test_server.py)
```
python test_server.py ctr_serving_model_kv
```
### Run Prediction
```
python test_client.py ctr_client_conf/serving_client_conf.prototxt ./raw_data
```
### Benchmark
CPU :Intel(R) Xeon(R) CPU 6148 @ 2.40GHz
Model :
[
Criteo CTR
](
https://github.com/PaddlePaddle/Serving/blob/develop/python/examples/criteo_ctr_with_cube/network_conf.py
)
server core/thread num : 4/8
Run
```
bash benchmark.sh
```
1000 batches will be sent by every client
| client thread num | prepro | client infer | op0 | op1 | op2 | postpro | avg_latency | qps |
| ------------------ | ------ | ------------ | ------ | ----- | ------ | ------- | ----- | ----- |
| 1 | 0.035 | 1.596 | 0.021 | 0.518 | 0.0024 | 0.0025 | 6.774 | 147.7 |
| 2 | 0.034 | 1.780 | 0.027 | 0.463 | 0.0020 | 0.0023 | 6.931 | 288.3 |
| 4 | 0.038 | 2.954 | 0.025 | 0.455 | 0.0019 | 0.0027 | 8.378 | 477.5 |
| 8 | 0.044 | 8.230 | 0.028 | 0.464 | 0.0023 | 0.0034 | 14.191 | 563.8 |
| 16 | 0.048 | 21.037 | 0.028 | 0.455 | 0.0025 | 0.0041 | 27.236 | 587.5 |
the average latency of threads

The QPS is

python/examples/criteo_ctr_with_cube/README_CN.md
0 → 100644
浏览文件 @
0c5bb75b
## 带稀疏参数索引服务的CTR预测服务
(简体中文|
[
English
](
./README.md
)
)
### 获取样例数据
进入目录
`python/examples/criteo_ctr_with_cube`
```
sh get_data.sh
```
### 下载模型和稀疏参数序列文件
```
wget https://paddle-serving.bj.bcebos.com/unittest/ctr_cube_unittest.tar.gz
tar xf ctr_cube_unittest.tar.gz
mv models/ctr_client_conf ./
mv models/ctr_serving_model_kv ./
mv models/data ./cube/
```
执行脚本后会在当前目录有ctr_server_model_kv和ctr_client_config文件夹。
### 启动稀疏参数索引服务
```
wget https://paddle-serving.bj.bcebos.com/others/cube_app.tar.gz
tar xf cube_app.tar.gz
mv cube_app/cube* ./cube/
sh cube_prepare.sh &
```
此处,模型当中的稀疏参数会被存放在稀疏参数索引服务Cube当中。
### 启动RPC预测服务,服务端线程数为4(可在test_server.py配置)
```
python test_server.py ctr_serving_model_kv
```
### 执行预测
```
python test_client.py ctr_client_conf/serving_client_conf.prototxt ./raw_data
```
### Benchmark
设备 :Intel(R) Xeon(R) CPU 6148 @ 2.40GHz
模型 :
[
Criteo CTR
](
https://github.com/PaddlePaddle/Serving/blob/develop/python/examples/criteo_ctr_with_cube/network_conf.py
)
server core/thread num : 4/8
执行
```
bash benchmark.sh
```
客户端每个线程会发送1000个batch
| client thread num | prepro | client infer | op0 | op1 | op2 | postpro | avg_latency | qps |
| ------------------ | ------ | ------------ | ------ | ----- | ------ | ------- | ----- | ----- |
| 1 | 0.035 | 1.596 | 0.021 | 0.518 | 0.0024 | 0.0025 | 6.774 | 147.7 |
| 2 | 0.034 | 1.780 | 0.027 | 0.463 | 0.0020 | 0.0023 | 6.931 | 288.3 |
| 4 | 0.038 | 2.954 | 0.025 | 0.455 | 0.0019 | 0.0027 | 8.378 | 477.5 |
| 8 | 0.044 | 8.230 | 0.028 | 0.464 | 0.0023 | 0.0034 | 14.191 | 563.8 |
| 16 | 0.048 | 21.037 | 0.028 | 0.455 | 0.0025 | 0.0041 | 27.236 | 587.5 |
平均每个线程耗时图如下

每个线程QPS耗时如下

python/examples/criteo_ctr_with_cube/criteo_reader.py
0 → 100755
浏览文件 @
0c5bb75b
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=doc-string-missing
import
sys
import
paddle.fluid.incubate.data_generator
as
dg
class
CriteoDataset
(
dg
.
MultiSlotDataGenerator
):
def
setup
(
self
,
sparse_feature_dim
):
self
.
cont_min_
=
[
0
,
-
3
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
]
self
.
cont_max_
=
[
20
,
600
,
100
,
50
,
64000
,
500
,
100
,
50
,
500
,
10
,
10
,
10
,
50
]
self
.
cont_diff_
=
[
20
,
603
,
100
,
50
,
64000
,
500
,
100
,
50
,
500
,
10
,
10
,
10
,
50
]
self
.
hash_dim_
=
sparse_feature_dim
# here, training data are lines with line_index < train_idx_
self
.
train_idx_
=
41256555
self
.
continuous_range_
=
range
(
1
,
14
)
self
.
categorical_range_
=
range
(
14
,
40
)
def
_process_line
(
self
,
line
):
features
=
line
.
rstrip
(
'
\n
'
).
split
(
'
\t
'
)
dense_feature
=
[]
sparse_feature
=
[]
for
idx
in
self
.
continuous_range_
:
if
features
[
idx
]
==
''
:
dense_feature
.
append
(
0.0
)
else
:
dense_feature
.
append
((
float
(
features
[
idx
])
-
self
.
cont_min_
[
idx
-
1
])
/
\
self
.
cont_diff_
[
idx
-
1
])
for
idx
in
self
.
categorical_range_
:
sparse_feature
.
append
(
[
hash
(
str
(
idx
)
+
features
[
idx
])
%
self
.
hash_dim_
])
return
dense_feature
,
sparse_feature
,
[
int
(
features
[
0
])]
def
infer_reader
(
self
,
filelist
,
batch
,
buf_size
):
def
local_iter
():
for
fname
in
filelist
:
with
open
(
fname
.
strip
(),
"r"
)
as
fin
:
for
line
in
fin
:
dense_feature
,
sparse_feature
,
label
=
self
.
_process_line
(
line
)
#yield dense_feature, sparse_feature, label
yield
[
dense_feature
]
+
sparse_feature
+
[
label
]
import
paddle
batch_iter
=
paddle
.
batch
(
paddle
.
reader
.
shuffle
(
local_iter
,
buf_size
=
buf_size
),
batch_size
=
batch
)
return
batch_iter
def
generate_sample
(
self
,
line
):
def
data_iter
():
dense_feature
,
sparse_feature
,
label
=
self
.
_process_line
(
line
)
feature_name
=
[
"dense_input"
]
for
idx
in
self
.
categorical_range_
:
feature_name
.
append
(
"C"
+
str
(
idx
-
13
))
feature_name
.
append
(
"label"
)
yield
zip
(
feature_name
,
[
dense_feature
]
+
sparse_feature
+
[
label
])
return
data_iter
if
__name__
==
"__main__"
:
criteo_dataset
=
CriteoDataset
()
criteo_dataset
.
setup
(
int
(
sys
.
argv
[
1
]))
criteo_dataset
.
run_from_stdin
()
python/examples/criteo_ctr_with_cube/get_data.sh
0 → 100755
浏览文件 @
0c5bb75b
wget
--no-check-certificate
https://paddle-serving.bj.bcebos.com/data/ctr_prediction/ctr_data.tar.gz
tar
-zxvf
ctr_data.tar.gz
python/examples/criteo_ctr_with_cube/local_train.py
0 → 100755
浏览文件 @
0c5bb75b
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=doc-string-missing
from
__future__
import
print_function
from
args
import
parse_args
import
os
import
paddle.fluid
as
fluid
import
paddle
import
sys
from
network_conf
import
dnn_model
dense_feature_dim
=
13
paddle
.
enable_static
()
def
train
():
args
=
parse_args
()
sparse_only
=
args
.
sparse_only
if
not
os
.
path
.
isdir
(
args
.
model_output_dir
):
os
.
mkdir
(
args
.
model_output_dir
)
dense_input
=
fluid
.
layers
.
data
(
name
=
"dense_input"
,
shape
=
[
dense_feature_dim
],
dtype
=
'float32'
)
sparse_input_ids
=
[
fluid
.
layers
.
data
(
name
=
"C"
+
str
(
i
),
shape
=
[
1
],
lod_level
=
1
,
dtype
=
"int64"
)
for
i
in
range
(
1
,
27
)
]
label
=
fluid
.
layers
.
data
(
name
=
'label'
,
shape
=
[
1
],
dtype
=
'int64'
)
#nn_input = None if sparse_only else dense_input
nn_input
=
dense_input
predict_y
,
loss
,
auc_var
,
batch_auc_var
,
infer_vars
=
dnn_model
(
nn_input
,
sparse_input_ids
,
label
,
args
.
embedding_size
,
args
.
sparse_feature_dim
)
optimizer
=
fluid
.
optimizer
.
SGD
(
learning_rate
=
1e-4
)
optimizer
.
minimize
(
loss
)
exe
=
fluid
.
Executor
(
fluid
.
CPUPlace
())
exe
.
run
(
fluid
.
default_startup_program
())
dataset
=
fluid
.
DatasetFactory
().
create_dataset
(
"InMemoryDataset"
)
dataset
.
set_use_var
([
dense_input
]
+
sparse_input_ids
+
[
label
])
python_executable
=
"python3.6"
pipe_command
=
"{} criteo_reader.py {}"
.
format
(
python_executable
,
args
.
sparse_feature_dim
)
dataset
.
set_pipe_command
(
pipe_command
)
dataset
.
set_batch_size
(
128
)
thread_num
=
10
dataset
.
set_thread
(
thread_num
)
whole_filelist
=
[
"raw_data/part-%d"
%
x
for
x
in
range
(
len
(
os
.
listdir
(
"raw_data"
)))
]
print
(
whole_filelist
)
dataset
.
set_filelist
(
whole_filelist
[:
100
])
dataset
.
load_into_memory
()
fluid
.
layers
.
Print
(
auc_var
)
epochs
=
1
for
i
in
range
(
epochs
):
exe
.
train_from_dataset
(
program
=
fluid
.
default_main_program
(),
dataset
=
dataset
,
debug
=
True
)
print
(
"epoch {} finished"
.
format
(
i
))
import
paddle_serving_client.io
as
server_io
feed_var_dict
=
{}
feed_var_dict
[
'dense_input'
]
=
dense_input
for
i
,
sparse
in
enumerate
(
sparse_input_ids
):
feed_var_dict
[
"embedding_{}.tmp_0"
.
format
(
i
)]
=
sparse
fetch_var_dict
=
{
"prob"
:
predict_y
}
feed_kv_dict
=
{}
feed_kv_dict
[
'dense_input'
]
=
dense_input
for
i
,
emb
in
enumerate
(
infer_vars
):
feed_kv_dict
[
"embedding_{}.tmp_0"
.
format
(
i
)]
=
emb
fetch_var_dict
=
{
"prob"
:
predict_y
}
server_io
.
save_model
(
"ctr_serving_model"
,
"ctr_client_conf"
,
feed_var_dict
,
fetch_var_dict
,
fluid
.
default_main_program
())
server_io
.
save_model
(
"ctr_serving_model_kv"
,
"ctr_client_conf_kv"
,
feed_kv_dict
,
fetch_var_dict
,
fluid
.
default_main_program
())
if
__name__
==
'__main__'
:
train
()
python/examples/criteo_ctr_with_cube/network_conf.py
0 → 100755
浏览文件 @
0c5bb75b
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=doc-string-missing
import
paddle.fluid
as
fluid
import
math
def
dnn_model
(
dense_input
,
sparse_inputs
,
label
,
embedding_size
,
sparse_feature_dim
):
def
embedding_layer
(
input
):
emb
=
fluid
.
layers
.
embedding
(
input
=
input
,
is_sparse
=
True
,
is_distributed
=
False
,
size
=
[
sparse_feature_dim
,
embedding_size
],
param_attr
=
fluid
.
ParamAttr
(
name
=
"SparseFeatFactors"
,
initializer
=
fluid
.
initializer
.
Uniform
()))
x
=
fluid
.
layers
.
sequence_pool
(
input
=
emb
,
pool_type
=
'sum'
)
return
emb
,
x
def
mlp_input_tensor
(
emb_sums
,
dense_tensor
):
#if isinstance(dense_tensor, fluid.Variable):
# return fluid.layers.concat(emb_sums, axis=1)
#else:
return
fluid
.
layers
.
concat
(
emb_sums
+
[
dense_tensor
],
axis
=
1
)
def
mlp
(
mlp_input
):
fc1
=
fluid
.
layers
.
fc
(
input
=
mlp_input
,
size
=
400
,
act
=
'relu'
,
param_attr
=
fluid
.
ParamAttr
(
initializer
=
fluid
.
initializer
.
Normal
(
scale
=
1
/
math
.
sqrt
(
mlp_input
.
shape
[
1
]))))
fc2
=
fluid
.
layers
.
fc
(
input
=
fc1
,
size
=
400
,
act
=
'relu'
,
param_attr
=
fluid
.
ParamAttr
(
initializer
=
fluid
.
initializer
.
Normal
(
scale
=
1
/
math
.
sqrt
(
fc1
.
shape
[
1
]))))
fc3
=
fluid
.
layers
.
fc
(
input
=
fc2
,
size
=
400
,
act
=
'relu'
,
param_attr
=
fluid
.
ParamAttr
(
initializer
=
fluid
.
initializer
.
Normal
(
scale
=
1
/
math
.
sqrt
(
fc2
.
shape
[
1
]))))
pre
=
fluid
.
layers
.
fc
(
input
=
fc3
,
size
=
2
,
act
=
'softmax'
,
param_attr
=
fluid
.
ParamAttr
(
initializer
=
fluid
.
initializer
.
Normal
(
scale
=
1
/
math
.
sqrt
(
fc3
.
shape
[
1
]))))
return
pre
emb_pair_sums
=
list
(
map
(
embedding_layer
,
sparse_inputs
))
emb_sums
=
[
x
[
1
]
for
x
in
emb_pair_sums
]
infer_vars
=
[
x
[
0
]
for
x
in
emb_pair_sums
]
mlp_in
=
mlp_input_tensor
(
emb_sums
,
dense_input
)
predict
=
mlp
(
mlp_in
)
cost
=
fluid
.
layers
.
cross_entropy
(
input
=
predict
,
label
=
label
)
avg_cost
=
fluid
.
layers
.
reduce_sum
(
cost
)
accuracy
=
fluid
.
layers
.
accuracy
(
input
=
predict
,
label
=
label
)
auc_var
,
batch_auc_var
,
auc_states
=
\
fluid
.
layers
.
auc
(
input
=
predict
,
label
=
label
,
num_thresholds
=
2
**
12
,
slide_steps
=
20
)
return
predict
,
avg_cost
,
auc_var
,
batch_auc_var
,
infer_vars
python/examples/criteo_ctr_with_cube/test_client.py
0 → 100755
浏览文件 @
0c5bb75b
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=doc-string-missing
from
paddle_serving_client
import
Client
import
sys
import
os
import
criteo
as
criteo
import
time
from
paddle_serving_client.metric
import
auc
import
numpy
as
np
py_version
=
sys
.
version_info
[
0
]
client
=
Client
()
client
.
load_client_config
(
sys
.
argv
[
1
])
client
.
connect
([
"127.0.0.1:9292"
])
batch
=
1
buf_size
=
100
dataset
=
criteo
.
CriteoDataset
()
dataset
.
setup
(
1000001
)
test_filelists
=
[
"{}/part-0"
.
format
(
sys
.
argv
[
2
])]
reader
=
dataset
.
infer_reader
(
test_filelists
,
batch
,
buf_size
)
label_list
=
[]
prob_list
=
[]
start
=
time
.
time
()
for
ei
in
range
(
10000
):
if
py_version
==
2
:
data
=
reader
().
next
()
else
:
data
=
reader
().
__next__
()
feed_dict
=
{}
feed_dict
[
'dense_input'
]
=
data
[
0
][
0
]
for
i
in
range
(
1
,
27
):
feed_dict
[
"embedding_{}.tmp_0"
.
format
(
i
-
1
)]
=
np
.
array
(
data
[
0
][
i
]).
reshape
(
-
1
)
feed_dict
[
"embedding_{}.tmp_0.lod"
.
format
(
i
-
1
)]
=
[
0
,
len
(
data
[
0
][
i
])]
fetch_map
=
client
.
predict
(
feed
=
feed_dict
,
fetch
=
[
"prob"
])
print
(
fetch_map
)
prob_list
.
append
(
fetch_map
[
'prob'
][
0
][
1
])
label_list
.
append
(
data
[
0
][
-
1
][
0
])
print
(
auc
(
label_list
,
prob_list
))
end
=
time
.
time
()
print
(
end
-
start
)
python/examples/criteo_ctr_with_cube/test_server.py
0 → 100755
浏览文件 @
0c5bb75b
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=doc-string-missing
import
os
import
sys
from
paddle_serving_server
import
OpMaker
from
paddle_serving_server
import
OpSeqMaker
from
paddle_serving_server
import
Server
op_maker
=
OpMaker
()
read_op
=
op_maker
.
create
(
'general_reader'
)
general_dist_kv_infer_op
=
op_maker
.
create
(
'general_dist_kv_infer'
)
response_op
=
op_maker
.
create
(
'general_response'
)
op_seq_maker
=
OpSeqMaker
()
op_seq_maker
.
add_op
(
read_op
)
op_seq_maker
.
add_op
(
general_dist_kv_infer_op
)
op_seq_maker
.
add_op
(
response_op
)
server
=
Server
()
server
.
set_op_sequence
(
op_seq_maker
.
get_op_sequence
())
server
.
set_num_threads
(
4
)
server
.
load_model_config
(
sys
.
argv
[
1
])
server
.
prepare_server
(
workdir
=
"work_dir1"
,
port
=
9292
,
device
=
"cpu"
,
cube_conf
=
"./cube/conf/cube.conf"
)
server
.
run_server
()
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录