distributed_strategy.py 19.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
#   Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

15
import paddle
16 17
from paddle.fleet.proto import distributed_strategy_pb2
from paddle.fluid.framework import Variable
18
import google.protobuf.text_format
19 20


21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
def get_msg_dict(msg):
    res_dict = {}
    fields = msg.DESCRIPTOR.fields
    for f in fields:
        res_dict[f.name] = getattr(msg, f.name)
    return res_dict


def assign_configs_value(msg, config):
    fields = msg.DESCRIPTOR.fields
    for key in config:
        for f in fields:
            if key == f.name:
                if f.label == 3:
                    getattr(msg, f.name).extend(config[f.name])
                elif f.label == 1 or f.label == 2:
                    setattr(msg, f.name, config[f.name])


def check_configs_key(msg, config, field_name):
    key_list = msg.DESCRIPTOR.fields_by_name.keys()
    for key in config:
        assert key in key_list, "key:{} not in {}".format(key, field_name)


46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
class DistributedJobInfo(object):
    """
    DistributedJobInfo will serialize all distributed training information
    Just for inner use: 1) debug 2) replicate experiments
    """

    def __init__(self):
        self.job_info = distributed_strategy_pb2.DistributedJobInfo()

    def _set_worker_num(self, worker_num):
        self.job_info.worker_num = worker_num

    def _set_server_num(self, server_num):
        self.job_info.server_num = server_num

    def _set_worker_ips(self, worker_ips):
        self.job_info.worker_ips.extend(worker_ips)

    def _set_server_endpoints(self, server_endpoints):
        self.job_info.server_endpoints.extend(server_endpoints)

    def _set_origin_startup(self, origin_startup_prog):
        self.job_info.origin_startup = str(origin_startup_prog)

    def _set_origin_main(self, origin_main_prog):
        self.job_info.origin_main = str(origin_main_prog)

    def _distributed_main(self, distributed_main_prog):
        self.job_info.distributed_main = str(distributed_main_prog)

    def _optimizer_name(self, optimizer_name):
        self.job_info.optimizer_name = optimizer_name

    def _set_distributed_strategy(self, dist_strategy):
        self.job_info.strategy = dist_strategy


class DistributedStrategy(object):
    def __init__(self):
85 86 87 88 89 90 91 92 93 94 95 96
        """
        DistributedStrategy is the main configuration entry for distributed training of Paddle.
        All of the distributed training configurations can be configured in DistributedStrategy,
        such as automatic mixed precision (AMP), Layer-wise Adaptive Rate Scaling (LARS), 
        asynchronous update parameter server(ASGD), etc.
        
        DistributedStrategy can be serialized into protobuf file or deserialized from protobuf file

        Users who run local training usually configure BuildStrategy and ExecutionStrategy, and 
        DistributedStrategy supports configurations from BuildStrategy and ExecutionStrategy

        """
97 98
        self.strategy = distributed_strategy_pb2.DistributedStrategy()

99
    def save_to_prototxt(self, output):
100 101 102 103 104 105 106 107 108 109 110 111 112
        """
        Serialize current DistributedStrategy to string and save to output file

        Examples:
          .. code-block:: python
        
            import paddle.fleet as fleet
            strategy = fleet.DistributedStrategy()
            strategy.dgc = True
            strategy.recompute = True
            strategy.recompute_configs = {"checkpoint": ["x"]}
            strategy.save_to_prototxt("dist_strategy.prototxt")
        """
