Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
PaddlePaddle
PaddleFL
提交
0c7d94c1
P
PaddleFL
项目概览
PaddlePaddle
/
PaddleFL
通知
35
Star
5
Fork
1
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
6
列表
看板
标记
里程碑
合并请求
4
Wiki
3
Wiki
分析
仓库
DevOps
项目成员
Pages
P
PaddleFL
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
6
Issue
6
列表
看板
标记
里程碑
合并请求
4
合并请求
4
Pages
分析
分析
仓库分析
DevOps
Wiki
3
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
0c7d94c1
编写于
5月 18, 2020
作者:
J
jingqinghe
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
update document
上级
6e76a892
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
0 addition
and
217 deletion
+0
-217
python/paddle_fl/paddle_fl/examples/submitter_demo/scheduler_client.py
..._fl/paddle_fl/examples/submitter_demo/scheduler_client.py
+0
-217
未找到文件。
python/paddle_fl/paddle_fl/examples/submitter_demo/scheduler_client.py
已删除
100644 → 0
浏览文件 @
6e76a892
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import
os
import
socket
import
random
import
zmq
import
time
import
sys
from
paddle_fl.paddle_fl.core.submitter.client_base
import
HPCClient
from
paddle_fl.paddle_fl.core.scheduler.agent_master
import
FLScheduler
import
paddle.fluid
as
fluid
from
paddle_fl.paddle_fl.core.master.job_generator
import
JobGenerator
from
paddle_fl.paddle_fl.core.strategy.fl_strategy_base
import
FLStrategyFactory
from
model
import
Model
import
tarfile
#random_port = random.randint(60001, 64001)
random_port
=
60001
print
(
random_port
)
current_ip
=
socket
.
gethostbyname
(
socket
.
gethostname
())
endpoints
=
"{}:{}"
.
format
(
current_ip
,
random_port
)
#start a web server for remote endpoints to download their config
os
.
system
(
"python -m SimpleHTTPServer 8080 &"
)
#os.system("python -m http.server 8080 &")
if
os
.
path
.
exists
(
"job_config"
):
os
.
system
(
"rm -rf job_config"
)
if
os
.
path
.
exists
(
"package"
):
os
.
system
(
"rm -rf package"
)
os
.
system
(
"mkdir package"
)
os
.
system
(
"cp train_program.py package"
)
with
open
(
"package/scheduler.conf"
,
"w"
)
as
fout
:
fout
.
write
(
"ENDPOINT
\t
{}
\n
"
.
format
(
endpoints
))
# submit a job with current endpoint
default_dict
=
{
"task_name"
:
"test_submit_job"
,
"hdfs_path"
:
"afs://xingtian.afs.baidu.com:9902"
,
"ugi"
:
""
,
"worker_nodes"
:
5
,
"server_nodes"
:
5
,
"hadoop_home"
:
"/home/jingqinghe/hadoop-xingtian/hadoop"
,
"hpc_home"
:
"/home/jingqinghe/mpi_feed4/smart_client"
,
"package_path"
:
"./package"
,
"priority"
:
"high"
,
"queue"
:
"paddle-dev-amd"
,
"server"
:
"yq01-hpc-lvliang01-smart-master.dmop.baidu.com"
,
"mpi_node_mem"
:
11000
,
"pcpu"
:
180
,
"python_tar"
:
"./python.tar.gz"
,
"wheel"
:
"./paddlepaddle-0.0.0-cp27-cp27mu-linux_x86_64-0.whl"
}
def
load_conf
(
conf_file
,
local_dict
):
with
open
(
conf_file
)
as
fin
:
for
line
in
fin
:
group
=
line
.
strip
().
split
(
"="
)
if
len
(
group
)
!=
2
:
continue
local_dict
[
group
[
0
]]
=
group
[
1
]
return
local_dict
client
=
HPCClient
()
default_dict
=
load_conf
(
sys
.
argv
[
1
],
default_dict
)
client
.
submit
(
task_name
=
default_dict
[
"task_name"
],
hdfs_path
=
default_dict
[
"hdfs_path"
],
ugi
=
default_dict
[
"ugi"
],
hdfs_output
=
default_dict
[
"hdfs_output"
],
worker_nodes
=
default_dict
[
"worker_nodes"
],
server_nodes
=
default_dict
[
"server_nodes"
],
hadoop_home
=
default_dict
[
"hadoop_home"
],
hpc_home
=
default_dict
[
"hpc_home"
],
train_cmd
=
default_dict
[
"train_cmd"
],
monitor_cmd
=
default_dict
[
"monitor_cmd"
],
package_path
=
default_dict
[
"package_path"
],
priority
=
default_dict
[
"priority"
],
queue
=
default_dict
[
"queue"
],
server
=
default_dict
[
"server"
],
mpi_node_mem
=
default_dict
[
"mpi_node_mem"
],
pcpu
=
default_dict
[
"pcpu"
],
python_tar
=
default_dict
[
"python_tar"
],
wheel
=
default_dict
[
"wheel"
])
print
(
"submit mpi job done."
)
# start scheduler and receive the ip of allocated endpoints
context
=
zmq
.
Context
()
zmq_socket
=
context
.
socket
(
zmq
.
REP
)
zmq_socket
.
bind
(
"tcp://{}:{}"
.
format
(
current_ip
,
random_port
))
print
(
"binding tcp://{}:{}"
.
format
(
current_ip
,
random_port
))
all_ips_ready
=
False
ip_list
=
[]
scheduler
=
FLScheduler
(
int
(
default_dict
[
"worker_nodes"
]),
int
(
default_dict
[
"server_nodes"
]),
port
=
random_port
,
socket
=
zmq_socket
)
scheduler
.
set_sample_worker_num
(
int
(
default_dict
[
"worker_nodes"
]))
print
(
"going to wait all ips ready"
)
while
not
all_ips_ready
:
message
=
zmq_socket
.
recv
()
group
=
message
.
split
(
"
\t
"
)
if
group
[
0
]
==
"ENDPOINT"
:
ip_list
.
append
(
group
[
1
])
zmq_socket
.
send
(
"ACCEPT
\t
{}"
.
format
(
group
[
1
]))
else
:
zmq_socket
.
send
(
"WAIT
\t
0"
)
if
len
(
ip_list
)
==
\
int
(
default_dict
[
"worker_nodes"
])
+
\
int
(
default_dict
[
"server_nodes"
]):
all_ips_ready
=
True
print
(
"all worker ips are collected"
)
print
(
ip_list
)
#allocate the role of each endpoint and their ids
ip_role
=
{}
for
i
in
range
(
len
(
ip_list
)):
if
i
<
int
(
default_dict
[
"server_nodes"
]):
ip_role
[
ip_list
[
i
]]
=
'server%d'
%
i
else
:
ip_role
[
ip_list
[
i
]]
=
'trainer%d'
%
(
i
-
int
(
default_dict
[
"server_nodes"
]))
print
(
ip_role
)
def
job_generate
():
#generate a fl job which is the same as fl_master
inputs
=
[
fluid
.
layers
.
data
(
\
name
=
str
(
slot_id
),
shape
=
[
5
],
dtype
=
"float32"
)
for
slot_id
in
range
(
3
)]
label
=
fluid
.
layers
.
data
(
\
name
=
"label"
,
shape
=
[
1
],
dtype
=
'int64'
)
model
=
Model
()
model
.
mlp
(
inputs
,
label
)
job_generator
=
JobGenerator
()
optimizer
=
fluid
.
optimizer
.
SGD
(
learning_rate
=
0.1
)
job_generator
.
set_optimizer
(
optimizer
)
job_generator
.
set_losses
([
model
.
loss
])
job_generator
.
set_startup_program
(
model
.
startup_program
)
job_generator
.
set_infer_feed_and_target_names
([
x
.
name
for
x
in
inputs
],
[
model
.
predict
.
name
])
build_strategy
=
FLStrategyFactory
()
build_strategy
.
fed_avg
=
True
build_strategy
.
inner_step
=
10
strategy
=
build_strategy
.
create_fl_strategy
()
# endpoints will be collected through the cluster
# in this example, we suppose endpoints have been collected
server_ip
=
[
"{}"
.
format
(
ip_list
[
0
])]
output
=
"job_config"
job_generator
.
generate_fl_job
(
strategy
,
server_endpoints
=
server_ip
,
worker_num
=
int
(
default_dict
[
"worker_nodes"
]),
output
=
output
)
file_list
=
os
.
listdir
(
output
)
for
file
in
file_list
:
tar
=
tarfile
.
open
(
'{}/{}.tar.gz'
.
format
(
output
,
file
),
'w:gz'
)
for
root
,
dir
,
files
in
os
.
walk
(
"{}/{}"
.
format
(
output
,
file
)):
for
f
in
files
:
fullpath
=
os
.
path
.
join
(
root
,
f
)
tar
.
add
(
fullpath
)
tar
.
close
()
job_generate
()
#send the allocated rolls to the remote endpoints
all_job_sent
=
False
download_job
=
[]
while
not
all_job_sent
:
message
=
zmq_socket
.
recv
()
group
=
message
.
split
(
"
\t
"
)
if
group
[
0
]
==
"GET_FL_JOB"
:
download_job
.
append
(
group
[
1
])
zmq_socket
.
send
(
ip_role
[
group
[
1
]])
else
:
zmq_socket
.
send
(
"WAIT
\t
0"
)
if
len
(
download_job
)
==
len
(
ip_list
):
all_job_sent
=
True
#start training
scheduler
.
init_env
()
print
(
"init env done."
)
scheduler
.
start_fl_training
()
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录