diff --git a/paddle/fluid/API.spec b/paddle/fluid/API.spec index 83ebefd9cdcf912d0c7a53b67b98ae620520b5b5..fbe63ba03ef9baf4f06d89c7a0ec16ef4d09f6f2 100644 --- a/paddle/fluid/API.spec +++ b/paddle/fluid/API.spec @@ -53,7 +53,7 @@ paddle.fluid.DistributeTranspiler.get_pserver_program ArgSpec(args=['self', 'end paddle.fluid.DistributeTranspiler.get_pserver_programs ArgSpec(args=['self', 'endpoint'], varargs=None, keywords=None, defaults=None) paddle.fluid.DistributeTranspiler.get_startup_program ArgSpec(args=['self', 'endpoint', 'pserver_program', 'startup_program'], varargs=None, keywords=None, defaults=(None, None)) paddle.fluid.DistributeTranspiler.get_trainer_program ArgSpec(args=['self', 'wait_port'], varargs=None, keywords=None, defaults=(True,)) -paddle.fluid.DistributeTranspiler.transpile ArgSpec(args=['self', 'trainer_id', 'program', 'pservers', 'trainers', 'sync_mode', 'startup_program'], varargs=None, keywords=None, defaults=(None, '127.0.0.1:6174', 1, True, None)) +paddle.fluid.DistributeTranspiler.transpile ArgSpec(args=['self', 'trainer_id', 'program', 'pservers', 'trainers', 'sync_mode', 'startup_program', 'current_endpoint'], varargs=None, keywords=None, defaults=(None, '127.0.0.1:6174', 1, True, None, '127.0.0.1:6174')) paddle.fluid.memory_optimize ArgSpec(args=['input_program', 'skip_opt_set', 'print_log', 'level'], varargs=None, keywords=None, defaults=(None, False, 0)) paddle.fluid.release_memory ArgSpec(args=['input_program', 'skip_opt_set'], varargs=None, keywords=None, defaults=(None,)) paddle.fluid.DistributeTranspilerConfig.__init__ @@ -336,7 +336,7 @@ paddle.fluid.transpiler.DistributeTranspiler.get_pserver_program ArgSpec(args=[' paddle.fluid.transpiler.DistributeTranspiler.get_pserver_programs ArgSpec(args=['self', 'endpoint'], varargs=None, keywords=None, defaults=None) paddle.fluid.transpiler.DistributeTranspiler.get_startup_program ArgSpec(args=['self', 'endpoint', 'pserver_program', 'startup_program'], varargs=None, keywords=None, defaults=(None, None)) paddle.fluid.transpiler.DistributeTranspiler.get_trainer_program ArgSpec(args=['self', 'wait_port'], varargs=None, keywords=None, defaults=(True,)) -paddle.fluid.transpiler.DistributeTranspiler.transpile ArgSpec(args=['self', 'trainer_id', 'program', 'pservers', 'trainers', 'sync_mode', 'startup_program'], varargs=None, keywords=None, defaults=(None, '127.0.0.1:6174', 1, True, None)) +paddle.fluid.transpiler.DistributeTranspiler.transpile ArgSpec(args=['self', 'trainer_id', 'program', 'pservers', 'trainers', 'sync_mode', 'startup_program', 'current_endpoint'], varargs=None, keywords=None, defaults=(None, '127.0.0.1:6174', 1, True, None, '127.0.0.1:6174')) paddle.fluid.transpiler.memory_optimize ArgSpec(args=['input_program', 'skip_opt_set', 'print_log', 'level'], varargs=None, keywords=None, defaults=(None, False, 0)) paddle.fluid.transpiler.release_memory ArgSpec(args=['input_program', 'skip_opt_set'], varargs=None, keywords=None, defaults=(None,)) paddle.fluid.transpiler.HashName.__init__ ArgSpec(args=['self', 'pserver_endpoints'], varargs=None, keywords=None, defaults=None) diff --git a/python/paddle/fluid/tests/unittests/test_dist_transpiler.py b/python/paddle/fluid/tests/unittests/test_dist_transpiler.py index a198b25520f97ce23b9c1ebb9cd82fc458222d73..1b7f0745cd758a7534dec9f48132e9cb44a994e6 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_transpiler.py +++ b/python/paddle/fluid/tests/unittests/test_dist_transpiler.py @@ -659,5 +659,25 @@ class TestLoadSliceVar(TranspilerTest): pserver2._slice_vars_and_attrs[idx][2].shape)) +class TestNCCL2Transpile(TranspilerTest): + def test_nccl2_transpile(self): + main = fluid.Program() + startup = fluid.Program() + with fluid.program_guard(main, startup): + self.net_conf() + + config = fluid.DistributeTranspilerConfig() + config.mode = "nccl2" + t = fluid.DistributeTranspiler(config=config) + t.transpile( + 0, + trainers="127.0.0.1:6174,127.0.0.1:6175", + current_endpoint="127.0.0.1:6174", + startup_program=startup) + print([op.type for op in startup.global_block().ops]) + self.assertEqual(startup.global_block().ops[-1].type, "gen_nccl_id") + self.assertIsNotNone(startup.global_block().vars.get("NCCLID")) + + if __name__ == "__main__": unittest.main() diff --git a/python/paddle/fluid/transpiler/distribute_transpiler.py b/python/paddle/fluid/transpiler/distribute_transpiler.py index 3f8c7b844a9fdc8404560ba4c78f9d328af2852a..92cb0fe3732509bbaf1a2e1fe59f2ac5f6e732fe 100644 --- a/python/paddle/fluid/transpiler/distribute_transpiler.py +++ b/python/paddle/fluid/transpiler/distribute_transpiler.py @@ -136,6 +136,8 @@ class DistributeTranspilerConfig(object): slice_var_up = True split_method = None min_block_size = 8192 + # supported modes: pserver, nccl2 + mode = "pserver" print_log = False @@ -144,27 +146,30 @@ class DistributeTranspiler(object): **DistributeTranspiler** Convert the fluid program to distributed data-parallelism programs. + Supports two modes: pserver mode and nccl2 mode. - The main_program will be transformed to use a remote parameter server - to do parameter optimization. And the optimization graph will be put - into a parameter server program. + In pserver mode, the main_program will be transformed to use a remote + parameter server to do parameter optimization. And the optimization + graph will be put into a parameter server program. + + In nccl2 mode, the transpiler will append a NCCL_ID broadcasting + op in startup_program to share the NCCL_ID across the job nodes. + After transpile_nccl2 called, you ***must*** pass trainer_id and + num_trainers argument to ParallelExecutor to enable NCCL2 distributed + mode. Examples: .. code-block:: python - # Define your model before these codes. - port = os.getenv("PADDLE_PSERVER_PORT", "6174") - pserver_ips = os.getenv("PADDLE_PSERVER_IPS", "") - eplist = [] - for ip in pserver_ips.split(","): - eplist.append(':'.join([ip, port])) - pserver_endpoints = ",".join(eplist) - trainers = int(os.getenv("PADDLE_TRAINERS")) - current_endpoint = os.getenv("PADDLE_CURRENT_IP", "") + ":" + port - trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0")) + # for pserver mode + pserver_endpoints = "192.168.0.1:6174,192.168.0.2:6174" + trainer_endpoints = "192.168.0.1:6174,192.168.0.2:6174" + current_endpoint = "192.168.0.1:6174" + trainer_id = 0 + trainers = 4 role = os.getenv("PADDLE_TRAINING_ROLE") - t = distribute_transpiler.DistributeTranspiler() + t = fluid.DistributeTranspiler() t.transpile( trainer_id, pservers=pserver_endpoints, trainers=trainers) if role == "PSERVER": @@ -173,6 +178,18 @@ class DistributeTranspiler(object): pserver_program) elif role == "TRAINER": trainer_program = t.get_trainer_program() + + # for nccl2 mode + config = fluid.DistributeTranspilerConfig() + config.mode = "nccl2" + t = fluid.DistributeTranspiler(config=config) + t.transpile(trainer_id, workers=workers, current_endpoint=curr_ep) + exe = fluid.ParallelExecutor( + use_cuda, + loss_name=loss_var.name, + num_trainers=len(trainers.split(",)), + trainer_id=trainer_id + ) """ def __init__(self, config=None): @@ -190,13 +207,41 @@ class DistributeTranspiler(object): assert (self.config.min_block_size >= 8192) assert (self.config.split_method.__bases__[0] == PSDispatcher) + def _transpile_nccl2(self, + trainer_id, + trainers, + current_endpoint, + startup_program=None): + if not startup_program: + startup_program = default_startup_program() + if trainer_id >= 0: + worker_endpoints = trainers.split(",") + # send NCCL_ID to others or recv from trainer 0 + worker_endpoints.remove(current_endpoint) + + nccl_id_var = startup_program.global_block().create_var( + name="NCCLID", persistable=True, type=core.VarDesc.VarType.RAW) + startup_program.global_block().append_op( + type="gen_nccl_id", + inputs={}, + outputs={"NCCLID": nccl_id_var}, + attrs={ + "endpoint": current_endpoint, + "endpoint_list": worker_endpoints, + "trainer_id": trainer_id + }) + return nccl_id_var + else: + raise ValueError("must set trainer_id > 0") + def transpile(self, trainer_id, program=None, pservers="127.0.0.1:6174", trainers=1, sync_mode=True, - startup_program=None): + startup_program=None, + current_endpoint="127.0.0.1:6174"): """ Run the transpiler. @@ -207,10 +252,15 @@ class DistributeTranspiler(object): default is fluid.default_main_program(). pservers (str): comma separated ip:port string for the pserver list. - trainers (int): number of trainers in the distributed job. + trainers (int|str): in pserver mode this is the number of + trainers, in nccl2 mode this is a string of trainer + endpoints. sync_mode (bool): Do sync training or not, default is True. startup_program (Program|None): startup_program to transpile, default is fluid.default_main_program(). + current_endpoint (str): need pass current endpoint when + transpile as nccl2 distributed mode. In pserver mode + this argument is not used. """ if program is None: program = default_main_program() @@ -220,6 +270,15 @@ class DistributeTranspiler(object): self.startup_program = startup_program self.origin_startup_program = self.startup_program.clone() + if self.config.mode == "nccl2": + assert (isinstance(trainers, str)) + self._transpile_nccl2( + trainer_id, + trainers, + current_endpoint, + startup_program=startup_program) + return + self.trainer_num = trainers self.sync_mode = sync_mode self.trainer_id = trainer_id