distributed_strategy.py 22.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
#   Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

15
import paddle
16 17
from paddle.fleet.proto import distributed_strategy_pb2
from paddle.fluid.framework import Variable
18
import google.protobuf.text_format
19 20


21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
def get_msg_dict(msg):
    res_dict = {}
    fields = msg.DESCRIPTOR.fields
    for f in fields:
        res_dict[f.name] = getattr(msg, f.name)
    return res_dict


def assign_configs_value(msg, config):
    fields = msg.DESCRIPTOR.fields
    for key in config:
        for f in fields:
            if key == f.name:
                if f.label == 3:
                    getattr(msg, f.name).extend(config[f.name])
                elif f.label == 1 or f.label == 2:
                    setattr(msg, f.name, config[f.name])


def check_configs_key(msg, config, field_name):
    key_list = msg.DESCRIPTOR.fields_by_name.keys()
    for key in config:
        assert key in key_list, "key:{} not in {}".format(key, field_name)


46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
class DistributedJobInfo(object):
    """
    DistributedJobInfo will serialize all distributed training information
    Just for inner use: 1) debug 2) replicate experiments
    """

    def __init__(self):
        self.job_info = distributed_strategy_pb2.DistributedJobInfo()

    def _set_worker_num(self, worker_num):
        self.job_info.worker_num = worker_num

    def _set_server_num(self, server_num):
        self.job_info.server_num = server_num

    def _set_worker_ips(self, worker_ips):
        self.job_info.worker_ips.extend(worker_ips)

    def _set_server_endpoints(self, server_endpoints):
        self.job_info.server_endpoints.extend(server_endpoints)

    def _set_origin_startup(self, origin_startup_prog):
        self.job_info.origin_startup = str(origin_startup_prog)

    def _set_origin_main(self, origin_main_prog):
        self.job_info.origin_main = str(origin_main_prog)

    def _distributed_main(self, distributed_main_prog):
        self.job_info.distributed_main = str(distributed_main_prog)

    def _optimizer_name(self, optimizer_name):
        self.job_info.optimizer_name = optimizer_name

    def _set_distributed_strategy(self, dist_strategy):
        self.job_info.strategy = dist_strategy


class DistributedStrategy(object):
    def __init__(self):
85 86 87 88 89 90 91 92 93 94 95 96
        """
        DistributedStrategy is the main configuration entry for distributed training of Paddle.
        All of the distributed training configurations can be configured in DistributedStrategy,
        such as automatic mixed precision (AMP), Layer-wise Adaptive Rate Scaling (LARS), 
        asynchronous update parameter server(ASGD), etc.
        
        DistributedStrategy can be serialized into protobuf file or deserialized from protobuf file

        Users who run local training usually configure BuildStrategy and ExecutionStrategy, and 
        DistributedStrategy supports configurations from BuildStrategy and ExecutionStrategy

        """
97 98
        self.strategy = distributed_strategy_pb2.DistributedStrategy()

99
    def save_to_prototxt(self, output):
100 101 102 103 104 105 106 107 108 109 110 111 112
        """
        Serialize current DistributedStrategy to string and save to output file

        Examples:
          .. code-block:: python
        
            import paddle.fleet as fleet
            strategy = fleet.DistributedStrategy()
            strategy.dgc = True
            strategy.recompute = True
            strategy.recompute_configs = {"checkpoint": ["x"]}
            strategy.save_to_prototxt("dist_strategy.prototxt")
        """
