__init__.py 15.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
#   Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
15
import warnings
16 17 18
"""
Convert the fluid program to distributed data-parallelism programs.
"""
19 20
import paddle.fluid.io as io
from paddle.fluid.communicator import Communicator
T
tangwei12 已提交
21
from paddle.fluid.framework import default_main_program
22
from paddle.fluid.framework import default_startup_program
T
tangwei12 已提交
23
from paddle.fluid.framework import Program
24 25 26
from paddle.fluid.compiler import CompiledProgram
from paddle.fluid.executor import Executor
from paddle.fluid.parallel_executor import ParallelExecutor
27 28
from paddle.fluid.optimizer import Optimizer
from paddle.fluid.transpiler.distribute_transpiler import DistributeTranspiler as OriginTranspiler
29
from paddle.fluid.transpiler.geo_sgd_transpiler import GeoSgdTranspiler
30
from paddle.fluid.transpiler.distribute_transpiler import DistributeTranspilerConfig
31

T
tangwei12 已提交
32 33 34
from paddle.fluid.incubate.fleet.base.fleet_base import DistributedOptimizer
from paddle.fluid.incubate.fleet.base.fleet_base import Fleet
from paddle.fluid.incubate.fleet.base.fleet_base import Mode
35
from paddle.fluid.incubate.fleet.base.role_maker import MPISymetricRoleMaker
36 37 38 39 40 41 42 43 44


class DistributedTranspiler(Fleet):
    """
    A subclass for compatibility with fluid.transpiler.DistributeTranspiler.
    """

    def __init__(self):
        super(DistributedTranspiler, self).__init__(Mode.TRANSPILER)
45 46
        self._transpile_config = None
        self._transpiler = None
T
tangwei12 已提交
47
        self._origin_program = None
48 49 50
        self.startup_program = None
        self.main_program = None
        self._communicator = None
51

T
tangwei12 已提交
52
    def init_worker(self):
53 54 55 56 57 58 59 60 61
        """
        `init_worker` has many many functions to do before training,
        first, wait for all parameter servers launch completely.
        second, run executor to initialize startup program
        third, wait for all worker initialize completely.

        Returns:
            None
        """
62 63 64 65 66 67 68
        # if MPISymetricRoleMaker is defined
        # we suppose a user wants to submit job on mpi cluster
        if isinstance(self._role_maker, MPISymetricRoleMaker):
            # check whether server has been initialized
            from paddle.fluid.transpiler.details.checkport import wait_server_ready
            wait_server_ready(fleet.server_endpoints(to_string=False))

69
        if not self._transpile_config.sync_mode:
70 71 72 73 74 75 76
            if self._transpile_config.geo_sgd_mode:
                self._communicator = Communicator(
                    self.main_program, self.vars_info,
                    fleet.worker_num(),
                    self._transpile_config.geo_sgd_need_push_nums)
            else:
                self._communicator = Communicator(self.main_program)
77 78 79 80 81

            if not self._communicator.is_running():
                self._communicator.start()
            else:
                warnings.warn("communicator has been initialized, skip")
82

T
tangwei12 已提交
83
    def init_server(self, model_dir=None):
84 85 86 87 88 89 90 91 92 93 94
        """
        `init_server` has many many functions to do before start pserver,
        first, run executor to initialize startup program,
        second, if the `model_dir` is not empty, it will load parameters from it for increment training.

        Args:
            model_dir(str): The directory path.

        Returns:
            None
        """
95
        if not self.startup_program:
96 97 98 99
            raise ValueError(
                "startup_program is None, need invoke DistributedOptimizer.minimize first"
            )

100
        self._executor.run(self.startup_program)
101 102 103 104 105

        if model_dir:
            if not os.path.isdir(model_dir):
                raise ValueError("There is no directory named '%s'", model_dir)

T
tangwei12 已提交
106
            io.load_persistables(self._executor, model_dir, self.main_program)
107

T
tangwei12 已提交
108
    def run_server(self):
109 110 111 112 113 114
        """
        `run_server` execute executor to start pserver main program.

        Returns:
            None
        """
115
        if not self.main_program:
116 117 118 119
            raise ValueError(
                "main_program is None, need invoke DistributedOptimizer.minimize first"
            )

120
        self._executor.run(self.main_program)
121 122 123 124 125 126 127 128 129 130 131

    def stop_worker(self):
        """
        Close this executor.

        For the distributed training, this method would free the resource on PServers related to
        the current Trainer.

        Returns:
            None
        """
132
        if not self._transpile_config.sync_mode:
133
            self._communicator.stop()
T
tangwei12 已提交
134
        self._executor.close()
135 136 137
        if isinstance(self._role_maker, MPISymetricRoleMaker):
            self._role_maker._finalize()

