test_fleet_distributed_strategy.py 18.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import unittest
import paddle
import os


class TestStrategyConfig(unittest.TestCase):
    def test_amp(self):
22
        strategy = paddle.distributed.fleet.DistributedStrategy()
23 24 25 26 27 28 29
        strategy.amp = True
        self.assertEqual(strategy.amp, True)
        strategy.amp = False
        self.assertEqual(strategy.amp, False)
        strategy.amp = "True"
        self.assertEqual(strategy.amp, False)

30
    def test_amp_configs(self):
31
        strategy = paddle.distributed.fleet.DistributedStrategy()
32 33 34 35 36 37 38 39 40 41
        configs = {
            "init_loss_scaling": 32768,
            "decr_every_n_nan_or_inf": 2,
            "incr_every_n_steps": 1000,
            "incr_ratio": 2.0,
            "use_dynamic_loss_scaling": True,
            "decr_ratio": 0.5
        }
        strategy.amp_configs = configs
        self.assertEqual(strategy.amp_configs["init_loss_scaling"], 32768)
42 43

    def test_recompute(self):
44
        strategy = paddle.distributed.fleet.DistributedStrategy()
45 46 47 48 49 50 51
        strategy.recompute = True
        self.assertEqual(strategy.recompute, True)
        strategy.recompute = False
        self.assertEqual(strategy.recompute, False)
        strategy.recompute = "True"
        self.assertEqual(strategy.recompute, False)

52
    def test_recompute_configs(self):
53
        strategy = paddle.distributed.fleet.DistributedStrategy()
54 55 56
        configs = {"checkpoints": ["x", "y"]}
        strategy.recompute_configs = configs
        self.assertEqual(len(strategy.recompute_configs["checkpoints"]), 2)
57 58

    def test_pipeline(self):
59
        strategy = paddle.distributed.fleet.DistributedStrategy()
60 61 62 63 64 65 66
        strategy.pipeline = True
        self.assertEqual(strategy.pipeline, True)
        strategy.pipeline = False
        self.assertEqual(strategy.pipeline, False)
        strategy.pipeline = "True"
        self.assertEqual(strategy.pipeline, False)

67
    def test_pipeline_configs(self):
68
        strategy = paddle.distributed.fleet.DistributedStrategy()
69
        configs = {"micro_batch_size": 4}
70
        strategy.pipeline_configs = configs
71 72 73 74
        self.assertEqual(strategy.pipeline_configs["micro_batch_size"], 4)
        configs = {"accumulate_steps": 2}
        strategy.pipeline_configs = configs
        self.assertEqual(strategy.pipeline_configs["accumulate_steps"], 2)
75

76 77 78 79 80 81 82 83 84 85 86
    def test_hybrid_parallel_configs(self):
        strategy = paddle.distributed.fleet.DistributedStrategy()
        strategy.hybrid_configs = {
            "dp_degree": 1,
            "mp_degree": 2,
            "pp_degree": 4
        }
        self.assertEqual(strategy.hybrid_configs["dp_degree"], 1)
        self.assertEqual(strategy.hybrid_configs["mp_degree"], 2)
        self.assertEqual(strategy.hybrid_configs["pp_degree"], 4)

87
    def test_localsgd(self):
88
        strategy = paddle.distributed.fleet.DistributedStrategy()
89 90 91 92 93 94 95
        strategy.localsgd = True
        self.assertEqual(strategy.localsgd, True)
        strategy.localsgd = False
        self.assertEqual(strategy.localsgd, False)
        strategy.localsgd = "True"
        self.assertEqual(strategy.localsgd, False)

96
    def test_localsgd_configs(self):
97
        strategy = paddle.distributed.fleet.DistributedStrategy()
98
        configs = {"k_steps": 4, "begin_step": 120}
99 100
        strategy.localsgd_configs = configs
        self.assertEqual(strategy.localsgd_configs["k_steps"], 4)
101
        self.assertEqual(strategy.localsgd_configs["begin_step"], 120)
102

103 104 105 106 107 108 109
    def test_adaptive_localsgd_configs(self):
        strategy = paddle.distributed.fleet.DistributedStrategy()
        configs = {"init_k_steps": 1, "begin_step": 120}
        strategy.adaptive_localsgd_configs = configs
        self.assertEqual(strategy.adaptive_localsgd_configs["init_k_steps"], 1)
        self.assertEqual(strategy.adaptive_localsgd_configs["begin_step"], 120)

