compiler.py 4.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
#   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import multiprocessing
import os
import six
from .. import compat as cpt

from . import core

ExecutionStrategy = core.ParallelExecutor.ExecutionStrategy
BuildStrategy = core.ParallelExecutor.BuildStrategy


def _place_obj(place):
    p = core.Place()
    p.set_place(place)
    return p


class _ProgramCompiler(object):
    def __init__(self, program):
        self._program = program
        self._compiled = False
        self._is_data_parallel = False

    def _with_data_parallel(self,
                            loss_name=None,
                            build_strategy=None,
                            exec_strategy=None):
        assert not self._is_data_parallel, "Already compiled with parallel."
        self._is_data_parallel = True
        self._build_strategy = build_strategy
        self._exec_strategy = exec_strategy
        self._loss_name = loss_name
        return self

    def _compile_data_parallel(self):
        self._places = []
        self._local_scopes = []

        if self._exec_strategy is None:
            self._exec_strategy = ExecutionStrategy()
        if self._build_strategy is None:
            self._build_strategy = BuildStrategy()

        self._exec_strategy.use_cuda = isinstance(self._place, core.CUDAPlace)
        if self._exec_strategy.use_cuda:
            gpus_env = os.getenv("FLAGS_selected_gpus")
            if gpus_env:
                gpus = [int(s) for s in gpus_env.split(",")]
            else:
                gpus = [
                    i for i in six.moves.range(core.get_cuda_device_count())
                ]
            self._places = [core.CUDAPlace(i) for i in gpus]
        else:
            cpu_num = int(
                os.environ.get('CPU_NUM', multiprocessing.cpu_count()))
            self._places = [core.CPUPlace() for _ in six.moves.range(cpu_num)]
        assert self._places, "no place for execution"

        if self._exec_strategy.num_threads == 0:
            if self._exec_strategy.use_cuda:
                # Experiments on se-resnext shows that too many threads hurt
                # performance. Worth tunning for other models in the future.
                self._exec_strategy.num_threads = len(self._places) * 4
            else:
                cpu_num = int(
                    os.environ.get('CPU_NUM', multiprocessing.cpu_count()))
                self._exec_strategy.num_threads = cpu_num * 2

        trainers_endpoints = self._program._trainers_endpoints
        if self._build_strategy.num_trainers > 1 and trainers_endpoints:
            assert self._build_strategy.num_trainers == len(
                trainers_endpoints), "num_trainers == len(end_points)"
            self._build_strategy.trainers_endpoints = trainers_endpoints

        self._persistable_vars = set([
            cpt.to_text(v.name)
            for v in [
                var for var in self._program.list_vars()
                if var.persistable and var.type != core.VarDesc.VarType.RAW
            ]
        ])

        places = list(map(_place_obj, self._places))
        return core.ParallelExecutor(
            places, self._persistable_vars, self._program.desc,
            cpt.to_text(self._loss_name)
            if self._loss_name else six.u(''), self._scope, self._local_scopes,
            self._exec_strategy, self._build_strategy)

    def _compile(self, scope, place):
        if self._compiled:
            return self
        self._compiled = True

        self._scope = scope
        self._place = place

        if self._is_data_parallel:
            self._executor = self._compile_data_parallel()
        else:
            p = _place_obj(self._place)
            self._executor = core.Executor(p)
        return self