fleet_base.py 22.0 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
#   Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import print_function
16
import paddle
17
from .role_maker import UserDefinedRoleMaker, PaddleCloudRoleMaker, RoleMakerBase
18
from .strategy_compiler import StrategyCompiler
19
from .distributed_strategy import DistributedStrategy
20 21 22
from .meta_optimizer_factory import MetaOptimizerFactory
from .runtime_factory import RuntimeFactory
from .util_factory import UtilFactory
23
from paddle.fluid.wrapped_decorator import wrap_decorator
24

25

26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
def _inited_runtime_handler_(func):
    def __impl__(*args, **kwargs):
        cls = args[0]

        if cls._runtime_handle is None:
            raise ValueError("Fleet can not find suitable runtime handler")

        return func(*args, **kwargs)

    return __impl__


inited_runtime_handler = wrap_decorator(_inited_runtime_handler_)


41 42 43
class Fleet(object):
    """
    Unified API for distributed training of PaddlePaddle
44
    Please reference the https://github.com/PaddlePaddle/FleetX for details
45 46 47 48 49


    Returns:
        Fleet: A Fleet instance

50
    Example for collective training:
51 52
        .. code-block:: python

53
            import paddle.distributed.fleet as fleet
54 55 56

            fleet.init(is_collective=True)

57 58 59
            strategy = fleet.DistributedStrategy()
            optimizer = paddle.optimizer.SGD(learning_rate=0.001)
            optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75

            # do distributed training


    Example for parameter server training:

        .. code-block:: python

            import paddle.distributed.fleet as fleet

            fleet.init()

            strategy = fleet.DistributedStrategy()
            optimizer = paddle.optimizer.SGD(learning_rate=0.001)
            optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)

76 77
            if fleet.is_first_worker():
                print("this is first worker")
78

79 80
            print("current node index: {}".format(fleet.worker_index()))
            print("total number of worker num: {}".format(fleet.worker_num()))
81

82 83 84
            if fleet.is_worker():
                print("this is worker")
            print("worker endpoints: {}".format(fleet.worker_endpoints(to_string=True)))
85

86 87
            print("server num: {}".format(fleet.server_num()))
            print("server endpoints: {}".format(fleet.server_endpoints(to_string=True)))
88

89 90 91
            if fleet.is_server():
                print("this is server")
            fleet.stop_worker()
92 93


94 95 96
    """

    def __init__(self):
97
        self._role_maker = None
98
        self.strategy_compiler = None
99
        self._is_collective = False
100 101
        self._runtime_handle = None
        self._util = None
102

103 104 105 106
    def init(self, role_maker=None, is_collective=False):
        """
        Initialize role_maker in Fleet.

107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
        This function is responsible for the distributed architecture
        what you want to run your code behind.

        Args:
            role_maker (RoleMakerBase, optional): A ``RoleMakerBase`` containing the configuration
                of environment variables related to distributed training.If you did not initialize 
                the rolemaker by yourself, it will be automatically initialized to PaddleRoleMaker.
                The default value is None.
            is_collective (Boolean, optional): A ``Boolean`` variable determines whether the program 
                runs on the CPU or GPU. False means set distributed training using CPU, and True means
                GPU.The default value is False.The default value is False.
        Returns:
            None

        Examples1:

            .. code-block:: python

                import paddle.distributed.fleet as fleet
                fleet.init()

        Examples2:

            .. code-block:: python

                import paddle.distributed.fleet as fleet
                fleet.init(is_collective=True)

        Examples3:

            .. code-block:: python

                import paddle.distributed.fleet as fleet
                role = fleet.PaddleCloudRoleMaker
                fleet.init(role)
142

143
        """
144 145

        if role_maker is None:
146 147 148 149 150 151
            if isinstance(is_collective, bool):
                self._is_collective = is_collective
                self._role_maker = PaddleCloudRoleMaker(
                    is_collective=self._is_collective)
            else:
                raise ValueError(
152 153
                    "`is_collective` should be instance of `bool`, but got {}".
                    format(type(is_collective)))
154
        else:
