fleet_base.py 23.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
#   Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import print_function
16
import warnings
17
import paddle
18
from paddle.fluid import compiler
19
from .role_maker import UserDefinedRoleMaker, PaddleCloudRoleMaker, RoleMakerBase
20
from .strategy_compiler import StrategyCompiler
21
from .distributed_strategy import DistributedStrategy
22 23 24
from .meta_optimizer_factory import MetaOptimizerFactory
from .runtime_factory import RuntimeFactory
from .util_factory import UtilFactory
25
from paddle.fluid.wrapped_decorator import wrap_decorator
26

27

28 29 30 31 32 33 34 35 36 37 38 39
def _inited_runtime_handler_(func):
    def __impl__(*args, **kwargs):
        cls = args[0]

        if cls._runtime_handle is None:
            raise ValueError("Fleet can not find suitable runtime handler")

        return func(*args, **kwargs)

    return __impl__


40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
def _is_non_distributed_check_(func):
    def __impl__(*args, **kwargs):
        cls = args[0]

        if cls._role_maker is not None and cls._role_maker._is_non_distributed(
        ) is True:
            warnings.warn(
                "%s() function doesn't work when use non_distributed fleet." %
                (func.__name__))
            return

        return func(*args, **kwargs)

    return __impl__


56
inited_runtime_handler = wrap_decorator(_inited_runtime_handler_)
57
is_non_distributed_check = wrap_decorator(_is_non_distributed_check_)
58 59


60 61 62
class Fleet(object):
    """
    Unified API for distributed training of PaddlePaddle
63
    Please reference the https://github.com/PaddlePaddle/FleetX for details
64 65 66 67 68


    Returns:
        Fleet: A Fleet instance

69
    Example for collective training:
70 71
        .. code-block:: python

72
            import paddle.distributed.fleet as fleet
73 74 75

            fleet.init(is_collective=True)

76 77 78
            strategy = fleet.DistributedStrategy()
            optimizer = paddle.optimizer.SGD(learning_rate=0.001)
            optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94

            # do distributed training


    Example for parameter server training:

        .. code-block:: python

            import paddle.distributed.fleet as fleet

            fleet.init()

            strategy = fleet.DistributedStrategy()
            optimizer = paddle.optimizer.SGD(learning_rate=0.001)
            optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)

95 96
            if fleet.is_first_worker():
                print("this is first worker")
97

98 99
            print("current node index: {}".format(fleet.worker_index()))
            print("total number of worker num: {}".format(fleet.worker_num()))
100

101 102 103
            if fleet.is_worker():
                print("this is worker")
            print("worker endpoints: {}".format(fleet.worker_endpoints(to_string=True)))
104

105 106
            print("server num: {}".format(fleet.server_num()))
            print("server endpoints: {}".format(fleet.server_endpoints(to_string=True)))
107

108 109 110
            if fleet.is_server():
                print("this is server")
            fleet.stop_worker()
111 112


113 114 115
    """

    def __init__(self):
116
        self._role_maker = None
117
        self.strategy_compiler = None
118
        self._is_collective = False
119 120
        self._runtime_handle = None
        self._util = None
121

122 123 124 125
    def init(self, role_maker=None, is_collective=False):
        """
        Initialize role_maker in Fleet.

126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
        This function is responsible for the distributed architecture
        what you want to run your code behind.

        Args:
            role_maker (RoleMakerBase, optional): A ``RoleMakerBase`` containing the configuration
                of environment variables related to distributed training.If you did not initialize 
                the rolemaker by yourself, it will be automatically initialized to PaddleRoleMaker.
                The default value is None.
            is_collective (Boolean, optional): A ``Boolean`` variable determines whether the program 
                runs on the CPU or GPU. False means set distributed training using CPU, and True means
                GPU.The default value is False.The default value is False.
        Returns:
            None

        Examples1:

            .. code-block:: python

                import paddle.distributed.fleet as fleet
                fleet.init()

        Examples2:

            .. code-block:: python

                import paddle.distributed.fleet as fleet
                fleet.init(is_collective=True)

        Examples3:

            .. code-block:: python

                import paddle.distributed.fleet as fleet
                role = fleet.PaddleCloudRoleMaker
                fleet.init(role)
161

162
        """
163 164

        if role_maker is None:
