diff --git a/python/paddle/distributed/launch/context/device.py b/python/paddle/distributed/launch/context/device.py index 7df7db28f7877f28e6100fa53acbf9c192bd206a..f03d0ea3d41efda8f4173b93302ea51ec27da442 100644 --- a/python/paddle/distributed/launch/context/device.py +++ b/python/paddle/distributed/launch/context/device.py @@ -21,6 +21,7 @@ class DeviceType: XPU = 'xpu' NPU = 'npu' MLU = 'mlu' + IPU = 'ipu' class Device(object): @@ -69,6 +70,8 @@ class Device(object): return 'FLAGS_selected_xpus' if self._dtype == DeviceType.MLU: return 'FLAGS_selected_mlus' + if self._dtype == DeviceType.IPU: + return 'FLAGS_selected_ipus' return 'FLAGS_selected_devices' def get_selected_devices(self, devices=''): @@ -130,6 +133,12 @@ class Device(object): dev._dtype = DeviceType.MLU num = fluid.core.get_mlu_device_count() visible_devices = os.getenv("MLU_VISIBLE_DEVICES") + elif fluid.core.is_compiled_with_ipu(): + dev._dtype = DeviceType.IPU + num = fluid.core.get_ipu_device_count() + # For IPUs, 'labels' is a list which contains the available numbers of IPU devices. + dev._labels = [str(x) for x in range(0, num + 1)] + return dev if num == 0: dev._dtype = DeviceType.CPU diff --git a/python/paddle/distributed/launch/controllers/__init__.py b/python/paddle/distributed/launch/controllers/__init__.py index f1c6ea5399a4656d5b4b7a4a0f9e32622fc78d58..c686164dbd8847c0650c064b257ef60eb95aeec5 100644 --- a/python/paddle/distributed/launch/controllers/__init__.py +++ b/python/paddle/distributed/launch/controllers/__init__.py @@ -17,9 +17,11 @@ __all__ = [] from .collective import CollectiveController from .collective import CollectiveElasticController from .ps import PSController +from .ipu_controller import IPUController # the order is extremely important _controllers = [ + IPUController, CollectiveElasticController, PSController, CollectiveController, diff --git a/python/paddle/distributed/launch/controllers/controller.py b/python/paddle/distributed/launch/controllers/controller.py index a8ae155562ae923a51b31e35a6912988a4c5c3cd..1f43679d748f1175b09cc3033f8ca63f1751286a 100644 --- a/python/paddle/distributed/launch/controllers/controller.py +++ b/python/paddle/distributed/launch/controllers/controller.py @@ -29,6 +29,7 @@ import time class ControleMode: COLLECTIVE = "collective" PS = "ps" + IPU = "ipu" class ControllerBase(object): diff --git a/python/paddle/distributed/launch/controllers/ipu_controller.py b/python/paddle/distributed/launch/controllers/ipu_controller.py new file mode 100644 index 0000000000000000000000000000000000000000..92dc2960ab6240aa1c690f8d0de0b85c04aec952 --- /dev/null +++ b/python/paddle/distributed/launch/controllers/ipu_controller.py @@ -0,0 +1,170 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import sys +import argparse + +from .collective import CollectiveController, ControleMode +from paddle.distributed.launch.job.container import Container + + +class IPUController(CollectiveController): + + @classmethod + def enable(cls, ctx): + if ctx.args.training_script == "ipu": + ctx.logger.debug("{} enabled".format(cls.__name__)) + ctx.args.run_mode = ControleMode.IPU + return True + else: + return False + + def parse_ipu_args(self, args_list): + parser = argparse.ArgumentParser() + parser.add_argument("--hosts", + type=str, + help="The hosts for IPU distributd training.") + parser.add_argument("--nproc_per_host", + type=int, + help="The number of processes launched per host.") + parser.add_argument("--ipus_per_replica", + type=int, + help="The number of IPUs requested per replica.") + parser.add_argument("--ipu_partition", + type=str, + help="The partition name of IPU devices.") + parser.add_argument("--vipu_server", + type=str, + help="The ip of the IPU device manager.") + parser.add_argument( + "training_script", + type=str, + help= + "The full path to the IPU distributed training program/script to be launched in parallel. e.g., ``training.py``." + ) + parser.add_argument('training_script_args', nargs=argparse.REMAINDER) + return parser.parse_args(args_list) + + def replace_training_script(self): + # IPU distributed computing is based on PopRun which is a wrapper of MPI. + self.ctx.args.training_script = "poprun" + poprun_args = self.parse_ipu_args(self.ctx.args.training_script_args) + + num_ipus = int(self.ctx.args.devices) + # The number of replicas for data parallel + assert (num_ipus % poprun_args.ipus_per_replica) == 0, \ + "The number of IPUs:{} mod the number of IPUs per replica:{} must == 0".format(num_ipus, poprun_args.ipus_per_replica) + num_replicas = num_ipus // poprun_args.ipus_per_replica + self.ctx.logger.info( + "The number of total replicas is {}.".format(num_replicas)) + + # The number of processes + num_nodes = len(poprun_args.hosts.split(',')) + num_procs = num_nodes * poprun_args.nproc_per_host + self.ctx.logger.info( + "The number of total processes is {}.".format(num_procs)) + assert (num_replicas % num_procs) == 0, \ + "The number of replicas:{} mod the number of processes:{} must == 0".format(num_replicas, num_procs) + + # hosts and endpoints + hosts = poprun_args.hosts.replace(' ', '').split(',') + endpoints = [x + ":8090" for x in hosts] + + # args for poprun + poprun_command = [] + + poprun_command.append('--num-instances={}'.format(num_procs)) + poprun_command.append('--num-replicas={}'.format(num_replicas)) + poprun_command.append('--ipus-per-replica={}'.format( + poprun_args.ipus_per_replica)) + poprun_command.append('--host={}'.format(','.join(hosts))) + poprun_command.append('--vipu-partition={}'.format( + poprun_args.ipu_partition)) + poprun_command.append('--vipu-server-host={}'.format( + poprun_args.vipu_server)) + + poprun_command.extend([ + '--update-partition=no', '--vipu-server-timeout=120', + '--print-topology=yes', '--numa-aware=yes' + ]) + + # global envs + global_envs = '--mpi-local-args=\'' + log_level = os.getenv('POPART_LOG_LEVEL', None) + if log_level: + global_envs += '-x POPART_LOG_LEVEL={} '.format(log_level) + global_envs += '-x PADDLE_TRAINERS_NUM={} -x PADDLE_TRAINER_ENDPOINTS={}'.format( + num_procs, ','.join(endpoints)) + global_envs += '\'' + poprun_command.append(global_envs) + + # local envs + for idx in range(num_procs): + cur_endpoint = endpoints[idx // poprun_args.nproc_per_host] + rank_in_node = idx % poprun_args.nproc_per_host + poprun_command.append( + '--instance-mpi-local-args={}:\"-x PADDLE_TRAINER_ID={} -x PADDLE_CURRENT_ENDPOINT={} -x PADDLE_RANK_IN_NODE={}\"' + .format(idx, idx, cur_endpoint, rank_in_node)) + + # executor + poprun_command.append(sys.executable) + + # script and script args + poprun_command.append(poprun_args.training_script) + poprun_command.extend(poprun_args.training_script_args) + + # for debug + print("----------- PopRun Command -----------") + print("poprun \\") + for i in range(len(poprun_command) - 1): + print("%s \\" % (poprun_command[i])) + print("%s" % (poprun_command[len(poprun_command) - 1])) + print("---------------------------------------") + + # replace training_script_args + self.ctx.args.training_script_args = poprun_command + + def _get_entrypoint(self): + entrypoint = [self.ctx.args.training_script] + entrypoint.extend(self.ctx.args.training_script_args) + entrypoint = [" ".join(entrypoint)] + return entrypoint + + def new_container(self, + entrypoint=None, + envs={}, + use_ctx_env=True, + out=None, + err=None): + c = Container( + entrypoint=(entrypoint or self._get_entrypoint()), + env=(self.ctx.get_envs() if use_ctx_env else {}), + ) + c.outfile, c.errfile = self._get_out_err_file(out, err) + c.update_env(envs) + # Need subprocess.Popen(shell=True) for PopRun command + c.shell = True + return c + + def run(self): + # Replace the training script with the PopRun command + self.replace_training_script() + + self.build_job() + self.build_pod() + + self.deploy_pod() + + self.watch() diff --git a/python/paddle/distributed/launch/job/container.py b/python/paddle/distributed/launch/job/container.py index 9f7b1733d1af2b5ff6a00a50d5b1498a5989a129..8f515d9e6f38b6770daf4abbde2c936234bdcb26 100644 --- a/python/paddle/distributed/launch/job/container.py +++ b/python/paddle/distributed/launch/job/container.py @@ -37,6 +37,7 @@ class Container(object): self._grace_period = 10 self._log_handler = None + self._shell = False @property def entrypoint(self): @@ -70,6 +71,14 @@ class Container(object): def errfile(self, err): self._err = err + @property + def shell(self): + return self._shell + + @shell.setter + def shell(self, shell): + self._shell = shell + def update_env(self, env={}, **kwargs): env = {k: v for k, v in env.items() if isinstance(v, str)} self._env.update(env) @@ -109,7 +118,8 @@ class Container(object): self._proc = ProcessContext(self._entrypoint, env=self._env, out=self._stdout, - err=self._stderr) + err=self._stderr, + shell=self._shell) self._proc.start() def terminate(self, force=False): diff --git a/python/paddle/distributed/launch/main.py b/python/paddle/distributed/launch/main.py index f90fa7401e9a023c652d415ff80ba433e223ccd5..4c1b99df178ea9e40c7f927b69f95db4d0024b05 100644 --- a/python/paddle/distributed/launch/main.py +++ b/python/paddle/distributed/launch/main.py @@ -91,6 +91,26 @@ def launch(): - ``--elastic_timeout``: Seconds to wait before elastic job begin to train. Default ``--elastic_timeout=30``. + IPU Parameters: + IPU distributed launch only requires and allowes three arguments ``--devices``, ``training_script`` and ``training_script_args``. + The ``--devices`` is the number of IPU devices. e.g., ``--devices=4`` will launch the training program with four IPU devices. + The ``training_script`` is only allowed to set as ``ipu``. + The ``training_script_args`` includes arguments required by IPU distributed launch and illustrated as below. + ``Examples 10`` has provided a example of paddle.distributed.launch with IPUs. + + - ``--hosts``: The hosts for IPU distributd training. + + - ``--nproc_per_host``: The number of processes launched per host. + + - ``--ipus_per_replica``: The number of IPUs requested per replica. + + - ``--ipu_partition``: The partition name of IPU devices. + + - ``--vipu_server``: The ip of the IPU device manager. + + - ``training_script``: The full path to the IPU distributed training program/script to be launched in parallel. e.g., ``training.py``. + + - ``training_script_args``: The args of the IPU distributed training program/script. Returns: - ``None`` @@ -229,6 +249,15 @@ def launch(): # once the number of nodes changes between 2:4 during training, the strategy holds + Examples 10 (ipu): + .. code-block:: bash + :name: code-block-example-bash10 + + # With the following command, the job will begin to run the distributhed program with IPUs. + # Only support and require the `device_num` as the arg and `ipu` as the launch script. + # Please Check the details about the following args of the launch scripte from `utils/ipu_launch.py`. + python -m paddle.distributed.launch --devices 4 ipu --hosts=localhost --nproc_per_host=2 --ipus_per_replica=1 --ipu_partition=pod16 --vipu_server=127.0.0.1 train.py + """ # initialize the context to run diff --git a/python/paddle/distributed/launch/utils/process_context.py b/python/paddle/distributed/launch/utils/process_context.py index 075536c8a8cb5ffeb3c84aad2e969b7b59488cd1..5d8505aa66eb38fc326892d1e23bc9f212fd40c4 100644 --- a/python/paddle/distributed/launch/utils/process_context.py +++ b/python/paddle/distributed/launch/utils/process_context.py @@ -24,7 +24,8 @@ class ProcessContext(object): out=sys.stdout, err=sys.stderr, group=True, - preexec_fn=None): + preexec_fn=None, + shell=False): self._cmd = cmd self._env = env self._preexec_fn = preexec_fn @@ -33,6 +34,7 @@ class ProcessContext(object): self._group = group if os.name != 'nt' else False self._proc = None self._code = None + self._shell = shell def _start(self): pre_fn = os.setsid if self._group else None @@ -40,7 +42,8 @@ class ProcessContext(object): env=self._env, stdout=self._stdout, stderr=self._stderr, - preexec_fn=self._preexec_fn or pre_fn) + preexec_fn=self._preexec_fn or pre_fn, + shell=self._shell) def _close_std(self): try: diff --git a/python/paddle/fluid/tests/unittests/ipu/distributed/run_dist_ipu.sh b/python/paddle/fluid/tests/unittests/ipu/distributed/run_dist_ipu.sh new file mode 100644 index 0000000000000000000000000000000000000000..a4221b37eb14f83100c3177436ac6fd8a3c29955 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/ipu/distributed/run_dist_ipu.sh @@ -0,0 +1,80 @@ +#!/bin/bash + +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -e + +partition_name=pod64 +vipu_server=10.137.96.62 +allclose_script=" +import sys +import numpy as np +data1 = np.loadtxt(\"ipu_res.txt\") +data2 = np.loadtxt(\"cpu_res.txt\") +if np.allclose(data1[::16], data2, atol=1e-6): + sys.exit(0) +else: + sys.exit(1) +" + +for opt in lamb sgd adam ; +do + for onchip in False True ; + do + for rts in False True ; + do + echo "Testcase: opt: ${opt}, onchip: ${onchip}, rts: ${rts}" + echo "paddle.distributed.fleet.launch test with IPUs..." + python3.7 -m paddle.distributed.launch \ + --devices=8 \ + ipu \ + --hosts=localhost \ + --nproc_per_host=2 \ + --ipus_per_replica=2 \ + --ipu_partition=${partition_name} \ + --vipu_server=${vipu_server} \ + test_dist_data_parallel_ipu.py ${opt} ipu_res.txt ${onchip} ${rts} > ipu.log + echo "paddle.distributed.fleet.launch test with IPUs...Done" + + echo "paddle normal test with CPU..." + export POPLAR_IPUMODEL=1 + python3.7 test_dist_data_parallel_ipu.py ${opt} cpu_res.txt > cpu.log + unset POPLAR_IPUMODEL + echo "paddle normal test with CPU...Done" + + echo "Compare results..." + python3.7 -c """${allclose_script}""" + if [ $? -eq 0 ];then + echo "Compare results...Done" + else + echo "Error occurs. Please check ipu.log, cpu.log, ipu_res.txt and cpu_res.txt" + exit 0 + fi + done + done +done + +if [ -f "ipu.log" ]; then + rm "ipu.log" +fi +if [ -f "cpu.log" ]; then + rm "cpu.log" +fi +if [ -f "ipu_res.txt" ]; then + rm "ipu_res.txt" +fi +if [ -f "cpu_res.txt" ]; then + rm "cpu_res.txt" +fi diff --git a/python/paddle/fluid/tests/unittests/ipu/distributed/test_dist_data_parallel_ipu.py b/python/paddle/fluid/tests/unittests/ipu/distributed/test_dist_data_parallel_ipu.py new file mode 100644 index 0000000000000000000000000000000000000000..891aa501c50796c05d04208593adcd1706279c7c --- /dev/null +++ b/python/paddle/fluid/tests/unittests/ipu/distributed/test_dist_data_parallel_ipu.py @@ -0,0 +1,193 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest + +import sys +import os +import random +import numpy as np +import paddle +import paddle.static +from paddle.fluid.tests.unittests.ipu.op_test_ipu import IPUOpTest + +mpi_comm = None + + +@unittest.skip('Disable distributed tests on auto CI.') +class TestBase(IPUOpTest): + + def set_attrs(self, enable_ipu, optimizer, log, onchip=False, rts=False): + self.ipu_options = { + "enable_pipelining": True, + "batches_per_step": 1, + "enable_gradient_accumulation": True, + "accumulation_factor": 4, + "enable_replicated_graphs": True, + "replicated_graph_count": 2, + "location_optimizer": { + "on_chip": onchip, + "use_replicated_tensor_sharding": rts + } + } + + self.cpu_bs = 16 + self.ipu_bs = 1 + self.optimizer = optimizer + self.log = log + self.enable_ipu = enable_ipu + + def test(self): + seed = 2021 + np.random.seed(seed) + random.seed(seed) + scope = paddle.static.Scope() + main_prog = paddle.static.Program() + startup_prog = paddle.static.Program() + main_prog.random_seed = seed + startup_prog.random_seed = seed + + bs = self.ipu_bs if self.enable_ipu else self.cpu_bs + data = np.random.rand(1, 3, 10, 10).astype(np.float32) + + with paddle.static.scope_guard(scope): + with paddle.static.program_guard(main_prog, startup_prog): + image = paddle.static.data(name='image', + shape=[bs, 3, 10, 10], + dtype='float32') + with paddle.static.ipu_shard_guard(index=0, stage=0): + conv1 = paddle.static.nn.conv2d(image, + num_filters=3, + filter_size=3, + bias_attr=False) + with paddle.static.ipu_shard_guard(index=1, stage=1): + conv2 = paddle.static.nn.conv2d(conv1, + num_filters=3, + filter_size=3, + bias_attr=False) + # should consider influence of bs + loss = paddle.mean(conv2) + + if self.optimizer == 'sgd': + opt = paddle.optimizer.SGD(learning_rate=1e-2) + elif self.optimizer == 'adam': + opt = paddle.optimizer.Adam(learning_rate=1e-2) + elif self.optimizer == 'lamb': + opt = paddle.optimizer.Lamb(learning_rate=1e-2) + else: + raise Exception('optimizer must be sgd, adam or lamb') + + opt.minimize(loss) + + if self.enable_ipu: + place = paddle.IPUPlace() + else: + place = paddle.CPUPlace() + executor = paddle.static.Executor(place) + executor.run(startup_prog) + + if self.enable_ipu: + feed_list = [image.name] + fetch_list = [loss.name] + ipu_strategy = paddle.static.IpuStrategy() + ipu_strategy.set_graph_config( + num_ipus=2 * self.ipu_options['replicated_graph_count'], + is_training=True, + enable_manual_shard=True) + ipu_strategy.set_options(self.ipu_options) + ipu_strategy.set_options({ + "enable_distribution": + True, + "enable_distributed_replicated_graphs": + True, + "global_replica_offset": + int(os.environ.get("PADDLE_TRAINER_ID")) * 2, + "global_replication_factor": + 4 + }) + program = paddle.static.IpuCompiledProgram( + main_prog, ipu_strategy=ipu_strategy).compile( + feed_list, fetch_list) + feed = { + "image": + np.tile(data, [ + self.ipu_options['replicated_graph_count'] * + self.ipu_options['batches_per_step'] * + self.ipu_options['accumulation_factor'], 1, 1, 1 + ]) + } + + else: + program = main_prog + feed = {"image": np.tile(data, [self.cpu_bs, 1, 1, 1])} + + epoch = 10 + if not self.enable_ipu: + # global replication factor + epoch *= 4 + epoch *= self.ipu_options['batches_per_step'] + epoch *= self.ipu_options['accumulation_factor'] + epoch = epoch / (self.cpu_bs / self.ipu_bs) + + results = [] + for i in range(int(epoch)): + res = executor.run(program, feed=feed, fetch_list=[loss]) + if self.enable_ipu: + res = mpi_comm.gather(res, root=0) + results.append(res) + if self.enable_ipu: + if int(os.environ.get("PADDLE_TRAINER_ID")) == 0: + np.savetxt(self.log, np.array(results).flatten()) + else: + np.savetxt(self.log, np.array(results).flatten()) + + +if __name__ == "__main__": + paddle.enable_static() + # Run distributed tests + if len(sys.argv) == 5: + from mpi4py import MPI + + DISTRIBUTED_COMM = MPI.COMM_WORLD + + def _get_comm(): + global DISTRIBUTED_COMM + if DISTRIBUTED_COMM is None: + raise RuntimeError( + "Distributed Commumication not setup. Please run setup_comm(MPI.COMM_WORLD) first." + ) + return DISTRIBUTED_COMM + + mpi_comm = _get_comm() + + optimizer = sys.argv[1] + log = sys.argv[2] + onchip = True if sys.argv[3] == "True" else False + rts = True if sys.argv[4] == "True" else False + test = TestBase() + test.set_attrs(enable_ipu=True, + optimizer=optimizer, + log=log, + onchip=onchip, + rts=rts) + test.test() + # Run cpu tests for compare + elif len(sys.argv) == 3: + test = TestBase() + test.set_attrs(enable_ipu=False, optimizer=sys.argv[1], log=sys.argv[2]) + test.test() + else: + raise ValueError( + "Only support 3 or 5 args. 3 for cpu test, 5 for ipu distributed test" + ) diff --git a/python/paddle/fluid/tests/unittests/ipu/distributed/test_dist_pod128_sample.py b/python/paddle/fluid/tests/unittests/ipu/distributed/test_dist_pod128_sample.py new file mode 100644 index 0000000000000000000000000000000000000000..f81ed48f04ffdd97361feb9d452a6ef8cab0f3af --- /dev/null +++ b/python/paddle/fluid/tests/unittests/ipu/distributed/test_dist_pod128_sample.py @@ -0,0 +1,115 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +''' +python3.7 -m paddle.distributed.launch \ +--devices=128 \ +ipu \ +--hosts=host1,host2 \ +--ipus_per_host=2 \ +--nproc_per_host=1 \ +--ipu_partition=pod128 \ +--vipu_server=lr17-1-ctrl \ +python/paddle/fluid/tests/unittests/ipu/disabled/test_dist_pod128_ipu.py +Equal to: +poprun \ +--host=localhost,host2 \ +--num-instances=2 \ +--num-replicas=64 \ +--ipus-per-replica=2 \ +--print-topology=yes \ +--vipu-partition=pod128_bert \ +--vipu-server-host=lr17-1-ctrl \ +--update-partition=yes \ +python3.7 python/paddle/fluid/tests/unittests/ipu/disabled/test_dist_pod128_ipu.py +''' + +import os +import numpy as np +import paddle + + +def TestDistTraining(): + paddle.enable_static() + + attrs = {"size": [128, 16], "padding_idx": -1, "dtype": 'float32'} + + scope = paddle.fluid.core.Scope() + main_prog = paddle.static.Program() + startup_prog = paddle.static.Program() + main_prog.random_seed = 42 + startup_prog.random_seed = 42 + + np.random.seed(42) + input_data = np.random.uniform(0, 127, size=[128, 3, 2, 1]).astype(np.int32) + + with paddle.fluid.scope_guard(scope): + with paddle.static.program_guard(main_prog, startup_prog): + x = paddle.static.data(name="x", shape=[3, 2, 1], dtype='int64') + with paddle.static.ipu_shard_guard(index=0, stage=0): + out = paddle.fluid.layers.embedding(x, **attrs) + with paddle.static.ipu_shard_guard(index=1, stage=1): + loss = paddle.mean(out) + opt = paddle.optimizer.Adam(learning_rate=1e-1) + opt.minimize(loss) + + feed_list = ["x"] + fetch_list = [loss.name] + + place = paddle.IPUPlace() + exe = paddle.static.Executor(place) + exe.run(startup_prog) + + ipu_strategy = paddle.static.IpuStrategy() + ipu_strategy.set_graph_config(num_ipus=64, + is_training=True, + enable_manual_shard=True) + ipu_strategy.set_pipelining_config( + enable_pipelining=True, + batches_per_step=1, + enable_gradient_accumulation=True, + accumulation_factor=4) + ipu_strategy.set_options({ + "enable_distribution": + True, + "enable_replicated_graphs": + True, + "replicated_graph_count": + 32, + "enable_distributed_replicated_graphs": + True, + "global_replica_offset": + # Paddle : int(os.environ.get("PADDLE_TRAINER_ID")) * 32 + # PopRun : int(os.environ.get("POPDIST_REPLICA_INDEX_OFFSET")) + int(os.environ.get("PADDLE_TRAINER_ID")) * 32, + "global_replication_factor": + 64, + "location_optimizer": { + "on_chip": False, + "use_replicated_tensor_sharding": True + } + }) + + ipu_program = paddle.static.IpuCompiledProgram( + main_prog, ipu_strategy=ipu_strategy) + program = ipu_program.compile(feed_list, fetch_list) + + for i in range(10): + res = exe.run(program, + feed={"x": input_data}, + fetch_list=fetch_list) + print("index: {}, result: {}".format(i, res)) + + +if __name__ == "__main__": + TestDistTraining() diff --git a/python/paddle/fluid/tests/unittests/ipu/distributed/test_dist_sample.py b/python/paddle/fluid/tests/unittests/ipu/distributed/test_dist_sample.py new file mode 100644 index 0000000000000000000000000000000000000000..d42977b5962d3d3a4cab782a85038c937a0b7a5d --- /dev/null +++ b/python/paddle/fluid/tests/unittests/ipu/distributed/test_dist_sample.py @@ -0,0 +1,178 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +''' +Single host: +python3.7 -m paddle.distributed.launch \ +--devices=4 \ +ipu \ +--hosts=localhost \ +--nproc_per_host=2 \ +--ipus_per_replica=1 \ +--ipu_partition=pod64 \ +--vipu_server=10.137.96.62 \ +python/paddle/fluid/tests/unittests/ipu/distributed/test_dist_sample.py +Equal to: +poprun \ +--host=localhost \ +--num-instances=2 \ +--num-replicas=4 \ +--ipus-per-replica=1 \ +--print-topology=yes \ +python3.7 python/paddle/fluid/tests/unittests/ipu/distributed/test_dist_sample.py +''' +''' +Multi hosts: +python3.7 -m paddle.distributed.launch \ +--devices=4 \ +ipu \ +--hosts=host1,host2 \ +--nproc_per_host=1 \ +--ipus_per_replica=1 \ +--ipu_partition=pod64 \ +--vipu_server=10.137.96.62 \ +python/paddle/fluid/tests/unittests/ipu/distributed/test_dist_sample.py +Equal to: +poprun \ +--host=host1,host2 \ +--num-instances=2 \ +--num-replicas=4 \ +--ipus-per-replica=1 \ +--print-topology=yes \ +python3.7 python/paddle/fluid/tests/unittests/ipu/distributed/test_dist_sample.py +''' + +import os +import sys +import paddle +import numpy as np + +mpi_comm = None + + +def Test(use_dist, file_name): + paddle.enable_static() + + attrs = {"size": [128, 16], "padding_idx": -1, "dtype": 'float32'} + + scope = paddle.fluid.core.Scope() + main_prog = paddle.static.Program() + startup_prog = paddle.static.Program() + main_prog.random_seed = 42 + startup_prog.random_seed = 42 + + with paddle.fluid.scope_guard(scope): + with paddle.static.program_guard(main_prog, startup_prog): + x = paddle.static.data(name="x", shape=[3, 2, 1], dtype='int64') + + out = paddle.fluid.layers.embedding(x, **attrs) + loss = paddle.mean(out) + opt = paddle.optimizer.Adam(learning_rate=1e-1) + opt.minimize(loss) + + feed_list = ["x"] + fetch_list = [loss.name] + + place = paddle.IPUPlace() + exe = paddle.static.Executor(place) + exe.run(startup_prog) + + ipu_strategy = paddle.static.IpuStrategy() + if use_dist: + ipu_strategy.set_graph_config(num_ipus=2, is_training=True) + # Set distributed envs + ipu_strategy.set_options({ + "enable_distribution": + True, + "enable_replicated_graphs": + True, + "replicated_graph_count": + 2, + "enable_distributed_replicated_graphs": + True, + "global_replica_offset": + int(os.environ.get("PADDLE_TRAINER_ID")) * 2, + "global_replication_factor": + 4 + }) + else: + ipu_strategy.set_graph_config(num_ipus=4, is_training=True) + ipu_strategy.set_options({ + "enable_replicated_graphs": True, + "replicated_graph_count": 4, + }) + + ipu_program = paddle.static.IpuCompiledProgram( + main_prog, ipu_strategy=ipu_strategy) + program = ipu_program.compile(feed_list, fetch_list) + + if use_dist: + if os.environ.get("PADDLE_TRAINER_ID") == "0": + input_data = np.concatenate([ + np.array([[[1], [3]], [[2], [4]], + [[4], [127]]]).astype(np.int32), + np.array([[[1], [3]], [[2], [4]], + [[4], [127]]]).astype(np.int32) + ]) + else: + input_data = np.concatenate([ + np.array([[[8], [60]], [[50], [77]], + [[90], [13]]]).astype(np.int32), + np.array([[[8], [60]], [[50], [77]], + [[90], [13]]]).astype(np.int32) + ]) + else: + input_data = np.concatenate([ + np.array([[[1], [3]], [[2], [4]], + [[4], [127]]]).astype(np.int32), + np.array([[[1], [3]], [[2], [4]], + [[4], [127]]]).astype(np.int32), + np.array([[[8], [60]], [[50], [77]], + [[90], [13]]]).astype(np.int32), + np.array([[[8], [60]], [[50], [77]], + [[90], [13]]]).astype(np.int32) + ]) + feed_data = {"x": input_data} + + for step in range(10): + res = exe.run(program, feed=feed_data, fetch_list=fetch_list) + + if use_dist: + res = mpi_comm.gather(res) + if os.getenv("PADDLE_TRAINER_ID") == "0": + np.savetxt(file_name, np.array(res).flatten()) + else: + np.savetxt(file_name, np.array(res).flatten()) + + +if __name__ == "__main__": + file_name = sys.argv[1] + + use_dist = False + if 'PADDLE_TRAINER_ID' in os.environ: + from mpi4py import MPI + + DISTRIBUTED_COMM = MPI.COMM_WORLD + + def _get_comm(): + global DISTRIBUTED_COMM + if DISTRIBUTED_COMM is None: + raise RuntimeError( + "Distributed Commumication not setup. Please run setup_comm(MPI.COMM_WORLD) first." + ) + return DISTRIBUTED_COMM + + mpi_comm = _get_comm() + use_dist = True + + Test(use_dist, file_name)