test_fleet_distributed_strategy.py 16.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import unittest
import paddle
import os


class TestStrategyConfig(unittest.TestCase):
    def test_amp(self):
22
        strategy = paddle.distributed.fleet.DistributedStrategy()
23 24 25 26 27 28 29
        strategy.amp = True
        self.assertEqual(strategy.amp, True)
        strategy.amp = False
        self.assertEqual(strategy.amp, False)
        strategy.amp = "True"
        self.assertEqual(strategy.amp, False)

30
    def test_amp_configs(self):
31
        strategy = paddle.distributed.fleet.DistributedStrategy()
32 33 34 35 36 37 38 39 40 41
        configs = {
            "init_loss_scaling": 32768,
            "decr_every_n_nan_or_inf": 2,
            "incr_every_n_steps": 1000,
            "incr_ratio": 2.0,
            "use_dynamic_loss_scaling": True,
            "decr_ratio": 0.5
        }
        strategy.amp_configs = configs
        self.assertEqual(strategy.amp_configs["init_loss_scaling"], 32768)
42 43

    def test_recompute(self):
44
        strategy = paddle.distributed.fleet.DistributedStrategy()
45 46 47 48 49 50 51
        strategy.recompute = True
        self.assertEqual(strategy.recompute, True)
        strategy.recompute = False
        self.assertEqual(strategy.recompute, False)
        strategy.recompute = "True"
        self.assertEqual(strategy.recompute, False)

52
    def test_recompute_configs(self):
53
        strategy = paddle.distributed.fleet.DistributedStrategy()
54 55 56
        configs = {"checkpoints": ["x", "y"]}
        strategy.recompute_configs = configs
        self.assertEqual(len(strategy.recompute_configs["checkpoints"]), 2)
57 58

    def test_pipeline(self):
59
        strategy = paddle.distributed.fleet.DistributedStrategy()
60 61 62 63 64 65 66
        strategy.pipeline = True
        self.assertEqual(strategy.pipeline, True)
        strategy.pipeline = False
        self.assertEqual(strategy.pipeline, False)
        strategy.pipeline = "True"
        self.assertEqual(strategy.pipeline, False)

67
    def test_pipeline_configs(self):
68
        strategy = paddle.distributed.fleet.DistributedStrategy()
69
        configs = {"micro_batch_size": 4}
70
        strategy.pipeline_configs = configs
71 72 73 74
        self.assertEqual(strategy.pipeline_configs["micro_batch_size"], 4)
        configs = {"accumulate_steps": 2}
        strategy.pipeline_configs = configs
        self.assertEqual(strategy.pipeline_configs["accumulate_steps"], 2)
75

76 77 78 79 80 81 82 83 84 85 86
    def test_hybrid_parallel_configs(self):
        strategy = paddle.distributed.fleet.DistributedStrategy()
        strategy.hybrid_configs = {
            "dp_degree": 1,
            "mp_degree": 2,
            "pp_degree": 4
        }
        self.assertEqual(strategy.hybrid_configs["dp_degree"], 1)
        self.assertEqual(strategy.hybrid_configs["mp_degree"], 2)
        self.assertEqual(strategy.hybrid_configs["pp_degree"], 4)

87
    def test_localsgd(self):
88
        strategy = paddle.distributed.fleet.DistributedStrategy()
89 90 91 92 93 94 95
        strategy.localsgd = True
        self.assertEqual(strategy.localsgd, True)
        strategy.localsgd = False
        self.assertEqual(strategy.localsgd, False)
        strategy.localsgd = "True"
        self.assertEqual(strategy.localsgd, False)

96
    def test_localsgd_configs(self):
97
        strategy = paddle.distributed.fleet.DistributedStrategy()
98
        configs = {"k_steps": 4, "begin_step": 120}
99 100
        strategy.localsgd_configs = configs
        self.assertEqual(strategy.localsgd_configs["k_steps"], 4)
101
        self.assertEqual(strategy.localsgd_configs["begin_step"], 120)
102

103 104 105 106 107 108 109
    def test_adaptive_localsgd_configs(self):
        strategy = paddle.distributed.fleet.DistributedStrategy()
        configs = {"init_k_steps": 1, "begin_step": 120}
        strategy.adaptive_localsgd_configs = configs
        self.assertEqual(strategy.adaptive_localsgd_configs["init_k_steps"], 1)
        self.assertEqual(strategy.adaptive_localsgd_configs["begin_step"], 120)

110
    def test_dgc(self):
111
        strategy = paddle.distributed.fleet.DistributedStrategy()
112 113 114 115 116 117 118
        strategy.dgc = True
        self.assertEqual(strategy.dgc, True)
        strategy.dgc = False
        self.assertEqual(strategy.dgc, False)
        strategy.dgc = "True"
        self.assertEqual(strategy.dgc, False)

