compiler.py 11.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
#   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import multiprocessing
import os
import six
X
polish  
Xin Pan 已提交
18
import sys
19 20 21
from .. import compat as cpt

from . import core
22
from . import framework
23

X
Xin Pan 已提交
24 25
__all__ = ['CompiledProgram', 'ExecutionStrategy', 'BuildStrategy']

26 27
ExecutionStrategy = core.ParallelExecutor.ExecutionStrategy
BuildStrategy = core.ParallelExecutor.BuildStrategy
F
flame 已提交
28 29
InferNativeConfig = core.NativeConfig
InferAnalysisConfig = core.AnalysisConfig
30 31 32 33 34 35 36 37


def _place_obj(place):
    p = core.Place()
    p.set_place(place)
    return p


38 39
def _is_pserver_mode(main_program):
    main = main_program if main_program \
C
chengduo 已提交
40
        else framework.default_main_program()
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
    for op in main.global_block().ops:
        if op.type in ["send", "recv"]:
            return True
    return False


def get_available_places(use_cuda):
    if use_cuda:
        gpus_env = os.getenv("FLAGS_selected_gpus")
        if gpus_env:
            gpus = [int(s) for s in gpus_env.split(",")]
        else:
            gpus = [i for i in six.moves.range(core.get_cuda_device_count())]
        places = [core.CUDAPlace(i) for i in gpus]
    else:
        cpu_num = int(os.environ.get('CPU_NUM', multiprocessing.cpu_count()))
        places = [core.CPUPlace() for _ in six.moves.range(cpu_num)]
    assert places, "no place for execution"
    return places


X
polish  
Xin Pan 已提交
62
class CompiledProgram(object):
X
polish  
Xin Pan 已提交
63
    """
X
Xin Pan 已提交
64
    Compiles to Graph for execution.
X
polish  
Xin Pan 已提交
65

X
Xin Pan 已提交
66 67 68 69
    1. Users first create the program with layers.
    2. Optionally, users use CompiledProgram to optimize the program before run.
    3. The original program or CompiledProgram is run by executor.

X
polish  
Xin Pan 已提交
70 71 72 73 74 75 76 77
    The CompiledProgram is used to transform a program for various
    optimizations, for example.
      * Pre-compute some logic once so that each run is faster.
      * Transform the program so that it can run in multiple devices.
      * TODO: transform the program for optimized inference or distributed
              training.

    Example:
X
Xin Pan 已提交
78
        .. code-block:: python
X
Xin Pan 已提交
79
            place = fluid.CUDAPlace(0) if use_gpu else fluid.CPUPlace()
X
Xin Pan 已提交
80 81 82 83 84 85 86 87
            exe = fluid.Executor(place)
            exe.run(startup)
            compiled_prog = compiler.CompiledProgram(main).with_data_parallel(
                loss_name=loss.name)
            for i in range(5):
                test_loss, = exe.run(compiled_prog,
                                     feed=feed_dict,
                                     fetch_list=[loss.name])
X
polish  
Xin Pan 已提交
88 89

    Args:
X
Xin Pan 已提交
90 91 92 93 94
        program_or_graph (Graph|Program): If it's Program, it will be first
            lowered to a graph for further optimizations. If it's a graph
            (potentially optimized before), it will be directly used for
            further optimizations. Note: graph is only supported when compiled
            with with_data_parallel option.
X
polish  
Xin Pan 已提交
95 96
    """

X
Xin Pan 已提交
97 98 99 100 101 102 103 104 105 106 107 108
    def __init__(self, program_or_graph):
        if isinstance(program_or_graph, core.Graph):
            self._graph = program_or_graph
            self._program = None
        elif isinstance(program_or_graph, framework.Program):
            self._graph = core.Graph(program_or_graph.desc)
            self._program = program_or_graph
        else:
            raise ValueError("Wrong program_to_graph type: %s" %
                             type(program_or_graph))

        self._program_desc = self._graph.origin_program_desc()