165 166 167 168 169 170
            if isinstance(is_collective, bool):
                self._is_collective = is_collective
                self._role_maker = PaddleCloudRoleMaker(
                    is_collective=self._is_collective)
            else:
                raise ValueError(
171 172
                    "`is_collective` should be instance of `bool`, but got {}".
                    format(type(is_collective)))
173
        else:
174 175 176 177 178 179
            if isinstance(role_maker, RoleMakerBase):
                self._role_maker = role_maker
            else:
                raise ValueError(
                    "`role_maker` should be subclass of `RoleMakerBase`, but got {}".
                    format(type(role_maker)))
180
        self.strategy_compiler = StrategyCompiler()
181
        return None
182 183 184 185 186 187 188 189

    def is_first_worker(self):
        """
        Check whether the node is the first instance of worker.

        Returns:
            bool: True if this is the first node of worker,
                  False if not.
190

191 192 193 194 195 196 197 198
        Examples:

            .. code-block:: python

                import paddle.distributed.fleet as fleet
                fleet.init()
                fleet.is_first_worker()

199 200 201 202 203 204 205 206 207
        """
        return self._role_maker.is_first_worker()

    def worker_index(self):
        """
        Get current worker index.

        Returns:
            int: node id
208 209 210 211 212 213 214 215

        Examples:

            .. code-block:: python
                import paddle.distributed.fleet as fleet
                fleet.init()
                fleet.worker_index()

216 217 218 219 220 221 222 223 224
        """
        return self._role_maker.worker_index()

    def worker_num(self):
        """
        Get current total worker number.

        Returns:
            int: worker numbers
225 226 227 228 229 230 231 232
        
        Examples:
            .. code-block:: python

                import paddle.distributed.fleet as fleet
                fleet.init()
                fleet.worker_num()

233 234 235 236 237 238 239 240 241 242
        """
        return self._role_maker.worker_num()

    def is_worker(self):
        """
        Check whether the node is an instance of worker.

        Returns:
            bool: True if this is a node of worker,
                  False if not.
243 244 245 246 247 248 249 250

        Examples:
            .. code-block:: python

                import paddle.distributed.fleet as fleet
                fleet.init()
                fleet.is_worker()

251 252 253 254 255
        """
        return self._role_maker.is_worker()

    def worker_endpoints(self, to_string=False):
        """
256
        Get current worker endpoints, such as ["127.0.0.1:1001", "127.0.0.1:1002"].
257 258 259

        Returns:
            list/string: server endpoints
260 261 262 263 264 265 266 267

        Examples:
            .. code-block:: python

                import paddle.distributed.fleet as fleet
                fleet.init()
                fleet.worker_endpoints()

268 269 270 271 272 273 274 275 276 277 278 279 280 281 282
        """
        '''
        if to_string:
            return ",".join(self._role_maker.get_trainer_endpoints())
        else:
            return self._role_maker.get_trainer_endpoints()
        '''
        return ["127.0.0.1:1001", "127.0.0.1:1002"]

    def server_num(self):
        """
        Get current total worker number.

        Returns:
            int: server number
283 284 285 286 287 288

        Examples:
            .. code-block:: python
            import paddle.distributed.fleet as fleet
            fleet.init()
            fleet.server_num()
289 290 291 292 293 294 295 296 297
        """
        return len(self._role_maker.get_pserver_endpoints())

    def server_index(self):
        """
        Get current server index.

        Returns:
            int: node id
298 299 300 301 302 303 304 305

        Examples:
            .. code-block:: python

                import paddle.distributed.fleet as fleet
                fleet.init()
                fleet.server_index()

306 307 308 309 310 311 312 313 314
        """
        return self._role_maker.server_index()

    def server_endpoints(self, to_string=False):
        """
        Get current server endpoints, such as ["127.0.0.1:1001", "127.0.0.1:1002"].

        Returns:
            list/string: server endpoints
315 316 317 318 319 320 321 322

        Examples:
            .. code-block:: python

                import paddle.distributed.fleet as fleet
                fleet.init()
                fleet.server_endpoints()

323
        """
324

325 326 327 328 329 330 331 332 333 334 335 336
        if to_string:
            return ",".join(self._role_maker.get_pserver_endpoints())
        else:
            return self._role_maker.get_pserver_endpoints()

    def is_server(self):
        """
        Check whether the node is an instance of server.

        Returns:
            bool: True if this is a node of server,
                  False if not.
337 338 339 340 341 342 343 344

        Examples:

            .. code-block:: python
                import paddle.distributed.fleet as fleet
                fleet.init()
                fleet.is_server()

345
        """