138 139 140 141 142 143 144 145 146
    def distributed_optimizer(self, optimizer, strategy=None):
        """
        Optimizer for distributed training.

        For the distributed training, this method would rebuild a new instance of DistributedOptimizer.
        Which has basic Optimizer function and special features for distributed training.

        Args:
            optimizer(Optimizer): The executor to run for init server.
147
            strategy(DistributeTranspilerConfig): Extra properties for distributed optimizer.
148 149 150 151 152 153 154

        Returns:
            TranspilerOptimizer: subclass of DistributedOptimizer.
        """

        if not isinstance(optimizer, Optimizer):
            raise ValueError("optimizer must be an instance of Optimizer")
T
tangwei12 已提交
155 156
        self._optimizer = TranspilerOptimizer(optimizer, strategy)
        return self._optimizer
157 158

    def save_inference_model(self,
159
                             executor,
160 161 162 163 164 165 166 167 168
                             dirname,
                             feeded_var_names,
                             target_vars,
                             main_program=None,
                             export_for_deployment=True):
        """
        Prune the given `main_program` to build a new program especially for inference,
        and then save it and all related parameters to given `dirname` by the `executor`.
        """
169 170 171 172 173 174 175 176 177 178
        if isinstance(executor, ParallelExecutor):
            raise TypeError(
                "in fleet.save_inference_model() function, executor must be as Executor type, ParallelExecutor is not allowed"
            )

        if not isinstance(executor, Executor):
            raise TypeError(
                "in fleet.save_inference_model() function, executor must be as Executor type"
            )

T
tangwei12 已提交
179
        if main_program is not None:
180 181 182 183
            if isinstance(main_program, CompiledProgram):
                raise TypeError(
                    "in fleet.save_inference_model() function, main_program must be as Program type, CompiledProgram is not allowed"
                )
T
tangwei12 已提交
184 185 186 187
            io.save_inference_model(dirname, feeded_var_names, target_vars,
                                    executor, main_program, None, None,
                                    export_for_deployment)
        else:
188 189 190
            io.save_inference_model(dirname, feeded_var_names, target_vars,
                                    executor, self._origin_program, None, None,
                                    export_for_deployment, True)
T
tangwei12 已提交
191 192 193 194 195 196 197 198 199 200

            model_basename = "__model__"
            model_filename = os.path.join(dirname, model_basename)

            with open(model_filename, "rb") as f:
                program_desc_str = f.read()

            program = Program.parse_from_string(program_desc_str)
            program._copy_dist_param_info_from(self.main_program)
            self.save_persistables(executor, dirname, program)
201

202
    def save_persistables(self, executor, dirname, main_program=None):
203 204 205 206 207 208 209 210 211 212
        """
        This function filters out all variables with `persistable==True` from the
        give `main_program` and then saves these variables to the folder `dirname`
        or file `filename`.

        The `dirname` is used to specify the folder where persistable variables
        are going to be saved. If you would like to save variables in separate
        files, set `filename` None; if you would like to save all variables in a
        single file, use `filename` to specify the file name.
        """
213 214 215 216 217 218 219 220 221
        if isinstance(executor, ParallelExecutor):
            raise TypeError(
                "in fleet.save_persistables() function, executor must be as Executor type, ParallelExecutor is not allowed"
            )

        if not isinstance(executor, Executor):
            raise TypeError(
                "in fleet.save_persistables() function, executor must be as Executor type"
            )
T
tangwei12 已提交
222 223 224 225

        if main_program is None:
            main_program = self.main_program

226 227 228 229 230
        if isinstance(main_program, CompiledProgram):
            raise TypeError(
                "in fleet.save_persistables() function, main_program must be as Program type, CompiledProgram is not allowed"
            )

T
tangwei12 已提交
231 232 233 234
        if not main_program._is_distributed:
            raise ValueError(
                "main_program is for local, may not use fleet.save_persistables")

235
        io.save_persistables(executor, dirname, main_program, None)
236 237

    def _transpile(self, config):
238
        if not isinstance(config, DistributeTranspilerConfig):
239
            raise TypeError(
240 241 242 243 244
                "config must be an instance of DistributeTranspilerConfig")

        if not config.sync_mode:
            config.runtime_split_send_recv = True

T
tangwei12 已提交
245 246 247
        # _origin_program is a deep copy for default_main_program, for inference
        self._origin_program = default_main_program().clone(for_test=False)

248
        self._transpile_config = config
249 250 251 252
        if config.geo_sgd_mode:
            self._transpiler = GeoSgdTranspiler(config)
        else:
            self._transpiler = OriginTranspiler(config)
253

T
tangwei12 已提交
254
        if self.is_worker():
T
tangwei12 已提交
255 256 257 258 259
            self._transpiler.transpile(
                trainer_id=fleet.worker_index(),
                pservers=fleet.server_endpoints(to_string=True),
                trainers=fleet.worker_num(),
                sync_mode=config.sync_mode)
260 261

            if isinstance(self._role_maker, MPISymetricRoleMaker):
262
                config.wait_port = False
263 264

            self.main_program = self._transpiler.get_trainer_program(
265
                wait_port=config.wait_port)
266
            self.startup_program = default_startup_program()
