parallel_executor.py 18.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
#   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

15 16 17 18
from __future__ import print_function
from . import core
from . import framework
from . import executor
19
from . import compiler
20
from .data_feeder import check_type
Y
Yu Yang 已提交
21
import sys
22

X
Xin Pan 已提交
23
__all__ = ['ParallelExecutor']
Y
yuyang18 已提交
24 25

ExecutionStrategy = core.ParallelExecutor.ExecutionStrategy
Y
yuyang18 已提交
26
BuildStrategy = core.ParallelExecutor.BuildStrategy
27 28 29


class ParallelExecutor(object):
C
chengduoZH 已提交
30
    """
31
	:api_attr: Static Graph
S
swtkiwi 已提交
32

33
    The ParallelExecutor is an upgraded version of :code:`paddle.static.Executor` that supports multi-node model
C
chengduo 已提交
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
    training and testing based on the data-parallel mode. In data-parallel mode,
    ParallelExecutor will broadcast the parameters from Node0 to other nodes during
    construction and copy the input Program to other nodes from Node0 to make sure
    that the initial state on each node is the same. Each node runs the model independently
    and the parameters' gradient is aggregated between those nodes during backward
    computation, and then each node independently updates its parameters. If you use
    the GPU to run the model, i.e. use_cuda=True, the node refers to the GPU,
    ParallelExecutor will automatically get the GPU resources available on the
    current machine, users can also set the available GPU resources in the environment
    variable, for example: want to use GPU0, GPU1, export CUDA_VISIBLEDEVICES=0,1;
    If the operation is performed on the CPU, i.e. use_cuda=False, the node refers to the CPU.
    **Note: At this time, the user needs to manually add CPU_NUM to the environment variable
    and set the number of CPU devices. For example, export CPU_NUM=4, if the environment
    variable is not set, the executor will add the variable to the environment variable
    and set it to 1.**


    Args:
        use_cuda (bool): Whether to use CUDA or not.
53
        loss_name (str): This parameter is the name of the loss Tensor of the
C
chengduo 已提交
54 55 56 57
            model. **Note: If it is data-parallel model training, you must set loss_name,
            otherwise, the results may be wrong**. The default is None.
        main_program (Program): This parameter represents the Program to be executed.
            If this parameter is not provided, that parameter is None, the program will
58
            be set to :code:`paddle.static.default_main_program()`. The default is None.
C
chengduo 已提交
59 60 61 62 63 64 65 66 67 68
        share_vars_from(ParallelExecutor): If share_vars_from is set, the current
            ParallelExecutor will share the parameters with the ParallelExecutor
            specified by share_vars_from. This parameter needs to be set when model testing
            is required during model training, and the data parallel mode is used for
            training and testing. Since ParallelExecutor will only distribute parameter
            variables to other devices when it is first executed, the ParallelExecutor
            specified by share_vars_from must be run before the current ParallelExecutor.
            The default is None.
        exec_strategy(ExecutionStrategy): exec_strategy specifies the options that can
            be changed when running the current model, such as the thread pool size.
69
            For more information about exec_strategy, please refer to :code:`paddle.static.ExecutionStrategy`.
C
chengduo 已提交
70 71 72 73 74
            The default is None.
        build_strategy(BuildStrategy): By configuring build_strategy, we can
            optimize the computational graph, such as operators' fusion in the
            computational graph and memory optimization during the execution
            of the computational graph. For more information about build_strategy,
75
            please refer to :code:`paddle.static.BuildStrategy`.  The default is None.
C
chengduo 已提交
76 77 78 79 80 81 82 83
        num_trainers(int): This parameter needs to be set in GPU distributed training.
            If the parameter value is greater than 1, NCCL will be initialized by multi-level
            nodes. Each node should have the same number of GPUs. The default is 1.
        trainer_id(int): This parameter needs to be set when performing GPU distributed
            training. This parameter must be used with the num_trainers parameter.
            Trainer_id indicates the "rank" of the current node. The trainer_id starts
            counting from 0. The default is 0.
        scope(Scope): Specifies the scope in which the program is executed.
84
            The default is paddle.static.global_scope().
C
chengduo 已提交
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99

    Returns:
        ParallelExecutor: The initialized ParallelExecutor object.

    Raises:
        TypeError: If share_vars_from is provided, but not ParallelExecutor object.

    NOTES:

        1. If you only use ParallelExecutor to do multi-card test, you don't need to set loss_name
           and share_vars_from.

        2. If you need to train and test the model with ParallelExecutor, the share_vars_from
           must be set when building the ParallelExecutor corresponding to the model test.
           Otherwise, the parameters used in the model test and the model training are inconsistent.
C
chengduoZH 已提交
100

101 102 103
    Examples:
        .. code-block:: python

104
          import paddle
105 106 107 108
          import numpy
          import os

          use_cuda = True
109 110
          paddle.enable_static()
          place = paddle.CUDAPlace(0) if use_cuda else paddle.CPUPlace()
111 112

          # NOTE: If you use CPU to run the program, you need
113
          # to specify the CPU_NUM, otherwise, PaddlePaddle will use
114 115 116 117 118 119 120
          # all the number of the logic core as the CPU_NUM,
          # in that case, the batch size of the input should be
          # greater than CPU_NUM, if not, the process will be
          # failed by an exception.
          if not use_cuda:
              os.environ['CPU_NUM'] = str(2)

121
          exe = paddle.static.Executor(place)
122

123 124 125 126 127 128 129 130
          train_program = paddle.static.Program()
          startup_program = paddle.static.Program()
          with paddle.static.program_guard(train_program, startup_program):
              data = paddle.static.data(name='X', shape=[None, 1], dtype='float32')
              hidden = paddle.static.nn.fc(data, 10)
              loss = paddle.mean(hidden)
              test_program = paddle.static.default_main_program().clone(for_test=True)
              paddle.optimizer.SGD(learning_rate=0.01).minimize(loss)
131 132 133

          exe.run(startup_program)

134 135 136 137 138 139 140
          train_exe = paddle.static.ParallelExecutor(use_cuda=use_cuda,
                                                     main_program=train_program,
                                                     loss_name=loss.name)
          # Note: if share_vars_from is not set here, the test parameter is different to the train one
          test_exe = paddle.static.ParallelExecutor(use_cuda=use_cuda,
                                                    main_program=test_program,
                                                    share_vars_from=train_exe)
141 142 143 144 145 146 147 148

          x = numpy.random.random(size=(10, 1)).astype('float32')
          loss_data, = train_exe.run(feed={"X": x},
                                     fetch_list=[loss.name])

          loss_data, = test_exe.run(feed={"X": x},
                                    fetch_list=[loss.name])

C
chengduoZH 已提交
149 150
    """