113 114 115 116
        with open(output, "w") as fout:
            fout.write(str(self.strategy))

    def load_from_prototxt(self, pb_file):
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203
        """
        Load from prototxt file for DistributedStrategy initialization

        Examples:
          .. code-block:: python

            import paddle.fleet as fleet
            strategy = fleet.DistributedStrategy()
            strategy.load_from_prototxt("dist_strategy.protoxt")
        """
        with open(pb_file, 'r') as f:
            self.strategy = google.protobuf.text_format.Merge(
                str(f.read()), self.strategy)

    @property
    def execution_strategy(self):
        """
        Configure ExecutionStrategy for DistributedStrategy

        Examples:
          .. code-block:: python

            exe_strategy = paddle.fluid.ExecutionStrategy()
            exe_strategy.num_threads = 10
            exe_strategy.num_iteration_per_drop_scope = 10
            exe_strategy.num_iteration_per_run = 10

            strategy = paddle.fleet.DistributedStrategy()
            strategy.execution_strategy = exe_strategy
        """
        execution_strategy = paddle.fluid.ExecutionStrategy()
        fields = self.strategy.execution_strategy.DESCRIPTOR.fields
        for f in fields:
            setattr(execution_strategy, f.name,
                    getattr(self.strategy.execution_strategy, f.name))
        return execution_strategy

    @execution_strategy.setter
    def execution_strategy(self, strategy):
        fields = self.strategy.execution_strategy.DESCRIPTOR.fields
        for f in fields:
            setattr(self.strategy.execution_strategy, f.name,
                    getattr(strategy, f.name))

    @property
    def build_strategy(self):
        """
        Configure BuildStrategy for DistributedStrategy
        Note that the properties of BuildStrategy are valid in DistributedStrategy
        only if the property is non-distributed strategy.

        Examples:
          .. code-block:: python

            build_strategy = paddle.fluid.BuildStrategy()
            build_strategy.enable_sequential_execution = True
            build_strategy.fuse_elewise_add_act_ops = True
            build_strategy.fuse_bn_act_ops = True
            build_strategy.enable_auto_fusion = True
            build_strategy.fuse_relu_depthwise_conv = True
            build_strategy.fuse_broadcast_ops = True
            build_strategy.fuse_all_optimizer_ops = True
            build_strategy.enable_inplace = True
            
            strategy = paddle.fleet.DistributedStrategy()
            strategy.build_strategy = build_strategy
        """

        build_strategy = paddle.fluid.BuildStrategy()
        fields = self.strategy.build_strategy.DESCRIPTOR.fields
        for f in fields:
            setattr(build_strategy, f.name,
                    getattr(self.strategy.build_strategy, f.name))
        return build_strategy

    @build_strategy.setter
    def build_strategy(self, strategy):
        fields = self.strategy.build_strategy.DESCRIPTOR.fields
        for f in fields:
            if f.label == 1 or f.label == 2:  # optional and required field
                setattr(self.strategy.build_strategy, f.name,
                        getattr(strategy, f.name))
            elif f.label == 3:  # repeated field
                getattr(self.strategy.build_strategy,
                        f.name).extend(getattr(strategy, f.name))

    @property
D
Dong Daxiang 已提交
204
    def a_sync(self):
205 206 207 208 209 210 211 212 213 214 215 216 217 218
        """
        Indicating whether we are using asynchronous stocastic gradient descent updates
        for training. This property is valid when we are using parameter server training, 
        which is implied by setting approperate RoleMaker
        Default value: True

        Examples:
          .. code-block:: python

            import paddle.fleet as fleet
            role_maker = fleet.PaddleCloudRoleMaker()
            fleet.init(role_maker)

            strategy = fleet.DistributedStrategy()
D
Dong Daxiang 已提交
219
            strategy.a_sync = True  # by default this is True
220 221 222 223
            
            # code block for defining loss and local optimizer
            # sgd = fleet.distributed_optimizer(optimizer, strategy)
        """
D
Dong Daxiang 已提交
224
        return self.strategy.a_sync
225

D
Dong Daxiang 已提交
226 227
    @a_sync.setter
    def a_sync(self, flag):
228
        if isinstance(flag, bool):
D
Dong Daxiang 已提交
229
            self.strategy.a_sync = flag
230
            self.a_sync_configs = {"k_steps": 0}
231
        else:
232 233 234
            raise ValueError(
                "The type of `flag` is invalid, expected type is bool, but received %s".
                format(type(flag)))
235 236

    @property
D
Dong Daxiang 已提交
237
    def a_sync_configs(self):
238
        """
D
Dong Daxiang 已提交
239
        Set a_sync update configurations. In general, asynchronous parameter server
240 241
        training has serveral configurable settings that can be configured through
        a dict.
242

243
        **Notes**:
D
Dong Daxiang 已提交
244
            **Detailed arguments for a_sync_configs**
245 246 247 248 249 250 251
            **k_step**: number of local optimization updates before communication
            **max_merge_var_num**: maximum number of merged gradients before communication
            **send_queue_size**: a buffer size of worker communication
            **independent_recv_thread**: if we are using independent recv thread for communication
            **thread_pool_size**: number of thread pool
            **send_wait_times**: waiting time for sending gradients
            **runtime_split_send_recv**: if we are using Tensor split for send and recv during runtime
252

253 254
        Examples:
          .. code-block:: python
255

256 257 258
            import paddle.fleet as fleet
            role_maker = fleet.PaddleCloudRoleMaker()
            fleet.init(role_maker)
259

260
            strategy = fleet.DistributedStrategy()
D
Dong Daxiang 已提交
261
            strategy.a_sync = True  # by default this is True
262
            configs = {"k_step": 10000, "send_queue_size": 32}
D
Dong Daxiang 已提交
263
            strategy.a_sync_configs = configs
264

265 266 267
            # code block for defining loss and local optimizer
            # sgd = fleet.distributed_optimizer(optimizer, strategy)
        """