155 156 157 158 159 160
            if isinstance(role_maker, RoleMakerBase):
                self._role_maker = role_maker
            else:
                raise ValueError(
                    "`role_maker` should be subclass of `RoleMakerBase`, but got {}".
                    format(type(role_maker)))
161
        self.strategy_compiler = StrategyCompiler()
162
        return None
163 164 165 166 167 168 169 170

    def is_first_worker(self):
        """
        Check whether the node is the first instance of worker.

        Returns:
            bool: True if this is the first node of worker,
                  False if not.
171

172 173 174 175 176 177 178 179
        Examples:

            .. code-block:: python

                import paddle.distributed.fleet as fleet
                fleet.init()
                fleet.is_first_worker()

180 181 182 183 184 185 186 187 188
        """
        return self._role_maker.is_first_worker()

    def worker_index(self):
        """
        Get current worker index.

        Returns:
            int: node id
189 190 191 192 193 194 195 196

        Examples:

            .. code-block:: python
                import paddle.distributed.fleet as fleet
                fleet.init()
                fleet.worker_index()

197 198 199 200 201 202 203 204 205
        """
        return self._role_maker.worker_index()

    def worker_num(self):
        """
        Get current total worker number.

        Returns:
            int: worker numbers
206 207 208 209 210 211 212 213
        
        Examples:
            .. code-block:: python

                import paddle.distributed.fleet as fleet
                fleet.init()
                fleet.worker_num()

214 215 216 217 218 219 220 221 222 223
        """
        return self._role_maker.worker_num()

    def is_worker(self):
        """
        Check whether the node is an instance of worker.

        Returns:
            bool: True if this is a node of worker,
                  False if not.
224 225 226 227 228 229 230 231

        Examples:
            .. code-block:: python

                import paddle.distributed.fleet as fleet
                fleet.init()
                fleet.is_worker()

232 233 234 235 236
        """
        return self._role_maker.is_worker()

    def worker_endpoints(self, to_string=False):
        """
237
        Get current worker endpoints, such as ["127.0.0.1:1001", "127.0.0.1:1002"].
238 239 240

        Returns:
            list/string: server endpoints
241 242 243 244 245 246 247 248

        Examples:
            .. code-block:: python

                import paddle.distributed.fleet as fleet
                fleet.init()
                fleet.worker_endpoints()

249 250 251 252 253 254 255 256 257 258 259 260 261 262 263
        """
        '''
        if to_string:
            return ",".join(self._role_maker.get_trainer_endpoints())
        else:
            return self._role_maker.get_trainer_endpoints()
        '''
        return ["127.0.0.1:1001", "127.0.0.1:1002"]

    def server_num(self):
        """
        Get current total worker number.

        Returns:
            int: server number
264 265 266 267 268 269

        Examples:
            .. code-block:: python
            import paddle.distributed.fleet as fleet
            fleet.init()
            fleet.server_num()
270 271 272 273 274 275 276 277 278
        """
        return len(self._role_maker.get_pserver_endpoints())

    def server_index(self):
        """
        Get current server index.

        Returns:
            int: node id
279 280 281 282 283 284 285 286

        Examples:
            .. code-block:: python

                import paddle.distributed.fleet as fleet
                fleet.init()
                fleet.server_index()

287 288 289 290 291 292 293 294 295
        """
        return self._role_maker.server_index()

    def server_endpoints(self, to_string=False):
        """
        Get current server endpoints, such as ["127.0.0.1:1001", "127.0.0.1:1002"].

        Returns:
            list/string: server endpoints
296 297 298 299 300 301 302 303

        Examples:
            .. code-block:: python

                import paddle.distributed.fleet as fleet
                fleet.init()
                fleet.server_endpoints()

304
        """
305

306 307 308 309 310 311 312 313 314 315 316 317
        if to_string:
            return ",".join(self._role_maker.get_pserver_endpoints())
        else:
            return self._role_maker.get_pserver_endpoints()

    def is_server(self):
        """
        Check whether the node is an instance of server.

        Returns:
            bool: True if this is a node of server,
                  False if not.
318 319 320 321 322 323 324 325

        Examples:

            .. code-block:: python
                import paddle.distributed.fleet as fleet
                fleet.init()
                fleet.is_server()

326
        """
