未验证 提交 083d769b 编写于 作者: Y yaozhixin 提交者: GitHub

[IPU]update paddle.distributed.launch (#43311)

* update paddle.distributed.launch

* add sample code

* update shell

* fix typo

* fix typo

* update docs

* rm code

* fix doc 2

* fix doc 3

* fix doc 4
Co-authored-by: Nroot <root@sgjur-pod004-1.ipu.graphcore.cn>
上级 0e6462d6
...@@ -21,6 +21,7 @@ class DeviceType: ...@@ -21,6 +21,7 @@ class DeviceType:
XPU = 'xpu' XPU = 'xpu'
NPU = 'npu' NPU = 'npu'
MLU = 'mlu' MLU = 'mlu'
IPU = 'ipu'
class Device(object): class Device(object):
...@@ -69,6 +70,8 @@ class Device(object): ...@@ -69,6 +70,8 @@ class Device(object):
return 'FLAGS_selected_xpus' return 'FLAGS_selected_xpus'
if self._dtype == DeviceType.MLU: if self._dtype == DeviceType.MLU:
return 'FLAGS_selected_mlus' return 'FLAGS_selected_mlus'
if self._dtype == DeviceType.IPU:
return 'FLAGS_selected_ipus'
return 'FLAGS_selected_devices' return 'FLAGS_selected_devices'
def get_selected_devices(self, devices=''): def get_selected_devices(self, devices=''):
...@@ -130,6 +133,12 @@ class Device(object): ...@@ -130,6 +133,12 @@ class Device(object):
dev._dtype = DeviceType.MLU dev._dtype = DeviceType.MLU
num = fluid.core.get_mlu_device_count() num = fluid.core.get_mlu_device_count()
visible_devices = os.getenv("MLU_VISIBLE_DEVICES") visible_devices = os.getenv("MLU_VISIBLE_DEVICES")
elif fluid.core.is_compiled_with_ipu():
dev._dtype = DeviceType.IPU
num = fluid.core.get_ipu_device_count()
# For IPUs, 'labels' is a list which contains the available numbers of IPU devices.
dev._labels = [str(x) for x in range(0, num + 1)]
return dev
if num == 0: if num == 0:
dev._dtype = DeviceType.CPU dev._dtype = DeviceType.CPU
......
...@@ -17,9 +17,11 @@ __all__ = [] ...@@ -17,9 +17,11 @@ __all__ = []
from .collective import CollectiveController from .collective import CollectiveController
from .collective import CollectiveElasticController from .collective import CollectiveElasticController
from .ps import PSController from .ps import PSController
from .ipu_controller import IPUController
# the order is extremely important # the order is extremely important
_controllers = [ _controllers = [
IPUController,
CollectiveElasticController, CollectiveElasticController,
PSController, PSController,
CollectiveController, CollectiveController,
......
...@@ -29,6 +29,7 @@ import time ...@@ -29,6 +29,7 @@ import time
class ControleMode: class ControleMode:
COLLECTIVE = "collective" COLLECTIVE = "collective"
PS = "ps" PS = "ps"
IPU = "ipu"
class ControllerBase(object): class ControllerBase(object):
......
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
import argparse
from .collective import CollectiveController, ControleMode
from paddle.distributed.launch.job.container import Container
class IPUController(CollectiveController):
@classmethod
def enable(cls, ctx):
if ctx.args.training_script == "ipu":
ctx.logger.debug("{} enabled".format(cls.__name__))
ctx.args.run_mode = ControleMode.IPU
return True
else:
return False
def parse_ipu_args(self, args_list):
parser = argparse.ArgumentParser()
parser.add_argument("--hosts",
type=str,
help="The hosts for IPU distributd training.")
parser.add_argument("--nproc_per_host",
type=int,
help="The number of processes launched per host.")
parser.add_argument("--ipus_per_replica",
type=int,
help="The number of IPUs requested per replica.")
parser.add_argument("--ipu_partition",
type=str,
help="The partition name of IPU devices.")
parser.add_argument("--vipu_server",
type=str,
help="The ip of the IPU device manager.")
parser.add_argument(
"training_script",
type=str,
help=
"The full path to the IPU distributed training program/script to be launched in parallel. e.g., ``training.py``."
)
parser.add_argument('training_script_args', nargs=argparse.REMAINDER)
return parser.parse_args(args_list)
def replace_training_script(self):
# IPU distributed computing is based on PopRun which is a wrapper of MPI.
self.ctx.args.training_script = "poprun"
poprun_args = self.parse_ipu_args(self.ctx.args.training_script_args)
num_ipus = int(self.ctx.args.devices)
# The number of replicas for data parallel
assert (num_ipus % poprun_args.ipus_per_replica) == 0, \
"The number of IPUs:{} mod the number of IPUs per replica:{} must == 0".format(num_ipus, poprun_args.ipus_per_replica)
num_replicas = num_ipus // poprun_args.ipus_per_replica
self.ctx.logger.info(
"The number of total replicas is {}.".format(num_replicas))
# The number of processes
num_nodes = len(poprun_args.hosts.split(','))
num_procs = num_nodes * poprun_args.nproc_per_host
self.ctx.logger.info(
"The number of total processes is {}.".format(num_procs))
assert (num_replicas % num_procs) == 0, \
"The number of replicas:{} mod the number of processes:{} must == 0".format(num_replicas, num_procs)
# hosts and endpoints
hosts = poprun_args.hosts.replace(' ', '').split(',')
endpoints = [x + ":8090" for x in hosts]
# args for poprun
poprun_command = []
poprun_command.append('--num-instances={}'.format(num_procs))
poprun_command.append('--num-replicas={}'.format(num_replicas))
poprun_command.append('--ipus-per-replica={}'.format(
poprun_args.ipus_per_replica))
poprun_command.append('--host={}'.format(','.join(hosts)))
poprun_command.append('--vipu-partition={}'.format(
poprun_args.ipu_partition))
poprun_command.append('--vipu-server-host={}'.format(
poprun_args.vipu_server))
poprun_command.extend([
'--update-partition=no', '--vipu-server-timeout=120',
'--print-topology=yes', '--numa-aware=yes'
])
# global envs
global_envs = '--mpi-local-args=\''
log_level = os.getenv('POPART_LOG_LEVEL', None)
if log_level:
global_envs += '-x POPART_LOG_LEVEL={} '.format(log_level)
global_envs += '-x PADDLE_TRAINERS_NUM={} -x PADDLE_TRAINER_ENDPOINTS={}'.format(
num_procs, ','.join(endpoints))
global_envs += '\''
poprun_command.append(global_envs)
# local envs
for idx in range(num_procs):
cur_endpoint = endpoints[idx // poprun_args.nproc_per_host]
rank_in_node = idx % poprun_args.nproc_per_host
poprun_command.append(
'--instance-mpi-local-args={}:\"-x PADDLE_TRAINER_ID={} -x PADDLE_CURRENT_ENDPOINT={} -x PADDLE_RANK_IN_NODE={}\"'
.format(idx, idx, cur_endpoint, rank_in_node))
# executor
poprun_command.append(sys.executable)
# script and script args
poprun_command.append(poprun_args.training_script)
poprun_command.extend(poprun_args.training_script_args)
# for debug
print("----------- PopRun Command -----------")
print("poprun \\")
for i in range(len(poprun_command) - 1):
print("%s \\" % (poprun_command[i]))
print("%s" % (poprun_command[len(poprun_command) - 1]))
print("---------------------------------------")
# replace training_script_args
self.ctx.args.training_script_args = poprun_command
def _get_entrypoint(self):
entrypoint = [self.ctx.args.training_script]
entrypoint.extend(self.ctx.args.training_script_args)
entrypoint = [" ".join(entrypoint)]
return entrypoint
def new_container(self,
entrypoint=None,
envs={},
use_ctx_env=True,
out=None,
err=None):
c = Container(
entrypoint=(entrypoint or self._get_entrypoint()),
env=(self.ctx.get_envs() if use_ctx_env else {}),
)
c.outfile, c.errfile = self._get_out_err_file(out, err)
c.update_env(envs)
# Need subprocess.Popen(shell=True) for PopRun command
c.shell = True
return c
def run(self):
# Replace the training script with the PopRun command
self.replace_training_script()
self.build_job()
self.build_pod()
self.deploy_pod()
self.watch()
...@@ -37,6 +37,7 @@ class Container(object): ...@@ -37,6 +37,7 @@ class Container(object):
self._grace_period = 10 self._grace_period = 10
self._log_handler = None self._log_handler = None
self._shell = False
@property @property
def entrypoint(self): def entrypoint(self):
...@@ -70,6 +71,14 @@ class Container(object): ...@@ -70,6 +71,14 @@ class Container(object):
def errfile(self, err): def errfile(self, err):
self._err = err self._err = err
@property
def shell(self):
return self._shell
@shell.setter
def shell(self, shell):
self._shell = shell
def update_env(self, env={}, **kwargs): def update_env(self, env={}, **kwargs):
env = {k: v for k, v in env.items() if isinstance(v, str)} env = {k: v for k, v in env.items() if isinstance(v, str)}
self._env.update(env) self._env.update(env)
...@@ -109,7 +118,8 @@ class Container(object): ...@@ -109,7 +118,8 @@ class Container(object):
self._proc = ProcessContext(self._entrypoint, self._proc = ProcessContext(self._entrypoint,
env=self._env, env=self._env,
out=self._stdout, out=self._stdout,
err=self._stderr) err=self._stderr,
shell=self._shell)
self._proc.start() self._proc.start()
def terminate(self, force=False): def terminate(self, force=False):
......
...@@ -91,6 +91,26 @@ def launch(): ...@@ -91,6 +91,26 @@ def launch():
- ``--elastic_timeout``: Seconds to wait before elastic job begin to train. Default ``--elastic_timeout=30``. - ``--elastic_timeout``: Seconds to wait before elastic job begin to train. Default ``--elastic_timeout=30``.
IPU Parameters:
IPU distributed launch only requires and allowes three arguments ``--devices``, ``training_script`` and ``training_script_args``.
The ``--devices`` is the number of IPU devices. e.g., ``--devices=4`` will launch the training program with four IPU devices.
The ``training_script`` is only allowed to set as ``ipu``.
The ``training_script_args`` includes arguments required by IPU distributed launch and illustrated as below.
``Examples 10`` has provided a example of paddle.distributed.launch with IPUs.
- ``--hosts``: The hosts for IPU distributd training.
- ``--nproc_per_host``: The number of processes launched per host.
- ``--ipus_per_replica``: The number of IPUs requested per replica.
- ``--ipu_partition``: The partition name of IPU devices.
- ``--vipu_server``: The ip of the IPU device manager.
- ``training_script``: The full path to the IPU distributed training program/script to be launched in parallel. e.g., ``training.py``.
- ``training_script_args``: The args of the IPU distributed training program/script.
Returns: Returns:
- ``None`` - ``None``
...@@ -229,6 +249,15 @@ def launch(): ...@@ -229,6 +249,15 @@ def launch():
# once the number of nodes changes between 2:4 during training, the strategy holds # once the number of nodes changes between 2:4 during training, the strategy holds
Examples 10 (ipu):
.. code-block:: bash
:name: code-block-example-bash10
# With the following command, the job will begin to run the distributhed program with IPUs.
# Only support and require the `device_num` as the arg and `ipu` as the launch script.
# Please Check the details about the following args of the launch scripte from `utils/ipu_launch.py`.
python -m paddle.distributed.launch --devices 4 ipu --hosts=localhost --nproc_per_host=2 --ipus_per_replica=1 --ipu_partition=pod16 --vipu_server=127.0.0.1 train.py
""" """
# initialize the context to run # initialize the context to run
......
...@@ -24,7 +24,8 @@ class ProcessContext(object): ...@@ -24,7 +24,8 @@ class ProcessContext(object):
out=sys.stdout, out=sys.stdout,
err=sys.stderr, err=sys.stderr,
group=True, group=True,
preexec_fn=None): preexec_fn=None,
shell=False):
self._cmd = cmd self._cmd = cmd
self._env = env self._env = env
self._preexec_fn = preexec_fn self._preexec_fn = preexec_fn
...@@ -33,6 +34,7 @@ class ProcessContext(object): ...@@ -33,6 +34,7 @@ class ProcessContext(object):
self._group = group if os.name != 'nt' else False self._group = group if os.name != 'nt' else False
self._proc = None self._proc = None
self._code = None self._code = None
self._shell = shell
def _start(self): def _start(self):
pre_fn = os.setsid if self._group else None pre_fn = os.setsid if self._group else None
...@@ -40,7 +42,8 @@ class ProcessContext(object): ...@@ -40,7 +42,8 @@ class ProcessContext(object):
env=self._env, env=self._env,
stdout=self._stdout, stdout=self._stdout,
stderr=self._stderr, stderr=self._stderr,
preexec_fn=self._preexec_fn or pre_fn) preexec_fn=self._preexec_fn or pre_fn,
shell=self._shell)
def _close_std(self): def _close_std(self):
try: try:
......
#!/bin/bash
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
set -e
partition_name=pod64
vipu_server=10.137.96.62
allclose_script="
import sys
import numpy as np
data1 = np.loadtxt(\"ipu_res.txt\")
data2 = np.loadtxt(\"cpu_res.txt\")
if np.allclose(data1[::16], data2, atol=1e-6):
sys.exit(0)
else:
sys.exit(1)
"
for opt in lamb sgd adam ;
do
for onchip in False True ;
do
for rts in False True ;
do
echo "Testcase: opt: ${opt}, onchip: ${onchip}, rts: ${rts}"
echo "paddle.distributed.fleet.launch test with IPUs..."
python3.7 -m paddle.distributed.launch \
--devices=8 \
ipu \
--hosts=localhost \
--nproc_per_host=2 \
--ipus_per_replica=2 \
--ipu_partition=${partition_name} \
--vipu_server=${vipu_server} \
test_dist_data_parallel_ipu.py ${opt} ipu_res.txt ${onchip} ${rts} > ipu.log
echo "paddle.distributed.fleet.launch test with IPUs...Done"
echo "paddle normal test with CPU..."
export POPLAR_IPUMODEL=1
python3.7 test_dist_data_parallel_ipu.py ${opt} cpu_res.txt > cpu.log
unset POPLAR_IPUMODEL
echo "paddle normal test with CPU...Done"
echo "Compare results..."
python3.7 -c """${allclose_script}"""
if [ $? -eq 0 ];then
echo "Compare results...Done"
else
echo "Error occurs. Please check ipu.log, cpu.log, ipu_res.txt and cpu_res.txt"
exit 0
fi
done
done
done
if [ -f "ipu.log" ]; then
rm "ipu.log"
fi
if [ -f "cpu.log" ]; then
rm "cpu.log"
fi
if [ -f "ipu_res.txt" ]; then
rm "ipu_res.txt"
fi
if [ -f "cpu_res.txt" ]; then
rm "cpu_res.txt"
fi
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import sys
import os
import random
import numpy as np
import paddle
import paddle.static
from paddle.fluid.tests.unittests.ipu.op_test_ipu import IPUOpTest
mpi_comm = None
@unittest.skip('Disable distributed tests on auto CI.')
class TestBase(IPUOpTest):
def set_attrs(self, enable_ipu, optimizer, log, onchip=False, rts=False):
self.ipu_options = {
"enable_pipelining": True,
"batches_per_step": 1,
"enable_gradient_accumulation": True,
"accumulation_factor": 4,
"enable_replicated_graphs": True,
"replicated_graph_count": 2,
"location_optimizer": {
"on_chip": onchip,
"use_replicated_tensor_sharding": rts
}
}
self.cpu_bs = 16
self.ipu_bs = 1
self.optimizer = optimizer
self.log = log
self.enable_ipu = enable_ipu
def test(self):
seed = 2021
np.random.seed(seed)
random.seed(seed)
scope = paddle.static.Scope()
main_prog = paddle.static.Program()
startup_prog = paddle.static.Program()
main_prog.random_seed = seed
startup_prog.random_seed = seed
bs = self.ipu_bs if self.enable_ipu else self.cpu_bs
data = np.random.rand(1, 3, 10, 10).astype(np.float32)
with paddle.static.scope_guard(scope):
with paddle.static.program_guard(main_prog, startup_prog):
image = paddle.static.data(name='image',
shape=[bs, 3, 10, 10],
dtype='float32')
with paddle.static.ipu_shard_guard(index=0, stage=0):
conv1 = paddle.static.nn.conv2d(image,
num_filters=3,
filter_size=3,
bias_attr=False)
with paddle.static.ipu_shard_guard(index=1, stage=1):
conv2 = paddle.static.nn.conv2d(conv1,
num_filters=3,
filter_size=3,
bias_attr=False)
# should consider influence of bs
loss = paddle.mean(conv2)
if self.optimizer == 'sgd':
opt = paddle.optimizer.SGD(learning_rate=1e-2)
elif self.optimizer == 'adam':
opt = paddle.optimizer.Adam(learning_rate=1e-2)
elif self.optimizer == 'lamb':
opt = paddle.optimizer.Lamb(learning_rate=1e-2)
else:
raise Exception('optimizer must be sgd, adam or lamb')
opt.minimize(loss)
if self.enable_ipu:
place = paddle.IPUPlace()
else:
place = paddle.CPUPlace()
executor = paddle.static.Executor(place)
executor.run(startup_prog)
if self.enable_ipu:
feed_list = [image.name]
fetch_list = [loss.name]
ipu_strategy = paddle.static.IpuStrategy()
ipu_strategy.set_graph_config(
num_ipus=2 * self.ipu_options['replicated_graph_count'],
is_training=True,
enable_manual_shard=True)
ipu_strategy.set_options(self.ipu_options)
ipu_strategy.set_options({
"enable_distribution":
True,
"enable_distributed_replicated_graphs":
True,
"global_replica_offset":
int(os.environ.get("PADDLE_TRAINER_ID")) * 2,
"global_replication_factor":
4
})
program = paddle.static.IpuCompiledProgram(
main_prog, ipu_strategy=ipu_strategy).compile(
feed_list, fetch_list)
feed = {
"image":
np.tile(data, [
self.ipu_options['replicated_graph_count'] *
self.ipu_options['batches_per_step'] *
self.ipu_options['accumulation_factor'], 1, 1, 1
])
}
else:
program = main_prog
feed = {"image": np.tile(data, [self.cpu_bs, 1, 1, 1])}
epoch = 10
if not self.enable_ipu:
# global replication factor
epoch *= 4
epoch *= self.ipu_options['batches_per_step']
epoch *= self.ipu_options['accumulation_factor']
epoch = epoch / (self.cpu_bs / self.ipu_bs)
results = []
for i in range(int(epoch)):
res = executor.run(program, feed=feed, fetch_list=[loss])
if self.enable_ipu:
res = mpi_comm.gather(res, root=0)
results.append(res)
if self.enable_ipu:
if int(os.environ.get("PADDLE_TRAINER_ID")) == 0:
np.savetxt(self.log, np.array(results).flatten())
else:
np.savetxt(self.log, np.array(results).flatten())
if __name__ == "__main__":
paddle.enable_static()
# Run distributed tests
if len(sys.argv) == 5:
from mpi4py import MPI
DISTRIBUTED_COMM = MPI.COMM_WORLD
def _get_comm():
global DISTRIBUTED_COMM
if DISTRIBUTED_COMM is None:
raise RuntimeError(
"Distributed Commumication not setup. Please run setup_comm(MPI.COMM_WORLD) first."
)
return DISTRIBUTED_COMM
mpi_comm = _get_comm()
optimizer = sys.argv[1]
log = sys.argv[2]
onchip = True if sys.argv[3] == "True" else False
rts = True if sys.argv[4] == "True" else False
test = TestBase()
test.set_attrs(enable_ipu=True,
optimizer=optimizer,
log=log,
onchip=onchip,
rts=rts)
test.test()
# Run cpu tests for compare
elif len(sys.argv) == 3:
test = TestBase()
test.set_attrs(enable_ipu=False, optimizer=sys.argv[1], log=sys.argv[2])
test.test()
else:
raise ValueError(
"Only support 3 or 5 args. 3 for cpu test, 5 for ipu distributed test"
)
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
'''
python3.7 -m paddle.distributed.launch \
--devices=128 \
ipu \
--hosts=host1,host2 \
--ipus_per_host=2 \
--nproc_per_host=1 \
--ipu_partition=pod128 \
--vipu_server=lr17-1-ctrl \
python/paddle/fluid/tests/unittests/ipu/disabled/test_dist_pod128_ipu.py
Equal to:
poprun \
--host=localhost,host2 \
--num-instances=2 \
--num-replicas=64 \
--ipus-per-replica=2 \
--print-topology=yes \
--vipu-partition=pod128_bert \
--vipu-server-host=lr17-1-ctrl \
--update-partition=yes \
python3.7 python/paddle/fluid/tests/unittests/ipu/disabled/test_dist_pod128_ipu.py
'''
import os
import numpy as np
import paddle
def TestDistTraining():
paddle.enable_static()
attrs = {"size": [128, 16], "padding_idx": -1, "dtype": 'float32'}
scope = paddle.fluid.core.Scope()
main_prog = paddle.static.Program()
startup_prog = paddle.static.Program()
main_prog.random_seed = 42
startup_prog.random_seed = 42
np.random.seed(42)
input_data = np.random.uniform(0, 127, size=[128, 3, 2, 1]).astype(np.int32)
with paddle.fluid.scope_guard(scope):
with paddle.static.program_guard(main_prog, startup_prog):
x = paddle.static.data(name="x", shape=[3, 2, 1], dtype='int64')
with paddle.static.ipu_shard_guard(index=0, stage=0):
out = paddle.fluid.layers.embedding(x, **attrs)
with paddle.static.ipu_shard_guard(index=1, stage=1):
loss = paddle.mean(out)
opt = paddle.optimizer.Adam(learning_rate=1e-1)
opt.minimize(loss)
feed_list = ["x"]
fetch_list = [loss.name]
place = paddle.IPUPlace()
exe = paddle.static.Executor(place)
exe.run(startup_prog)
ipu_strategy = paddle.static.IpuStrategy()
ipu_strategy.set_graph_config(num_ipus=64,
is_training=True,
enable_manual_shard=True)
ipu_strategy.set_pipelining_config(
enable_pipelining=True,
batches_per_step=1,
enable_gradient_accumulation=True,
accumulation_factor=4)
ipu_strategy.set_options({
"enable_distribution":
True,
"enable_replicated_graphs":
True,
"replicated_graph_count":
32,
"enable_distributed_replicated_graphs":
True,
"global_replica_offset":
# Paddle : int(os.environ.get("PADDLE_TRAINER_ID")) * 32
# PopRun : int(os.environ.get("POPDIST_REPLICA_INDEX_OFFSET"))
int(os.environ.get("PADDLE_TRAINER_ID")) * 32,
"global_replication_factor":
64,
"location_optimizer": {
"on_chip": False,
"use_replicated_tensor_sharding": True
}
})
ipu_program = paddle.static.IpuCompiledProgram(
main_prog, ipu_strategy=ipu_strategy)
program = ipu_program.compile(feed_list, fetch_list)
for i in range(10):
res = exe.run(program,
feed={"x": input_data},
fetch_list=fetch_list)
print("index: {}, result: {}".format(i, res))
if __name__ == "__main__":
TestDistTraining()
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
'''
Single host:
python3.7 -m paddle.distributed.launch \
--devices=4 \
ipu \
--hosts=localhost \
--nproc_per_host=2 \
--ipus_per_replica=1 \
--ipu_partition=pod64 \
--vipu_server=10.137.96.62 \
python/paddle/fluid/tests/unittests/ipu/distributed/test_dist_sample.py
Equal to:
poprun \
--host=localhost \
--num-instances=2 \
--num-replicas=4 \
--ipus-per-replica=1 \
--print-topology=yes \
python3.7 python/paddle/fluid/tests/unittests/ipu/distributed/test_dist_sample.py
'''
'''
Multi hosts:
python3.7 -m paddle.distributed.launch \
--devices=4 \
ipu \
--hosts=host1,host2 \
--nproc_per_host=1 \
--ipus_per_replica=1 \
--ipu_partition=pod64 \
--vipu_server=10.137.96.62 \
python/paddle/fluid/tests/unittests/ipu/distributed/test_dist_sample.py
Equal to:
poprun \
--host=host1,host2 \
--num-instances=2 \
--num-replicas=4 \
--ipus-per-replica=1 \
--print-topology=yes \
python3.7 python/paddle/fluid/tests/unittests/ipu/distributed/test_dist_sample.py
'''
import os
import sys
import paddle
import numpy as np
mpi_comm = None
def Test(use_dist, file_name):
paddle.enable_static()
attrs = {"size": [128, 16], "padding_idx": -1, "dtype": 'float32'}
scope = paddle.fluid.core.Scope()
main_prog = paddle.static.Program()
startup_prog = paddle.static.Program()
main_prog.random_seed = 42
startup_prog.random_seed = 42
with paddle.fluid.scope_guard(scope):
with paddle.static.program_guard(main_prog, startup_prog):
x = paddle.static.data(name="x", shape=[3, 2, 1], dtype='int64')
out = paddle.fluid.layers.embedding(x, **attrs)
loss = paddle.mean(out)
opt = paddle.optimizer.Adam(learning_rate=1e-1)
opt.minimize(loss)
feed_list = ["x"]
fetch_list = [loss.name]
place = paddle.IPUPlace()
exe = paddle.static.Executor(place)
exe.run(startup_prog)
ipu_strategy = paddle.static.IpuStrategy()
if use_dist:
ipu_strategy.set_graph_config(num_ipus=2, is_training=True)
# Set distributed envs
ipu_strategy.set_options({
"enable_distribution":
True,
"enable_replicated_graphs":
True,
"replicated_graph_count":
2,
"enable_distributed_replicated_graphs":
True,
"global_replica_offset":
int(os.environ.get("PADDLE_TRAINER_ID")) * 2,
"global_replication_factor":
4
})
else:
ipu_strategy.set_graph_config(num_ipus=4, is_training=True)
ipu_strategy.set_options({
"enable_replicated_graphs": True,
"replicated_graph_count": 4,
})
ipu_program = paddle.static.IpuCompiledProgram(
main_prog, ipu_strategy=ipu_strategy)
program = ipu_program.compile(feed_list, fetch_list)
if use_dist:
if os.environ.get("PADDLE_TRAINER_ID") == "0":
input_data = np.concatenate([
np.array([[[1], [3]], [[2], [4]],
[[4], [127]]]).astype(np.int32),
np.array([[[1], [3]], [[2], [4]],
[[4], [127]]]).astype(np.int32)
])
else:
input_data = np.concatenate([
np.array([[[8], [60]], [[50], [77]],
[[90], [13]]]).astype(np.int32),
np.array([[[8], [60]], [[50], [77]],
[[90], [13]]]).astype(np.int32)
])
else:
input_data = np.concatenate([
np.array([[[1], [3]], [[2], [4]],
[[4], [127]]]).astype(np.int32),
np.array([[[1], [3]], [[2], [4]],
[[4], [127]]]).astype(np.int32),
np.array([[[8], [60]], [[50], [77]],
[[90], [13]]]).astype(np.int32),
np.array([[[8], [60]], [[50], [77]],
[[90], [13]]]).astype(np.int32)
])
feed_data = {"x": input_data}
for step in range(10):
res = exe.run(program, feed=feed_data, fetch_list=fetch_list)
if use_dist:
res = mpi_comm.gather(res)
if os.getenv("PADDLE_TRAINER_ID") == "0":
np.savetxt(file_name, np.array(res).flatten())
else:
np.savetxt(file_name, np.array(res).flatten())
if __name__ == "__main__":
file_name = sys.argv[1]
use_dist = False
if 'PADDLE_TRAINER_ID' in os.environ:
from mpi4py import MPI
DISTRIBUTED_COMM = MPI.COMM_WORLD
def _get_comm():
global DISTRIBUTED_COMM
if DISTRIBUTED_COMM is None:
raise RuntimeError(
"Distributed Commumication not setup. Please run setup_comm(MPI.COMM_WORLD) first."
)
return DISTRIBUTED_COMM
mpi_comm = _get_comm()
use_dist = True
Test(use_dist, file_name)
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册