346 347
        return self._role_maker.is_server(
        ) or self._role_maker._is_heter_worker()
348 349 350 351 352 353

    @property
    def util(self):
        """
        Utility functions that can be used under certain runtime
        return util
354 355 356 357 358 359 360 361 362 363 364 365 366

        Returns:
            UtilBase: instance of UtilBase, can use distributed ops/tools easily.

        Examples:

            .. code-block:: python
                import paddle.distributed.fleet as fleet
                fleet.init()
                util = fleet.util
                files = ["1.log", "2.log", "3.log", "4.log"]
                files = util.get_file_shard()

367 368 369 370 371 372 373
        """
        return self._util

    @util.setter
    def util(self, util):
        """
        Set Utility functions for userd-defined runtime
374 375 376

        Returns:
            None
377 378 379 380 381
        """
        self._util = util

    def barrier_worker(self):
        """
382 383 384 385
        barrier all workers

        Returns:
            None
386 387 388
        """
        self._role_maker.barrier_worker()

389
    @is_non_distributed_check
390
    @inited_runtime_handler
391 392
    def init_worker(self):
        """
393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410
        initialize `Communicator` for parameter server training.


        Returns:
            None

        Examples:

            .. code-block:: python

                import paddle.distributed.fleet as fleet
                fleet.init()

                # build net
                # fleet.distributed_optimizer(...)

                fleet.init_worker()

411 412 413
        """
        self._runtime_handle._init_worker()

414
    @is_non_distributed_check
415
    @inited_runtime_handler
416
    def init_server(self, *args, **kwargs):
417
        """
418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436
        init_server executor to initialize startup program,
        if the `args` is not empty, it will run load_persistables for increment training.


        Returns:
            None

        Examples:

            .. code-block:: python

                import paddle.distributed.fleet as fleet
                fleet.init()

                # build net
                # fleet.distributed_optimizer(...)

                fleet.init_server()

437
        """
438
        self._runtime_handle._init_server(*args, **kwargs)
439

440
    @is_non_distributed_check
441
    @inited_runtime_handler
442 443
    def run_server(self):
        """
444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461
        run server will run pserver main program with executor.

        Returns:
            None

        Examples:

            .. code-block:: python

                import paddle.distributed.fleet as fleet
                fleet.init()

                # build net
                # fleet.distributed_optimizer(...)

                if fleet.is_server():
                    fleet.init_server()

462 463 464
        """
        self._runtime_handle._run_server()

465
    @is_non_distributed_check
466
    @inited_runtime_handler
467 468
    def stop_worker(self):
        """
469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485
        stop `Communicator` and give training complete notice to parameter server.

        Returns:
            None

        Examples:

            .. code-block:: python

                import paddle.distributed.fleet as fleet
                fleet.init()

                # build net
                # fleet.distributed_optimizer(...)

                fleet.init_server()

486 487 488
        """
        self._runtime_handle._stop_worker()

489 490 491 492 493 494 495
    def save_inference_model(self,
                             executor,
                             dirname,
                             feeded_var_names,
                             target_vars,
                             main_program=None,
                             export_for_deployment=True):
496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515
        """
        save inference model for inference.

        Returns:
            None

        Examples:

            .. code-block:: python

                import paddle.distributed.fleet as fleet
                fleet.init()

                # build net
                # fleet.distributed_optimizer(...)

                fleet.init_server()

        """

516 517 518 519 520
        self._runtime_handle._save_inference_model(
            executor, dirname, feeded_var_names, target_vars, main_program,
            export_for_deployment)

    def save_persistables(self, executor, dirname, main_program=None):
521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560
        """

        saves all persistable variables from :code:`main_program` to
        the folder :code:`dirname`. You can refer to

        The :code:`dirname` is used to specify the folder where persistable variables
        are going to be saved. If you would like to save variables in separate
        files, set :code:`filename` None.

        Args:
            executor(Executor): The executor to run for saving persistable variables.
                                You can refer to :ref:`api_guide_executor_en` for
                                more details.

            dirname(str, optional): The saving directory path.
                                When you need to save the parameter to the memory, set it to None.
            main_program(Program, optional): The program whose persistbale variables will
                                             be saved. Default: None.


        Returns:
            None

        Examples:

            .. code-block:: text

                import paddle.distributed.fleet as fleet
                import paddle.fluid as fluid

                fleet.init()

                # build net
                # fleet.distributed_optimizer(...)

                exe = fluid.Executor(fluid.CPUPlace())
                fleet.save_persistables(exe, "dirname", fluid.default_main_program())

        """