113 114 115 116
        with open(output, "w") as fout:
            fout.write(str(self.strategy))

    def load_from_prototxt(self, pb_file):
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227
        """
        Load from prototxt file for DistributedStrategy initialization

        Examples:
          .. code-block:: python

            import paddle.fleet as fleet
            strategy = fleet.DistributedStrategy()
            strategy.load_from_prototxt("dist_strategy.protoxt")
        """
        with open(pb_file, 'r') as f:
            self.strategy = google.protobuf.text_format.Merge(
                str(f.read()), self.strategy)

    @property
    def execution_strategy(self):
        """
        Configure ExecutionStrategy for DistributedStrategy

        Examples:
          .. code-block:: python

            exe_strategy = paddle.fluid.ExecutionStrategy()
            exe_strategy.num_threads = 10
            exe_strategy.num_iteration_per_drop_scope = 10
            exe_strategy.num_iteration_per_run = 10

            strategy = paddle.fleet.DistributedStrategy()
            strategy.execution_strategy = exe_strategy
        """
        execution_strategy = paddle.fluid.ExecutionStrategy()
        fields = self.strategy.execution_strategy.DESCRIPTOR.fields
        for f in fields:
            setattr(execution_strategy, f.name,
                    getattr(self.strategy.execution_strategy, f.name))
        return execution_strategy

    @execution_strategy.setter
    def execution_strategy(self, strategy):
        fields = self.strategy.execution_strategy.DESCRIPTOR.fields
        for f in fields:
            setattr(self.strategy.execution_strategy, f.name,
                    getattr(strategy, f.name))

    @property
    def build_strategy(self):
        """
        Configure BuildStrategy for DistributedStrategy
        Note that the properties of BuildStrategy are valid in DistributedStrategy
        only if the property is non-distributed strategy.

        Examples:
          .. code-block:: python

            build_strategy = paddle.fluid.BuildStrategy()
            build_strategy.enable_sequential_execution = True
            build_strategy.fuse_elewise_add_act_ops = True
            build_strategy.fuse_bn_act_ops = True
            build_strategy.enable_auto_fusion = True
            build_strategy.fuse_relu_depthwise_conv = True
            build_strategy.fuse_broadcast_ops = True
            build_strategy.fuse_all_optimizer_ops = True
            build_strategy.enable_inplace = True
            
            strategy = paddle.fleet.DistributedStrategy()
            strategy.build_strategy = build_strategy
        """

        build_strategy = paddle.fluid.BuildStrategy()
        fields = self.strategy.build_strategy.DESCRIPTOR.fields
        for f in fields:
            setattr(build_strategy, f.name,
                    getattr(self.strategy.build_strategy, f.name))
        return build_strategy

    @build_strategy.setter
    def build_strategy(self, strategy):
        fields = self.strategy.build_strategy.DESCRIPTOR.fields
        for f in fields:
            if f.label == 1 or f.label == 2:  # optional and required field
                setattr(self.strategy.build_strategy, f.name,
                        getattr(strategy, f.name))
            elif f.label == 3:  # repeated field
                getattr(self.strategy.build_strategy,
                        f.name).extend(getattr(strategy, f.name))

    @property
    def async_update(self):
        """
        Indicating whether we are using asynchronous stocastic gradient descent updates
        for training. This property is valid when we are using parameter server training, 
        which is implied by setting approperate RoleMaker
        Default value: True

        Examples:
          .. code-block:: python

            import paddle.fleet as fleet
            role_maker = fleet.PaddleCloudRoleMaker()
            fleet.init(role_maker)

            strategy = fleet.DistributedStrategy()
            strategy.async_update = True  # by default this is True
            
            # code block for defining loss and local optimizer
            # sgd = fleet.distributed_optimizer(optimizer, strategy)
        """
        return self.strategy.async

    @async_update.setter
    def async_update(self, flag):
228
        if isinstance(flag, bool):
229
            self.strategy.async = flag
230
        else:
231
            print("WARNING: async_update should have value of bool type")
232 233

    @property