X
polish  
Xin Pan 已提交
109 110 111
        self._scope = None
        self._place = None
        self._executor = None
112 113
        self._compiled = False
        self._is_data_parallel = False
F
flame 已提交
114
        self._is_inference = False
115

X
Xin Pan 已提交
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
    def with_data_parallel(self,
                           loss_name=None,
                           build_strategy=None,
                           exec_strategy=None,
                           share_vars_from=None):
        """Configs the program to run in data parallel way.

        Args:
            loss_name (str): The loss name must set in training. Default None.
            build_strategy(BuildStrategy): build_strategy is used to
                build the graph so it can run on multiple devices/cores with
                optimized topology.
                For more information, please refer to fluid.BuildStrategy.
                Default None.
            exec_strategy(ExecutionStrategy): exec_strategy is used to
                to select the a way to execute the graph, for example how many
                threads are used, how many iterations to clean up the temp
                variables. For more information, please refer
                to fluid.ExecutionStrategy. Default None.
            share_vars_from(CompiledProgram): If provide, this CompiledProgram
                will share variables from `share_vars_from`. `share_vars_from`
                must be run by the executor before this CompiledProgram so that
                vars are ready.
        Returns:
            self
        """
142
        assert not self._is_data_parallel, "Already compiled with parallel."
X
Xin Pan 已提交
143
        assert not self._is_inference, "Cannot compile both data parallel and inference"
144 145 146 147
        self._is_data_parallel = True
        self._build_strategy = build_strategy
        self._exec_strategy = exec_strategy
        self._loss_name = loss_name
X
polish  
Xin Pan 已提交
148
        self._share_vars_from = share_vars_from
X
fix  
Xin Pan 已提交
149 150 151 152
        if self._exec_strategy is None:
            self._exec_strategy = ExecutionStrategy()
        if self._build_strategy is None:
            self._build_strategy = BuildStrategy()
153
        self._build_strategy.is_distribution = _is_pserver_mode(self._program)
154 155
        return self

F
flame 已提交
156 157 158 159 160 161 162 163
    def with_inference_optimize(self, config):
        """ Add inference optimize

        Args:
            config: instance of `NativeConfig` or `AnalysisConfig` to create predictor
        Returns:
            self
        """
X
Xin Pan 已提交
164
        assert not self._is_data_parallel, "Cannot compile both data parallel and inference"
X
Xin Pan 已提交
165 166
        assert not self._is_inference, "Already compiled with inference"

F
flame 已提交
167 168 169 170 171 172 173
        assert any([
            isinstance(config, InferNativeConfig),
            isinstance(config, InferAnalysisConfig)
        ])
        self._is_inference = True
        self._infer_config = config
        return self
X
polish  
Xin Pan 已提交
174

F
flame 已提交
175
    def _with_distributed(self):
X
polish  
Xin Pan 已提交
176 177
        raise NotImplementedError()

178
    def _compile_data_parallel(self, use_cuda=False, scope=None):
X
polish  
Xin Pan 已提交
179
        if self._share_vars_from:
180
            if scope:
X
polish  
Xin Pan 已提交
181 182 183 184 185 186 187 188 189 190
                sys.stderr.write("share_vars_from is set, scope is ignored.\n")
            if not self._share_vars_from._is_data_parallel:
                raise ValueError("share_vars_from is not data parallel. Cannot "
                                 "share vars from it.")
            if self._share_vars_from._executor is None:
                raise ValueError(
                    "share_vars_from is not compiled and run, so there is no "
                    "var to share.")
            self._local_scopes = self._share_vars_from._executor.local_scopes()
        else:
191
            assert scope is not None, ""
X
polish  
Xin Pan 已提交
192
            self._local_scopes = []
193

194 195
        self._exec_strategy.use_cuda = use_cuda
        self._places = get_available_places(self._exec_strategy.use_cuda)