267 268 269
            if self._transpile_config.geo_sgd_mode:
                self.vars_info = self._transpiler._get_vars_info()
                self.startup_program = self._transpiler.trainer_startup_program
270
        else:
T
tangwei12 已提交
271 272 273 274 275 276
            self._transpiler.transpile(
                trainer_id=fleet.worker_index(),
                pservers=fleet.server_endpoints(to_string=True),
                trainers=fleet.worker_num(),
                sync_mode=config.sync_mode,
                current_endpoint=self.server_endpoints()[self.server_index()])
277
            self.main_program, self.startup_program = \
278 279
                self._transpiler.get_pserver_programs(
                    self.server_endpoints()[self.server_index()])
280 281 282 283 284 285


fleet = DistributedTranspiler()


class TranspilerOptimizer(DistributedOptimizer):
T
tangwei12 已提交
286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302
    """
    DistributedOptimizer is a wrapper for paddle.fluid.optimizer
    A user should pass a paddle.fluid.optimizer to DistributedOptimizer
    minimize() function is implemented.
    DistributedOptimizer is the starting point for a user who wants to
    run distributed training. The optimized information will be stored in
    Fleet() instance who holds the global information about current distributed
    training.

    Args:
        optimizer(Optimizer): subclass of Optimizer.
        strategy(DistributeTranspilerConfig): instance of DistributeTranspilerConfig.

    Returns:
        None
    """

303 304 305
    def __init__(self, optimizer, strategy=None):
        super(TranspilerOptimizer, self).__init__(optimizer, strategy)

T
tangwei12 已提交
306 307
        if strategy:
            if not isinstance(strategy, DistributeTranspilerConfig):
308
                raise TypeError(
T
tangwei12 已提交
309 310 311 312 313 314
                    "In {} mode, strategy must be an instance of DistributeTranspilerConfig".
                    format(fleet._mode))
            else:
                self._strategy = strategy
        else:
            self._strategy = DistributeTranspilerConfig()
315 316 317 318 319 320 321

    def backward(self,
                 loss,
                 startup_program=None,
                 parameter_list=None,
                 no_grad_set=None,
                 callbacks=None):
T
tangwei12 已提交
322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340
        """
        First part of `minimize`, do auto-diff to append backward ops for
        the current program.

        Args:
            loss (Variable): loss variable to run optimizations.
            startup_program (Program): startup_program for initializing parameters
                in `parameter_list`.
            parameter_list (list): list of Variables to update.
            no_grad_set (set|None): set of Variables should be ignored.
            callbacks (list|None): list of callables to run when appending backward
                operator for one parameter.

        Return:
            list: list of (param, grad) pair, grad is the output of backward.

        Examples:
            See examples in `apply_gradients`.
        """
341 342 343 344
        return self._optimizer.backward(loss, startup_program, parameter_list,
                                        no_grad_set, callbacks)

    def apply_gradients(self, params_grads):
T
tangwei12 已提交
345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364
        """
        Second part of `minimize`, appending optimization operators for
        given `params_grads` pairs.

        Args:
            params_grads (list): list of (param, grad) pair to do optimization.

        Returns:
            list: A list of operators appended to the current program.

        Examples:
            .. code-block:: python

                loss = network()
                optimizer = fluid.optimizer.SGD(learning_rate=0.1)
                params_grads = optimizer.backward(loss)
                # you may append operations for params_grads here
                # ...
                optimizer.apply_gradients(params_grads)
        """
365 366 367 368
        return self._optimizer.apply_gradients(params_grads)

    def minimize(self,
                 loss,
369
                 scopes=None,
370 371 372
                 startup_program=None,
                 parameter_list=None,
                 no_grad_set=None):
373 374 375 376 377
        """
        Add operations to minimize `loss` by updating `parameter_list`.

        This method combines interface `backward()` and
        `apply_gradients()` into one.
378

379 380 381 382 383 384 385 386 387 388 389 390
        Args:
            loss (Variable): loss variable to run optimizations.
            scopes (None): TranspilerOptimizer doesn't need scope parameter.
            startup_program (Program): startup_program for initializing parameters
                in `parameter_list`.
            parameter_list (list): list of Variables to update.
            no_grad_set (set|None): set of Variables should be ignored.

        Returns:
            tuple: (optimize_ops, params_grads) which are, list of operators appended;
            and list of (param, grad) Variables pair for optimization.
        """
T
tangwei12 已提交
391
        if isinstance(loss, list):
392
            raise TypeError(
T
tangwei12 已提交
393
                "DistributedTranspiler's minimize can not accept loss with list")
394

T
tangwei12 已提交
395
        if isinstance(startup_program, list):
396
            raise TypeError(
T
tangwei12 已提交
397 398 399 400 401
                "DistributedTranspiler's minimize can not accept program with list"
            )

        optimize_ops, params_grads = self._optimizer.minimize(
            loss, startup_program, parameter_list, no_grad_set)
402
        fleet._transpile(config=self._strategy)
T
tangwei12 已提交
403
        return optimize_ops, params_grads