test_dist_mnist_fleetapi.py 2.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import unittest
from test_dist_base import TestDistBase
P
pangyoki 已提交
17 18 19
import paddle

paddle.enable_static()
20 21 22


class TestDistMnistNCCL2FleetApi(TestDistBase):
23

24 25 26 27 28
    def _setup_config(self):
        self._sync_mode = True
        self._use_reduce = False
        self._use_reader_alloc = False
        self._nccl2_mode = True
29
        self._use_fleet_api = True
30
        self._sync_batch_norm = True
31 32 33 34

    def test_dist_train(self):
        import paddle.fluid as fluid
        if fluid.core.is_compiled_with_cuda():
35 36 37 38 39
            self.check_with_place(
                "dist_mnist.py",
                delta=1e-5,
                check_error_log=True,
                need_envs={'FLAGS_allreduce_record_one_event': '1'})
40 41


42
class FleetCollectiveTest(unittest.TestCase):
43

44 45 46 47 48
    def test_open_sync_batch_norm(self):
        import paddle.fluid as fluid
        import paddle.fluid.incubate.fleet.base.role_maker as role_maker
        from paddle.fluid.incubate.fleet.collective import fleet, DistributedStrategy

49 50 51 52
        if not fluid.core.is_compiled_with_cuda():
            # Operator "gen_nccl_id" has not been registered
            return

53 54
        data = fluid.layers.data(name='X', shape=[1], dtype='float32')
        hidden = fluid.layers.fc(input=data, size=10)
55
        loss = paddle.mean(hidden)
56 57 58 59 60 61 62 63 64

        optimizer = fluid.optimizer.AdamOptimizer()

        role = role_maker.UserDefinedCollectiveRoleMaker(0, ['127.0.0.1:6170'])
        fleet.init(role)

        dist_strategy = DistributedStrategy()
        dist_strategy.sync_batch_norm = True

65 66
        dist_optimizer = fleet.distributed_optimizer(optimizer,
                                                     strategy=dist_strategy)
67 68 69 70 71
        dist_optimizer.minimize(loss)

        self.assertEqual(dist_strategy.exec_strategy.num_threads, 1)


72 73
if __name__ == "__main__":
    unittest.main()