ps_program_builder.py 12.6 KB
Newer Older
Z
ziyoujiyi 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
# 
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# 
#     http://www.apache.org/licenses/LICENSE-2.0
# 
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import paddle
from .public import *
from paddle.distributed.fleet.base.private_helper_function import wait_server_ready
from paddle.distributed.passes import new_pass, PassContext


class PsProgramBuilder(object):
    def __init__(self, pass_ctx):
        self.pass_ctx = pass_ctx
        self.attrs = self.pass_ctx._attrs
        self.loss = self.attrs['loss']
        self.cloned_main = self.attrs['cloned_main']
        self.cloned_startup = self.attrs['cloned_startup']

        self.use_ps_gpu = self.attrs['use_ps_gpu']
        self.use_heter_ps = self.attrs['is_heter_ps_mode']
        self.is_worker = self.attrs['is_worker']
        self.is_heter_worker = self.attrs['is_heter_worker']
        self.ps_mode = self.attrs['ps_mode']

        self.launch_barrier = self.attrs['launch_barrier']
        self.launch_barrier_flag = self.attrs['launch_barrier_flag']
        self.server_endpoints = self.attrs['role_maker']._get_pserver_endpoints(
        )

40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
    def _build_trainer_desc(self):
        opt_info = self.loss.block.program._fleet_opt
        opt_info = {} if opt_info is None else opt_info
        opt_info["trainer"] = opt_info.get("trainer", "DistMultiTrainer")
        opt_info["device_worker"] = opt_info.get("device_worker",
                                                 "DownpourLite")
        pid = str(id(self.cloned_main))
        program_configs = {
            pid: {
                'pull_dense': [],
                'push_dense': [],
                'pull_sparse': [],
                'push_sparse': []
            }
        }
        dense_table_config = {}
        send_ctx = get_the_one_send_context(self.attrs)
        recv_ctx = get_the_one_recv_context(self.attrs)
        for name, ctx in send_ctx.items():
            if ctx.program_id() != id(self.loss.block.program):
                continue
            if ctx.is_sparse():
                continue
            if not ctx.is_tensor_table():
                program_configs[pid]['pull_dense'].append(ctx.table_id())
                program_configs[pid]['push_dense'].append(ctx.table_id())
            dense_table_config[ctx.table_id()] = recv_ctx[ctx.table_id()]
        opt_info['program_configs'] = program_configs
        opt_info['dense_table_config'] = dense_table_config
        self.cloned_main._fleet_opt = opt_info

Z
ziyoujiyi 已提交
71 72 73 74
    def _optimize_programs(self):
        pass

    def _build_trainer_programs(self):
Z
ziyoujiyi 已提交
75
        raise NotImplementedError
Z
ziyoujiyi 已提交
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93

    def _build_pserver_programs(self):
        is_sgd_adam = False
        ops = get_optimize_ops(self.attrs['origin_main_program'])
        if len(ops) == 0:
            return
        add_lr_decay_table_pass = new_pass('add_lr_decay_table_pass',
                                           self.attrs)
        add_lr_decay_table_pass.apply([], [], self.pass_ctx)
        for op in ops:
            if op.type in ["sgd", "adam"]:
                is_sgd_adam = True
                break
        if is_sgd_adam:
            return

    def _build_programs(self):
        if self.attrs['is_worker']:
Z
ziyoujiyi 已提交
94
            logger.info("start building trainer program")
Z
ziyoujiyi 已提交
95 96
            self._build_trainer_programs()
            fluid.framework.switch_startup_program(self.cloned_startup)
97 98
            # print("ps_program_build before =", id(self.loss.block.program))
            self._build_trainer_desc()
Z
ziyoujiyi 已提交
99
            self.loss.block.program = self.cloned_main
100 101 102 103 104 105
            # print("ps_program_build after =", id(self.loss.block.program))
            # print("ps_program_build clone after =", id(self.cloned_main))
            # print("ps_program_build after trainer_desc",
            #       id(self.loss.block.program))
            # print("ps_program build trainer desc",
            #       self.loss.block.program._fleet_opt)
Z
ziyoujiyi 已提交
106 107

        elif self.attrs['is_server']:
Z
ziyoujiyi 已提交
108
            logger.info("start building pserver program")
Z
ziyoujiyi 已提交
109 110 111 112 113 114 115 116
            self._build_pserver_programs()
            self.loss.block.program = self.attrs['_main_server']
            fluid.framework.switch_startup_program(self.attrs[
                '_startup_server'])


class GeoPsProgramBuilder(PsProgramBuilder):  # 仅 CPU 模式
    def __init__(self, pass_ctx):
Z
ziyoujiyi 已提交
117
        logger.info("start building geo-ps program")
Z
ziyoujiyi 已提交
118 119 120
        super(GeoPsProgramBuilder, self).__init__(pass_ctx)
        if self.ps_mode != DistributedMode.GEO:
            raise ValueError("ps mode: {} not matched {}",
W
wangguanqun 已提交
121
                             format(self.ps_mode, "GeoPsProgramBuilder"))
