parallel_executor.py 5.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
#   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import core
import multiprocessing
import framework
import executor

__all__ = ['ParallelExecutor']


class ParallelExecutor(object):
X
Xin Pan 已提交
24 25
    def __init__(self,
                 use_cuda,
26 27
                 loss_name=None,
                 main_program=None,
X
Xin Pan 已提交
28
                 num_threads=None,
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
                 allow_op_delay=False,
                 share_vars_from=None):
        """
        ParallelExecutor can run program in parallel.

        Args:
            use_cuda(bool): Whether to use CUDA or not.
            loss_name(str, default None): The loss name must set in training.
            main_program(Program, default None): The program that need to run,
                if not provided, then default_main_program will be used.
            num_threads(int, default None): How many threads are used for
                training.
            allow_op_delay(bool, default False): Whether to delay and buffer
                some operators together for scheduling or not, which may
                improve performance in some cases, defalut False.
            share_vars_from(ParallelExecutor, default None): If provied,
                it will share variables from the specified ParallelExecutor.

        Returns:
            A ParallelExecutor object.

        Raises:
            TypeError: If share_vars_from is provided, but not ParallelExecutor
                object.

        Examples:
            .. code-block:: python

              train_exe = fluid.ParallelExecutor(
                  use_cuda=True, loss_name=loss.name)
              test_exe = fluid.ParallelExecutor(
                  use_cuda=True,
                  main_program=test_program,
                  share_vars_from=train_exe)

              train_loss, = train_exe.run([loss.name], feed_dict=feed_dict)
              test_loss, = test_exe.run([loss.name], feed_dict=feed_dict)
        """

X
Xin Pan 已提交
68 69
        self._places = []
        self._act_places = []
70 71 72
        if use_cuda:
            for i in xrange(core.get_cuda_device_count()):
                p = core.Place()
X
Xin Pan 已提交
73 74 75
                self._act_places.append(core.CUDAPlace(i))
                p.set_place(self._act_places[-1])
                self._places.append(p)
76 77 78
        else:
            for i in xrange(multiprocessing.cpu_count()):
                p = core.Place()
X
Xin Pan 已提交
79 80 81 82
                self._act_places.append(core.CPUPlace(i))
                p.set_place(self._act_places[-1])
                self._places.append(p)
        assert self._places, "no place for execution"
83 84

        if num_threads is None:
X
Xin Pan 已提交
85 86 87
            if use_cuda:
                # Experiments on se-resnext shows that too many threads hurt
                # performance. Worth tunning for other models in the future.
X
Xin Pan 已提交
88
                num_threads = len(self._places)
X
Xin Pan 已提交
89
            else:
X
Xin Pan 已提交
90
                min(len(self._places) * 2, multiprocessing.cpu_count())
91

92 93
        main = main_program
        main = main if main else framework.default_main_program()
94 95
        scope = executor.global_scope()

96 97 98 99 100 101
        if share_vars_from and not isinstance(share_vars_from,
                                              ParallelExecutor):
            raise TypeError("share_vars_from must be ParallelExecutor.")
        local_scopes = share_vars_from.executor.local_scopes(
        ) if share_vars_from else []

T
typhoonzero 已提交
102
        self.persistable_vars = [
103 104 105 106
            v.name
            for v in filter(lambda var: var.persistable, main.list_vars())
        ]

107 108 109
        self.executor = core.ParallelExecutor(
            num_threads,
            True if use_cuda else False,  # use_event
X
Xin Pan 已提交
110
            self._places,
111 112 113 114
            set([
                p.name for p in main.global_block().iter_parameters()
                if not p.stop_gradient
            ]),
T
typhoonzero 已提交
115
            set(self.persistable_vars),
116
            main.desc,
117
            loss_name if loss_name else '',
X
Xin Pan 已提交
118
            scope,
119
            local_scopes,
X
Xin Pan 已提交
120
            allow_op_delay)
121 122
        self.scope = scope

X
Xin Pan 已提交
123 124 125 126 127 128 129
    def run(self, fetch_list, feed_dict={}):
        """
        :param fetch_list: A list of variable names that will be fetched.
        :param feed_dict: A dict mapping for feed variable name to LoDTensor
          or numpy array.
        :return: fetched value list.
        """
X
Xin Pan 已提交
130 131 132
        if not isinstance(feed_dict, dict):
            raise TypeError("feed_dict should be a dict")

X
Xin Pan 已提交
133 134 135 136 137 138 139 140
        feed_tensor_dict = {}
        for i, feed_name in enumerate(feed_dict):
            feed_tensor = feed_dict[feed_name]
            if not isinstance(feed_tensor, core.LoDTensor):
                feed_tensor = core.LoDTensor()
                feed_tensor.set(feed_dict[feed_name], self._act_places[0])
            feed_tensor_dict[feed_name] = feed_tensor

141
        fetch_var_name = '@FETCHED_VAR_NAME@'
X
Xin Pan 已提交
142
        self.executor.run(fetch_list, fetch_var_name, feed_tensor_dict)
143 144
        arr = self.scope.find_var(fetch_var_name).get_lod_tensor_array()
        return [arr[i] for i in range(len(arr))]
T
typhoonzero 已提交
145 146 147

    def bcast_params(self):
        self.executor.bcast_params(set(self.persistable_vars))