D
Dong Daxiang 已提交
268
        return get_msg_dict(self.strategy.a_sync_configs)
269

D
Dong Daxiang 已提交
270 271 272 273 274
    @a_sync_configs.setter
    def a_sync_configs(self, configs):
        check_configs_key(self.strategy.a_sync_configs, configs,
                          "a_sync_configs")
        assign_configs_value(self.strategy.a_sync_configs, configs)
275

276
    @property
277 278 279 280
    def amp(self):
        """
        Indicating whether we are using automatic mixed precision training
        Default Value: False
281

282 283
        Examples:
          .. code-block:: python
284

285 286 287
            import paddle.fleet as fleet
            strategy = fleet.DistributedStrategy()
            strategy.amp = True # by default this is false
288

289 290
        """
        return self.strategy.amp
291

292 293
    @amp.setter
    def amp(self, flag):
294
        if isinstance(flag, bool):
295
            self.strategy.amp = flag
296
        else:
297
            print("WARNING: amp should have value of bool type")
298 299

    @property
300 301
    def amp_configs(self):
        return get_msg_dict(self.strategy.amp_configs)
302

303 304 305 306
    @amp_configs.setter
    def amp_configs(self, configs):
        check_configs_key(self.strategy.amp_configs, configs, "amp_configs")
        assign_configs_value(self.strategy.amp_configs, configs)
307 308

    @property
309 310 311 312 313 314 315 316 317 318 319 320 321 322 323
    def recompute(self):
        """
        Indicating whether we are using forward recomputation for memory optimization
        Default value: False

        Examples:
          .. code-block:: python

            import paddle.fleet as fleet
            strategy = fleet.DistributedStrategy()
            strategy.recompute = True
            # suppose x and y are names of checkpoint tensors for recomputation
            strategy.recompute_configs = {"checkpoints": ["x", "y"]}
        """
        return self.strategy.recompute
324

325 326 327 328 329 330 331 332 333
    @property
    def sync_nccl_allreduce(self):
        return self.strategy.sync_nccl_allreduce

    @sync_nccl_allreduce.setter
    def sync_nccl_allreduce(self, flag):
        if isinstance(flag, bool):
            self.strategy.sync_nccl_allreduce = flag
        else:
334
            print("WARNING: sync_nccl_allreduce should have value of bool type")
335

336
    @property
337 338
    def use_hierarchical_allreduce(self):
        return self.strategy.use_hierarchical_allreduce
339

340 341
    @use_hierarchical_allreduce.setter
    def use_hierarchical_allreduce(self, flag):
342
        if isinstance(flag, bool):
343
            self.strategy.use_hierarchical_allreduce = flag
344 345
        else:
            print(
346
                "WARNING: use_hierarchical_allreduce should have value of bool type"
347 348 349
            )

    @property
350 351
    def hierarchical_allreduce_inter_nranks(self):
        return self.strategy.hierarchical_allreduce_inter_nranks
352

353 354 355 356
    @hierarchical_allreduce_inter_nranks.setter
    def hierarchical_allreduce_inter_nranks(self, value):
        if isinstance(value, int):
            self.strategy.hierarchical_allreduce_inter_nranks = value
357 358
        else:
            print(
359
                "WARNING: hierarchical_allreduce_inter_nranks should have value of int type"
360 361
            )

362
    @property
363 364
    def sync_batch_norm(self):
        return self.strategy.sync_batch_norm
365

366 367
    @sync_batch_norm.setter
    def sync_batch_norm(self, flag):
368
        if isinstance(flag, bool):
369
            self.strategy.sync_batch_norm = flag
370
        else:
371
            print("WARNING: sync_batch_norm should have value of bool type")
372 373 374 375 376 377 378 379 380 381 382 383

    @property
    def fuse_all_reduce_ops(self):
        return self.strategy.fuse_all_reduce_ops

    @fuse_all_reduce_ops.setter
    def fuse_all_reduce_ops(self, flag):
        if isinstance(flag, bool):
            self.strategy.fuse_all_reduce_ops = flag
        else:
            print("WARNING: fuse_all_reduce_ops should have value of bool type")