Z
ziyoujiyi 已提交
122 123 124 125 126 127 128 129

    def _build_trainer_programs(self):
        append_send_ops_pass = new_pass("append_send_ops_pass", self.attrs)
        append_send_ops_pass.apply([self.cloned_main], [None], self.pass_ctx)

        self.attrs['origin_main_program'] = self.cloned_main

        if self.launch_barrier and self.launch_barrier_flag:
130
            wait_server_ready(self.server_endpoints)
Z
ziyoujiyi 已提交
131 132 133

        return

134 135 136 137 138 139 140
    def _build_pserver_programs(self):
        add_listen_and_serv_pass = new_pass('add_listen_and_serv_pass',
                                            self.attrs)
        add_listen_and_serv_pass.apply([self.attrs['_main_server']], [None],
                                       self.pass_ctx)
        return

Z
ziyoujiyi 已提交
141 142 143 144

class CpuSyncPsProgramBuilder(PsProgramBuilder):
    def __init__(self, pass_ctx):
        super(CpuSyncPsProgramBuilder, self).__init__(pass_ctx)
Z
ziyoujiyi 已提交
145 146
        if self.ps_mode == DistributedMode.SYNC:
            logger.info("start building cpu-sync-ps program")
W
wangguanqun 已提交
147
        if self.ps_mode != DistributedMode.SYNC and self.ps_mode != DistributedMode.ASYNC:
Z
ziyoujiyi 已提交
148
            raise ValueError("ps mode: {} not matched {}",
Z
ziyoujiyi 已提交
149
                             format(self.ps_mode, "PsProgramBuilder"))
Z
ziyoujiyi 已提交
150 151

    def _build_trainer_programs(self):
152 153
        # print("build trainer program entry")
        # print("before ps program builder program:", self.cloned_main)
Z
ziyoujiyi 已提交
154 155 156 157
        add_lr_decay_table_pass = new_pass("add_lr_decay_table_pass",
                                           self.attrs)
        add_lr_decay_table_pass.apply([], [], self.pass_ctx)

158
        # print("before distributed op pass")
Z
ziyoujiyi 已提交
159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177
        distributed_ops_pass = new_pass("distributed_ops_pass", self.attrs)
        distributed_ops_pass.apply([self.cloned_main], [None], self.pass_ctx)

        delete_optimizer_pass = new_pass("delete_optimizer_pass", self.attrs)
        delete_optimizer_pass.apply([self.cloned_main], [None], self.pass_ctx)

        append_send_ops_pass = new_pass("append_send_ops_pass", self.attrs)
        append_send_ops_pass.apply([self.cloned_main], [None], self.pass_ctx)

        delete_extra_optimizer_pass = new_pass("delete_extra_optimizer_pass",
                                               self.attrs)
        delete_extra_optimizer_pass.apply([self.attrs['origin_main_program']],
                                          [self.cloned_startup], self.pass_ctx)

        fake_init_ops_pass = new_pass("fake_init_ops_pass", self.attrs)
        fake_init_ops_pass.apply([None], [self.cloned_startup], self.pass_ctx)

        self.attrs['origin_main_program'] = self.cloned_main
        self.attrs['origin_startup_program'] = self.cloned_startup
178
        # print("after ps program builder program:", self.cloned_main)
Z
ziyoujiyi 已提交
179 180

        if self.launch_barrier and self.launch_barrier_flag:
181
            wait_server_ready(self.server_endpoints)
Z
ziyoujiyi 已提交
182 183 184 185 186 187

        return


class CpuAsyncPsProgramBuilder(CpuSyncPsProgramBuilder):
    def __init__(self, pass_ctx):
Z
ziyoujiyi 已提交
188
        logger.info("start building cpu-async-ps program")
Z
ziyoujiyi 已提交
189 190 191
        super(CpuAsyncPsProgramBuilder, self).__init__(pass_ctx)


Z
ziyoujiyi 已提交
192
class GpuPsProgramBuilder(PsProgramBuilder):
Z
ziyoujiyi 已提交
193
    def __init__(self, pass_ctx):
Z
ziyoujiyi 已提交
194
        logger.info("start building gpu-ps program")
Z
ziyoujiyi 已提交
195 196 197
        super(GpuPsProgramBuilder, self).__init__(pass_ctx)

    def _build_trainer_programs(self):
Z
ziyoujiyi 已提交
198

Z
ziyoujiyi 已提交
199 200 201 202 203 204 205 206 207 208 209 210 211 212
        add_lr_decay_table_pass = new_pass("add_lr_decay_table_pass",
                                           self.attrs)
        add_lr_decay_table_pass.apply([], [], self.pass_ctx)

        distributed_ops_pass = new_pass("distributed_ops_pass", self.attrs)
        distributed_ops_pass.apply([self.cloned_main], [None], self.pass_ctx)

        fake_init_ops_pass = new_pass("fake_init_ops_pass", self.attrs)
        fake_init_ops_pass.apply([None], [self.cloned_startup], self.pass_ctx)

        ps_gpu_pass = new_pass("ps_gpu_pass", self.attrs)
        ps_gpu_pass.apply([self.cloned_main], [None], self.pass_ctx)

        ps_transpile_pass = new_pass("ps_transpile_pass", self.attrs)