X
Xin Pan 已提交
151 152
    def __init__(self,
                 use_cuda,
153 154
                 loss_name=None,
                 main_program=None,
Y
Yu Yang 已提交
155
                 share_vars_from=None,
Y
yuyang18 已提交
156
                 exec_strategy=None,
Y
yuyang18 已提交
157
                 build_strategy=None,
T
typhoonzero 已提交
158
                 num_trainers=1,
159
                 trainer_id=0,
X
Xin Pan 已提交
160
                 scope=None):
Y
yuyang18 已提交
161 162
        if build_strategy is None:
            build_strategy = BuildStrategy()
C
chengduo 已提交
163 164 165 166 167 168 169 170 171 172 173 174 175 176

        # TODO(paddle-dev): trainer_id and num_trainers should be removed from parameter list.
        if num_trainers != 1 and build_strategy.num_trainers != num_trainers:
            sys.stderr.write(
                'The value of build_strategy.num_trainers[%d] is overwritten '
                'by the passed num_trainers[%d].\n' %
                (build_strategy.num_trainers, num_trainers))
            build_strategy.num_trainers = num_trainers
        if trainer_id != 0 and build_strategy.trainer_id != trainer_id:
            sys.stderr.write(
                'The value of build_strategy.trainer_id[%d] is overwritten '
                'by the passed trainer_id[%d].\n' %
                (build_strategy.trainer_id, trainer_id))
            build_strategy.trainer_id = trainer_id
177

S
sneaxiy 已提交
178 179
        self._places = framework.cuda_places(
        ) if use_cuda else framework.cpu_places()
180
        self._scope = scope if scope is not None else executor.global_scope()
X
Xin Pan 已提交
181

182 183
        main_program = main_program if main_program is not None \
            else framework.default_main_program()
184

185
        self._compiled_program = compiler.CompiledProgram(main_program)
C
chengduo 已提交
186 187 188 189
        if share_vars_from:
            assert isinstance(
                share_vars_from, ParallelExecutor
            ), "The share_vars_from should be ParallelExecutor."