234 235 236 237 238
    def async_update_configs(self):
        """
        Set async update configurations. In general, asynchronous parameter server
        training has serveral configurable settings that can be configured through
        a dict.
239

240 241 242 243 244 245 246 247 248
        **Notes**:
            **Detailed arguments for async_update_configs**
            **k_step**: number of local optimization updates before communication
            **max_merge_var_num**: maximum number of merged gradients before communication
            **send_queue_size**: a buffer size of worker communication
            **independent_recv_thread**: if we are using independent recv thread for communication
            **thread_pool_size**: number of thread pool
            **send_wait_times**: waiting time for sending gradients
            **runtime_split_send_recv**: if we are using Tensor split for send and recv during runtime
249

250 251
        Examples:
          .. code-block:: python
252

253 254 255
            import paddle.fleet as fleet
            role_maker = fleet.PaddleCloudRoleMaker()
            fleet.init(role_maker)
256

257 258 259 260
            strategy = fleet.DistributedStrategy()
            strategy.async_update = True  # by default this is True
            configs = {"k_step": 10000, "send_queue_size": 32}
            strategy.async_update_configs = configs
261

262 263 264 265
            # code block for defining loss and local optimizer
            # sgd = fleet.distributed_optimizer(optimizer, strategy)
        """
        return get_msg_dict(self.strategy.async_configs)
266

267 268 269 270
    @async_update_configs.setter
    def async_update_configs(self, configs):
        check_configs_key(self.strategy.async_configs, configs, "async_configs")
        assign_configs_value(self.strategy.async_configs, configs)
271

272
    @property
273 274 275 276
    def amp(self):
        """
        Indicating whether we are using automatic mixed precision training
        Default Value: False
277

278 279
        Examples:
          .. code-block:: python
280

281 282 283
            import paddle.fleet as fleet
            strategy = fleet.DistributedStrategy()
            strategy.amp = True # by default this is false
284

285 286
        """
        return self.strategy.amp
287

288 289
    @amp.setter
    def amp(self, flag):
290
        if isinstance(flag, bool):
291
            self.strategy.amp = flag
292
        else:
293
            print("WARNING: amp should have value of bool type")
294 295

    @property
296 297
    def amp_configs(self):
        return get_msg_dict(self.strategy.amp_configs)
298

299 300 301 302
    @amp_configs.setter
    def amp_configs(self, configs):
        check_configs_key(self.strategy.amp_configs, configs, "amp_configs")
        assign_configs_value(self.strategy.amp_configs, configs)
303 304

    @property
305 306 307 308 309 310 311 312 313 314 315 316 317 318 319
    def recompute(self):
        """
        Indicating whether we are using forward recomputation for memory optimization
        Default value: False

        Examples:
          .. code-block:: python

            import paddle.fleet as fleet
            strategy = fleet.DistributedStrategy()
            strategy.recompute = True
            # suppose x and y are names of checkpoint tensors for recomputation
            strategy.recompute_configs = {"checkpoints": ["x", "y"]}
        """
        return self.strategy.recompute
320

321 322 323 324 325 326 327 328 329
    @property
    def sync_nccl_allreduce(self):
        return self.strategy.sync_nccl_allreduce

    @sync_nccl_allreduce.setter
    def sync_nccl_allreduce(self, flag):
        if isinstance(flag, bool):
            self.strategy.sync_nccl_allreduce = flag
        else:
330
            print("WARNING: sync_nccl_allreduce should have value of bool type")
331

332
    @property
333 334
    def use_hierarchical_allreduce(self):
        return self.strategy.use_hierarchical_allreduce
335

336 337
    @use_hierarchical_allreduce.setter
    def use_hierarchical_allreduce(self, flag):
338
        if isinstance(flag, bool):
339
            self.strategy.use_hierarchical_allreduce = flag
340 341
        else:
            print(
342
                "WARNING: use_hierarchical_allreduce should have value of bool type"
343 344 345
            )

    @property
346 347
    def hierarchical_allreduce_inter_nranks(self):
        return self.strategy.hierarchical_allreduce_inter_nranks
348

349 350 351 352
    @hierarchical_allreduce_inter_nranks.setter
    def hierarchical_allreduce_inter_nranks(self, value):
        if isinstance(value, int):
            self.strategy.hierarchical_allreduce_inter_nranks = value