110
    def test_dgc(self):
111
        strategy = paddle.distributed.fleet.DistributedStrategy()
112 113 114 115 116 117 118
        strategy.dgc = True
        self.assertEqual(strategy.dgc, True)
        strategy.dgc = False
        self.assertEqual(strategy.dgc, False)
        strategy.dgc = "True"
        self.assertEqual(strategy.dgc, False)

119 120 121 122 123 124 125 126 127 128
    def test_fp16_allreduce(self):
        strategy = paddle.distributed.fleet.DistributedStrategy()
        strategy.fp16_allreduce = True
        self.assertEqual(strategy.fp16_allreduce, True)
        strategy.fp16_allreduce = False
        self.assertEqual(strategy.fp16_allreduce, False)
        with self.assertRaises(TypeError):
            strategy.fp16_allreduce = "True"
        self.assertEqual(strategy.fp16_allreduce, False)

129
    def test_sync_nccl_allreduce(self):
130
        strategy = paddle.distributed.fleet.DistributedStrategy()
131 132 133 134 135 136
        strategy.sync_nccl_allreduce = True
        self.assertEqual(strategy.sync_nccl_allreduce, True)
        strategy.sync_nccl_allreduce = False
        self.assertEqual(strategy.sync_nccl_allreduce, False)
        strategy.sync_nccl_allreduce = "True"
        self.assertEqual(strategy.sync_nccl_allreduce, False)
137

138
    def test_nccl_comm_num(self):
139
        strategy = paddle.distributed.fleet.DistributedStrategy()
140 141 142 143 144
        strategy.nccl_comm_num = 1
        self.assertEqual(strategy.nccl_comm_num, 1)
        strategy.nccl_comm_num = "2"
        self.assertEqual(strategy.nccl_comm_num, 1)

145
    def test_use_hierarchical_allreduce(self):
146
        strategy = paddle.distributed.fleet.DistributedStrategy()
147 148 149 150 151 152 153 154
        strategy.use_hierarchical_allreduce = True
        self.assertEqual(strategy.use_hierarchical_allreduce, True)
        strategy.use_hierarchical_allreduce = False
        self.assertEqual(strategy.use_hierarchical_allreduce, False)
        strategy.use_hierarchical_allreduce = "True"
        self.assertEqual(strategy.use_hierarchical_allreduce, False)

    def test_hierarchical_allreduce_inter_nranks(self):
155
        strategy = paddle.distributed.fleet.DistributedStrategy()
156 157 158 159 160 161
        strategy.hierarchical_allreduce_inter_nranks = 8
        self.assertEqual(strategy.hierarchical_allreduce_inter_nranks, 8)
        strategy.hierarchical_allreduce_inter_nranks = "4"
        self.assertEqual(strategy.hierarchical_allreduce_inter_nranks, 8)

    def test_sync_batch_norm(self):
162
        strategy = paddle.distributed.fleet.DistributedStrategy()
163 164 165 166 167 168 169 170
        strategy.sync_batch_norm = True
        self.assertEqual(strategy.sync_batch_norm, True)
        strategy.sync_batch_norm = False
        self.assertEqual(strategy.sync_batch_norm, False)
        strategy.sync_batch_norm = "True"
        self.assertEqual(strategy.sync_batch_norm, False)

    def test_fuse_all_reduce_ops(self):
171
        strategy = paddle.distributed.fleet.DistributedStrategy()
172 173 174 175 176 177 178
        strategy.fuse_all_reduce_ops = True
        self.assertEqual(strategy.fuse_all_reduce_ops, True)
        strategy.fuse_all_reduce_ops = False
        self.assertEqual(strategy.fuse_all_reduce_ops, False)
        strategy.fuse_all_reduce_ops = "True"
        self.assertEqual(strategy.fuse_all_reduce_ops, False)

179
    def test_fuse_grad_size_in_MB(self):
180
        strategy = paddle.distributed.fleet.DistributedStrategy()
181 182 183 184 185
        strategy.fuse_grad_size_in_MB = 50
        self.assertEqual(strategy.fuse_grad_size_in_MB, 50)
        strategy.fuse_grad_size_in_MB = "40"
        self.assertEqual(strategy.fuse_grad_size_in_MB, 50)