119 120 121 122 123 124 125 126 127 128
    def test_fp16_allreduce(self):
        strategy = paddle.distributed.fleet.DistributedStrategy()
        strategy.fp16_allreduce = True
        self.assertEqual(strategy.fp16_allreduce, True)
        strategy.fp16_allreduce = False
        self.assertEqual(strategy.fp16_allreduce, False)
        with self.assertRaises(TypeError):
            strategy.fp16_allreduce = "True"
        self.assertEqual(strategy.fp16_allreduce, False)

129
    def test_sync_nccl_allreduce(self):
130
        strategy = paddle.distributed.fleet.DistributedStrategy()
131 132 133 134 135 136
        strategy.sync_nccl_allreduce = True
        self.assertEqual(strategy.sync_nccl_allreduce, True)
        strategy.sync_nccl_allreduce = False
        self.assertEqual(strategy.sync_nccl_allreduce, False)
        strategy.sync_nccl_allreduce = "True"
        self.assertEqual(strategy.sync_nccl_allreduce, False)
137

138
    def test_nccl_comm_num(self):
139
        strategy = paddle.distributed.fleet.DistributedStrategy()
140 141 142 143 144
        strategy.nccl_comm_num = 1
        self.assertEqual(strategy.nccl_comm_num, 1)
        strategy.nccl_comm_num = "2"
        self.assertEqual(strategy.nccl_comm_num, 1)

145
    def test_use_hierarchical_allreduce(self):
146
        strategy = paddle.distributed.fleet.DistributedStrategy()
147 148 149 150 151 152 153 154
        strategy.use_hierarchical_allreduce = True
        self.assertEqual(strategy.use_hierarchical_allreduce, True)
        strategy.use_hierarchical_allreduce = False
        self.assertEqual(strategy.use_hierarchical_allreduce, False)
        strategy.use_hierarchical_allreduce = "True"
        self.assertEqual(strategy.use_hierarchical_allreduce, False)

    def test_hierarchical_allreduce_inter_nranks(self):
155
        strategy = paddle.distributed.fleet.DistributedStrategy()
156 157 158 159 160 161
        strategy.hierarchical_allreduce_inter_nranks = 8
        self.assertEqual(strategy.hierarchical_allreduce_inter_nranks, 8)
        strategy.hierarchical_allreduce_inter_nranks = "4"
        self.assertEqual(strategy.hierarchical_allreduce_inter_nranks, 8)

    def test_sync_batch_norm(self):
162
        strategy = paddle.distributed.fleet.DistributedStrategy()
163 164 165 166 167 168 169 170
        strategy.sync_batch_norm = True
        self.assertEqual(strategy.sync_batch_norm, True)
        strategy.sync_batch_norm = False
        self.assertEqual(strategy.sync_batch_norm, False)
        strategy.sync_batch_norm = "True"
        self.assertEqual(strategy.sync_batch_norm, False)

    def test_fuse_all_reduce_ops(self):
171
        strategy = paddle.distributed.fleet.DistributedStrategy()
172 173 174 175 176 177 178
        strategy.fuse_all_reduce_ops = True
        self.assertEqual(strategy.fuse_all_reduce_ops, True)
        strategy.fuse_all_reduce_ops = False
        self.assertEqual(strategy.fuse_all_reduce_ops, False)
        strategy.fuse_all_reduce_ops = "True"
        self.assertEqual(strategy.fuse_all_reduce_ops, False)

179
    def test_fuse_grad_size_in_MB(self):
180
        strategy = paddle.distributed.fleet.DistributedStrategy()
181 182 183 184 185
        strategy.fuse_grad_size_in_MB = 50
        self.assertEqual(strategy.fuse_grad_size_in_MB, 50)
        strategy.fuse_grad_size_in_MB = "40"
        self.assertEqual(strategy.fuse_grad_size_in_MB, 50)

186 187 188 189 190 191 192
    def test_last_comm_group_size_MB(self):
        strategy = paddle.distributed.fleet.DistributedStrategy()
        strategy.last_comm_group_size_MB = 50
        self.assertEqual(strategy.last_comm_group_size_MB, 50)
        with self.assertRaises(ValueError):
            strategy.last_comm_group_size_MB = -1

193 194 195 196 197 198 199 200 201
    def test_find_unused_parameters(self):
        strategy = paddle.distributed.fleet.DistributedStrategy()
        strategy.find_unused_parameters = True
        self.assertEqual(strategy.find_unused_parameters, True)
        strategy.find_unused_parameters = False
        self.assertEqual(strategy.find_unused_parameters, False)
        strategy.find_unused_parameters = "True"
        self.assertEqual(strategy.find_unused_parameters, False)

