diff --git a/python/paddle/distributed/fleet/base/fleet_base.py b/python/paddle/distributed/fleet/base/fleet_base.py index f246afd5679e0ee38af5580d0a317a79318f7d70..a1830f3ba47ac0072dd9d5c68a29de7505306850 100644 --- a/python/paddle/distributed/fleet/base/fleet_base.py +++ b/python/paddle/distributed/fleet/base/fleet_base.py @@ -186,6 +186,15 @@ class Fleet(object): fleet.util._set_role_maker(self._role_maker) self.strategy_compiler = StrategyCompiler() + + if self._role_maker._is_non_distributed() and self._is_collective: + if paddle.fluid.core.is_compiled_with_cuda(): + gpus_num = paddle.fluid.core.get_cuda_device_count() + if gpus_num != 1: + raise ValueError( + "CUDA_VISIBLE_DEVICES shoule be set only 1 card if you use `python` to launch fleet program." + ) + if paddle.fluid.framework.in_dygraph_mode(): if self.worker_num() == 1: return @@ -569,8 +578,6 @@ class Fleet(object): """ self.user_defined_optimizer = optimizer - if paddle.fluid.framework.in_dygraph_mode(): - return self if strategy == None: strategy = DistributedStrategy() diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index 8f1b963ac8a5f7ee375578207840e9ed2e1907f4..6f24659515651848c8a864bd33beb2f4784f4c72 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -129,6 +129,7 @@ if (NOT ${WITH_GPU}) LIST(REMOVE_ITEM TEST_OPS test_parallel_dygraph_transformer) LIST(REMOVE_ITEM TEST_OPS test_parallel_dygraph_sync_batch_norm) LIST(REMOVE_ITEM TEST_OPS test_imperative_auto_mixed_precision) + LIST(REMOVE_ITEM TEST_OPS test_fleet_base_single) elseif(${CUDNN_VERSION} VERSION_LESS 7100) LIST(REMOVE_ITEM TEST_OPS test_conv2d_fusion_op) endif() diff --git a/python/paddle/fluid/tests/unittests/test_fleet_base.py b/python/paddle/fluid/tests/unittests/test_fleet_base.py index 3d4b2e218f725a0debbad3460e80e6c9b7eaf641..f50d80d215da8e6927544901fdd0dd7e814e064c 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_base.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_base.py @@ -171,45 +171,7 @@ class TestFleetDygraph(unittest.TestCase): final_strategy = fleet._final_strategy() -class LinearNet(nn.Layer): - def __init__(self): - super(LinearNet, self).__init__() - self._linear1 = nn.Linear(10, 10) - self._linear2 = nn.Linear(10, 1) - - def forward(self, x): - return self._linear2(self._linear1(x)) - - -class TestFleetDygraphSingle(unittest.TestCase): - def setUp(self): - os.environ["PADDLE_TRAINER_ENDPOINTS"] = "127.0.0.1:36213" - os.environ["PADDLE_CURRENT_ENDPOINTS"] = "127.0.0.1:36213" - os.environ["PADDLE_TRAINERS_NUM"] = "1" - os.environ["PADDLE_TRAINER_ID"] = "0" - - def test_dygraph_single(self): - paddle.disable_static() - fleet.init(is_collective=True) - - layer = LinearNet() - loss_fn = nn.MSELoss() - adam = paddle.optimizer.Adam( - learning_rate=0.001, parameters=layer.parameters()) - - adam = fleet.distributed_optimizer(adam) - dp_layer = fleet.distributed_model(layer) - for step in range(2): - inputs = paddle.randn([10, 10], 'float32') - outputs = dp_layer(inputs) - labels = paddle.randn([10, 1], 'float32') - loss = loss_fn(outputs, labels) - loss.backward() - adam.step() - adam.clear_grad() - - -class TestFleetBaseSingleRunCollective(unittest.TestCase): +class TestFleetBaseSingleError(unittest.TestCase): def setUp(self): os.environ.pop("PADDLE_TRAINER_ENDPOINTS") @@ -221,71 +183,23 @@ class TestFleetBaseSingleRunCollective(unittest.TestCase): } def test_single_run_collective_minimize(self): - input_x = paddle.static.data(name="x", shape=[-1, 32], dtype='float32') - input_y = paddle.static.data(name="y", shape=[-1, 1], dtype='int64') - - fc_1 = fluid.layers.fc(input=input_x, size=64, act='tanh') - prediction = fluid.layers.fc(input=fc_1, size=2, act='softmax') - cost = fluid.layers.cross_entropy(input=prediction, label=input_y) - avg_cost = paddle.mean(x=cost) - - fleet.init(is_collective=True) - optimizer = fluid.optimizer.SGD(learning_rate=0.001) - optimizer = fleet.distributed_optimizer(optimizer) - optimizer.minimize(avg_cost) - - place = fluid.CUDAPlace(0) if paddle.fluid.is_compiled_with_cuda( - ) else fluid.CPUPlace() - - exe = fluid.Executor(place) - exe.run(paddle.static.default_startup_program()) - - for i in range(10): - cost_val = exe.run(feed=self.gen_data(), fetch_list=[avg_cost.name]) - print("cost of step[{}] = {}".format(i, cost_val)) - - -class TestFleetBaseSingleRunPS(unittest.TestCase): - def setUp(self): - os.environ.pop("PADDLE_PSERVERS_IP_PORT_LIST") - - def gen_data(self): - return { - "x": np.random.random(size=(128, 32)).astype('float32'), - "y": np.random.randint( - 2, size=(128, 1)).astype('int64') - } - - def test_single_run_ps_minimize(self): - input_x = paddle.static.data(name="x", shape=[-1, 32], dtype='float32') - input_y = paddle.static.data(name="y", shape=[-1, 1], dtype='int64') - - fc_1 = fluid.layers.fc(input=input_x, size=64, act='tanh') - prediction = fluid.layers.fc(input=fc_1, size=2, act='softmax') - cost = fluid.layers.cross_entropy(input=prediction, label=input_y) - avg_cost = paddle.mean(x=cost) - - fleet.init() - strategy = paddle.distributed.fleet.DistributedStrategy() - optimizer = fluid.optimizer.SGD(learning_rate=0.01) - optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy) - optimizer.minimize(avg_cost) - if fleet.is_server(): - fleet.init_server() - fleet.run_server() - elif fleet.is_worker(): - place = fluid.CPUPlace() - exe = fluid.Executor(place) - exe.run(paddle.static.default_startup_program()) - step = 100 - for i in range(step): - cost_val = exe.run(program=fluid.default_main_program(), - feed=self.gen_data(), - fetch_list=[avg_cost.name]) - print("worker_index: %d, step%d cost = %f" % - (fleet.worker_index(), i, cost_val[0])) - fleet.save_persistables(exe, "fleet_single_model/") - print("save fleet models done.") + def test_single_error(): + input_x = paddle.static.data( + name="x", shape=[-1, 32], dtype='float32') + input_y = paddle.static.data(name="y", shape=[-1, 1], dtype='int64') + + fc_1 = fluid.layers.fc(input=input_x, size=64, act='tanh') + prediction = fluid.layers.fc(input=fc_1, size=2, act='softmax') + cost = fluid.layers.cross_entropy(input=prediction, label=input_y) + avg_cost = paddle.mean(x=cost) + fleet.init(is_collective=True) + + # in non_distributed mode(use `python` to launch), raise error if has multi cards + if fluid.core.is_compiled_with_cuda( + ) and fluid.core.get_cuda_device_count() > 1: + self.assertRaises(ValueError, test_single_error) + else: + test_single_error() if __name__ == "__main__": diff --git a/python/paddle/fluid/tests/unittests/test_fleet_base_single.py b/python/paddle/fluid/tests/unittests/test_fleet_base_single.py new file mode 100644 index 0000000000000000000000000000000000000000..111a6331958ca28c94ea608967d9b0c80f5ef55e --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_fleet_base_single.py @@ -0,0 +1,148 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import numpy as np +import os +cuda_visible_devices = os.getenv('CUDA_VISIBLE_DEVICES') +if cuda_visible_devices is None or cuda_visible_devices == "": + os.environ['CUDA_VISIBLE_DEVICES'] = '0' +else: + os.environ['CUDA_VISIBLE_DEVICES'] = cuda_visible_devices.split(',')[0] +import paddle +import paddle.distributed.fleet as fleet +import paddle.distributed.fleet.base.role_maker as role_maker +import paddle.fluid as fluid +import unittest +import paddle.nn as nn + + +class LinearNet(nn.Layer): + def __init__(self): + super(LinearNet, self).__init__() + self._linear1 = nn.Linear(10, 10) + self._linear2 = nn.Linear(10, 1) + + def forward(self, x): + return self._linear2(self._linear1(x)) + + +class TestFleetDygraphSingle(unittest.TestCase): + def setUp(self): + os.environ["PADDLE_TRAINER_ENDPOINTS"] = "127.0.0.1:36213" + os.environ["PADDLE_CURRENT_ENDPOINTS"] = "127.0.0.1:36213" + os.environ["PADDLE_TRAINERS_NUM"] = "1" + os.environ["PADDLE_TRAINER_ID"] = "0" + + def test_dygraph_single(self): + paddle.disable_static() + fleet.init(is_collective=True) + + layer = LinearNet() + loss_fn = nn.MSELoss() + adam = paddle.optimizer.Adam( + learning_rate=0.001, parameters=layer.parameters()) + + adam = fleet.distributed_optimizer(adam) + dp_layer = fleet.distributed_model(layer) + for step in range(2): + inputs = paddle.randn([10, 10], 'float32') + outputs = dp_layer(inputs) + labels = paddle.randn([10, 1], 'float32') + loss = loss_fn(outputs, labels) + loss = dp_layer.scale_loss(loss) + loss.backward() + dp_layer.apply_collective_grads() + adam.step() + adam.clear_grad() + + +class TestFleetBaseSingleRunCollective(unittest.TestCase): + def setUp(self): + pass + + def gen_data(self): + return { + "x": np.random.random(size=(128, 32)).astype('float32'), + "y": np.random.randint( + 2, size=(128, 1)).astype('int64') + } + + def test_single_run_collective_minimize(self): + input_x = paddle.static.data(name="x", shape=[-1, 32], dtype='float32') + input_y = paddle.static.data(name="y", shape=[-1, 1], dtype='int64') + + fc_1 = fluid.layers.fc(input=input_x, size=64, act='tanh') + prediction = fluid.layers.fc(input=fc_1, size=2, act='softmax') + cost = fluid.layers.cross_entropy(input=prediction, label=input_y) + avg_cost = paddle.mean(x=cost) + + fleet.init(is_collective=True) + optimizer = fluid.optimizer.SGD(learning_rate=0.001) + optimizer = fleet.distributed_optimizer(optimizer) + optimizer.minimize(avg_cost) + + place = fluid.CUDAPlace(0) if paddle.fluid.is_compiled_with_cuda( + ) else fluid.CPUPlace() + + exe = fluid.Executor(place) + exe.run(paddle.static.default_startup_program()) + + for i in range(10): + cost_val = exe.run(feed=self.gen_data(), fetch_list=[avg_cost.name]) + print("cost of step[{}] = {}".format(i, cost_val)) + + +class TestFleetBaseSingleRunPS(unittest.TestCase): + def setUp(self): + pass + + def gen_data(self): + return { + "x": np.random.random(size=(128, 32)).astype('float32'), + "y": np.random.randint( + 2, size=(128, 1)).astype('int64') + } + + def test_single_run_ps_minimize(self): + input_x = paddle.static.data(name="x", shape=[-1, 32], dtype='float32') + input_y = paddle.static.data(name="y", shape=[-1, 1], dtype='int64') + + fc_1 = fluid.layers.fc(input=input_x, size=64, act='tanh') + prediction = fluid.layers.fc(input=fc_1, size=2, act='softmax') + cost = fluid.layers.cross_entropy(input=prediction, label=input_y) + avg_cost = paddle.mean(x=cost) + + fleet.init() + strategy = paddle.distributed.fleet.DistributedStrategy() + optimizer = fluid.optimizer.SGD(learning_rate=0.01) + optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy) + optimizer.minimize(avg_cost) + if fleet.is_server(): + fleet.init_server() + fleet.run_server() + elif fleet.is_worker(): + place = fluid.CPUPlace() + exe = fluid.Executor(place) + exe.run(paddle.static.default_startup_program()) + step = 10 + for i in range(step): + cost_val = exe.run(program=fluid.default_main_program(), + feed=self.gen_data(), + fetch_list=[avg_cost.name]) + print("worker_index: %d, step%d cost = %f" % + (fleet.worker_index(), i, cost_val[0])) + + +if __name__ == "__main__": + unittest.main()