186 187 188 189 190 191 192
    def test_last_comm_group_size_MB(self):
        strategy = paddle.distributed.fleet.DistributedStrategy()
        strategy.last_comm_group_size_MB = 50
        self.assertEqual(strategy.last_comm_group_size_MB, 50)
        with self.assertRaises(ValueError):
            strategy.last_comm_group_size_MB = -1

193 194 195 196 197 198 199 200 201
    def test_find_unused_parameters(self):
        strategy = paddle.distributed.fleet.DistributedStrategy()
        strategy.find_unused_parameters = True
        self.assertEqual(strategy.find_unused_parameters, True)
        strategy.find_unused_parameters = False
        self.assertEqual(strategy.find_unused_parameters, False)
        strategy.find_unused_parameters = "True"
        self.assertEqual(strategy.find_unused_parameters, False)

202
    def test_fuse_grad_size_in_TFLOPS(self):
203
        strategy = paddle.distributed.fleet.DistributedStrategy()
204 205 206 207 208
        strategy._fuse_grad_size_in_TFLOPS = 0.1
        self.assertGreater(strategy._fuse_grad_size_in_TFLOPS, 0.09)
        strategy._fuse_grad_size_in_TFLOPS = "0.3"
        self.assertGreater(strategy._fuse_grad_size_in_TFLOPS, 0.09)

209
    def test_gradient_merge(self):
210
        strategy = paddle.distributed.fleet.DistributedStrategy()
211 212 213 214 215 216 217
        strategy.gradient_merge = True
        self.assertEqual(strategy.gradient_merge, True)
        strategy.gradient_merge = False
        self.assertEqual(strategy.gradient_merge, False)
        strategy.gradient_merge = "True"
        self.assertEqual(strategy.gradient_merge, False)

218
    def test_gradient_merge_configs(self):
219
        strategy = paddle.distributed.fleet.DistributedStrategy()
220 221 222
        configs = {"k_steps": 4}
        strategy.gradient_merge_configs = configs
        self.assertEqual(strategy.gradient_merge_configs["k_steps"], 4)
223 224

    def test_lars(self):
225
        strategy = paddle.distributed.fleet.DistributedStrategy()
226 227 228 229 230 231 232 233
        strategy.lars = True
        self.assertEqual(strategy.lars, True)
        strategy.lars = False
        self.assertEqual(strategy.lars, False)
        strategy.lars = "True"
        self.assertEqual(strategy.lars, False)

    def test_lamb(self):
234
        strategy = paddle.distributed.fleet.DistributedStrategy()
235 236 237 238 239 240 241
        strategy.lamb = True
        self.assertEqual(strategy.lamb, True)
        strategy.lamb = False
        self.assertEqual(strategy.lamb, False)
        strategy.lamb = "True"
        self.assertEqual(strategy.lamb, False)

D
Dong Daxiang 已提交
242
    def test_a_sync(self):
243
        strategy = paddle.distributed.fleet.DistributedStrategy()
D
Dong Daxiang 已提交
244 245 246 247
        strategy.a_sync = True
        self.assertEqual(strategy.a_sync, True)
        strategy.a_sync = False
        self.assertEqual(strategy.a_sync, False)
248 249 250

        with self.assertRaises(ValueError):
            strategy.a_sync = "True"
251

D
Dong Daxiang 已提交
252
    def test_a_sync_configs(self):
253
        strategy = paddle.distributed.fleet.DistributedStrategy()
254
        configs = {"k_steps": 1000}
D
Dong Daxiang 已提交
255 256
        strategy.a_sync_configs = configs
        self.assertEqual(strategy.a_sync_configs["k_steps"], 1000)
257

258 259
    def test_sparse_table_configs(self):
        strategy = paddle.distributed.fleet.DistributedStrategy()
260 261 262
        configs = {}
        configs['emb'] = {
            "table_parameters.emb.accessor.embed_sgd_param.adagrad.learning_rate":
263
            0.05,
264 265
            "table_parameters.emb.accessor.table_accessor_save_param.num": 2,
            "table_parameters.emb.accessor.table_accessor_save_param.param":
266
            [1, 2]
267 268
        }
        strategy.sparse_table_configs = configs