202
    def test_fuse_grad_size_in_TFLOPS(self):
203
        strategy = paddle.distributed.fleet.DistributedStrategy()
204 205 206 207 208
        strategy._fuse_grad_size_in_TFLOPS = 0.1
        self.assertGreater(strategy._fuse_grad_size_in_TFLOPS, 0.09)
        strategy._fuse_grad_size_in_TFLOPS = "0.3"
        self.assertGreater(strategy._fuse_grad_size_in_TFLOPS, 0.09)

209
    def test_gradient_merge(self):
210
        strategy = paddle.distributed.fleet.DistributedStrategy()
211 212 213 214 215 216 217
        strategy.gradient_merge = True
        self.assertEqual(strategy.gradient_merge, True)
        strategy.gradient_merge = False
        self.assertEqual(strategy.gradient_merge, False)
        strategy.gradient_merge = "True"
        self.assertEqual(strategy.gradient_merge, False)

218
    def test_gradient_merge_configs(self):
219
        strategy = paddle.distributed.fleet.DistributedStrategy()
220 221 222
        configs = {"k_steps": 4}
        strategy.gradient_merge_configs = configs
        self.assertEqual(strategy.gradient_merge_configs["k_steps"], 4)
223 224

    def test_lars(self):
225
        strategy = paddle.distributed.fleet.DistributedStrategy()
226 227 228 229 230 231 232 233
        strategy.lars = True
        self.assertEqual(strategy.lars, True)
        strategy.lars = False
        self.assertEqual(strategy.lars, False)
        strategy.lars = "True"
        self.assertEqual(strategy.lars, False)

    def test_lamb(self):
234
        strategy = paddle.distributed.fleet.DistributedStrategy()
235 236 237 238 239 240 241
        strategy.lamb = True
        self.assertEqual(strategy.lamb, True)
        strategy.lamb = False
        self.assertEqual(strategy.lamb, False)
        strategy.lamb = "True"
        self.assertEqual(strategy.lamb, False)

D
Dong Daxiang 已提交
242
    def test_a_sync(self):
243
        strategy = paddle.distributed.fleet.DistributedStrategy()
D
Dong Daxiang 已提交
244 245 246 247
        strategy.a_sync = True
        self.assertEqual(strategy.a_sync, True)
        strategy.a_sync = False
        self.assertEqual(strategy.a_sync, False)
248 249 250

        with self.assertRaises(ValueError):
            strategy.a_sync = "True"
251

D
Dong Daxiang 已提交
252
    def test_a_sync_configs(self):
253
        strategy = paddle.distributed.fleet.DistributedStrategy()
254
        configs = {"k_steps": 1000}
D
Dong Daxiang 已提交
255 256
        strategy.a_sync_configs = configs
        self.assertEqual(strategy.a_sync_configs["k_steps"], 1000)
257

258 259 260 261 262 263 264 265 266 267 268 269 270
    def test_trainer_desc_configs(self):
        strategy = paddle.distributed.fleet.DistributedStrategy()
        configs = {
            "dump_fields_path": "dump_data",
            "dump_fields": ["xxx", "yyy"],
            "dump_param": []
        }
        strategy.trainer_desc_configs = configs
        self.assertEqual(strategy.trainer_desc_configs["dump_fields_path"],
                         "dump_data")
        self.assertEqual(len(strategy.trainer_desc_configs["dump_fields"]), 2)
        self.assertEqual(len(strategy.trainer_desc_configs["dump_param"]), 0)

271
    def test_elastic(self):
272
        strategy = paddle.distributed.fleet.DistributedStrategy()
273 274 275 276 277 278 279 280
        strategy.elastic = True
        self.assertEqual(strategy.elastic, True)
        strategy.elastic = False
        self.assertEqual(strategy.elastic, False)
        strategy.elastic = "True"
        self.assertEqual(strategy.elastic, False)

    def test_auto(self):
281
        strategy = paddle.distributed.fleet.DistributedStrategy()
282 283 284 285 286 287 288
        strategy.auto = True
        self.assertEqual(strategy.auto, True)
        strategy.auto = False
        self.assertEqual(strategy.auto, False)
        strategy.auto = "True"
        self.assertEqual(strategy.auto, False)

289
    def test_strategy_prototxt(self):
290
        strategy = paddle.distributed.fleet.DistributedStrategy()
D
Dong Daxiang 已提交
291
        strategy.a_sync = True
292 293
        strategy.localsgd = True
        strategy.dgc = True
294
        localsgd_configs = {"k_steps": 5, "begin_step": 1}
