graph_execution_optimizer.py 8.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
#   Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and

import paddle
from paddle.fluid.framework import core
from paddle.fluid import compiler
from .meta_optimizer_base import MetaOptimizerBase
from ..base.private_helper_function import wait_server_ready
D
Dong Daxiang 已提交
19
import logging
20 21 22 23 24 25 26 27


class GraphExecutionOptimizer(MetaOptimizerBase):
    def __init__(self, optimizer):
        super(GraphExecutionOptimizer, self).__init__(optimizer)
        self.inner_opt = optimizer
        # we do not allow meta optimizer to be inner optimizer currently
        self.meta_optimizers_white_list = []
28
        self.meta_optimizers_black_list = []
29 30 31 32 33 34 35 36

    def _is_graph_out(self):
        return True

    def _can_apply(self):
        """
        Basically, this is PE, and almost all programs can be executed here
        """
D
Dong Daxiang 已提交
37 38 39 40
        if not self.role_maker._is_collective:
            # update me. currently, if parameter server is used
            # graph execution optimizer can not be applied
            return False
41 42 43 44 45 46 47 48 49 50
        return True

    def backward(self,
                 loss,
                 startup_program=None,
                 parameter_list=None,
                 no_grad_set=None,
                 callbacks=None):
        pass

51
    # should fix the variable
52
    def _setup_nccl_op(self, startup_program, main_program, build_strategy):
53
        trainer_endpoints = self.role_maker._get_trainer_endpoints()
54
        trainers = trainer_endpoints
55 56
        trainer_id = self.role_maker._worker_index()
        current_endpoint = self.role_maker._get_trainer_endpoints()[trainer_id]
57
        trainer_endpoints_env = ",".join(trainer_endpoints)
58
        trainers_num = self.role_maker._worker_num()
59 60
        nccl_id_var = startup_program.global_block().create_var(
            name="NCCLID", persistable=True, type=core.VarDesc.VarType.RAW)
61
        for i in range(1, build_strategy.nccl_comm_num):
62 63 64 65 66
            startup_program.global_block().create_var(
                name="NCCLID_{}".format(i),
                persistable=True,
                type=core.VarDesc.VarType.RAW)

67 68
        if build_strategy.use_hierarchical_allreduce:
            for i in range(0, build_strategy.nccl_comm_num):
69 70 71 72 73 74 75 76 77 78 79 80 81 82
                startup_program.global_block().create_var(
                    name="Hierarchical_inter_NCCLID_{}".format(i),
                    persistable=True,
                    type=core.VarDesc.VarType.RAW)
                startup_program.global_block().create_var(
                    name="Hierarchical_exter_NCCLID_{}".format(i),
                    persistable=True,
                    type=core.VarDesc.VarType.RAW)

        startup_program.global_block().append_op(
            type="gen_nccl_id",
            inputs={},
            outputs={"NCCLID": nccl_id_var},
            attrs={
D
Dong Daxiang 已提交
83
                "trainers": trainer_endpoints,
84
                "trainer_id": trainer_id,
85
                "nccl_comm_num": build_strategy.nccl_comm_num,
86
                "use_hierarchical_allreduce":
87
                build_strategy.use_hierarchical_allreduce,
88
                "hierarchical_allreduce_inter_ranks":
89
                build_strategy.hierarchical_allreduce_inter_nranks
90 91 92
            })

    def _try_to_compile(self, startup_program, main_program, loss):
93 94 95 96
        import copy
        dist_strategy = self.user_defined_strategy
        local_build_strategy = paddle.fluid.BuildStrategy()
        local_build_strategy.enable_sequential_execution = \
97
            dist_strategy.build_strategy.enable_sequential_execution
98
        local_build_strategy.fuse_elewise_add_act_ops = \
99
            dist_strategy.build_strategy.fuse_elewise_add_act_ops
100
        local_build_strategy.fuse_bn_act_ops = \
101
            dist_strategy.build_strategy.fuse_bn_act_ops
102
        local_build_strategy.enable_auto_fusion = \
103
            dist_strategy.build_strategy.enable_auto_fusion
104
        local_build_strategy.fuse_relu_depthwise_conv = \
105
            dist_strategy.build_strategy.fuse_relu_depthwise_conv
106
        local_build_strategy.fuse_broadcast_ops = \
107
            dist_strategy.build_strategy.fuse_broadcast_ops
108
        local_build_strategy.fuse_all_optimizer_ops = \
109
            dist_strategy.build_strategy.fuse_all_optimizer_ops
