parallel_executor.py 5.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
#   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import core
import multiprocessing
import framework
import executor

__all__ = ['ParallelExecutor']


class ParallelExecutor(object):
X
Xin Pan 已提交
24 25
    def __init__(self,
                 use_cuda,
26 27
                 loss_name=None,
                 main_program=None,
X
Xin Pan 已提交
28
                 num_threads=None,
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
                 allow_op_delay=False,
                 share_vars_from=None):
        """
        ParallelExecutor can run program in parallel.

        Args:
            use_cuda(bool): Whether to use CUDA or not.
            loss_name(str, default None): The loss name must set in training.
            main_program(Program, default None): The program that need to run,
                if not provided, then default_main_program will be used.
            num_threads(int, default None): How many threads are used for
                training.
            allow_op_delay(bool, default False): Whether to delay and buffer
                some operators together for scheduling or not, which may
                improve performance in some cases, defalut False.
            share_vars_from(ParallelExecutor, default None): If provied,
                it will share variables from the specified ParallelExecutor.

        Returns:
            A ParallelExecutor object.

        Raises:
            TypeError: If share_vars_from is provided, but not ParallelExecutor
                object.

        Examples:
            .. code-block:: python

              train_exe = fluid.ParallelExecutor(
                  use_cuda=True, loss_name=loss.name)
              test_exe = fluid.ParallelExecutor(
                  use_cuda=True,
                  main_program=test_program,
                  share_vars_from=train_exe)

              train_loss, = train_exe.run([loss.name], feed_dict=feed_dict)
              test_loss, = test_exe.run([loss.name], feed_dict=feed_dict)
        """

X
Xin Pan 已提交
68 69
        self._places = []
        self._act_places = []
70 71 72
        if use_cuda:
            for i in xrange(core.get_cuda_device_count()):
                p = core.Place()
X
Xin Pan 已提交
73 74 75
                self._act_places.append(core.CUDAPlace(i))
                p.set_place(self._act_places[-1])
                self._places.append(p)
76 77 78
        else:
            for i in xrange(multiprocessing.cpu_count()):
                p = core.Place()
X
Xin Pan 已提交
79 80 81 82
                self._act_places.append(core.CPUPlace(i))
                p.set_place(self._act_places[-1])
                self._places.append(p)
        assert self._places, "no place for execution"
83 84

        if num_threads is None:
X
Xin Pan 已提交
85 86 87
            if use_cuda:
                # Experiments on se-resnext shows that too many threads hurt
                # performance. Worth tunning for other models in the future.
X
Xin Pan 已提交
88
                num_threads = len(self._places)
X
Xin Pan 已提交
89
            else:
90 91
                num_threads = min(
                    len(self._places) * 2, multiprocessing.cpu_count())
92

93 94
        main = main_program
        main = main if main else framework.default_main_program()
95 96
        scope = executor.global_scope()

97 98 99 100 101 102 103 104 105 106 107
        if share_vars_from and not isinstance(share_vars_from,
                                              ParallelExecutor):
            raise TypeError("share_vars_from must be ParallelExecutor.")
        local_scopes = share_vars_from.executor.local_scopes(
        ) if share_vars_from else []

        persistable_vars = [
            v.name
            for v in filter(lambda var: var.persistable, main.list_vars())
        ]

108 109 110
        self.executor = core.ParallelExecutor(
            num_threads,
            True if use_cuda else False,  # use_event
X
Xin Pan 已提交
111
            self._places,
112 113 114 115
            set([
                p.name for p in main.global_block().iter_parameters()
                if not p.stop_gradient
            ]),
116
            set(persistable_vars),
117
            main.desc,
118
            loss_name if loss_name else '',
X
Xin Pan 已提交
119
            scope,
120
            local_scopes,
X
Xin Pan 已提交
121
            allow_op_delay)
122 123
        self.scope = scope

X
Xin Pan 已提交
124 125 126 127 128 129 130
    def run(self, fetch_list, feed_dict={}):
        """
        :param fetch_list: A list of variable names that will be fetched.
        :param feed_dict: A dict mapping for feed variable name to LoDTensor
          or numpy array.
        :return: fetched value list.
        """
X
Xin Pan 已提交
131 132 133
        if not isinstance(feed_dict, dict):
            raise TypeError("feed_dict should be a dict")

X
Xin Pan 已提交
134 135 136 137 138 139 140 141
        feed_tensor_dict = {}
        for i, feed_name in enumerate(feed_dict):
            feed_tensor = feed_dict[feed_name]
            if not isinstance(feed_tensor, core.LoDTensor):
                feed_tensor = core.LoDTensor()
                feed_tensor.set(feed_dict[feed_name], self._act_places[0])
            feed_tensor_dict[feed_name] = feed_tensor

142
        fetch_var_name = '@FETCHED_VAR_NAME@'
X
Xin Pan 已提交
143
        self.executor.run(fetch_list, fetch_var_name, feed_tensor_dict)
144 145
        arr = self.scope.find_var(fetch_var_name).get_lod_tensor_array()
        return [arr[i] for i in range(len(arr))]