561 562
        self._runtime_handle._save_persistables(executor, dirname, main_program)

563
    def distributed_optimizer(self, optimizer, strategy=None):
564
        """
565 566 567 568 569 570 571 572 573
        Optimizer for distributed training.

        For the distributed training, this method would rebuild a new instance of DistributedOptimizer.
        Which has basic Optimizer function and special features for distributed training.

        Args:
            optimizer(Optimizer): The executor to run for init server.
            strategy(DistributedStrategy): Extra properties for distributed optimizer.

574
        Returns:
575
            Fleet: instance of fleet.
576 577

        Examples:
578

579
            .. code-block:: python
580 581 582 583 584 585 586 587

                import paddle.distributed.fleet as fleet
                role = fleet.role_maker.PaddleCloudRoleMaker(is_collective=True)
                fleet.init(role)
                strategy = fleet.DistributedStrategy()
                optimizer = paddle.optimizer.SGD(learning_rate=0.001)
                optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)

588 589
        """
        self.user_defined_optimizer = optimizer
590 591
        if strategy == None:
            strategy = DistributedStrategy()
592
        self.user_defined_strategy = strategy
D
Dong Daxiang 已提交
593
        self.valid_strategy = None
594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618
        return self

    def minimize(self,
                 loss,
                 startup_program=None,
                 parameter_list=None,
                 no_grad_set=None):
        """
        Add distributed operations to minimize ``loss`` by updating ``parameter_list``.

        Args:
            loss (Variable): A ``Variable`` containing the value to minimize.
            startup_program (Program, optional): :ref:`api_fluid_Program` for
                initializing parameters in ``parameter_list``. The default value
                is None, at this time :ref:`api_fluid_default_startup_program` will be used.
            parameter_list (Iterable, optional): Iterable of ``Variable`` or ``Variable.name`` to update
                to minimize ``loss``. The default value is None, at this time all parameters
                will be updated.
            no_grad_set (set, optional): Set of ``Variable``  or ``Variable.name`` that don't need
                to be updated. The default value is None.

        Returns:
            tuple: tuple (optimize_ops, params_grads), A list of operators appended
            by minimize and a list of (param, grad) variable pairs, param is
            ``Parameter``, grad is the gradient value corresponding to the parameter.
619 620
            The returned tuple can be passed to ``fetch_list`` in ``Executor.run()`` to
            indicate program pruning. If so, the program will be pruned by ``feed`` and
621 622 623
            ``fetch_list`` before run, see details in ``Executor``.

        Examples:
624
            .. code-block:: python
625

626 627
                import paddle
                import paddle.distributed.fleet as fleet
628

629 630 631 632 633 634 635 636 637 638 639 640
                fc_1 = paddle.fluid.layers.fc(input=input_x, size=hid_dim, act='tanh')
                fc_2 = paddle.fluid.layers.fc(input=fc_1, size=hid_dim, act='tanh')
                prediction = paddle.fluid.layers.fc(input=[fc_2], size=label_dim, act='softmax')
                cost = paddle.fluid.layers.cross_entropy(input=prediction, label=input_y)
                avg_cost = paddle.fluid.layers.mean(x=cost)

                role = fleet.role_maker.PaddleCloudRoleMaker(is_collective=True)
                fleet.init(role)
                strategy = fleet.DistributedStrategy()
                optimizer = paddle.optimizer.SGD(learning_rate=0.001)
                optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)
                optimizer.minimize(avg_cost)
641

642
                # for more examples, please reference https://github.com/PaddlePaddle/FleetX
643 644

        """
645
        context = {}
646 647
        # cache original feed forward program
        self.origin_main_program = loss.block.program
648 649
        context["origin_main_program"] = self.origin_main_program
        context["loss"] = loss
650 651
        if startup_program == None:
            self.origin_startup_program = \
652 653
                paddle.static.default_startup_program().clone(for_test=False)
            startup_program = paddle.static.default_startup_program()