327 328
        return self._role_maker.is_server(
        ) or self._role_maker._is_heter_worker()
329 330 331 332 333 334

    @property
    def util(self):
        """
        Utility functions that can be used under certain runtime
        return util
335 336 337 338 339 340 341 342 343 344 345 346 347

        Returns:
            UtilBase: instance of UtilBase, can use distributed ops/tools easily.

        Examples:

            .. code-block:: python
                import paddle.distributed.fleet as fleet
                fleet.init()
                util = fleet.util
                files = ["1.log", "2.log", "3.log", "4.log"]
                files = util.get_file_shard()

348 349 350 351 352 353 354
        """
        return self._util

    @util.setter
    def util(self, util):
        """
        Set Utility functions for userd-defined runtime
355 356 357

        Returns:
            None
358 359 360 361 362
        """
        self._util = util

    def barrier_worker(self):
        """
363 364 365 366
        barrier all workers

        Returns:
            None
367 368 369
        """
        self._role_maker.barrier_worker()

370
    @inited_runtime_handler
371 372
    def init_worker(self):
        """
373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390
        initialize `Communicator` for parameter server training.


        Returns:
            None

        Examples:

            .. code-block:: python

                import paddle.distributed.fleet as fleet
                fleet.init()

                # build net
                # fleet.distributed_optimizer(...)

                fleet.init_worker()

391 392 393
        """
        self._runtime_handle._init_worker()

394
    @inited_runtime_handler
395
    def init_server(self, *args, **kwargs):
396
        """
397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415
        init_server executor to initialize startup program,
        if the `args` is not empty, it will run load_persistables for increment training.


        Returns:
            None

        Examples:

            .. code-block:: python

                import paddle.distributed.fleet as fleet
                fleet.init()

                # build net
                # fleet.distributed_optimizer(...)

                fleet.init_server()

416
        """
417
        self._runtime_handle._init_server(*args, **kwargs)
418

419
    @inited_runtime_handler
420 421
    def run_server(self):
        """
422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439
        run server will run pserver main program with executor.

        Returns:
            None

        Examples:

            .. code-block:: python

                import paddle.distributed.fleet as fleet
                fleet.init()

                # build net
                # fleet.distributed_optimizer(...)

                if fleet.is_server():
                    fleet.init_server()

440 441 442
        """
        self._runtime_handle._run_server()

443
    @inited_runtime_handler
444 445
    def stop_worker(self):
        """
446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462
        stop `Communicator` and give training complete notice to parameter server.

        Returns:
            None

        Examples:

            .. code-block:: python

                import paddle.distributed.fleet as fleet
                fleet.init()

                # build net
                # fleet.distributed_optimizer(...)

                fleet.init_server()

463 464 465
        """
        self._runtime_handle._stop_worker()

466 467 468 469 470 471 472
    def save_inference_model(self,
                             executor,
                             dirname,
                             feeded_var_names,
                             target_vars,
                             main_program=None,
                             export_for_deployment=True):
473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492
        """
        save inference model for inference.

        Returns:
            None

        Examples:

            .. code-block:: python

                import paddle.distributed.fleet as fleet
                fleet.init()

                # build net
                # fleet.distributed_optimizer(...)

                fleet.init_server()

        """

493 494 495 496 497
        self._runtime_handle._save_inference_model(
            executor, dirname, feeded_var_names, target_vars, main_program,
            export_for_deployment)

    def save_persistables(self, executor, dirname, main_program=None):