269 270 271 272
        self.assertEqual(strategy.sparse_table_configs[0]
                         .accessor.embed_sgd_param.adagrad.learning_rate, 0.05)
        self.assertEqual(strategy.sparse_table_configs[0]
                         .accessor.table_accessor_save_param[0].param, 1)
273

274 275 276 277 278 279 280 281 282 283
        strategy.adam_d2sum = True
        self.assertEqual(strategy.adam_d2sum, True)
        strategy.fs_client_param = {
            "uri": "123",
            "user": "456",
            "passwd": "789",
            "hadoop_bin": "hadoop"
        }
        self.assertEqual(strategy.fs_client_param.user, "456")

284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315
    def test_fleet_desc_configs(self):
        strategy = paddle.distributed.fleet.DistributedStrategy()
        configs = {}
        configs['emb'] = {"sparse_optimizer": "adagrad"}
        strategy.fleet_desc_configs = configs
        self.assertEqual(strategy.sparse_table_configs[0]
                         .accessor.embed_sgd_param.adagrad.learning_rate, 0.05)

        strategy = paddle.distributed.fleet.DistributedStrategy()
        configs = {}
        configs['emb'] = {"sparse_optimizer": "naive"}
        strategy.fleet_desc_configs = configs
        self.assertEqual(strategy.sparse_table_configs[0]
                         .accessor.embed_sgd_param.naive.learning_rate, 0.05)

        strategy = paddle.distributed.fleet.DistributedStrategy()
        configs = {}
        configs['emb'] = {"sparse_optimizer": "adam"}
        strategy.fleet_desc_configs = configs
        self.assertEqual(strategy.sparse_table_configs[0]
                         .accessor.embed_sgd_param.adam.beta1_decay_rate, 0.9)

        strategy = paddle.distributed.fleet.DistributedStrategy()
        configs = {}
        configs['emb'] = {
            "sparse_accessor_class": "DownpourUnitAccessor",
            "embed_sparse_optimizer": "std_adagrad"
        }
        strategy.fleet_desc_configs = configs
        self.assertEqual(strategy.sparse_table_configs[0]
                         .accessor.embed_sgd_param.adagrad.initial_range, 0)

316 317 318 319 320
    def test_trainer_desc_configs(self):
        strategy = paddle.distributed.fleet.DistributedStrategy()
        configs = {
            "dump_fields_path": "dump_data",
            "dump_fields": ["xxx", "yyy"],
321
            "dump_param": ['zzz']
322 323 324 325 326
        }
        strategy.trainer_desc_configs = configs
        self.assertEqual(strategy.trainer_desc_configs["dump_fields_path"],
                         "dump_data")
        self.assertEqual(len(strategy.trainer_desc_configs["dump_fields"]), 2)
327
        self.assertEqual(len(strategy.trainer_desc_configs["dump_param"]), 1)
328

329
    def test_elastic(self):
330
        strategy = paddle.distributed.fleet.DistributedStrategy()
331 332 333 334 335 336 337 338
        strategy.elastic = True
        self.assertEqual(strategy.elastic, True)
        strategy.elastic = False
        self.assertEqual(strategy.elastic, False)
        strategy.elastic = "True"
        self.assertEqual(strategy.elastic, False)

    def test_auto(self):
339
        strategy = paddle.distributed.fleet.DistributedStrategy()
340 341 342 343 344 345 346
        strategy.auto = True
        self.assertEqual(strategy.auto, True)
        strategy.auto = False
        self.assertEqual(strategy.auto, False)
        strategy.auto = "True"
        self.assertEqual(strategy.auto, False)

347
    def test_strategy_prototxt(self):
348
        strategy = paddle.distributed.fleet.DistributedStrategy()
D
Dong Daxiang 已提交
349
        strategy.a_sync = True
350 351
        strategy.localsgd = True
        strategy.dgc = True
352
        localsgd_configs = {"k_steps": 5, "begin_step": 1}