654 655 656
        else:
            self.origin_startup_program = \
                startup_program.clone(for_test=False)
657

658 659
        context["origin_startup_program"] = startup_program
        context["role_maker"] = self._role_maker
660 661 662 663 664

        # compile time
        distributed_optimizer_list = \
            MetaOptimizerFactory()._get_valid_meta_optimizers(
                self.user_defined_optimizer)
D
Dong Daxiang 已提交
665

666 667
        valid_optimizer_list = []
        valid_graph_optimizer_list = []
D
Dong Daxiang 已提交
668
        can_not_apply_optimizer_list = []
669 670 671 672 673 674 675
        # recall meta optimizers for ranking
        for opt in distributed_optimizer_list:
            opt._set_basic_info(loss, self._role_maker,
                                self.user_defined_optimizer,
                                self.user_defined_strategy)
            if opt._can_apply() and not opt._is_graph_out():
                valid_optimizer_list.append(opt)
D
Dong Daxiang 已提交
676
            elif opt._can_apply() and opt._is_graph_out():
677
                valid_graph_optimizer_list.append(opt)
D
Dong Daxiang 已提交
678 679
            else:
                can_not_apply_optimizer_list.append(opt)
680
        # combine recalled meta optimizers to be a valid meta optimizer
D
Dong Daxiang 已提交
681
        meta_optimizer, graph_optimizer = \
682 683 684 685
            self.strategy_compiler.generate_optimizer(
                loss, self._role_maker, self.user_defined_optimizer,
                self.user_defined_strategy, valid_optimizer_list,
                valid_graph_optimizer_list)
D
Dong Daxiang 已提交
686

D
Dong Daxiang 已提交
687 688
        valid_strategy = self.strategy_compiler._get_valid_strategy(
            self.user_defined_strategy, can_not_apply_optimizer_list)
689 690 691

        context["valid_strategy"] = valid_strategy

D
Dong Daxiang 已提交
692
        self.valid_strategy = valid_strategy
693
        self.valid_strategy._enable_env()
D
Dong Daxiang 已提交
694

695 696
        optimize_ops = []
        params_grads = []
697

698 699 700 701 702 703 704 705 706 707 708 709 710 711
        if self._role_maker._is_non_distributed() and not self._is_collective:
            if self._runtime_handle is None:
                self._runtime_handle = RuntimeFactory()._create_runtime(context)

            compiled_program = compiler.CompiledProgram(
                self.origin_main_program).with_data_parallel(
                    loss_name=loss.name, share_vars_from=None)
            loss.block.program._graph = compiled_program
            return self.user_defined_optimizer.minimize(
                loss,
                startup_program=startup_program,
                parameter_list=parameter_list,
                no_grad_set=no_grad_set)

712 713 714 715 716 717
        if meta_optimizer:
            optimize_ops, params_grads = meta_optimizer.minimize(
                loss,
                startup_program=startup_program,
                parameter_list=parameter_list,
                no_grad_set=no_grad_set)
718

719
            default_program = paddle.static.default_main_program()
720 721 722 723

            if id(default_program) != id(loss.block.program):
                paddle.fluid.framework.switch_main_program(loss.block.program)

724 725 726 727 728 729
        else:
            optimize_ops, params_grads = self.user_defined_optimizer.minimize(
                loss,
                startup_program=startup_program,
                parameter_list=parameter_list,
                no_grad_set=no_grad_set)
730

731 732
        context["program_optimize_ops"] = optimize_ops
        context["program_params_grads"] = params_grads
733

734
        if graph_optimizer:
D
Dong Daxiang 已提交
735
            optimize_ops, params_grads = graph_optimizer.minimize(
736 737 738 739 740 741 742 743
                loss,
                startup_program=startup_program,
                parameter_list=parameter_list,
                no_grad_set=no_grad_set)
            # since we do not encourage users to use graph operations
            # if a graph optimizer takes effect, mostly
            # optimizers_ops and params_grads are None
            # i.e. users can not modify current computation graph anymore
744 745 746
            context["graph_optimize_ops"] = optimize_ops
            context["graph_optimize_grads"] = params_grads

747
        if self._runtime_handle is None:
748
            self._runtime_handle = RuntimeFactory()._create_runtime(context)
749 750

        if self._util is None:
751
            self._util = UtilFactory()._create_util(context)
752 753

        return optimize_ops, params_grads