Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
Greenplum
DeepSpeed
提交
cf1f1601
D
DeepSpeed
项目概览
Greenplum
/
DeepSpeed
上一次同步 大约 1 年
通知
10
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
D
DeepSpeed
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
cf1f1601
编写于
11月 06, 2021
作者:
C
Chunyang Wen
提交者:
GitHub
11月 05, 2021
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Use fstr in launcher (#1521)
* Use fstr in launcher * Fix wrong condition for word_info * Fix typo
上级
f9b37801
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
34 addition
and
38 deletion
+34
-38
deepspeed/launcher/launch.py
deepspeed/launcher/launch.py
+34
-38
未找到文件。
deepspeed/launcher/launch.py
浏览文件 @
cf1f1601
...
...
@@ -70,23 +70,21 @@ def main():
for
k
in
current_env
.
keys
():
if
"NCCL"
in
k
:
logger
.
info
(
"%s %s %s"
,
args
.
node_rank
,
k
,
current_env
[
k
]
)
logger
.
info
(
f
"
{
args
.
node_rank
}
{
k
}
=
{
current_env
[
k
]
}
"
)
world_info
=
None
assert
args
.
world_info
!=
"None"
,
"must provide world info dict"
if
args
.
world_info
==
"None"
:
raise
ValueError
(
"world_info can not be None"
)
world_info
=
base64
.
urlsafe_b64decode
(
args
.
world_info
)
world_info
=
json
.
loads
(
world_info
)
logger
.
info
(
"WORLD INFO DICT: {}"
.
format
(
world_info
)
)
logger
.
info
(
f
"WORLD INFO DICT:
{
world_info
}
"
)
node_list
=
list
(
world_info
.
keys
())
args
.
nnodes
=
len
(
node_list
)
local_node
=
node_list
[
args
.
node_rank
]
local_gpu_ids
=
world_info
[
local_node
]
num_local_procs
=
len
(
local_gpu_ids
)
logger
.
info
(
"nnodes={}, num_local_procs={}, node_rank={}"
.
format
(
args
.
nnodes
,
num_local_procs
,
args
.
node_rank
),
f
"nnodes=
{
args
.
nnodes
}
, num_local_procs=
{
num_local_procs
}
, node_rank=
{
args
.
node_rank
}
"
)
global_rank_mapping
=
defaultdict
(
list
)
...
...
@@ -98,12 +96,10 @@ def main():
for
gid
in
gids
:
global_rank_mapping
[
node_id
].
append
(
curr_global_rank
)
curr_global_rank
+=
1
logger
.
info
(
"global_rank_mapping={}"
.
format
(
global_rank_mapping
)
)
logger
.
info
(
"dist_world_size={}"
.
format
(
dist_world_size
)
)
logger
.
info
(
f
"global_rank_mapping=
{
global_rank_mapping
}
"
)
logger
.
info
(
f
"dist_world_size=
{
dist_world_size
}
"
)
current_env
[
"CUDA_VISIBLE_DEVICES"
]
=
","
.
join
(
map
(
str
,
local_gpu_ids
))
logger
.
info
(
"Setting CUDA_VISIBLE_DEVICES={}"
.
format
(
current_env
[
"CUDA_VISIBLE_DEVICES"
]))
exclusion_counts_per_node
=
None
logger
.
info
(
f
"Setting CUDA_VISIBLE_DEVICES=
{
current_env
[
'CUDA_VISIBLE_DEVICES'
]
}
"
)
# set PyTorch distributed related environmental variables
current_env
[
"MASTER_ADDR"
]
=
args
.
master_addr
...
...
@@ -111,6 +107,7 @@ def main():
current_env
[
"WORLD_SIZE"
]
=
str
(
dist_world_size
)
processes
=
[]
cmd
=
[]
for
local_rank
in
range
(
0
,
num_local_procs
):
# each process's rank
dist_rank
=
global_rank_mapping
[
local_node
][
local_rank
]
...
...
@@ -118,36 +115,34 @@ def main():
current_env
[
"LOCAL_RANK"
]
=
str
(
local_rank
)
# spawn the processes
cmd
=
[
sys
.
executable
,
"-u"
,
args
.
training_script
,
"--local_rank={}"
.
format
(
local_rank
)
]
+
args
.
training_script_args
sig_names
=
{
2
:
"SIGINT"
,
15
:
"SIGTERM"
}
last_return_code
=
None
def
sigkill_handler
(
signum
,
frame
):
for
process
in
processes
:
print
(
f
"Killing subprocess
{
process
.
pid
}
"
)
try
:
process
.
kill
()
except
Exception
as
e
:
pass
if
last_return_code
is
not
None
:
raise
subprocess
.
CalledProcessError
(
returncode
=
last_return_code
,
cmd
=
cmd
)
if
signum
in
sig_names
:
print
(
f
"Main process received
{
sig_names
[
signum
]
}
, exiting"
)
sys
.
exit
(
1
)
# pass SIGINT/SIGTERM to children if the parent is being terminated
signal
.
signal
(
signal
.
SIGINT
,
sigkill_handler
)
signal
.
signal
(
signal
.
SIGTERM
,
sigkill_handler
)
cmd
=
[
sys
.
executable
,
"-u"
,
args
.
training_script
,
f
"--local_rank=
{
local_rank
}
"
]
+
args
.
training_script_args
process
=
subprocess
.
Popen
(
cmd
,
env
=
current_env
)
processes
.
append
(
process
)
sig_names
=
{
2
:
"SIGINT"
,
15
:
"SIGTERM"
}
last_return_code
=
None
def
sigkill_handler
(
signum
,
frame
):
for
process
in
processes
:
logger
.
info
(
f
"Killing subprocess
{
process
.
pid
}
"
)
try
:
process
.
kill
()
except
Exception
:
pass
if
last_return_code
is
not
None
:
raise
subprocess
.
CalledProcessError
(
returncode
=
last_return_code
,
cmd
=
cmd
)
if
signum
in
sig_names
:
logger
.
info
(
f
"Main process received
{
sig_names
[
signum
]
}
, exiting"
)
sys
.
exit
(
1
)
# pass SIGINT/SIGTERM to children if the parent is being terminated
signal
.
signal
(
signal
.
SIGINT
,
sigkill_handler
)
signal
.
signal
(
signal
.
SIGTERM
,
sigkill_handler
)
alive_processes
=
set
(
processes
)
while
len
(
alive_processes
):
finished_processes
=
[]
...
...
@@ -161,6 +156,7 @@ def main():
sigkill_handler
(
signal
.
SIGTERM
,
None
)
# not coming back
else
:
# exited cleanly
logger
.
info
(
f
"Process
{
process
.
pid
}
exits successfully."
)
finished_processes
.
append
(
process
)
alive_processes
=
set
(
alive_processes
)
-
set
(
finished_processes
)
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录