384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407
    @property
    def fuse_grad_size_in_MB(self):
        return self.strategy.fuse_grad_size_in_MB

    @fuse_grad_size_in_MB.setter
    def fuse_grad_size_in_MB(self, value):
        if isinstance(value, int):
            self.strategy.fuse_grad_size_in_MB = value
        else:
            print("WARNING: fuse_grad_size_in_MB should have value of int type")

    @property
    def _fuse_grad_size_in_TFLOPS(self):
        return self.strategy.fuse_grad_size_in_TFLOPS

    @_fuse_grad_size_in_TFLOPS.setter
    def _fuse_grad_size_in_TFLOPS(self, value):
        if isinstance(value, float):
            self.strategy.fuse_grad_size_in_TFLOPS = value
        else:
            print(
                "WARNING: fuse_grad_size_in_TFLOPS should have value of float type"
            )

408
    @property
409 410
    def nccl_comm_num(self):
        return self.strategy.nccl_comm_num
411

412 413
    @nccl_comm_num.setter
    def nccl_comm_num(self, value):
414
        if isinstance(value, int):
415
            self.strategy.nccl_comm_num = value
416
        else:
417
            print("WARNING: nccl_comm_num should have value of int type")
418

419 420
    @recompute.setter
    def recompute(self, flag):
421
        if isinstance(flag, bool):
422
            self.strategy.recompute = flag
423
        else:
424
            print("WARNING: recompute should have value of bool type")
425 426

    @property
427 428 429 430
    def recompute_configs(self):
        """
        Set recompute configurations. In general, the recompute strategy of current
        implementation should have some manually assign checkpoints
431

432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447
        Examples:
          .. code-block:: python
        
            import paddle.fleet as fleet
            strategy = fleet.DistributedStrategy()
            strategy.recompute = True
            strategy.recompute_configs = {"checkpionts": ["x", "y"]}

        """
        return get_msg_dict(self.strategy.recompute_configs)

    @recompute_configs.setter
    def recompute_configs(self, configs):
        check_configs_key(self.strategy.recompute_configs, configs,
                          "checkpoint_configs")
        assign_configs_value(self.strategy.recompute_configs, configs)
448 449

    @property
450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465
    def pipeline(self):
        """
        Indicating whether we are using pipeline parallelism for distributed training.
        Current implementation mainly focus on single GPU machine pipeline parallelism and
        data parallelism across GPU machine. The pipeline information is indicated through
        device_guard information in user-defined program.

        Examples:
          .. code-block:: python
        
            import paddle.fleet as fleet
            strategy = fleet.DistributedStrategy()
            strategy.pipeline = True

        """
        return self.strategy.pipeline
466

467 468
    @pipeline.setter
    def pipeline(self, flag):
469
        if isinstance(flag, bool):
470
            self.strategy.pipeline = flag
471
        else:
472
            print("WARNING: pipeline should have value of bool type")
473 474

    @property
475 476 477 478 479 480 481 482 483 484
    def pipeline_configs(self):
        """
        Set pipeline parallelism configurations. In pipeline parallelism,
        different parts of neural networks are running on different GPUS.
        There are Tensor queue buffer between each pair of neighborhood GPUS 
        that are responsible for synchronizing hidden Tensor results between
        GPUs. Pipeline parallelism consists of serveral producer-consumer style
        hardware pairs, such as GPU-GPU, CPU-GPU, GPU-XPU. The best way to speedup
        pipeline parallelism is to make the size of Tensor in Tensor queue smaller, 
        so that we will have a faster producer for downstream consumers.
485

486 487 488
        **Notes**:
            **Detailed arguments for pipeline_configs**
            **micro_batch**: the number of small batches in each user defined batch
489

490 491 492 493 494 495 496
        Examples:
          .. code-block:: python
        
            import paddle.fleet as fleet
            strategy = fleet.DistributedStrategy()
            strategy.pipeline = True
            strategy.pipeline_configs = {"micro_batch": 12}
497

498
        """
499

500
        return get_msg_dict(self.strategy.pipeline_configs)
501

502 503 504 505 506
    @pipeline_configs.setter
    def pipeline_configs(self, configs):
        check_configs_key(self.strategy.pipeline_configs, configs,
                          "pipeline_configs")
        assign_configs_value(self.strategy.pipeline_configs, configs)
507 508

    @property
509 510
    def localsgd(self):
        return self.strategy.localsgd
511

512 513 514 515
    @localsgd.setter
    def localsgd(self, flag):
        if isinstance(flag, bool):
            self.strategy.localsgd = flag
516
        else:
517
            print("WARNING: localsgd should have value of bool type")
518 519

    @property
520 521
    def localsgd_configs(self):
        return get_msg_dict(self.strategy.localsgd_configs)
522

523 524 525 526 527
    @localsgd_configs.setter
    def localsgd_configs(self, configs):
        check_configs_key(self.strategy.localsgd_configs, configs,
                          "localsgd_configs")
        assign_configs_value(self.strategy.localsgd_configs, configs)