110
        local_build_strategy.enable_inplace = \
111
            dist_strategy.build_strategy.enable_inplace
112
        local_build_strategy.use_hierarchical_allreduce = \
113
            dist_strategy.use_hierarchical_allreduce
114
        local_build_strategy.hierarchical_allreduce_inter_nranks = \
115
            dist_strategy.hierarchical_allreduce_inter_nranks
116
        local_build_strategy.sync_batch_norm = \
117
            dist_strategy.sync_batch_norm
118
        local_build_strategy.fuse_all_reduce_ops = \
119
            dist_strategy.fuse_all_reduce_ops
120
        local_build_strategy.nccl_comm_num = \
121
            dist_strategy.nccl_comm_num
122

123 124 125 126 127 128
        if self.user_defined_strategy.recompute == True:
            logging.warn(
                "set enable_sequential_execution=True since you have enable the recompute strategy"
            )
            local_build_strategy.enable_sequential_execution = True

129
        exe_strategy = self.user_defined_strategy.execution_strategy
130 131
        worker_num = self.role_maker._worker_num()
        node_num = self.role_maker._node_num()
132

133
        if self.role_maker._is_collective:
134
            assert worker_num >= 1, "nccl2 worker_num must >= 1, now:{}" % worker_num
135

136
        if worker_num <= 1:
137
            # local mode
138
            if local_build_strategy.nccl_comm_num > 1:
139
                logging.warn("set nccl_comm_num=1 since you only have 1 node.")
140
            local_build_strategy.nccl_comm_num = 1
141

142
        if node_num <= 1:
143
            if local_build_strategy.use_hierarchical_allreduce:
144 145 146
                logging.warn(
                    "set hierachical_allreduce=False since you only have 1 node."
                )
147
            local_build_strategy.use_hierarchical_allreduce = False
148

149
        sync_allreduce = dist_strategy.sync_nccl_allreduce
150
        if sync_allreduce:
151 152 153
            exe_strategy.num_threads = local_build_strategy.nccl_comm_num + 1
            if local_build_strategy.use_hierarchical_allreduce:
                exe_strategy.num_threads = 2 * local_build_strategy.nccl_comm_num + 1
154 155 156
            if exe_strategy.num_threads > 4:
                logging.warn(
                    "if you use hierachical_allreduce or "
157
                    "with multi nccl comm, please set distributed_strategy.sync_nccl_allreduce=False"
158 159
                )

160
        sync_batch_norm = local_build_strategy.sync_batch_norm
161
        if sync_batch_norm:
162 163
            local_build_strategy.nccl_comm_num = 1
            local_build_strategy.use_hierarchical_allreduce = False
164 165 166 167 168 169 170
            exe_strategy.num_threads = 1
            logging.warn(
                "use sync_batch_norm will hang when set num_threads > 1, so "
                "set num_threads=1, nccl_comm_num=1, hierachical_allreduce=False."
            )

        # TODO(guru4elephant): should be an independent optimizer
171
        self._setup_nccl_op(startup_program, main_program, local_build_strategy)
172

173 174 175
        local_build_strategy.num_trainers = self.role_maker._worker_num()
        local_build_strategy.trainer_id = self.role_maker._worker_index()
        local_build_strategy.trainers_endpoints = self.role_maker._get_trainer_endpoints(
176
        )
177
        local_build_strategy.enable_backward_optimizer_op_deps = True
178 179 180 181 182

        self._compiled_program = compiler.CompiledProgram(main_program)

        self._compiled_program.with_data_parallel(
            loss_name=loss.name,
183
            build_strategy=local_build_strategy,
184 185 186 187 188
            exec_strategy=exe_strategy,
            share_vars_from=None)

        return self._compiled_program

D
Dong Daxiang 已提交
189 190
    def _disable_strategy(self, dist_strategy):
        # TODO(guru4elephant): should close all PE related flags here
191 192
        return

193
    def _enable_strategy(self, dist_strategy, context):
194 195
        # by default, graph execution strategy is enabled
        return
D
Dong Daxiang 已提交
196

197 198 199 200 201 202
    def minimize(self,
                 loss,
                 startup_program=None,
                 parameter_list=None,
                 no_grad_set=None):
        if startup_program == None:
203
            startup_program = paddle.static.default_startup_program()
204 205
        compiled_program = self._try_to_compile(startup_program,
                                                loss.block.program, loss)
206
        loss.block.program._graph = compiled_program
207 208 209

        # just return self.optimizer_ops and self.param_grads
        return None, None