498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537
        """

        saves all persistable variables from :code:`main_program` to
        the folder :code:`dirname`. You can refer to

        The :code:`dirname` is used to specify the folder where persistable variables
        are going to be saved. If you would like to save variables in separate
        files, set :code:`filename` None.

        Args:
            executor(Executor): The executor to run for saving persistable variables.
                                You can refer to :ref:`api_guide_executor_en` for
                                more details.

            dirname(str, optional): The saving directory path.
                                When you need to save the parameter to the memory, set it to None.
            main_program(Program, optional): The program whose persistbale variables will
                                             be saved. Default: None.


        Returns:
            None

        Examples:

            .. code-block:: text

                import paddle.distributed.fleet as fleet
                import paddle.fluid as fluid

                fleet.init()

                # build net
                # fleet.distributed_optimizer(...)

                exe = fluid.Executor(fluid.CPUPlace())
                fleet.save_persistables(exe, "dirname", fluid.default_main_program())

        """

538 539
        self._runtime_handle._save_persistables(executor, dirname, main_program)

540
    def distributed_optimizer(self, optimizer, strategy=None):
541
        """
542 543 544 545 546 547 548 549 550
        Optimizer for distributed training.

        For the distributed training, this method would rebuild a new instance of DistributedOptimizer.
        Which has basic Optimizer function and special features for distributed training.

        Args:
            optimizer(Optimizer): The executor to run for init server.
            strategy(DistributedStrategy): Extra properties for distributed optimizer.

551
        Returns:
552
            Fleet: instance of fleet.
553 554

        Examples:
555

556
            .. code-block:: python
557 558 559 560 561 562 563 564

                import paddle.distributed.fleet as fleet
                role = fleet.role_maker.PaddleCloudRoleMaker(is_collective=True)
                fleet.init(role)
                strategy = fleet.DistributedStrategy()
                optimizer = paddle.optimizer.SGD(learning_rate=0.001)
                optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)

565 566
        """
        self.user_defined_optimizer = optimizer
567 568
        if strategy == None:
            strategy = DistributedStrategy()
569
        self.user_defined_strategy = strategy
D
Dong Daxiang 已提交
570
        self.valid_strategy = None
571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600
        return self

    def minimize(self,
                 loss,
                 startup_program=None,
                 parameter_list=None,
                 no_grad_set=None):
        """
        Add distributed operations to minimize ``loss`` by updating ``parameter_list``.

        Args:
            loss (Variable): A ``Variable`` containing the value to minimize.
            startup_program (Program, optional): :ref:`api_fluid_Program` for
                initializing parameters in ``parameter_list``. The default value
                is None, at this time :ref:`api_fluid_default_startup_program` will be used.
            parameter_list (Iterable, optional): Iterable of ``Variable`` or ``Variable.name`` to update
                to minimize ``loss``. The default value is None, at this time all parameters
                will be updated.
            no_grad_set (set, optional): Set of ``Variable``  or ``Variable.name`` that don't need
                to be updated. The default value is None.

        Returns:
            tuple: tuple (optimize_ops, params_grads), A list of operators appended
            by minimize and a list of (param, grad) variable pairs, param is
            ``Parameter``, grad is the gradient value corresponding to the parameter.
            The returned tuple can be passed to ``fetch_list`` in ``Executor.run()`` to 
            indicate program pruning. If so, the program will be pruned by ``feed`` and 
            ``fetch_list`` before run, see details in ``Executor``.

        Examples:
601
            .. code-block:: python
602

603 604
                import paddle
                import paddle.distributed.fleet as fleet
605

606 607 608 609 610 611 612 613 614 615 616 617
                fc_1 = paddle.fluid.layers.fc(input=input_x, size=hid_dim, act='tanh')
                fc_2 = paddle.fluid.layers.fc(input=fc_1, size=hid_dim, act='tanh')
                prediction = paddle.fluid.layers.fc(input=[fc_2], size=label_dim, act='softmax')
                cost = paddle.fluid.layers.cross_entropy(input=prediction, label=input_y)
                avg_cost = paddle.fluid.layers.mean(x=cost)

                role = fleet.role_maker.PaddleCloudRoleMaker(is_collective=True)
                fleet.init(role)
                strategy = fleet.DistributedStrategy()
                optimizer = paddle.optimizer.SGD(learning_rate=0.001)
                optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)
                optimizer.minimize(avg_cost)
618

619
                # for more examples, please reference https://github.com/PaddlePaddle/FleetX
620 621

        """
622
        context = {}