C
chengduo 已提交
190

191 192 193 194
        self._compiled_program.with_data_parallel(
            loss_name=loss_name,
            build_strategy=build_strategy,
            exec_strategy=exec_strategy,
C
chengduo 已提交
195 196
            share_vars_from=share_vars_from._compiled_program
            if share_vars_from else None)
G
gongweibao 已提交
197

198
        self._place = core.CUDAPlace(0) if use_cuda else core.CPUPlace()
C
chengduo 已提交
199
        self._exe = executor.Executor(self._place)
200

201
    def run(self, fetch_list, feed=None, feed_dict=None, return_numpy=True):
X
Xin Pan 已提交
202
        """
C
chengduo 已提交
203 204 205 206 207 208
        This interface is used to run the current model. It should be noted
        that the executor will execute all the operators in the Program,
        and will not prune some operators in the Program according to the
        fetch_list.

        Args:
209
            fetch_list(list): This parameter represents the Tensors that need to be returned
C
chengduo 已提交
210
                after the model runs. The default is None.
211
            feed(list|dict): This parameter represents the input Tensors of the model.
C
chengduo 已提交
212
                If it is single card training, the feed is dict type, and if it is multi-card
213
                training, the parameter feed can be dict or list of Tensor. If the
C
chengduo 已提交
214 215 216 217 218 219 220 221 222
                parameter type is dict, the data in the feed will be split and sent to
                multiple devices (CPU/GPU), that is to say, the input data will be evenly
                sent to different devices, so you should make sure the number of samples of
                the current mini-batch must be greater than the number of places;
                if the parameter type is list, those data are copied directly to each device,
                so the length of this list should be equal to the number of places.
                The default is None.
            feed_dict: Alias for feed parameter, for backward compatibility.
                This parameter has been deprecated. Default None.
223 224
            return_numpy(bool): This parameter indicates whether convert the fetched Tensors
                (the Tensor specified in the fetch list) to numpy.ndarray. if it is False,
C
chengduo 已提交
225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244
                the type of the return value is a list of :code:`LoDTensor`. The default is True.

        Returns:
            List: The fetched result list.

        Raises:
            ValueError: If the feed is a list, but its length is not equal the
                length of active places, or its element's is not dict.

        NOTES:
            1. If the feed parameter is dict type, the input data will be evenly distributed
               to different cards. For example, using two GPUs to run the model, the input
               sample number is 3, that is, [0, 1, 2], the sample number on GPU0 is 1,
               that is, [0], and the sample number on GPU1 is 2, that is, [1, 2].
               If the number of samples is less than the number of devices, the program will
               throw an exception, so when running the model, you should make sure that the
               number of samples of the last batch of the data set should be greater than the
               number of CPU cores or GPU cards, if it is less than, it is recommended that
               the batch be discarded.
            2. If the number of CPU cores or GPU cards available is greater than 1, the fetch
245 246
               results are spliced together in dimension 0 for the same Tensor values
               (Tensors in fetch_list) on different devices.
Y
Yu Yang 已提交
247 248


249 250 251
        Examples:
            .. code-block:: python

252
              import paddle
253 254 255 256
              import numpy
              import os

              use_cuda = True
257 258
              paddle.enable_static()
              place = paddle.CUDAPlace(0) if use_cuda else paddle.CPUPlace()
259 260

              # NOTE: If you use CPU to run the program, you need
261
              # to specify the CPU_NUM, otherwise, PaddlePaddle will use
262 263 264 265 266 267 268
              # all the number of the logic core as the CPU_NUM,
              # in that case, the batch size of the input should be
              # greater than CPU_NUM, if not, the process will be
              # failed by an exception.
              if not use_cuda:
                  os.environ['CPU_NUM'] = str(2)

269
              exe = paddle.static.Executor(place)
270

271 272 273 274 275 276 277
              train_program = paddle.static.Program()
              startup_program = paddle.static.Program()
              with paddle.static.program_guard(train_program, startup_program):
                  data = paddle.static.data(name='X', shape=[None, 1], dtype='float32')
                  hidden = paddle.static.nn.fc(data, 10)
                  loss = paddle.mean(hidden)
                  paddle.optimizer.SGD(learning_rate=0.01).minimize(loss)
278 279 280

              exe.run(startup_program)

281 282 283
              train_exe = paddle.static.ParallelExecutor(use_cuda=use_cuda,
                                                         main_program=train_program,
                                                         loss_name=loss.name)
284 285

              # If the feed is a dict:
T
tianshuo78520a 已提交
286
              # the image will be split into devices. If there is two devices
287 288 289 290 291 292 293 294 295 296 297 298 299 300
              # each device will process an image with shape (5, 1)
              x = numpy.random.random(size=(10, 1)).astype('float32')
              loss_data, = train_exe.run(feed={"X": x},
                                         fetch_list=[loss.name])

              # If the feed is a list:
              # each device will process each element in the list.
              # the 1st device will process an image with shape (10, 1)
              # the 2nd device will process an image with shape (9, 1)
              #
              # you can use exe.device_count to get the device number.
              x2 = numpy.random.random(size=(9, 1)).astype('float32')
              loss_data, = train_exe.run(feed=[{"X": x}, {"X": x2}],
                                         fetch_list=[loss.name])
Y
Yu Yang 已提交
301

X
Xin Pan 已提交
302
        """