Z
ziyoujiyi 已提交
213 214
        ps_transpile_pass.apply([self.cloned_main], [self.cloned_startup],
                                self.pass_ctx)
Z
ziyoujiyi 已提交
215 216 217 218 219

        self.attrs['origin_main_program'] = self.cloned_main
        self.attrs['origin_startup_program'] = self.cloned_startup

        if self.launch_barrier and self.launch_barrier_flag:
220
            wait_server_ready(self.server_endpoints)
Z
ziyoujiyi 已提交
221 222 223 224 225 226

        return


class HeterAsyncPsProgramBuilder(PsProgramBuilder):
    def __init__(self, pass_ctx):
Z
ziyoujiyi 已提交
227
        logger.info("start building heter-async-ps program")
Z
ziyoujiyi 已提交
228 229 230 231
        super(HeterAsyncPsProgramBuilder, self).__init__(pass_ctx)
        if self.use_ps_gpu or self.ps_mode == DistributedMode.GEO or self.attrs[
                'is_heter_ps_mode'] == False:
            raise ValueError("ps mode: {} not matched {}",
W
wangguanqun 已提交
232
                             format(self.ps_mode, "HeterAsyncPsProgramBuilder"))
Z
ziyoujiyi 已提交
233 234 235 236 237 238 239

    def _build_trainer_programs(self):
        add_lr_decay_table_pass = new_pass("add_lr_decay_table_pass",
                                           self.attrs)
        add_lr_decay_table_pass.apply([], [], self.pass_ctx)

        distributed_ops_pass = new_pass("distributed_ops_pass", self.attrs)
240
        distributed_ops_pass.apply([self.cloned_main], [None], self.pass_ctx)
Z
ziyoujiyi 已提交
241 242

        delete_optimizer_pass = new_pass("delete_optimizer_pass", self.attrs)
243
        delete_optimizer_pass.apply([self.cloned_main], [None], self.pass_ctx)
Z
ziyoujiyi 已提交
244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263

        append_send_ops_pass = new_pass("append_send_ops_pass", self.attrs)
        append_send_ops_pass.apply([self.cloned_main], [None], self.pass_ctx)

        delete_extra_optimizer_pass = new_pass("delete_extra_optimizer_pass",
                                               self.attrs)
        delete_extra_optimizer_pass.apply([self.attrs['origin_main_program']],
                                          [self.cloned_startup], self.pass_ctx)

        fake_init_ops_pass = new_pass("fake_init_ops_pass", self.attrs)
        fake_init_ops_pass.apply([None], [self.cloned_startup], self.pass_ctx)

        if self.is_heter_worker:
            split_heter_worker_ops_pass = new_pass(
                "split_heter_worker_ops_pass", self.attrs)
            split_heter_worker_ops_pass.apply([self.cloned_main], [None],
                                              self.pass_ctx)
        else:
            split_trainer_ops_pass = new_pass("split_trainer_ops_pass",
                                              self.attrs)
264 265
            split_trainer_ops_pass.apply([self.cloned_main], [None],
                                         self.pass_ctx)
Z
ziyoujiyi 已提交
266 267 268 269

        set_heter_pipeline_opt_pass = new_pass('set_heter_pipeline_opt_pass',
                                               self.attrs)
        set_heter_pipeline_opt_pass.apply([self.cloned_main],
270
                                          [self.cloned_startup], self.pass_ctx)
Z
ziyoujiyi 已提交
271 272

        if self.launch_barrier and self.launch_barrier_flag:
273
            wait_server_ready(self.server_endpoints)
Z
ziyoujiyi 已提交
274 275 276 277 278 279 280 281 282

        return

    def _build_programs(self):
        if self.attrs['is_worker'] or self.attrs['is_heter_worker']:
            self._build_trainer_programs()
            ps_set_heter_pipeline_opt_pass = new_pass(
                "set_heter_pipeline_opt_pass", self.attrs)
            ps_set_heter_pipeline_opt_pass.apply(
283
                [self.cloned_main], [self.cloned_startup], self.pass_ctx)
Z
ziyoujiyi 已提交
284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303

        elif self.attrs['is_server']:
            self._build_pserver_programs()
            self.loss.block.program = self.attrs['_main_server']
            fluid.framework.switch_startup_program(self.attrs[
                '_startup_server'])


class FlPsProgramBuilder(PsProgramBuilder):
    def __init__(self, pass_ctx):
        super(FlPsProgramBuilder, self).__init__(pass_ctx)

    def _build_trainer_programs(self):
        pass

    def _build_pserver_programs(self):
        pass

    def _build_programs(self):
        pass