295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317
        strategy.localsgd_configs = localsgd_configs
        build_strategy = paddle.fluid.BuildStrategy()
        build_strategy.enable_sequential_execution = True
        build_strategy.nccl_comm_num = 10
        build_strategy.use_hierarchical_allreduce = True
        build_strategy.hierarchical_allreduce_inter_nranks = 1
        build_strategy.fuse_elewise_add_act_ops = True
        build_strategy.fuse_bn_act_ops = True
        build_strategy.enable_auto_fusion = True
        build_strategy.fuse_relu_depthwise_conv = True
        build_strategy.fuse_broadcast_ops = True
        build_strategy.fuse_all_optimizer_ops = True
        build_strategy.sync_batch_norm = True
        build_strategy.enable_inplace = True
        build_strategy.fuse_all_reduce_ops = True
        build_strategy.enable_backward_optimizer_op_deps = True
        build_strategy.trainers_endpoints = ["1", "2"]
        strategy.build_strategy = build_strategy
        exe_strategy = paddle.fluid.ExecutionStrategy()
        exe_strategy.num_threads = 10
        exe_strategy.num_iteration_per_drop_scope = 10
        exe_strategy.num_iteration_per_run = 10
        strategy.execution_strategy = exe_strategy
318
        strategy.save_to_prototxt("dist_strategy.prototxt")
319
        strategy2 = paddle.distributed.fleet.DistributedStrategy()
320
        strategy2.load_from_prototxt("dist_strategy.prototxt")
321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340
        self.assertEqual(strategy.dgc, strategy2.dgc)

    def test_build_strategy(self):
        build_strategy = paddle.fluid.BuildStrategy()
        build_strategy.enable_sequential_execution = True
        build_strategy.nccl_comm_num = 10
        build_strategy.use_hierarchical_allreduce = True
        build_strategy.hierarchical_allreduce_inter_nranks = 1
        build_strategy.fuse_elewise_add_act_ops = True
        build_strategy.fuse_bn_act_ops = True
        build_strategy.enable_auto_fusion = True
        build_strategy.fuse_relu_depthwise_conv = True
        build_strategy.fuse_broadcast_ops = True
        build_strategy.fuse_all_optimizer_ops = True
        build_strategy.sync_batch_norm = True
        build_strategy.enable_inplace = True
        build_strategy.fuse_all_reduce_ops = True
        build_strategy.enable_backward_optimizer_op_deps = True
        build_strategy.trainers_endpoints = ["1", "2"]

341
        strategy = paddle.distributed.fleet.DistributedStrategy()
342 343 344 345 346 347 348 349
        strategy.build_strategy = build_strategy

    def test_execution_strategy(self):
        exe_strategy = paddle.fluid.ExecutionStrategy()
        exe_strategy.num_threads = 10
        exe_strategy.num_iteration_per_drop_scope = 10
        exe_strategy.num_iteration_per_run = 10

350
        strategy = paddle.distributed.fleet.DistributedStrategy()
351
        strategy.execution_strategy = exe_strategy
352

353 354 355 356 357
    def test_unknown_strategy(self):
        strategy = paddle.distributed.fleet.DistributedStrategy()
        with self.assertRaises(TypeError):
            strategy.unknown_key = 'UNK'

358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379
    def test_cudnn_exhaustive_search(self):
        strategy = paddle.distributed.fleet.DistributedStrategy()
        strategy.cudnn_exhaustive_search = False
        self.assertEqual(strategy.cudnn_exhaustive_search, False)
        strategy.cudnn_exhaustive_search = "True"
        self.assertEqual(strategy.cudnn_exhaustive_search, False)

    def test_cudnn_batchnorm_spatial_persistent(self):
        strategy = paddle.distributed.fleet.DistributedStrategy()
        strategy.cudnn_batchnorm_spatial_persistent = False
        self.assertEqual(strategy.cudnn_batchnorm_spatial_persistent, False)
        strategy.cudnn_batchnorm_spatial_persistent = "True"
        self.assertEqual(strategy.cudnn_batchnorm_spatial_persistent, False)

    def test_conv_workspace_size_limit(self):
        strategy = paddle.distributed.fleet.DistributedStrategy()
        strategy.conv_workspace_size_limit = 1000
        self.assertEqual(strategy.conv_workspace_size_limit, 1000)
        strategy.conv_workspace_size_limit = "400"
        self.assertEqual(strategy.conv_workspace_size_limit, 1000)
        strategy._enable_env()

380 381 382 383 384 385 386 387
    def test_distributed_strategy_repr(self):
        strategy = paddle.distributed.fleet.DistributedStrategy()
        strategy.recompute = True
        strategy.recompute_configs = {"checkpoints": ["a1", "a2", "a3"]}
        strategy.amp = True
        strategy.localsgd = True
        print(str(strategy))

388 389 390

if __name__ == '__main__':
    unittest.main()