353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375
        strategy.localsgd_configs = localsgd_configs
        build_strategy = paddle.fluid.BuildStrategy()
        build_strategy.enable_sequential_execution = True
        build_strategy.nccl_comm_num = 10
        build_strategy.use_hierarchical_allreduce = True
        build_strategy.hierarchical_allreduce_inter_nranks = 1
        build_strategy.fuse_elewise_add_act_ops = True
        build_strategy.fuse_bn_act_ops = True
        build_strategy.enable_auto_fusion = True
        build_strategy.fuse_relu_depthwise_conv = True
        build_strategy.fuse_broadcast_ops = True
        build_strategy.fuse_all_optimizer_ops = True
        build_strategy.sync_batch_norm = True
        build_strategy.enable_inplace = True
        build_strategy.fuse_all_reduce_ops = True
        build_strategy.enable_backward_optimizer_op_deps = True
        build_strategy.trainers_endpoints = ["1", "2"]
        strategy.build_strategy = build_strategy
        exe_strategy = paddle.fluid.ExecutionStrategy()
        exe_strategy.num_threads = 10
        exe_strategy.num_iteration_per_drop_scope = 10
        exe_strategy.num_iteration_per_run = 10
        strategy.execution_strategy = exe_strategy
376
        strategy.save_to_prototxt("dist_strategy.prototxt")
377
        strategy2 = paddle.distributed.fleet.DistributedStrategy()
378
        strategy2.load_from_prototxt("dist_strategy.prototxt")
379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398
        self.assertEqual(strategy.dgc, strategy2.dgc)

    def test_build_strategy(self):
        build_strategy = paddle.fluid.BuildStrategy()
        build_strategy.enable_sequential_execution = True
        build_strategy.nccl_comm_num = 10
        build_strategy.use_hierarchical_allreduce = True
        build_strategy.hierarchical_allreduce_inter_nranks = 1
        build_strategy.fuse_elewise_add_act_ops = True
        build_strategy.fuse_bn_act_ops = True
        build_strategy.enable_auto_fusion = True
        build_strategy.fuse_relu_depthwise_conv = True
        build_strategy.fuse_broadcast_ops = True
        build_strategy.fuse_all_optimizer_ops = True
        build_strategy.sync_batch_norm = True
        build_strategy.enable_inplace = True
        build_strategy.fuse_all_reduce_ops = True
        build_strategy.enable_backward_optimizer_op_deps = True
        build_strategy.trainers_endpoints = ["1", "2"]

399
        strategy = paddle.distributed.fleet.DistributedStrategy()
400 401 402 403 404 405 406 407
        strategy.build_strategy = build_strategy

    def test_execution_strategy(self):
        exe_strategy = paddle.fluid.ExecutionStrategy()
        exe_strategy.num_threads = 10
        exe_strategy.num_iteration_per_drop_scope = 10
        exe_strategy.num_iteration_per_run = 10

408
        strategy = paddle.distributed.fleet.DistributedStrategy()
409
        strategy.execution_strategy = exe_strategy
410

411 412 413 414 415
    def test_unknown_strategy(self):
        strategy = paddle.distributed.fleet.DistributedStrategy()
        with self.assertRaises(TypeError):
            strategy.unknown_key = 'UNK'

416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437
    def test_cudnn_exhaustive_search(self):
        strategy = paddle.distributed.fleet.DistributedStrategy()
        strategy.cudnn_exhaustive_search = False
        self.assertEqual(strategy.cudnn_exhaustive_search, False)
        strategy.cudnn_exhaustive_search = "True"
        self.assertEqual(strategy.cudnn_exhaustive_search, False)

    def test_cudnn_batchnorm_spatial_persistent(self):
        strategy = paddle.distributed.fleet.DistributedStrategy()
        strategy.cudnn_batchnorm_spatial_persistent = False
        self.assertEqual(strategy.cudnn_batchnorm_spatial_persistent, False)
        strategy.cudnn_batchnorm_spatial_persistent = "True"
        self.assertEqual(strategy.cudnn_batchnorm_spatial_persistent, False)

    def test_conv_workspace_size_limit(self):
        strategy = paddle.distributed.fleet.DistributedStrategy()
        strategy.conv_workspace_size_limit = 1000
        self.assertEqual(strategy.conv_workspace_size_limit, 1000)
        strategy.conv_workspace_size_limit = "400"
        self.assertEqual(strategy.conv_workspace_size_limit, 1000)
        strategy._enable_env()

438 439 440 441 442 443 444 445
    def test_distributed_strategy_repr(self):
        strategy = paddle.distributed.fleet.DistributedStrategy()
        strategy.recompute = True
        strategy.recompute_configs = {"checkpoints": ["a1", "a2", "a3"]}
        strategy.amp = True
        strategy.localsgd = True
        print(str(strategy))

446 447 448

if __name__ == '__main__':
    unittest.main()