353 354
        else:
            print(
355
                "WARNING: hierarchical_allreduce_inter_nranks should have value of int type"
356 357
            )

358
    @property
359 360
    def sync_batch_norm(self):
        return self.strategy.sync_batch_norm
361

362 363
    @sync_batch_norm.setter
    def sync_batch_norm(self, flag):
364
        if isinstance(flag, bool):
365
            self.strategy.sync_batch_norm = flag
366
        else:
367
            print("WARNING: sync_batch_norm should have value of bool type")
368 369 370 371 372 373 374 375 376 377 378 379 380

    @property
    def fuse_all_reduce_ops(self):
        return self.strategy.fuse_all_reduce_ops

    @fuse_all_reduce_ops.setter
    def fuse_all_reduce_ops(self, flag):
        if isinstance(flag, bool):
            self.strategy.fuse_all_reduce_ops = flag
        else:
            print("WARNING: fuse_all_reduce_ops should have value of bool type")

    @property
381 382
    def nccl_comm_num(self):
        return self.strategy.nccl_comm_num
383

384 385
    @nccl_comm_num.setter
    def nccl_comm_num(self, value):
386
        if isinstance(value, int):
387
            self.strategy.nccl_comm_num = value
388
        else:
389
            print("WARNING: nccl_comm_num should have value of int type")
390

391 392
    @recompute.setter
    def recompute(self, flag):
393
        if isinstance(flag, bool):
394
            self.strategy.recompute = flag
395
        else:
396
            print("WARNING: recompute should have value of bool type")
397 398

    @property
399 400 401 402
    def recompute_configs(self):
        """
        Set recompute configurations. In general, the recompute strategy of current
        implementation should have some manually assign checkpoints
403

404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419
        Examples:
          .. code-block:: python
        
            import paddle.fleet as fleet
            strategy = fleet.DistributedStrategy()
            strategy.recompute = True
            strategy.recompute_configs = {"checkpionts": ["x", "y"]}

        """
        return get_msg_dict(self.strategy.recompute_configs)

    @recompute_configs.setter
    def recompute_configs(self, configs):
        check_configs_key(self.strategy.recompute_configs, configs,
                          "checkpoint_configs")
        assign_configs_value(self.strategy.recompute_configs, configs)
420 421

    @property
422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437
    def pipeline(self):
        """
        Indicating whether we are using pipeline parallelism for distributed training.
        Current implementation mainly focus on single GPU machine pipeline parallelism and
        data parallelism across GPU machine. The pipeline information is indicated through
        device_guard information in user-defined program.

        Examples:
          .. code-block:: python
        
            import paddle.fleet as fleet
            strategy = fleet.DistributedStrategy()
            strategy.pipeline = True

        """
        return self.strategy.pipeline
438

439 440
    @pipeline.setter
    def pipeline(self, flag):
441
        if isinstance(flag, bool):
442
            self.strategy.pipeline = flag
443
        else:
444
            print("WARNING: pipeline should have value of bool type")
445 446

    @property
447 448 449 450 451 452 453 454 455 456
    def pipeline_configs(self):
        """
        Set pipeline parallelism configurations. In pipeline parallelism,
        different parts of neural networks are running on different GPUS.
        There are Tensor queue buffer between each pair of neighborhood GPUS 
        that are responsible for synchronizing hidden Tensor results between
        GPUs. Pipeline parallelism consists of serveral producer-consumer style
        hardware pairs, such as GPU-GPU, CPU-GPU, GPU-XPU. The best way to speedup
        pipeline parallelism is to make the size of Tensor in Tensor queue smaller, 
        so that we will have a faster producer for downstream consumers.
457

458 459 460
        **Notes**:
            **Detailed arguments for pipeline_configs**
            **micro_batch**: the number of small batches in each user defined batch
461

462 463 464 465 466 467 468
        Examples:
          .. code-block:: python
        
            import paddle.fleet as fleet
            strategy = fleet.DistributedStrategy()
            strategy.pipeline = True
            strategy.pipeline_configs = {"micro_batch": 12}
469

470
        """