C
chengduo 已提交
303 304 305 306 307
        return self._exe.run(program=self._compiled_program,
                             scope=self._scope,
                             feed=feed,
                             fetch_list=fetch_list,
                             return_numpy=return_numpy)
T
typhoonzero 已提交
308

Y
Yu Yang 已提交
309 310
    @property
    def device_count(self):
311
        return len(self._places)
312 313 314

    def drop_local_exe_scopes(self):
        """
C
chengduo 已提交
315 316 317 318
        Drop the local execution scopes immediately. In order to avoid frequently
        application and release of temporary variables, the strategy adopted by
        ParallelExecutor is to drop the local execution scopes after several iterations.
        ParallelExecutor provides the num_iteration_per_drop_scope option in
319
        :code:`paddle.static.ExecutionStrategy`, which indicates how many iterations are intervened to
C
chengduo 已提交
320 321 322 323 324 325
        drop the local execution scopes. If the num_iteration_per_drop_scope value
        is 100, but you want to drop the local execution scopes after 50 iterations,
        you can call the interface manually.

        Returns:
            None
326 327 328 329

        Examples:
            .. code-block:: python

330
              import paddle
331 332 333 334 335
              import numpy
              import os

              use_cuda = True
              # NOTE: If you use CPU to run the program, you need
336
              # to specify the CPU_NUM, otherwise, PaddlePaddle will use
337 338 339 340 341 342 343
              # all the number of the logic core as the CPU_NUM,
              # in that case, the batch size of the input should be
              # greater than CPU_NUM, if not, the process will be
              # failed by an exception.
              if not use_cuda:
                  os.environ['CPU_NUM'] = str(2)

344 345 346 347 348 349 350
              paddle.enable_static()
              train_program = paddle.static.Program()
              startup_program = paddle.static.Program()
              with paddle.static.program_guard(train_program, startup_program):
                  data = paddle.static.data(name='X', shape=[None, 1], dtype='float32')
                  hidden = paddle.static.nn.fc(data, 10)
                  loss = paddle.mean(hidden)
351

352 353
              place = paddle.CUDAPlace(0) if use_cuda else paddle.CPUPlace()
              exe = paddle.static.Executor(place)
354 355
              exe.run(startup_program)

356 357 358
              parallel_exe = paddle.static.ParallelExecutor(use_cuda=use_cuda,
                                                            main_program=train_program,
                                                            loss_name=loss.name)
359 360 361

              x = numpy.random.random(size=(10, 1)).astype('float32')
              loss_data, = parallel_exe.run(feed={"X": x},
362
                                            fetch_list=[loss.name])
363 364

              parallel_exe.drop_local_exe_scopes()
365

366
        """
367 368 369
        check_type(self._compiled_program._executor,
                   "the Executor of compiled program", core.ParallelExecutor,
                   "ParallelExecutor.drop_local_exe_scopes")
370 371 372 373
        self._compiled_program._executor.drop_local_exe_scopes()

    # This API is used to check whether DropLocalExeScopes can work.
    def _need_create_local_exe_scopes(self):
374 375 376
        check_type(self._compiled_program._executor,
                   "the Executor of compiled program", core.ParallelExecutor,
                   "ParallelExecutor._need_create_local_exe_scopes")
377
        return self._compiled_program._executor._need_create_local_exe_scopes()