528 529

    @property
530 531
    def dgc(self):
        return self.strategy.dgc
532

533 534 535 536
    @dgc.setter
    def dgc(self, flag):
        if isinstance(flag, bool):
            self.strategy.dgc = flag
537
        else:
538
            print("WARNING: dgc should have value of bool type")
539 540

    @property
541 542
    def dgc_configs(self):
        return get_msg_dict(self.strategy.dgc_configs)
543

544 545 546 547
    @dgc_configs.setter
    def dgc_configs(self, configs):
        check_configs_key(self.strategy.dgc_configs, configs, "dgc_configs")
        assign_configs_value(self.strategy.dgc_configs, configs)
548 549

    @property
550
    def gradient_merge(self):
551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567
        """
        Gradient Merge, also called as Gradient Accumulation,
        is a strategy for large batch training. With this strategy,
        model parameter will not be updated until user-defined steps.
        For each step, the forward network and the backward network
        will run to calculate the gradient of model parameters.
        For every k step, the optimization network will run,
        applying a specific optimization method (such as SGD, Adam)
        to model parameters.

        Examples:
        .. code-block:: python
            import paddle.fleet as fleet
            strategy = fleet.DistributedStrategy()
            strategy.gradient_merge = True
            strategy.gradient_merge_configs = {"k_steps": 4, "avg": True}
        """
568
        return self.strategy.gradient_merge
569

570 571
    @gradient_merge.setter
    def gradient_merge(self, flag):
572
        if isinstance(flag, bool):
573
            self.strategy.gradient_merge = flag
574
        else:
575 576 577 578
            print("WARNING: gradient_merge should have value of bool type")

    @property
    def gradient_merge_configs(self):
579 580 581 582 583 584 585 586 587 588 589 590
        """
        the key-value configs of distribute_strategy
        Keys: 
            k_steps (int): the update period of the parameters
            avg (bool): whether to average the gradients of each mini-batch,
                the default value is `True`
        Example:
            import paddle.fleet as fleet
            strategy = fleet.DistributedStrategy()
            strategy.gradient_merge = True
            strategy.gradient_merge_configs = {"k_steps": 4, "avg": True}
        """
591 592 593 594 595 596 597
        return get_msg_dict(self.strategy.gradient_merge_configs)

    @gradient_merge_configs.setter
    def gradient_merge_configs(self, configs):
        check_configs_key(self.strategy.gradient_merge_configs, configs,
                          "gradient_configs")
        assign_configs_value(self.strategy.gradient_merge_configs, configs)
598 599

    @property
600 601
    def lars(self):
        return self.strategy.lars
602

603 604
    @lars.setter
    def lars(self, flag):
605
        if isinstance(flag, bool):
606
            self.strategy.lars = flag
607
        else:
608
            print("WARNING: lars should have value of bool type")
609

610 611 612 613 614 615 616 617 618
    @property
    def lars_configs(self):
        return get_msg_dict(self.strategy.lars_configs)

    @lars_configs.setter
    def lars_configs(self, configs):
        check_configs_key(self.strategy.lars_configs, configs, "lars_configs")
        assign_configs_value(self.strategy.lars_configs, configs)

619
    @property
620 621
    def lamb(self):
        return self.strategy.lamb
622

623 624
    @lamb.setter
    def lamb(self, flag):
625
        if isinstance(flag, bool):
626
            self.strategy.lamb = flag
627
        else:
628
            print("WARNING: lamb should have value of bool type")
629

630 631 632 633 634 635 636 637 638
    @property
    def lamb_configs(self):
        return get_msg_dict(self.strategy.lamb_configs)

    @lamb_configs.setter
    def lamb_configs(self, configs):
        check_configs_key(self.strategy.lamb_configs, configs, "lamb_configs")
        assign_configs_value(self.strategy.lamb_configs, configs)

639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661
    @property
    def elastic(self):
        return self.strategy.elastic

    @elastic.setter
    def elastic(self, flag):
        if isinstance(flag, bool):
            self.strategy.elastic = flag
        else:
            print("WARNING: elastic should have value of bool type")

    @property
    def auto(self):
        return self.strategy.auto

    @auto.setter
    def auto(self, flag):
        if isinstance(flag, bool):
            self.strategy.auto = flag
        else:
            print("WARNING: auto should have value of bool type")

    def __repr__(self):
D
Dong Daxiang 已提交
662 663 664
        fields = self.strategy.DESCRIPTOR.fields
        for f in fields:
            print("{}: {}".format(f.name, f.default_value))
665
        return str(self.strategy)