623 624
        # cache original feed forward program
        self.origin_main_program = loss.block.program
625 626
        context["origin_main_program"] = self.origin_main_program
        context["loss"] = loss
627 628
        if startup_program == None:
            self.origin_startup_program = \
629 630
                paddle.static.default_startup_program().clone(for_test=False)
            startup_program = paddle.static.default_startup_program()
631 632 633
        else:
            self.origin_startup_program = \
                startup_program.clone(for_test=False)
634

635 636
        context["origin_startup_program"] = startup_program
        context["role_maker"] = self._role_maker
637 638 639 640 641

        # compile time
        distributed_optimizer_list = \
            MetaOptimizerFactory()._get_valid_meta_optimizers(
                self.user_defined_optimizer)
D
Dong Daxiang 已提交
642

643 644
        valid_optimizer_list = []
        valid_graph_optimizer_list = []
D
Dong Daxiang 已提交
645
        can_not_apply_optimizer_list = []
646 647 648 649 650 651 652
        # recall meta optimizers for ranking
        for opt in distributed_optimizer_list:
            opt._set_basic_info(loss, self._role_maker,
                                self.user_defined_optimizer,
                                self.user_defined_strategy)
            if opt._can_apply() and not opt._is_graph_out():
                valid_optimizer_list.append(opt)
D
Dong Daxiang 已提交
653
            elif opt._can_apply() and opt._is_graph_out():
654
                valid_graph_optimizer_list.append(opt)
D
Dong Daxiang 已提交
655 656
            else:
                can_not_apply_optimizer_list.append(opt)
657
        # combine recalled meta optimizers to be a valid meta optimizer
D
Dong Daxiang 已提交
658
        meta_optimizer, graph_optimizer = \
659 660 661 662
            self.strategy_compiler.generate_optimizer(
                loss, self._role_maker, self.user_defined_optimizer,
                self.user_defined_strategy, valid_optimizer_list,
                valid_graph_optimizer_list)
D
Dong Daxiang 已提交
663

D
Dong Daxiang 已提交
664 665
        valid_strategy = self.strategy_compiler._get_valid_strategy(
            self.user_defined_strategy, can_not_apply_optimizer_list)
666 667 668

        context["valid_strategy"] = valid_strategy

D
Dong Daxiang 已提交
669
        self.valid_strategy = valid_strategy
670
        self.valid_strategy._enable_env()
D
Dong Daxiang 已提交
671

672 673
        optimize_ops = []
        params_grads = []
674

675 676 677 678 679 680
        if meta_optimizer:
            optimize_ops, params_grads = meta_optimizer.minimize(
                loss,
                startup_program=startup_program,
                parameter_list=parameter_list,
                no_grad_set=no_grad_set)
681

682
            default_program = paddle.static.default_main_program()
683 684 685 686

            if id(default_program) != id(loss.block.program):
                paddle.fluid.framework.switch_main_program(loss.block.program)

687 688 689 690 691 692
        else:
            optimize_ops, params_grads = self.user_defined_optimizer.minimize(
                loss,
                startup_program=startup_program,
                parameter_list=parameter_list,
                no_grad_set=no_grad_set)
693

694 695
        context["program_optimize_ops"] = optimize_ops
        context["program_params_grads"] = params_grads
696

697
        if graph_optimizer:
D
Dong Daxiang 已提交
698
            optimize_ops, params_grads = graph_optimizer.minimize(
699 700 701 702 703 704 705 706
                loss,
                startup_program=startup_program,
                parameter_list=parameter_list,
                no_grad_set=no_grad_set)
            # since we do not encourage users to use graph operations
            # if a graph optimizer takes effect, mostly
            # optimizers_ops and params_grads are None
            # i.e. users can not modify current computation graph anymore
707 708 709
            context["graph_optimize_ops"] = optimize_ops
            context["graph_optimize_grads"] = params_grads

710
        if self._runtime_handle is None:
711
            self._runtime_handle = RuntimeFactory()._create_runtime(context)
712 713

        if self._util is None:
714
            self._util = UtilFactory()._create_util(context)
715 716

        return optimize_ops, params_grads