Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
PaddlePaddle
PaddleFL
提交
e16c8a63
P
PaddleFL
项目概览
PaddlePaddle
/
PaddleFL
通知
35
Star
5
Fork
1
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
6
列表
看板
标记
里程碑
合并请求
4
Wiki
3
Wiki
分析
仓库
DevOps
项目成员
Pages
P
PaddleFL
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
6
Issue
6
列表
看板
标记
里程碑
合并请求
4
合并请求
4
Pages
分析
分析
仓库分析
DevOps
Wiki
3
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
e16c8a63
编写于
12月 26, 2019
作者:
Q
qjing666
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
add data_safety_training
上级
0f8b5087
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
420 addition
and
2 deletion
+420
-2
contrib/data_safety_training/image_classification/README.md
contrib/data_safety_training/image_classification/README.md
+30
-0
contrib/data_safety_training/image_classification/server/receive.py
...ta_safety_training/image_classification/server/receive.py
+28
-0
contrib/data_safety_training/image_classification/server/server.py
...ata_safety_training/image_classification/server/server.py
+239
-0
contrib/data_safety_training/image_classification/server/user.py
.../data_safety_training/image_classification/server/user.py
+104
-0
contrib/data_safety_training/image_classification/submitter.py
...ib/data_safety_training/image_classification/submitter.py
+18
-0
images/split_flow.png
images/split_flow.png
+0
-0
paddle_fl/examples/submitter_demo/scheduler_client.py
paddle_fl/examples/submitter_demo/scheduler_client.py
+1
-2
未找到文件。
contrib/data_safety_training/image_classification/README.md
0 → 100644
浏览文件 @
e16c8a63
##Training process
<img
src=
'images/split_flow.png'
width =
"1000"
height =
"320"
align=
"middle"
/>
-
User send a request to server including task type, image size, etc.
-
Server response a path for User where it can download the Encoder.
-
User encode the images and send the processed data to DB after serialization.
-
Server fetch the uploaded data and start training model.
##Start the service on Server side
"""
python server/receiver.py
"""
##Start the request on User side
"""
python submitter.py
"""
contrib/data_safety_training/image_classification/server/receive.py
0 → 100644
浏览文件 @
e16c8a63
import
zmq
import
socket
import
msgpack
import
os
random_port
=
60001
current_ip
=
socket
.
gethostbyname
(
socket
.
gethostname
())
print
(
current_ip
)
os
.
system
(
"python -m SimpleHTTPServer 8080 &"
)
#listening for client
context
=
zmq
.
Context
()
zmq_socket
=
context
.
socket
(
zmq
.
REP
)
zmq_socket
.
bind
(
"tcp://{}:{}"
.
format
(
current_ip
,
random_port
))
print
(
"binding tcp://{}:{}"
.
format
(
current_ip
,
random_port
))
#get mission and return the path of encoder
message
=
msgpack
.
loads
(
zmq_socket
.
recv
())
print
(
message
[
"mission"
])
zmq_socket
.
send
(
"user.py"
)
#wait client finish encoding
while
True
:
message
=
zmq_socket
.
recv
()
if
message
==
'complete'
:
break
#start training
os
.
system
(
"python -u server.py > server.log &"
)
contrib/data_safety_training/image_classification/server/server.py
0 → 100644
浏览文件 @
e16c8a63
from
__future__
import
print_function
import
os
import
paddle
import
paddle.fluid
as
fluid
import
numpy
import
sys
from
paddle.fluid
import
layers
import
redis
import
time
from
paddle.fluid.param_attr
import
ParamAttr
import
math
import
msgpack
def
data_generater
(
samples
,
r
):
# data generater
def
train_data
():
for
item
in
samples
:
sample
=
msgpack
.
loads
(
r
.
get
(
str
(
item
)))
conv
=
sample
[
0
]
label
=
sample
[
1
]
yield
conv
,
label
return
train_data
class
ResNet
():
def
__init__
(
self
,
layers
=
50
):
self
.
layers
=
layers
def
net
(
self
,
input
,
class_dim
=
10
):
layers
=
self
.
layers
supported_layers
=
[
18
,
34
,
50
,
101
,
152
]
assert
layers
in
supported_layers
,
\
"supported layers are {} but input layer is {}"
.
format
(
supported_layers
,
layers
)
if
layers
==
18
:
depth
=
[
2
,
2
,
2
,
2
]
elif
layers
==
34
or
layers
==
50
:
depth
=
[
3
,
4
,
6
,
3
]
elif
layers
==
101
:
depth
=
[
3
,
4
,
23
,
3
]
elif
layers
==
152
:
depth
=
[
3
,
8
,
36
,
3
]
num_filters
=
[
64
,
128
,
256
,
512
]
conv
=
input
if
layers
>=
50
:
for
block
in
range
(
len
(
depth
)):
for
i
in
range
(
depth
[
block
]):
if
layers
in
[
101
,
152
]
and
block
==
2
:
if
i
==
0
:
conv_name
=
"res"
+
str
(
block
+
2
)
+
"a"
else
:
conv_name
=
"res"
+
str
(
block
+
2
)
+
"b"
+
str
(
i
)
else
:
conv_name
=
"res"
+
str
(
block
+
2
)
+
chr
(
97
+
i
)
conv
=
self
.
bottleneck_block
(
input
=
conv
,
num_filters
=
num_filters
[
block
],
stride
=
2
if
i
==
0
and
block
!=
0
else
1
,
name
=
conv_name
)
pool
=
fluid
.
layers
.
pool2d
(
input
=
conv
,
pool_type
=
'avg'
,
global_pooling
=
True
)
stdv
=
1.0
/
math
.
sqrt
(
pool
.
shape
[
1
]
*
1.0
)
out
=
fluid
.
layers
.
fc
(
input
=
pool
,
size
=
class_dim
,
param_attr
=
fluid
.
param_attr
.
ParamAttr
(
initializer
=
fluid
.
initializer
.
Uniform
(
-
stdv
,
stdv
)),
act
=
"softmax"
)
else
:
for
block
in
range
(
len
(
depth
)):
for
i
in
range
(
depth
[
block
]):
conv_name
=
"res"
+
str
(
block
+
2
)
+
chr
(
97
+
i
)
conv
=
self
.
basic_block
(
input
=
conv
,
num_filters
=
num_filters
[
block
],
stride
=
2
if
i
==
0
and
block
!=
0
else
1
,
is_first
=
block
==
i
==
0
,
name
=
conv_name
)
pool
=
fluid
.
layers
.
pool2d
(
input
=
conv
,
pool_type
=
'avg'
,
global_pooling
=
True
)
stdv
=
1.0
/
math
.
sqrt
(
pool
.
shape
[
1
]
*
1.0
)
out
=
fluid
.
layers
.
fc
(
input
=
pool
,
size
=
class_dim
,
param_attr
=
fluid
.
param_attr
.
ParamAttr
(
initializer
=
fluid
.
initializer
.
Uniform
(
-
stdv
,
stdv
)),
act
=
"softmax"
)
return
out
def
conv_bn_layer
(
self
,
input
,
num_filters
,
filter_size
,
stride
=
1
,
groups
=
1
,
act
=
None
,
name
=
None
):
conv
=
fluid
.
layers
.
conv2d
(
input
=
input
,
num_filters
=
num_filters
,
filter_size
=
filter_size
,
stride
=
stride
,
padding
=
(
filter_size
-
1
)
//
2
,
groups
=
groups
,
act
=
None
,
param_attr
=
ParamAttr
(
name
=
name
+
"_weights"
),
bias_attr
=
False
,
name
=
name
+
'.conv2d.output.1'
)
if
name
==
"conv1"
:
bn_name
=
"bn_"
+
name
else
:
bn_name
=
"bn"
+
name
[
3
:]
return
fluid
.
layers
.
batch_norm
(
input
=
conv
,
act
=
act
,
name
=
bn_name
+
'.output.1'
,
param_attr
=
ParamAttr
(
name
=
bn_name
+
'_scale'
),
bias_attr
=
ParamAttr
(
bn_name
+
'_offset'
),
moving_mean_name
=
bn_name
+
'_mean'
,
moving_variance_name
=
bn_name
+
'_variance'
,
)
def
shortcut
(
self
,
input
,
ch_out
,
stride
,
is_first
,
name
):
ch_in
=
input
.
shape
[
1
]
if
ch_in
!=
ch_out
or
stride
!=
1
or
is_first
==
True
:
return
self
.
conv_bn_layer
(
input
,
ch_out
,
1
,
stride
,
name
=
name
)
else
:
return
input
def
bottleneck_block
(
self
,
input
,
num_filters
,
stride
,
name
):
conv0
=
self
.
conv_bn_layer
(
input
=
input
,
num_filters
=
num_filters
,
filter_size
=
1
,
act
=
'relu'
,
name
=
name
+
"_branch2a"
)
conv1
=
self
.
conv_bn_layer
(
input
=
conv0
,
num_filters
=
num_filters
,
filter_size
=
3
,
stride
=
stride
,
act
=
'relu'
,
name
=
name
+
"_branch2b"
)
conv2
=
self
.
conv_bn_layer
(
input
=
conv1
,
num_filters
=
num_filters
*
4
,
filter_size
=
1
,
act
=
None
,
name
=
name
+
"_branch2c"
)
short
=
self
.
shortcut
(
input
,
num_filters
*
4
,
stride
,
is_first
=
False
,
name
=
name
+
"_branch1"
)
return
fluid
.
layers
.
elementwise_add
(
x
=
short
,
y
=
conv2
,
act
=
'relu'
,
name
=
name
+
".add.output.5"
)
def
basic_block
(
self
,
input
,
num_filters
,
stride
,
is_first
,
name
):
conv0
=
self
.
conv_bn_layer
(
input
=
input
,
num_filters
=
num_filters
,
filter_size
=
3
,
act
=
'relu'
,
stride
=
stride
,
name
=
name
+
"_branch2a"
)
conv1
=
self
.
conv_bn_layer
(
input
=
conv0
,
num_filters
=
num_filters
,
filter_size
=
3
,
act
=
None
,
name
=
name
+
"_branch2b"
)
short
=
self
.
shortcut
(
input
,
num_filters
,
stride
,
is_first
,
name
=
name
+
"_branch1"
)
return
fluid
.
layers
.
elementwise_add
(
x
=
short
,
y
=
conv1
,
act
=
'relu'
)
# local redis config
redis_host
=
"127.0.0.1"
redis_port
=
6379
redis_password
=
""
r
=
redis
.
StrictRedis
(
host
=
redis_host
,
port
=
redis_port
,
password
=
redis_password
)
# reader generation
reader
=
fluid
.
layers
.
py_reader
(
capacity
=
64
,
shapes
=
[(
-
1
,
64
,
8
,
8
),
(
-
1
,
1
)],
dtypes
=
[
'float32'
,
'int64'
])
samples
=
r
.
keys
()
train_data
=
data_generater
(
samples
,
r
)
reader
.
decorate_paddle_reader
(
paddle
.
batch
(
paddle
.
reader
.
shuffle
(
train_data
,
buf_size
=
5000
),
batch_size
=
64
))
conv1
,
label
=
fluid
.
layers
.
read_file
(
reader
)
# train program
place
=
fluid
.
CUDAPlace
(
0
)
model
=
ResNet
(
layers
=
50
)
predicts
=
model
.
net
(
conv1
,
10
)
cost
=
fluid
.
layers
.
cross_entropy
(
input
=
predicts
,
label
=
label
)
accuracy
=
fluid
.
layers
.
accuracy
(
input
=
predicts
,
label
=
label
)
loss
=
fluid
.
layers
.
mean
(
cost
)
optimizer
=
fluid
.
optimizer
.
Adam
(
learning_rate
=
0.001
)
optimizer
.
minimize
(
loss
)
exe
=
fluid
.
Executor
(
place
)
exe
.
run
(
fluid
.
default_startup_program
())
total_time
=
0
EPOCH_NUM
=
1
step
=
0
train_start
=
time
.
time
()
# start training
for
pass_id
in
range
(
EPOCH_NUM
):
reader
.
start
()
try
:
while
True
:
start_time
=
time
.
time
()
loss_value
,
acc_value
=
exe
.
run
(
fetch_list
=
[
loss
.
name
,
accuracy
.
name
])
step
+=
1
if
step
%
10
==
0
:
print
(
"epoch: "
+
str
(
pass_id
)
+
"step: "
+
str
(
step
)
+
"loss: "
+
str
(
loss_value
)
+
"acc: "
+
str
(
acc_value
))
end_time
=
time
.
time
()
total_time
+=
(
end_time
-
start_time
)
except
fluid
.
core
.
EOFException
:
reader
.
reset
()
train_end
=
time
.
time
()
print
(
"total time: %d"
%
(
train_end
-
train_start
))
print
(
"computation time: %d"
%
total_time
)
contrib/data_safety_training/image_classification/server/user.py
0 → 100644
浏览文件 @
e16c8a63
from
__future__
import
print_function
import
os
import
paddle
import
paddle.fluid
as
fluid
import
numpy
import
sys
import
redis
import
time
from
paddle.fluid
import
layers
from
paddle.fluid.param_attr
import
ParamAttr
import
msgpack
def
conv_bn_layer
(
input
,
num_filters
,
filter_size
,
stride
=
1
,
groups
=
1
,
act
=
None
,
name
=
None
):
conv
=
fluid
.
layers
.
conv2d
(
input
=
input
,
num_filters
=
num_filters
,
filter_size
=
filter_size
,
stride
=
stride
,
padding
=
(
filter_size
-
1
)
//
2
,
groups
=
groups
,
act
=
None
,
param_attr
=
ParamAttr
(
name
=
name
+
"_weights"
),
bias_attr
=
False
,
name
=
name
+
'.conv2d.output.1'
)
if
name
==
"conv1"
:
bn_name
=
"bn_"
+
name
else
:
bn_name
=
"bn"
+
name
[
3
:]
return
fluid
.
layers
.
batch_norm
(
input
=
conv
,
act
=
act
,
name
=
bn_name
+
'.output.1'
,
param_attr
=
ParamAttr
(
name
=
bn_name
+
'_scale'
),
bias_attr
=
ParamAttr
(
bn_name
+
'_offset'
),
moving_mean_name
=
bn_name
+
'_mean'
,
moving_variance_name
=
bn_name
+
'_variance'
,
)
def
load_conf
(
conf_file
,
local_dict
):
with
open
(
conf_file
)
as
fin
:
for
line
in
fin
:
group
=
line
.
strip
().
split
(
"="
)
if
len
(
group
)
!=
2
:
continue
local_dict
[
group
[
0
]]
=
group
[
1
]
return
local_dict
# redis DB configuration
redis_host
=
"127.0.0.1"
redis_port
=
6379
redis_password
=
""
start_time
=
time
.
time
()
# start a redis client and empty the DB
r
=
redis
.
StrictRedis
(
host
=
redis_host
,
port
=
redis_port
,
password
=
redis_password
)
r
.
flushall
()
# encoding program
images
=
fluid
.
layers
.
data
(
name
=
'images'
,
shape
=
[
3
,
32
,
32
],
dtype
=
'float32'
)
label
=
fluid
.
layers
.
data
(
name
=
'label'
,
shape
=
[
1
],
dtype
=
'int64'
)
place
=
fluid
.
CPUPlace
()
conv1
=
conv_bn_layer
(
input
=
images
,
num_filters
=
64
,
filter_size
=
7
,
stride
=
2
,
act
=
'relu'
,
name
=
"conv1"
)
pool
=
fluid
.
layers
.
pool2d
(
input
=
conv1
,
pool_size
=
3
,
pool_stride
=
2
,
pool_padding
=
1
,
pool_type
=
'max'
)
feeder
=
fluid
.
DataFeeder
(
place
=
place
,
feed_list
=
[
images
,
label
])
pretrained_model
=
'ResNet50_pretrained'
exe
=
fluid
.
Executor
(
place
)
exe
.
run
(
fluid
.
default_startup_program
())
# load pretrained mode and prepare datal
def
if_exist
(
var
):
return
os
.
path
.
exists
(
os
.
path
.
join
(
pretrained_model
,
var
.
name
))
fluid
.
io
.
load_vars
(
exe
,
pretrained_model
,
main_program
=
fluid
.
default_main_program
(),
predicate
=
if_exist
)
train_data
=
paddle
.
dataset
.
cifar
.
train10
()
step
=
0
# start encoding and uploading
for
data
in
train_data
():
pre_data
=
[]
pre_data
.
append
(
data
)
res
=
exe
.
run
(
program
=
fluid
.
default_main_program
(),
feed
=
feeder
.
feed
(
pre_data
),
fetch_list
=
[
pool
.
name
])
sample
=
[
res
[
0
][
0
].
tolist
(),
data
[
1
]]
step
+=
1
file
=
msgpack
.
dumps
(
sample
)
r
.
set
(
step
,
file
)
if
step
%
100
==
0
:
print
(
numpy
.
array
(
sample
[
0
]).
shape
)
print
(
"%dstart"
%
step
)
files
=
r
.
keys
()
print
(
"upload file numbers: %d"
%
len
(
files
))
end_time
=
time
.
time
()
total_time
=
end_time
-
start_time
print
(
"total time: %d"
%
total_time
)
contrib/data_safety_training/image_classification/submitter.py
0 → 100644
浏览文件 @
e16c8a63
import
zmq
import
socket
import
msgpack
import
os
mission_dict
=
{
"mission"
:
"image classification"
,
"image_size"
:
[
3
,
32
,
32
]}
#send request
context
=
zmq
.
Context
()
zmq_socket
=
context
.
socket
(
zmq
.
REQ
)
zmq_socket
.
connect
(
"tcp://127.0.0.1:60001"
)
zmq_socket
.
send
(
msgpack
.
dumps
(
mission_dict
))
#get and download encoder
file
=
zmq_socket
.
recv
()
os
.
system
(
"wget 127.0.0.1:8080/{}"
.
format
(
file
))
#data encoding
os
.
system
(
"python -u user.py > user.log"
)
zmq_socket
.
send
(
"complete"
)
images/split_flow.png
0 → 100644
浏览文件 @
e16c8a63
101.2 KB
paddle_fl/examples/submitter_demo/scheduler_client.py
浏览文件 @
e16c8a63
...
@@ -122,7 +122,7 @@ ip_role = {}
...
@@ -122,7 +122,7 @@ ip_role = {}
for
i
in
range
(
len
(
ip_list
)):
for
i
in
range
(
len
(
ip_list
)):
if
i
<
int
(
default_dict
[
"server_nodes"
]):
if
i
<
int
(
default_dict
[
"server_nodes"
]):
ip_role
[
ip_list
[
i
]]
=
'server%d'
%
i
ip_role
[
ip_list
[
i
]]
=
'server%d'
%
i
else
:
else
:
ip_role
[
ip_list
[
i
]]
=
'trainer%d'
%
(
i
-
int
(
default_dict
[
"server_nodes"
]))
ip_role
[
ip_list
[
i
]]
=
'trainer%d'
%
(
i
-
int
(
default_dict
[
"server_nodes"
]))
print
(
ip_role
)
print
(
ip_role
)
...
@@ -190,4 +190,3 @@ while not all_job_sent:
...
@@ -190,4 +190,3 @@ while not all_job_sent:
scheduler
.
init_env
()
scheduler
.
init_env
()
print
(
"init env done."
)
print
(
"init env done."
)
scheduler
.
start_fl_training
()
scheduler
.
start_fl_training
()
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录