196 197 198 199 200 201 202 203 204 205 206

        if self._exec_strategy.num_threads == 0:
            if self._exec_strategy.use_cuda:
                # Experiments on se-resnext shows that too many threads hurt
                # performance. Worth tunning for other models in the future.
                self._exec_strategy.num_threads = len(self._places) * 4
            else:
                cpu_num = int(
                    os.environ.get('CPU_NUM', multiprocessing.cpu_count()))
                self._exec_strategy.num_threads = cpu_num * 2

D
dzhwinter 已提交
207 208
        # FIXME(dzhwinter): enable_inplace should be after memory_optimize
        # if turn on python memory optimize, turn off the inplace_pass.
209 210 211 212 213 214
        # memory_optimize and enable_inplace default are True, but we can disable them on purpose
        if self._program and self._program._is_mem_optimized:
            self._build_strategy.memory_optimize = False

        if self._program and self._program._is_mem_optimized:
            self._build_strategy.enable_inplace = False
X
Xin Pan 已提交
215 216 217 218 219 220

        # TODO(wuyi): trainer endpoings should be passed in through
        # build_strategy, not program.xxx.
        if self._program and self._build_strategy.num_trainers > 1 and \
                self._program._trainers_endpoints:
            tps = self._program._trainers_endpoints
D
dzhwinter 已提交
221

222
            assert self._build_strategy.num_trainers == len(
X
Xin Pan 已提交
223 224 225 226
                tps), "num_trainers == len(end_points)"
            self._build_strategy.trainers_endpoints = tps

        self._persistable_vars = []
Z
Zhen Wang 已提交
227 228 229 230
        for node in self._graph.nodes():
            if node.is_var() and node.var() is not None and node.var().persistable() and \
                    node.var().type() != core.VarDesc.VarType.RAW:
                self._persistable_vars.append(cpt.to_text(node.name()))
231 232

        places = list(map(_place_obj, self._places))
Y
Yan Xu 已提交
233 234 235 236 237 238 239 240 241 242 243
        # ParallelExecutor would broadcast all the parameters during initializing.
        # The parameters of each process should be in the same ordered for the data-parallelism
        # distributed training to keep the broadcast correct.
        self._persistable_vars = list(set(self._persistable_vars))
        self._persistable_vars.sort()

        return core.ParallelExecutor(
            places, self._persistable_vars,
            cpt.to_text(self._loss_name)
            if self._loss_name else six.u(''), self._scope, self._local_scopes,
            self._exec_strategy, self._build_strategy, self._graph)
244

F
flame 已提交
245 246 247
    def _compile_inference(self):
        return core.create_paddle_predictor(self._infer_config)

248
    def _compile(self, scope, place):
X
Xin Pan 已提交
249 250 251 252 253 254 255 256 257 258
        """Compile the program based on the configs.

        Args:
            scope: The variables (resources) that are associated with
               this compiled program.
            place: The location that the compiled program will be run on.

        Returns:
            self
        """
259
        if self._compiled:
X
polish  
Xin Pan 已提交
260 261
            if scope and self._scope != scope:
                raise ValueError("Cannot compile with different scope")
S
sneaxiy 已提交
262
            if place and not self._place._equals(place):
X
polish  
Xin Pan 已提交
263
                raise ValueError("Cannot compile with different place")
264
            return self
X
fix  
Xin Pan 已提交
265
        self._compiled = True
266 267 268 269

        self._scope = scope
        self._place = place
        if self._is_data_parallel:
270 271 272
            self._executor = self._compile_data_parallel(
                use_cuda=isinstance(self._place, core.CUDAPlace),
                scope=self._scope)
F
flame 已提交
273 274
        elif self._is_inference:
            self._executor = self._compile_inference()
275 276 277 278
        else:
            p = _place_obj(self._place)
            self._executor = core.Executor(p)
        return self