Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
BaiXuePrincess
PaddleRec
提交
f6b622a5
P
PaddleRec
项目概览
BaiXuePrincess
/
PaddleRec
与 Fork 源项目一致
Fork自
PaddlePaddle / PaddleRec
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
P
PaddleRec
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
f6b622a5
编写于
9月 24, 2020
作者:
T
tangwei12
提交者:
GitHub
9月 24, 2020
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'master' into model_fix
上级
f0ba997a
a4bd3a69
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
426 addition
and
32 deletion
+426
-32
core/engine/cluster/cluster.py
core/engine/cluster/cluster.py
+38
-2
core/engine/cluster_utils.py
core/engine/cluster_utils.py
+324
-0
core/engine/local_cluster.py
core/engine/local_cluster.py
+63
-30
run.py
run.py
+1
-0
未找到文件。
core/engine/cluster/cluster.py
浏览文件 @
f6b622a5
...
@@ -19,10 +19,16 @@ import copy
...
@@ -19,10 +19,16 @@ import copy
import
os
import
os
import
subprocess
import
subprocess
import
warnings
import
warnings
import
sys
import
logging
from
paddlerec.core.engine.engine
import
Engine
from
paddlerec.core.engine.engine
import
Engine
from
paddlerec.core.factory
import
TrainerFactory
from
paddlerec.core.factory
import
TrainerFactory
from
paddlerec.core.utils
import
envs
from
paddlerec.core.utils
import
envs
import
paddlerec.core.engine.cluster_utils
as
cluster_utils
logger
=
logging
.
getLogger
(
"root"
)
logger
.
propagate
=
False
class
ClusterEngine
(
Engine
):
class
ClusterEngine
(
Engine
):
...
@@ -47,8 +53,38 @@ class ClusterEngine(Engine):
...
@@ -47,8 +53,38 @@ class ClusterEngine(Engine):
self
.
backend
))
self
.
backend
))
def
start_worker_procs
(
self
):
def
start_worker_procs
(
self
):
trainer
=
TrainerFactory
.
create
(
self
.
trainer
)
if
(
envs
.
get_runtime_environ
(
"fleet_mode"
)
==
"COLLECTIVE"
):
trainer
.
run
()
#trainer_ports = os.getenv("TRAINER_PORTS", None).split(",")
cuda_visible_devices
=
os
.
getenv
(
"CUDA_VISIBLE_DEVICES"
)
if
cuda_visible_devices
is
None
or
cuda_visible_devices
==
""
:
selected_gpus
=
range
(
int
(
os
.
getenv
(
"TRAINER_GPU_CARD_COUNT"
)))
else
:
# change selected_gpus into relative values
# e.g. CUDA_VISIBLE_DEVICES=4,5,6,7; args.selected_gpus=4,5,6,7;
# therefore selected_gpus=0,1,2,3
cuda_visible_devices_list
=
cuda_visible_devices
.
split
(
','
)
for
x
in
range
(
int
(
os
.
getenv
(
"TRAINER_GPU_CARD_COUNT"
))):
assert
x
in
cuda_visible_devices_list
,
"Can't find "
\
"your selected_gpus %s in CUDA_VISIBLE_DEVICES[%s]."
\
%
(
x
,
cuda_visible_devices
)
selected_gpus
=
[
cuda_visible_devices_list
.
index
(
x
)]
print
(
"selected_gpus:{}"
.
format
(
selected_gpus
))
factory
=
"paddlerec.core.factory"
cmd
=
[
sys
.
executable
,
"-u"
,
"-m"
,
factory
,
self
.
trainer
]
logs_dir
=
envs
.
get_runtime_environ
(
"log_dir"
)
print
(
"use_paddlecloud_flag:{}"
.
format
(
cluster_utils
.
use_paddlecloud
()))
if
cluster_utils
.
use_paddlecloud
():
cluster
,
pod
=
cluster_utils
.
get_cloud_cluster
(
selected_gpus
)
logger
.
info
(
"get cluster from cloud:{}"
.
format
(
cluster
))
procs
=
cluster_utils
.
start_local_trainers
(
cluster
,
pod
,
cmd
,
log_dir
=
logs_dir
)
print
(
"cluster:{}"
.
format
(
cluster
))
print
(
"pod:{}"
.
format
(
pod
))
else
:
trainer
=
TrainerFactory
.
create
(
self
.
trainer
)
trainer
.
run
()
def
start_master_procs
(
self
):
def
start_master_procs
(
self
):
if
self
.
backend
==
"PADDLECLOUD"
:
if
self
.
backend
==
"PADDLECLOUD"
:
...
...
core/engine/cluster_utils.py
0 → 100644
浏览文件 @
f6b622a5
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import
functools
import
logging
import
socket
import
time
import
os
import
signal
import
copy
import
sys
import
subprocess
from
contextlib
import
closing
import
socket
logger
=
logging
.
getLogger
(
"root"
)
logger
.
propagate
=
False
class
Cluster
(
object
):
def
__init__
(
self
,
hdfs
):
self
.
job_server
=
None
self
.
pods
=
[]
self
.
hdfs
=
None
self
.
job_stage_flag
=
None
def
__str__
(
self
):
return
"job_server:{} pods:{} job_stage_flag:{} hdfs:{}"
.
format
(
self
.
job_server
,
[
str
(
pod
)
for
pod
in
self
.
pods
],
self
.
job_stage_flag
,
self
.
hdfs
)
def
__eq__
(
self
,
cluster
):
if
len
(
self
.
pods
)
!=
len
(
cluster
.
pods
):
return
False
for
a
,
b
in
zip
(
self
.
pods
,
cluster
.
pods
):
if
a
!=
b
:
return
False
if
self
.
job_stage_flag
!=
cluster
.
job_stage_flag
:
return
False
return
True
def
__ne__
(
self
,
cluster
):
return
not
self
.
__eq__
(
cluster
)
def
update_pods
(
cluster
):
self
.
pods
=
copy
.
copy
(
cluster
.
pods
)
def
trainers_nranks
(
self
):
return
len
(
self
.
trainers_endpoints
())
def
pods_nranks
(
self
):
return
len
(
self
.
pods
)
def
trainers_endpoints
(
self
):
r
=
[]
for
pod
in
self
.
pods
:
for
t
in
pod
.
trainers
:
r
.
append
(
t
.
endpoint
)
return
r
def
pods_endpoints
(
self
):
r
=
[]
for
pod
in
self
.
pods
:
ep
=
"{}:{}"
.
format
(
pod
.
addr
,
pod
.
port
)
assert
pod
.
port
!=
None
and
pod
.
addr
!=
None
,
"{} not a valid endpoint"
.
format
(
ep
)
r
.
append
(
ep
)
return
r
def
get_pod_by_id
(
self
,
pod_id
):
for
pod
in
self
.
pods
:
if
str
(
pod_id
)
==
str
(
pod
.
id
):
return
pod
return
None
class
JobServer
(
object
):
def
__init__
(
self
):
self
.
endpoint
=
None
def
__str__
(
self
):
return
"{}"
.
format
(
self
.
endpoint
)
def
__eq__
(
self
,
j
):
return
self
.
endpint
==
j
.
endpoint
def
__ne__
(
self
,
j
):
return
not
self
==
j
class
Trainer
(
object
):
def
__init__
(
self
):
self
.
gpus
=
[]
self
.
endpoint
=
None
self
.
rank
=
None
def
__str__
(
self
):
return
"gpu:{} endpoint:{} rank:{}"
.
format
(
self
.
gpus
,
self
.
endpoint
,
self
.
rank
)
def
__eq__
(
self
,
t
):
if
len
(
self
.
gpus
)
!=
len
(
t
.
gpus
):
return
False
if
self
.
endpoint
!=
t
.
endpoint
or
\
self
.
rank
!=
t
.
rank
:
return
False
for
a
,
b
in
zip
(
self
.
gpus
,
t
.
gpus
):
if
a
!=
b
:
return
False
return
True
def
__ne__
(
self
,
t
):
return
not
self
==
t
def
rank
(
self
):
return
self
.
rank
class
Pod
(
object
):
def
__init__
(
self
):
self
.
rank
=
None
self
.
id
=
None
self
.
addr
=
None
self
.
port
=
None
self
.
trainers
=
[]
self
.
gpus
=
[]
def
__str__
(
self
):
return
"rank:{} id:{} addr:{} port:{} visible_gpu:{} trainers:{}"
.
format
(
self
.
rank
,
self
.
id
,
self
.
addr
,
self
.
port
,
self
.
gpus
,
[
str
(
t
)
for
t
in
self
.
trainers
])
def
__eq__
(
self
,
pod
):
if
self
.
rank
!=
pod
.
rank
or
\
self
.
id
!=
pod
.
id
or
\
self
.
addr
!=
pod
.
addr
or
\
self
.
port
!=
pod
.
port
:
logger
.
debug
(
"pod {} != pod"
.
format
(
self
,
pod
))
return
False
if
len
(
self
.
trainers
)
!=
len
(
pod
.
trainers
):
logger
.
debug
(
"trainers {} != {}"
.
format
(
self
.
trainers
,
pod
.
trainers
))
return
False
for
i
in
range
(
len
(
self
.
trainers
)):
if
self
.
trainers
[
i
]
!=
pod
.
trainers
[
i
]:
logger
.
debug
(
"trainer {} != {}"
.
format
(
self
.
trainers
[
i
],
pod
.
trainers
[
i
]))
return
False
return
True
def
__ne__
(
self
,
pod
):
return
not
self
==
pod
def
parse_response
(
self
,
res_pods
):
pass
def
rank
(
self
):
return
self
.
rank
def
get_visible_gpus
(
self
):
r
=
""
for
g
in
self
.
gpus
:
r
+=
"{},"
.
format
(
g
)
assert
r
!=
""
,
"this pod {} can't see any gpus"
.
format
(
self
)
r
=
r
[:
-
1
]
return
r
def
get_cluster
(
node_ips
,
node_ip
,
paddle_ports
,
selected_gpus
):
assert
type
(
paddle_ports
)
is
list
,
"paddle_ports must be list"
cluster
=
Cluster
(
hdfs
=
None
)
trainer_rank
=
0
for
node_rank
,
ip
in
enumerate
(
node_ips
):
pod
=
Pod
()
pod
.
rank
=
node_rank
pod
.
addr
=
ip
for
i
in
range
(
len
(
selected_gpus
)):
trainer
=
Trainer
()
trainer
.
gpus
.
append
(
selected_gpus
[
i
])
trainer
.
endpoint
=
"%s:%d"
%
(
ip
,
paddle_ports
[
i
])
trainer
.
rank
=
trainer_rank
trainer_rank
+=
1
pod
.
trainers
.
append
(
trainer
)
cluster
.
pods
.
append
(
pod
)
pod_rank
=
node_ips
.
index
(
node_ip
)
return
cluster
,
cluster
.
pods
[
pod_rank
]
def
get_cloud_cluster
(
selected_gpus
,
args_port
=
None
):
#you can automatically get ip info while using paddlecloud multi nodes mode.
node_ips
=
os
.
getenv
(
"PADDLE_TRAINERS"
)
assert
node_ips
is
not
None
,
"PADDLE_TRAINERS should not be None"
print
(
"node_ips:{}"
.
format
(
node_ips
))
node_ip
=
os
.
getenv
(
"POD_IP"
)
assert
node_ip
is
not
None
,
"POD_IP should not be None"
print
(
"node_ip:{}"
.
format
(
node_ip
))
node_rank
=
os
.
getenv
(
"PADDLE_TRAINER_ID"
)
assert
node_rank
is
not
None
,
"PADDLE_TRAINER_ID should not be None"
print
(
"node_rank:{}"
.
format
(
node_rank
))
node_ips
=
node_ips
.
split
(
","
)
num_nodes
=
len
(
node_ips
)
node_rank
=
int
(
node_rank
)
started_port
=
args_port
print
(
"num_nodes:"
,
num_nodes
)
if
num_nodes
>
1
:
try
:
paddle_port
=
int
(
os
.
getenv
(
"PADDLE_PORT"
,
""
))
paddle_port_num
=
int
(
os
.
getenv
(
"TRAINER_PORTS_NUM"
,
""
))
if
paddle_port_num
>=
len
(
selected_gpus
)
and
paddle_port
!=
args_port
:
logger
.
warning
(
"Use Cloud specified port:{}."
.
format
(
paddle_port
))
started_port
=
paddle_port
except
Exception
as
e
:
print
(
e
)
pass
if
started_port
is
None
:
started_port
=
6170
logger
.
debug
(
"parsed from args:node_ips:{}
\
node_ip:{} node_rank:{} started_port:{}"
.
format
(
node_ips
,
node_ip
,
node_rank
,
started_port
))
ports
=
[
x
for
x
in
range
(
started_port
,
started_port
+
len
(
selected_gpus
))]
cluster
,
pod
=
get_cluster
(
node_ips
,
node_ip
,
ports
,
selected_gpus
)
return
cluster
,
cluster
.
pods
[
node_rank
]
def
use_paddlecloud
():
node_ips
=
os
.
getenv
(
"PADDLE_TRAINERS"
,
None
)
node_ip
=
os
.
getenv
(
"POD_IP"
,
None
)
node_rank
=
os
.
getenv
(
"PADDLE_TRAINER_ID"
,
None
)
if
node_ips
is
None
or
node_ip
is
None
or
node_rank
is
None
:
return
False
else
:
return
True
class
TrainerProc
(
object
):
def
__init__
(
self
):
self
.
proc
=
None
self
.
log_fn
=
None
self
.
log_offset
=
None
self
.
rank
=
None
self
.
local_rank
=
None
self
.
cmd
=
None
def
start_local_trainers
(
cluster
,
pod
,
cmd
,
log_dir
=
None
):
current_env
=
copy
.
copy
(
os
.
environ
.
copy
())
#paddle broadcast ncclUniqueId use socket, and
#proxy maybe make trainers unreachable, so delete them.
#if we set them to "", grpc will log error message "bad uri"
#so just delete them.
current_env
.
pop
(
"http_proxy"
,
None
)
current_env
.
pop
(
"https_proxy"
,
None
)
procs
=
[]
for
idx
,
t
in
enumerate
(
pod
.
trainers
):
proc_env
=
{
"FLAGS_selected_gpus"
:
"%s"
%
","
.
join
([
str
(
g
)
for
g
in
t
.
gpus
]),
"PADDLE_TRAINER_ID"
:
"%d"
%
t
.
rank
,
"PADDLE_CURRENT_ENDPOINT"
:
"%s"
%
t
.
endpoint
,
"PADDLE_TRAINERS_NUM"
:
"%d"
%
cluster
.
trainers_nranks
(),
"PADDLE_TRAINER_ENDPOINTS"
:
","
.
join
(
cluster
.
trainers_endpoints
())
}
current_env
.
update
(
proc_env
)
logger
.
debug
(
"trainer proc env:{}"
.
format
(
current_env
))
# cmd = [sys.executable, "-u", training_script]
logger
.
info
(
"start trainer proc:{} env:{}"
.
format
(
cmd
,
proc_env
))
fn
=
None
if
log_dir
is
not
None
:
os
.
system
(
"mkdir -p {}"
.
format
(
log_dir
))
fn
=
open
(
"%s/workerlog.%d"
%
(
log_dir
,
idx
),
"a"
)
proc
=
subprocess
.
Popen
(
cmd
,
env
=
current_env
,
stdout
=
fn
,
stderr
=
fn
)
else
:
proc
=
subprocess
.
Popen
(
cmd
,
env
=
current_env
)
tp
=
TrainerProc
()
tp
.
proc
=
proc
tp
.
rank
=
t
.
rank
tp
.
local_rank
=
idx
tp
.
log_fn
=
fn
tp
.
log_offset
=
fn
.
tell
()
if
fn
else
None
tp
.
cmd
=
cmd
procs
.
append
(
proc
)
return
procs
core/engine/local_cluster.py
浏览文件 @
f6b622a5
...
@@ -19,9 +19,14 @@ import copy
...
@@ -19,9 +19,14 @@ import copy
import
os
import
os
import
sys
import
sys
import
subprocess
import
subprocess
import
logging
from
paddlerec.core.engine.engine
import
Engine
from
paddlerec.core.engine.engine
import
Engine
from
paddlerec.core.utils
import
envs
from
paddlerec.core.utils
import
envs
import
paddlerec.core.engine.cluster_utils
as
cluster_utils
logger
=
logging
.
getLogger
(
"root"
)
logger
.
propagate
=
False
class
LocalClusterEngine
(
Engine
):
class
LocalClusterEngine
(
Engine
):
...
@@ -97,42 +102,70 @@ class LocalClusterEngine(Engine):
...
@@ -97,42 +102,70 @@ class LocalClusterEngine(Engine):
stderr
=
fn
,
stderr
=
fn
,
cwd
=
os
.
getcwd
())
cwd
=
os
.
getcwd
())
procs
.
append
(
proc
)
procs
.
append
(
proc
)
elif
fleet_mode
.
upper
()
==
"COLLECTIVE"
:
elif
fleet_mode
.
upper
()
==
"COLLECTIVE"
:
selected_gpus
=
self
.
envs
[
"selected_gpus"
].
split
(
","
)
cuda_visible_devices
=
os
.
getenv
(
"CUDA_VISIBLE_DEVICES"
)
if
cuda_visible_devices
is
None
or
cuda_visible_devices
==
""
:
selected_gpus
=
[
x
.
strip
()
for
x
in
self
.
envs
[
"selected_gpus"
].
split
(
","
)
]
else
:
# change selected_gpus into relative values
# e.g. CUDA_VISIBLE_DEVICES=4,5,6,7; args.selected_gpus=4,5,6,7;
# therefore selected_gpus=0,1,2,3
cuda_visible_devices_list
=
cuda_visible_devices
.
split
(
','
)
for
x
in
self
.
envs
[
"selected_gpus"
].
split
(
","
):
assert
x
in
cuda_visible_devices_list
,
"Can't find "
\
"your selected_gpus %s in CUDA_VISIBLE_DEVICES[%s]."
\
%
(
x
,
cuda_visible_devices
)
selected_gpus
=
[
cuda_visible_devices_list
.
index
(
x
.
strip
())
for
x
in
self
.
envs
[
"selected_gpus"
].
split
(
","
)
]
selected_gpus_num
=
len
(
selected_gpus
)
selected_gpus_num
=
len
(
selected_gpus
)
for
i
in
range
(
selected_gpus_num
-
1
):
while
True
:
new_port
=
envs
.
find_free_port
()
if
new_port
not
in
ports
:
ports
.
append
(
new_port
)
break
user_endpoints
=
","
.
join
([
"127.0.0.1:"
+
str
(
x
)
for
x
in
ports
])
factory
=
"paddlerec.core.factory"
factory
=
"paddlerec.core.factory"
cmd
=
[
sys
.
executable
,
"-u"
,
"-m"
,
factory
,
self
.
trainer
]
cmd
=
[
sys
.
executable
,
"-u"
,
"-m"
,
factory
,
self
.
trainer
]
for
i
in
range
(
selected_gpus_num
):
print
(
"use_paddlecloud_flag:{}"
.
format
(
current_env
.
update
({
cluster_utils
.
use_paddlecloud
()))
"PADDLE_TRAINER_ENDPOINTS"
:
user_endpoints
,
if
cluster_utils
.
use_paddlecloud
():
"PADDLE_CURRENT_ENDPOINTS"
:
user_endpoints
[
i
],
cluster
,
pod
=
cluster_utils
.
get_cloud_cluster
(
selected_gpus
)
"PADDLE_TRAINERS_NUM"
:
str
(
worker_num
),
logger
.
info
(
"get cluster from cloud:{}"
.
format
(
cluster
))
"TRAINING_ROLE"
:
"TRAINER"
,
procs
=
cluster_utils
.
start_local_trainers
(
"PADDLE_TRAINER_ID"
:
str
(
i
),
cluster
,
pod
,
cmd
,
log_dir
=
logs_dir
)
"FLAGS_selected_gpus"
:
str
(
selected_gpus
[
i
]),
"PADDLEREC_GPU_NUMS"
:
str
(
selected_gpus_num
)
else
:
})
# trainers_num = 1 or not use paddlecloud ips="a,b"
for
i
in
range
(
selected_gpus_num
-
1
):
os
.
system
(
"mkdir -p {}"
.
format
(
logs_dir
))
while
True
:
fn
=
open
(
"%s/worker.%d"
%
(
logs_dir
,
i
),
"w"
)
new_port
=
envs
.
find_free_port
()
log_fns
.
append
(
fn
)
if
new_port
not
in
ports
:
proc
=
subprocess
.
Popen
(
ports
.
append
(
new_port
)
cmd
,
break
env
=
current_env
,
user_endpoints
=
","
.
join
(
stdout
=
fn
,
[
"127.0.0.1:"
+
str
(
x
)
for
x
in
ports
])
stderr
=
fn
,
for
i
in
range
(
selected_gpus_num
):
cwd
=
os
.
getcwd
())
current_env
.
update
({
procs
.
append
(
proc
)
"PADDLE_TRAINER_ENDPOINTS"
:
user_endpoints
,
"PADDLE_CURRENT_ENDPOINTS"
:
user_endpoints
[
i
],
"PADDLE_TRAINERS_NUM"
:
str
(
worker_num
),
"TRAINING_ROLE"
:
"TRAINER"
,
"PADDLE_TRAINER_ID"
:
str
(
i
),
"FLAGS_selected_gpus"
:
str
(
selected_gpus
[
i
]),
"PADDLEREC_GPU_NUMS"
:
str
(
selected_gpus_num
)
})
os
.
system
(
"mkdir -p {}"
.
format
(
logs_dir
))
fn
=
open
(
"%s/worker.%d"
%
(
logs_dir
,
i
),
"w"
)
log_fns
.
append
(
fn
)
proc
=
subprocess
.
Popen
(
cmd
,
env
=
current_env
,
stdout
=
fn
,
stderr
=
fn
,
cwd
=
os
.
getcwd
())
procs
.
append
(
proc
)
# only wait worker to finish here
# only wait worker to finish here
for
i
,
proc
in
enumerate
(
procs
):
for
i
,
proc
in
enumerate
(
procs
):
...
...
run.py
浏览文件 @
f6b622a5
...
@@ -348,6 +348,7 @@ def cluster_engine(args):
...
@@ -348,6 +348,7 @@ def cluster_engine(args):
cluster_envs
[
"fleet_mode"
]
=
fleet_mode
cluster_envs
[
"fleet_mode"
]
=
fleet_mode
cluster_envs
[
"engine_role"
]
=
"WORKER"
cluster_envs
[
"engine_role"
]
=
"WORKER"
cluster_envs
[
"log_dir"
]
=
"logs"
cluster_envs
[
"train.trainer.trainer"
]
=
trainer
cluster_envs
[
"train.trainer.trainer"
]
=
trainer
cluster_envs
[
"train.trainer.engine"
]
=
"cluster"
cluster_envs
[
"train.trainer.engine"
]
=
"cluster"
cluster_envs
[
"train.trainer.executor_mode"
]
=
executor_mode
cluster_envs
[
"train.trainer.executor_mode"
]
=
executor_mode
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录