parallel_executor.py 3.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
#   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import core
import multiprocessing
import framework
import executor

__all__ = ['ParallelExecutor']


class ParallelExecutor(object):
X
Xin Pan 已提交
24 25 26 27 28
    def __init__(self,
                 loss_name,
                 use_cuda,
                 num_threads=None,
                 allow_op_delay=False):
X
Xin Pan 已提交
29 30
        self._places = []
        self._act_places = []
31 32 33
        if use_cuda:
            for i in xrange(core.get_cuda_device_count()):
                p = core.Place()
X
Xin Pan 已提交
34 35 36
                self._act_places.append(core.CUDAPlace(i))
                p.set_place(self._act_places[-1])
                self._places.append(p)
37 38 39
        else:
            for i in xrange(multiprocessing.cpu_count()):
                p = core.Place()
X
Xin Pan 已提交
40 41 42 43
                self._act_places.append(core.CPUPlace(i))
                p.set_place(self._act_places[-1])
                self._places.append(p)
        assert self._places, "no place for execution"
44 45

        if num_threads is None:
X
Xin Pan 已提交
46 47 48
            if use_cuda:
                # Experiments on se-resnext shows that too many threads hurt
                # performance. Worth tunning for other models in the future.
X
Xin Pan 已提交
49
                num_threads = len(self._places)
X
Xin Pan 已提交
50
            else:
X
Xin Pan 已提交
51
                min(len(self._places) * 2, multiprocessing.cpu_count())
52 53 54 55 56 57 58 59

        startup = framework.default_startup_program()
        main = framework.default_main_program()
        scope = executor.global_scope()

        self.executor = core.ParallelExecutor(
            num_threads,
            True if use_cuda else False,  # use_event
X
Xin Pan 已提交
60
            self._places,
61 62 63 64 65 66 67
            set([
                p.name for p in main.global_block().iter_parameters()
                if not p.stop_gradient
            ]),
            startup.desc,
            main.desc,
            loss_name,
X
Xin Pan 已提交
68 69
            scope,
            allow_op_delay)
70 71
        self.scope = scope

X
Xin Pan 已提交
72 73 74 75 76 77 78
    def run(self, fetch_list, feed_dict={}):
        """
        :param fetch_list: A list of variable names that will be fetched.
        :param feed_dict: A dict mapping for feed variable name to LoDTensor
          or numpy array.
        :return: fetched value list.
        """
X
Xin Pan 已提交
79 80 81
        if not isinstance(feed_dict, dict):
            raise TypeError("feed_dict should be a dict")

X
Xin Pan 已提交
82 83 84 85 86 87 88 89
        feed_tensor_dict = {}
        for i, feed_name in enumerate(feed_dict):
            feed_tensor = feed_dict[feed_name]
            if not isinstance(feed_tensor, core.LoDTensor):
                feed_tensor = core.LoDTensor()
                feed_tensor.set(feed_dict[feed_name], self._act_places[0])
            feed_tensor_dict[feed_name] = feed_tensor

90
        fetch_var_name = '@FETCHED_VAR_NAME@'
X
Xin Pan 已提交
91
        self.executor.run(fetch_list, fetch_var_name, feed_tensor_dict)
92 93
        arr = self.scope.find_var(fetch_var_name).get_lod_tensor_array()
        return [arr[i] for i in range(len(arr))]