test_dist_mnist_fleetapi.py 2.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import unittest
from test_dist_base import TestDistBase
P
pangyoki 已提交
17 18 19
import paddle

paddle.enable_static()
20 21 22 23 24 25 26 27


class TestDistMnistNCCL2FleetApi(TestDistBase):
    def _setup_config(self):
        self._sync_mode = True
        self._use_reduce = False
        self._use_reader_alloc = False
        self._nccl2_mode = True
28
        self._use_fleet_api = True
29
        self._sync_batch_norm = True
30 31 32

    def test_dist_train(self):
        import paddle.fluid as fluid
33

34
        if fluid.core.is_compiled_with_cuda():
35 36 37 38
            self.check_with_place(
                "dist_mnist.py",
                delta=1e-5,
                check_error_log=True,
39 40
                need_envs={'FLAGS_allreduce_record_one_event': '1'},
            )
41 42


43 44 45 46
class FleetCollectiveTest(unittest.TestCase):
    def test_open_sync_batch_norm(self):
        import paddle.fluid as fluid
        import paddle.fluid.incubate.fleet.base.role_maker as role_maker
47 48 49 50
        from paddle.fluid.incubate.fleet.collective import (
            fleet,
            DistributedStrategy,
        )
51

52 53 54 55
        if not fluid.core.is_compiled_with_cuda():
            # Operator "gen_nccl_id" has not been registered
            return

56 57
        data = fluid.layers.data(name='X', shape=[1], dtype='float32')
        hidden = fluid.layers.fc(input=data, size=10)
58
        loss = paddle.mean(hidden)
59 60 61 62 63 64 65 66 67

        optimizer = fluid.optimizer.AdamOptimizer()

        role = role_maker.UserDefinedCollectiveRoleMaker(0, ['127.0.0.1:6170'])
        fleet.init(role)

        dist_strategy = DistributedStrategy()
        dist_strategy.sync_batch_norm = True

68 69 70
        dist_optimizer = fleet.distributed_optimizer(
            optimizer, strategy=dist_strategy
        )
71 72 73 74 75
        dist_optimizer.minimize(loss)

        self.assertEqual(dist_strategy.exec_strategy.num_threads, 1)


76 77
if __name__ == "__main__":
    unittest.main()