471

472
        return get_msg_dict(self.strategy.pipeline_configs)
473

474 475 476 477 478
    @pipeline_configs.setter
    def pipeline_configs(self, configs):
        check_configs_key(self.strategy.pipeline_configs, configs,
                          "pipeline_configs")
        assign_configs_value(self.strategy.pipeline_configs, configs)
479 480

    @property
481 482
    def localsgd(self):
        return self.strategy.localsgd
483

484 485 486 487
    @localsgd.setter
    def localsgd(self, flag):
        if isinstance(flag, bool):
            self.strategy.localsgd = flag
488
        else:
489
            print("WARNING: localsgd should have value of bool type")
490 491

    @property
492 493
    def localsgd_configs(self):
        return get_msg_dict(self.strategy.localsgd_configs)
494

495 496 497 498 499
    @localsgd_configs.setter
    def localsgd_configs(self, configs):
        check_configs_key(self.strategy.localsgd_configs, configs,
                          "localsgd_configs")
        assign_configs_value(self.strategy.localsgd_configs, configs)
500 501

    @property
502 503
    def dgc(self):
        return self.strategy.dgc
504

505 506 507 508
    @dgc.setter
    def dgc(self, flag):
        if isinstance(flag, bool):
            self.strategy.dgc = flag
509
        else:
510
            print("WARNING: dgc should have value of bool type")
511 512

    @property
513 514
    def dgc_configs(self):
        return get_msg_dict(self.strategy.dgc_configs)
515

516 517 518 519
    @dgc_configs.setter
    def dgc_configs(self, configs):
        check_configs_key(self.strategy.dgc_configs, configs, "dgc_configs")
        assign_configs_value(self.strategy.dgc_configs, configs)
520 521

    @property
522 523
    def gradient_merge(self):
        return self.strategy.gradient_merge
524

525 526
    @gradient_merge.setter
    def gradient_merge(self, flag):
527
        if isinstance(flag, bool):
528
            self.strategy.gradient_merge = flag
529
        else:
530 531 532 533 534 535 536 537 538 539 540
            print("WARNING: gradient_merge should have value of bool type")

    @property
    def gradient_merge_configs(self):
        return get_msg_dict(self.strategy.gradient_merge_configs)

    @gradient_merge_configs.setter
    def gradient_merge_configs(self, configs):
        check_configs_key(self.strategy.gradient_merge_configs, configs,
                          "gradient_configs")
        assign_configs_value(self.strategy.gradient_merge_configs, configs)
541 542

    @property
543 544
    def lars(self):
        return self.strategy.lars
545

546 547
    @lars.setter
    def lars(self, flag):
548
        if isinstance(flag, bool):
549
            self.strategy.lars = flag
550
        else:
551
            print("WARNING: lars should have value of bool type")
552 553

    @property
554 555
    def lamb(self):
        return self.strategy.lamb
556

557 558
    @lamb.setter
    def lamb(self, flag):
559
        if isinstance(flag, bool):
560
            self.strategy.lamb = flag
561
        else:
562
            print("WARNING: lamb should have value of bool type")
563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587

    @property
    def elastic(self):
        return self.strategy.elastic

    @elastic.setter
    def elastic(self, flag):
        if isinstance(flag, bool):
            self.strategy.elastic = flag
        else:
            print("WARNING: elastic should have value of bool type")

    @property
    def auto(self):
        return self.strategy.auto

    @auto.setter
    def auto(self, flag):
        if isinstance(flag, bool):
            self.strategy.auto = flag
        else:
            print("WARNING: auto should have value of bool type")

    def __repr__(self):
        return str(self.strategy)