未验证 提交 aeb2dc2b 编写于 作者: W Wu Yi 提交者: GitHub

Nccl2 dist API (#13506)

* add nccl2 dist api

* update apispec

* update

* update api spec
上级 c66a8d2c
...@@ -53,7 +53,7 @@ paddle.fluid.DistributeTranspiler.get_pserver_program ArgSpec(args=['self', 'end ...@@ -53,7 +53,7 @@ paddle.fluid.DistributeTranspiler.get_pserver_program ArgSpec(args=['self', 'end
paddle.fluid.DistributeTranspiler.get_pserver_programs ArgSpec(args=['self', 'endpoint'], varargs=None, keywords=None, defaults=None) paddle.fluid.DistributeTranspiler.get_pserver_programs ArgSpec(args=['self', 'endpoint'], varargs=None, keywords=None, defaults=None)
paddle.fluid.DistributeTranspiler.get_startup_program ArgSpec(args=['self', 'endpoint', 'pserver_program', 'startup_program'], varargs=None, keywords=None, defaults=(None, None)) paddle.fluid.DistributeTranspiler.get_startup_program ArgSpec(args=['self', 'endpoint', 'pserver_program', 'startup_program'], varargs=None, keywords=None, defaults=(None, None))
paddle.fluid.DistributeTranspiler.get_trainer_program ArgSpec(args=['self', 'wait_port'], varargs=None, keywords=None, defaults=(True,)) paddle.fluid.DistributeTranspiler.get_trainer_program ArgSpec(args=['self', 'wait_port'], varargs=None, keywords=None, defaults=(True,))
paddle.fluid.DistributeTranspiler.transpile ArgSpec(args=['self', 'trainer_id', 'program', 'pservers', 'trainers', 'sync_mode', 'startup_program'], varargs=None, keywords=None, defaults=(None, '127.0.0.1:6174', 1, True, None)) paddle.fluid.DistributeTranspiler.transpile ArgSpec(args=['self', 'trainer_id', 'program', 'pservers', 'trainers', 'sync_mode', 'startup_program', 'current_endpoint'], varargs=None, keywords=None, defaults=(None, '127.0.0.1:6174', 1, True, None, '127.0.0.1:6174'))
paddle.fluid.memory_optimize ArgSpec(args=['input_program', 'skip_opt_set', 'print_log', 'level'], varargs=None, keywords=None, defaults=(None, False, 0)) paddle.fluid.memory_optimize ArgSpec(args=['input_program', 'skip_opt_set', 'print_log', 'level'], varargs=None, keywords=None, defaults=(None, False, 0))
paddle.fluid.release_memory ArgSpec(args=['input_program', 'skip_opt_set'], varargs=None, keywords=None, defaults=(None,)) paddle.fluid.release_memory ArgSpec(args=['input_program', 'skip_opt_set'], varargs=None, keywords=None, defaults=(None,))
paddle.fluid.DistributeTranspilerConfig.__init__ paddle.fluid.DistributeTranspilerConfig.__init__
...@@ -336,7 +336,7 @@ paddle.fluid.transpiler.DistributeTranspiler.get_pserver_program ArgSpec(args=[' ...@@ -336,7 +336,7 @@ paddle.fluid.transpiler.DistributeTranspiler.get_pserver_program ArgSpec(args=['
paddle.fluid.transpiler.DistributeTranspiler.get_pserver_programs ArgSpec(args=['self', 'endpoint'], varargs=None, keywords=None, defaults=None) paddle.fluid.transpiler.DistributeTranspiler.get_pserver_programs ArgSpec(args=['self', 'endpoint'], varargs=None, keywords=None, defaults=None)
paddle.fluid.transpiler.DistributeTranspiler.get_startup_program ArgSpec(args=['self', 'endpoint', 'pserver_program', 'startup_program'], varargs=None, keywords=None, defaults=(None, None)) paddle.fluid.transpiler.DistributeTranspiler.get_startup_program ArgSpec(args=['self', 'endpoint', 'pserver_program', 'startup_program'], varargs=None, keywords=None, defaults=(None, None))
paddle.fluid.transpiler.DistributeTranspiler.get_trainer_program ArgSpec(args=['self', 'wait_port'], varargs=None, keywords=None, defaults=(True,)) paddle.fluid.transpiler.DistributeTranspiler.get_trainer_program ArgSpec(args=['self', 'wait_port'], varargs=None, keywords=None, defaults=(True,))
paddle.fluid.transpiler.DistributeTranspiler.transpile ArgSpec(args=['self', 'trainer_id', 'program', 'pservers', 'trainers', 'sync_mode', 'startup_program'], varargs=None, keywords=None, defaults=(None, '127.0.0.1:6174', 1, True, None)) paddle.fluid.transpiler.DistributeTranspiler.transpile ArgSpec(args=['self', 'trainer_id', 'program', 'pservers', 'trainers', 'sync_mode', 'startup_program', 'current_endpoint'], varargs=None, keywords=None, defaults=(None, '127.0.0.1:6174', 1, True, None, '127.0.0.1:6174'))
paddle.fluid.transpiler.memory_optimize ArgSpec(args=['input_program', 'skip_opt_set', 'print_log', 'level'], varargs=None, keywords=None, defaults=(None, False, 0)) paddle.fluid.transpiler.memory_optimize ArgSpec(args=['input_program', 'skip_opt_set', 'print_log', 'level'], varargs=None, keywords=None, defaults=(None, False, 0))
paddle.fluid.transpiler.release_memory ArgSpec(args=['input_program', 'skip_opt_set'], varargs=None, keywords=None, defaults=(None,)) paddle.fluid.transpiler.release_memory ArgSpec(args=['input_program', 'skip_opt_set'], varargs=None, keywords=None, defaults=(None,))
paddle.fluid.transpiler.HashName.__init__ ArgSpec(args=['self', 'pserver_endpoints'], varargs=None, keywords=None, defaults=None) paddle.fluid.transpiler.HashName.__init__ ArgSpec(args=['self', 'pserver_endpoints'], varargs=None, keywords=None, defaults=None)
......
...@@ -659,5 +659,25 @@ class TestLoadSliceVar(TranspilerTest): ...@@ -659,5 +659,25 @@ class TestLoadSliceVar(TranspilerTest):
pserver2._slice_vars_and_attrs[idx][2].shape)) pserver2._slice_vars_and_attrs[idx][2].shape))
class TestNCCL2Transpile(TranspilerTest):
def test_nccl2_transpile(self):
main = fluid.Program()
startup = fluid.Program()
with fluid.program_guard(main, startup):
self.net_conf()
config = fluid.DistributeTranspilerConfig()
config.mode = "nccl2"
t = fluid.DistributeTranspiler(config=config)
t.transpile(
0,
trainers="127.0.0.1:6174,127.0.0.1:6175",
current_endpoint="127.0.0.1:6174",
startup_program=startup)
print([op.type for op in startup.global_block().ops])
self.assertEqual(startup.global_block().ops[-1].type, "gen_nccl_id")
self.assertIsNotNone(startup.global_block().vars.get("NCCLID"))
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
...@@ -136,6 +136,8 @@ class DistributeTranspilerConfig(object): ...@@ -136,6 +136,8 @@ class DistributeTranspilerConfig(object):
slice_var_up = True slice_var_up = True
split_method = None split_method = None
min_block_size = 8192 min_block_size = 8192
# supported modes: pserver, nccl2
mode = "pserver"
print_log = False print_log = False
...@@ -144,27 +146,30 @@ class DistributeTranspiler(object): ...@@ -144,27 +146,30 @@ class DistributeTranspiler(object):
**DistributeTranspiler** **DistributeTranspiler**
Convert the fluid program to distributed data-parallelism programs. Convert the fluid program to distributed data-parallelism programs.
Supports two modes: pserver mode and nccl2 mode.
The main_program will be transformed to use a remote parameter server In pserver mode, the main_program will be transformed to use a remote
to do parameter optimization. And the optimization graph will be put parameter server to do parameter optimization. And the optimization
into a parameter server program. graph will be put into a parameter server program.
In nccl2 mode, the transpiler will append a NCCL_ID broadcasting
op in startup_program to share the NCCL_ID across the job nodes.
After transpile_nccl2 called, you ***must*** pass trainer_id and
num_trainers argument to ParallelExecutor to enable NCCL2 distributed
mode.
Examples: Examples:
.. code-block:: python .. code-block:: python
# Define your model before these codes. # for pserver mode
port = os.getenv("PADDLE_PSERVER_PORT", "6174") pserver_endpoints = "192.168.0.1:6174,192.168.0.2:6174"
pserver_ips = os.getenv("PADDLE_PSERVER_IPS", "") trainer_endpoints = "192.168.0.1:6174,192.168.0.2:6174"
eplist = [] current_endpoint = "192.168.0.1:6174"
for ip in pserver_ips.split(","): trainer_id = 0
eplist.append(':'.join([ip, port])) trainers = 4
pserver_endpoints = ",".join(eplist)
trainers = int(os.getenv("PADDLE_TRAINERS"))
current_endpoint = os.getenv("PADDLE_CURRENT_IP", "") + ":" + port
trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
role = os.getenv("PADDLE_TRAINING_ROLE") role = os.getenv("PADDLE_TRAINING_ROLE")
t = distribute_transpiler.DistributeTranspiler() t = fluid.DistributeTranspiler()
t.transpile( t.transpile(
trainer_id, pservers=pserver_endpoints, trainers=trainers) trainer_id, pservers=pserver_endpoints, trainers=trainers)
if role == "PSERVER": if role == "PSERVER":
...@@ -173,6 +178,18 @@ class DistributeTranspiler(object): ...@@ -173,6 +178,18 @@ class DistributeTranspiler(object):
pserver_program) pserver_program)
elif role == "TRAINER": elif role == "TRAINER":
trainer_program = t.get_trainer_program() trainer_program = t.get_trainer_program()
# for nccl2 mode
config = fluid.DistributeTranspilerConfig()
config.mode = "nccl2"
t = fluid.DistributeTranspiler(config=config)
t.transpile(trainer_id, workers=workers, current_endpoint=curr_ep)
exe = fluid.ParallelExecutor(
use_cuda,
loss_name=loss_var.name,
num_trainers=len(trainers.split(",)),
trainer_id=trainer_id
)
""" """
def __init__(self, config=None): def __init__(self, config=None):
...@@ -190,13 +207,41 @@ class DistributeTranspiler(object): ...@@ -190,13 +207,41 @@ class DistributeTranspiler(object):
assert (self.config.min_block_size >= 8192) assert (self.config.min_block_size >= 8192)
assert (self.config.split_method.__bases__[0] == PSDispatcher) assert (self.config.split_method.__bases__[0] == PSDispatcher)
def _transpile_nccl2(self,
trainer_id,
trainers,
current_endpoint,
startup_program=None):
if not startup_program:
startup_program = default_startup_program()
if trainer_id >= 0:
worker_endpoints = trainers.split(",")
# send NCCL_ID to others or recv from trainer 0
worker_endpoints.remove(current_endpoint)
nccl_id_var = startup_program.global_block().create_var(
name="NCCLID", persistable=True, type=core.VarDesc.VarType.RAW)
startup_program.global_block().append_op(
type="gen_nccl_id",
inputs={},
outputs={"NCCLID": nccl_id_var},
attrs={
"endpoint": current_endpoint,
"endpoint_list": worker_endpoints,
"trainer_id": trainer_id
})
return nccl_id_var
else:
raise ValueError("must set trainer_id > 0")
def transpile(self, def transpile(self,
trainer_id, trainer_id,
program=None, program=None,
pservers="127.0.0.1:6174", pservers="127.0.0.1:6174",
trainers=1, trainers=1,
sync_mode=True, sync_mode=True,
startup_program=None): startup_program=None,
current_endpoint="127.0.0.1:6174"):
""" """
Run the transpiler. Run the transpiler.
...@@ -207,10 +252,15 @@ class DistributeTranspiler(object): ...@@ -207,10 +252,15 @@ class DistributeTranspiler(object):
default is fluid.default_main_program(). default is fluid.default_main_program().
pservers (str): comma separated ip:port string for the pserver pservers (str): comma separated ip:port string for the pserver
list. list.
trainers (int): number of trainers in the distributed job. trainers (int|str): in pserver mode this is the number of
trainers, in nccl2 mode this is a string of trainer
endpoints.
sync_mode (bool): Do sync training or not, default is True. sync_mode (bool): Do sync training or not, default is True.
startup_program (Program|None): startup_program to transpile, startup_program (Program|None): startup_program to transpile,
default is fluid.default_main_program(). default is fluid.default_main_program().
current_endpoint (str): need pass current endpoint when
transpile as nccl2 distributed mode. In pserver mode
this argument is not used.
""" """
if program is None: if program is None:
program = default_main_program() program = default_main_program()
...@@ -220,6 +270,15 @@ class DistributeTranspiler(object): ...@@ -220,6 +270,15 @@ class DistributeTranspiler(object):
self.startup_program = startup_program self.startup_program = startup_program
self.origin_startup_program = self.startup_program.clone() self.origin_startup_program = self.startup_program.clone()
if self.config.mode == "nccl2":
assert (isinstance(trainers, str))
self._transpile_nccl2(
trainer_id,
trainers,
current_endpoint,
startup_program=startup_program)
return
self.trainer_num = trainers self.trainer_num = trainers
self.sync_mode = sync_mode self.sync_mode = sync_mode
self.trainer_id = trainer_id self.trainer_id = trainer_id
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册