parallel_executor.py 5.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
#   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import core
import multiprocessing
import framework
import executor
J
JiayiFeng 已提交
19
import warnings
20 21 22 23 24

__all__ = ['ParallelExecutor']


class ParallelExecutor(object):
X
Xin Pan 已提交
25 26
    def __init__(self,
                 use_cuda,
27 28
                 loss_name=None,
                 main_program=None,
X
Xin Pan 已提交
29
                 num_threads=None,
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
                 allow_op_delay=False,
                 share_vars_from=None):
        """
        ParallelExecutor can run program in parallel.

        Args:
            use_cuda(bool): Whether to use CUDA or not.
            loss_name(str, default None): The loss name must set in training.
            main_program(Program, default None): The program that need to run,
                if not provided, then default_main_program will be used.
            num_threads(int, default None): How many threads are used for
                training.
            allow_op_delay(bool, default False): Whether to delay and buffer
                some operators together for scheduling or not, which may
                improve performance in some cases, defalut False.
            share_vars_from(ParallelExecutor, default None): If provied,
                it will share variables from the specified ParallelExecutor.

        Returns:
            A ParallelExecutor object.

        Raises:
            TypeError: If share_vars_from is provided, but not ParallelExecutor
                object.

        Examples:
            .. code-block:: python

              train_exe = fluid.ParallelExecutor(
                  use_cuda=True, loss_name=loss.name)
              test_exe = fluid.ParallelExecutor(
                  use_cuda=True,
                  main_program=test_program,
                  share_vars_from=train_exe)

65 66
              train_loss, = train_exe.run([loss.name], feed=feed_dict)
              test_loss, = test_exe.run([loss.name], feed=feed_dict)
67 68
        """

X
Xin Pan 已提交
69 70
        self._places = []
        self._act_places = []
71 72 73
        if use_cuda:
            for i in xrange(core.get_cuda_device_count()):
                p = core.Place()
X
Xin Pan 已提交
74 75 76
                self._act_places.append(core.CUDAPlace(i))
                p.set_place(self._act_places[-1])
                self._places.append(p)
77 78 79
        else:
            for i in xrange(multiprocessing.cpu_count()):
                p = core.Place()
X
Xin Pan 已提交
80 81 82 83
                self._act_places.append(core.CPUPlace(i))
                p.set_place(self._act_places[-1])
                self._places.append(p)
        assert self._places, "no place for execution"
84 85

        if num_threads is None:
X
Xin Pan 已提交
86 87 88
            if use_cuda:
                # Experiments on se-resnext shows that too many threads hurt
                # performance. Worth tunning for other models in the future.
X
Xin Pan 已提交
89
                num_threads = len(self._places)
X
Xin Pan 已提交
90
            else:
91 92
                num_threads = min(
                    len(self._places) * 2, multiprocessing.cpu_count())
93

94 95
        main = main_program
        main = main if main else framework.default_main_program()
96 97
        scope = executor.global_scope()

98 99 100 101 102 103
        if share_vars_from and not isinstance(share_vars_from,
                                              ParallelExecutor):
            raise TypeError("share_vars_from must be ParallelExecutor.")
        local_scopes = share_vars_from.executor.local_scopes(
        ) if share_vars_from else []

T
typhoonzero 已提交
104
        self.persistable_vars = [
105
            v.name
T
typhoonzero 已提交
106 107 108
            for v in filter(lambda var: \
                var.persistable and var.type != core.VarDesc.VarType.RAW,
                main.list_vars())
109 110
        ]

111 112 113
        self.executor = core.ParallelExecutor(
            num_threads,
            True if use_cuda else False,  # use_event
X
Xin Pan 已提交
114
            self._places,
115 116 117 118
            set([
                p.name for p in main.global_block().iter_parameters()
                if not p.stop_gradient
            ]),
T
typhoonzero 已提交
119
            set(self.persistable_vars),
120
            main.desc,
121
            loss_name if loss_name else '',
X
Xin Pan 已提交
122
            scope,
123
            local_scopes,
X
Xin Pan 已提交
124
            allow_op_delay)
125 126
        self.scope = scope

127
    def run(self, fetch_list, feed={}, feed_dict={}):
X
Xin Pan 已提交
128 129
        """
        :param fetch_list: A list of variable names that will be fetched.
130
        :param feed: A dict mapping for feed variable name to LoDTensor
X
Xin Pan 已提交
131 132 133
          or numpy array.
        :return: fetched value list.
        """
J
JiayiFeng 已提交
134 135 136 137
        if not feed_dict == {}:
            warnings.warn(
                "The 'feed_dict' of ParallelExecutor.run() is deprecated. Please use 'feed' instead."
            )
J
JiayiFeng 已提交
138 139
        if feed == {}:
            feed = feed_dict
140 141
        if not isinstance(feed, dict):
            raise TypeError("feed should be a dict")
X
Xin Pan 已提交
142

X
Xin Pan 已提交
143
        feed_tensor_dict = {}
144 145
        for i, feed_name in enumerate(feed):
            feed_tensor = feed[feed_name]
X
Xin Pan 已提交
146 147
            if not isinstance(feed_tensor, core.LoDTensor):
                feed_tensor = core.LoDTensor()
148
                feed_tensor.set(feed[feed_name], self._act_places[0])
X
Xin Pan 已提交
149 150
            feed_tensor_dict[feed_name] = feed_tensor

151
        fetch_var_name = '@FETCHED_VAR_NAME@'
X
Xin Pan 已提交
152
        self.executor.run(fetch_list, fetch_var_name, feed_tensor_dict)
153 154
        arr = self.scope.find_var(fetch_var_name).get_lod_tensor_array()
        return [arr[i] for i in range(len(arr))]
T
typhoonzero 已提交
155 156 157

    def bcast_params(self):
        self.executor.bcast_params(set(self.persistable_vars))