ps_program_builder.py 18.0 KB
Newer Older
Z
ziyoujiyi 已提交
1
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
2
#
Z
ziyoujiyi 已提交
3 4 5
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
6
#
Z
ziyoujiyi 已提交
7
#     http://www.apache.org/licenses/LICENSE-2.0
8
#
Z
ziyoujiyi 已提交
9 10 11 12 13 14
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

15
from .public import *  # noqa: F403
16 17 18
from paddle.distributed.fleet.base.private_helper_function import (
    wait_server_ready,
)
19
from paddle.distributed.passes import new_pass
Z
ziyoujiyi 已提交
20 21


22
class PsProgramBuilder:
Z
ziyoujiyi 已提交
23 24 25 26
    def __init__(self, pass_ctx):
        self.pass_ctx = pass_ctx
        self.attrs = self.pass_ctx._attrs
        self.loss = self.attrs['loss']
27 28 29
        self.origin_startup_program = self.attrs['origin_startup_program']
        self.main_program = self.attrs['origin_main_programs']

Z
ziyoujiyi 已提交
30 31 32 33 34 35 36
        self.cloned_main = self.attrs['cloned_main']
        self.cloned_startup = self.attrs['cloned_startup']

        self.use_ps_gpu = self.attrs['use_ps_gpu']
        self.use_heter_ps = self.attrs['is_heter_ps_mode']
        self.is_worker = self.attrs['is_worker']
        self.is_heter_worker = self.attrs['is_heter_worker']
37
        self.is_server = self.attrs['is_server']
Z
ziyoujiyi 已提交
38 39 40 41
        self.ps_mode = self.attrs['ps_mode']

        self.launch_barrier = self.attrs['launch_barrier']
        self.launch_barrier_flag = self.attrs['launch_barrier_flag']
42 43 44
        self.server_endpoints = self.attrs[
            'role_maker'
        ]._get_pserver_endpoints()
Z
ziyoujiyi 已提交
45

46 47 48
    def _build_trainer_desc(self):
        opt_info = self.loss.block.program._fleet_opt
        opt_info = {} if opt_info is None else opt_info
49 50
        opt_info["trainer"] = opt_info.get("trainer", "MultiTrainer")
        opt_info["device_worker"] = opt_info.get("device_worker", "Hogwild")
51 52
        self.cloned_main._fleet_opt = opt_info

Z
ziyoujiyi 已提交
53 54 55 56
    def _optimize_programs(self):
        pass

    def _build_trainer_programs(self):
Z
ziyoujiyi 已提交
57
        raise NotImplementedError
Z
ziyoujiyi 已提交
58 59 60 61 62 63

    def _build_pserver_programs(self):
        is_sgd_adam = False
        ops = get_optimize_ops(self.attrs['origin_main_program'])
        if len(ops) == 0:
            return
64 65 66
        add_lr_decay_table_pass = new_pass(
            'add_lr_decay_table_pass', self.attrs
        )
Z
ziyoujiyi 已提交
67 68 69 70 71 72 73 74 75 76 77 78
        add_lr_decay_table_pass.apply([], [], self.pass_ctx)
        for op in ops:
            if op.type in ["sgd", "adam"]:
                is_sgd_adam = True
                break
        if is_sgd_adam:
            return

    def _build_programs(self):
        if self.attrs['is_worker']:
            self._build_trainer_programs()
            fluid.framework.switch_startup_program(self.cloned_startup)
79 80 81 82 83
            print(
                "fluid.default_startup_program: {}".format(
                    fluid.default_startup_program
                )
            )
84 85
            # print("ps_program_build before =", id(self.loss.block.program))
            self._build_trainer_desc()
Z
ziyoujiyi 已提交
86
            self.loss.block.program = self.cloned_main
87 88 89 90 91 92
            # print("ps_program_build after =", id(self.loss.block.program))
            # print("ps_program_build clone after =", id(self.cloned_main))
            # print("ps_program_build after trainer_desc",
            #       id(self.loss.block.program))
            # print("ps_program build trainer desc",
            #       self.loss.block.program._fleet_opt)
Z
ziyoujiyi 已提交
93 94 95 96

        elif self.attrs['is_server']:
            self._build_pserver_programs()
            self.loss.block.program = self.attrs['_main_server']
97
            fluid.framework.switch_startup_program(
98 99
                self.attrs['_startup_server']
            )
Z
ziyoujiyi 已提交
100 101 102 103


class GeoPsProgramBuilder(PsProgramBuilder):  # 仅 CPU 模式
    def __init__(self, pass_ctx):
104
        super().__init__(pass_ctx)
Z
ziyoujiyi 已提交
105
        if self.ps_mode != DistributedMode.GEO:
106 107 108 109
            raise ValueError(
                "ps mode: {} not matched {}",
                format(self.ps_mode, "GeoPsProgramBuilder"),
            )
Z
ziyoujiyi 已提交
110 111 112 113 114 115 116 117

    def _build_trainer_programs(self):
        append_send_ops_pass = new_pass("append_send_ops_pass", self.attrs)
        append_send_ops_pass.apply([self.cloned_main], [None], self.pass_ctx)

        self.attrs['origin_main_program'] = self.cloned_main

        if self.launch_barrier and self.launch_barrier_flag:
118
            wait_server_ready(self.server_endpoints)
Z
ziyoujiyi 已提交
119

120
    def _build_pserver_programs(self):
121 122 123 124 125 126
        add_listen_and_serv_pass = new_pass(
            'add_listen_and_serv_pass', self.attrs
        )
        add_listen_and_serv_pass.apply(
            [self.attrs['_main_server']], [None], self.pass_ctx
        )
127 128
        return

Z
ziyoujiyi 已提交
129

130 131
class NuPsProgramBuilder(PsProgramBuilder):
    def __init__(self, pass_ctx):
132
        super().__init__(pass_ctx)
133 134 135 136
        if not self.attrs['local_sparse']:
            raise ValueError("No local sparse params")

    def _build_trainer_programs(self):
137 138 139
        add_lr_decay_table_pass = new_pass(
            "add_lr_decay_table_pass", self.attrs
        )
140 141 142 143 144 145 146 147
        add_lr_decay_table_pass.apply([], [], self.pass_ctx)

        distributed_ops_pass = new_pass("distributed_ops_pass", self.attrs)
        distributed_ops_pass.apply([self.cloned_main], [None], self.pass_ctx)

        delete_optimizer_pass = new_pass("delete_optimizer_pass", self.attrs)
        delete_optimizer_pass.apply([self.cloned_main], [None], self.pass_ctx)

148 149 150
        append_send_ops_pass = new_pass(
            "append_send_ops_pass", self.attrs
        )  # fleet->PushDenseVarsAsync
151 152
        append_send_ops_pass.apply([self.cloned_main], [None], self.pass_ctx)

153 154 155 156 157 158 159 160
        delete_extra_optimizer_pass = new_pass(
            "delete_extra_optimizer_pass", self.attrs
        )
        delete_extra_optimizer_pass.apply(
            [self.attrs['origin_main_program']],
            [self.cloned_startup],
            self.pass_ctx,
        )
161 162 163 164

        fake_init_ops_pass = new_pass("fake_init_ops_pass", self.attrs)
        fake_init_ops_pass.apply([None], [self.cloned_startup], self.pass_ctx)

165 166 167
        append_send_ops_pass = new_pass(
            "append_send_ops_pass", self.attrs
        )  # communicator->Send
168 169 170 171 172 173 174 175 176 177 178
        append_send_ops_pass.apply([self.cloned_main], [None], self.pass_ctx)

        self.attrs['origin_main_program'] = self.cloned_main
        self.attrs['origin_startup_program'] = self.cloned_startup

        if self.launch_barrier and self.launch_barrier_flag:
            wait_server_ready(self.server_endpoints)

        return


Z
ziyoujiyi 已提交
179 180
class CpuSyncPsProgramBuilder(PsProgramBuilder):
    def __init__(self, pass_ctx):
181
        super().__init__(pass_ctx)
182 183 184 185 186 187 188 189
        if (
            self.ps_mode != DistributedMode.SYNC
            and self.ps_mode != DistributedMode.ASYNC
        ):
            raise ValueError(
                "ps mode: {} not matched {}",
                format(self.ps_mode, "PsProgramBuilder"),
            )
Z
ziyoujiyi 已提交
190 191

    def _build_trainer_programs(self):
192 193
        # print("build trainer program entry")
        # print("before ps program builder program:", self.cloned_main)
194 195 196
        add_lr_decay_table_pass = new_pass(
            "add_lr_decay_table_pass", self.attrs
        )
Z
ziyoujiyi 已提交
197 198
        add_lr_decay_table_pass.apply([], [], self.pass_ctx)

199
        # print("before distributed op pass")
Z
ziyoujiyi 已提交
200 201 202 203 204 205 206 207 208
        distributed_ops_pass = new_pass("distributed_ops_pass", self.attrs)
        distributed_ops_pass.apply([self.cloned_main], [None], self.pass_ctx)

        delete_optimizer_pass = new_pass("delete_optimizer_pass", self.attrs)
        delete_optimizer_pass.apply([self.cloned_main], [None], self.pass_ctx)

        append_send_ops_pass = new_pass("append_send_ops_pass", self.attrs)
        append_send_ops_pass.apply([self.cloned_main], [None], self.pass_ctx)

209 210 211 212 213 214 215 216
        delete_extra_optimizer_pass = new_pass(
            "delete_extra_optimizer_pass", self.attrs
        )
        delete_extra_optimizer_pass.apply(
            [self.attrs['origin_main_program']],
            [self.cloned_startup],
            self.pass_ctx,
        )
Z
ziyoujiyi 已提交
217 218 219 220 221 222

        fake_init_ops_pass = new_pass("fake_init_ops_pass", self.attrs)
        fake_init_ops_pass.apply([None], [self.cloned_startup], self.pass_ctx)

        self.attrs['origin_main_program'] = self.cloned_main
        self.attrs['origin_startup_program'] = self.cloned_startup
223
        # print("after ps program builder program:", self.cloned_main)
Z
ziyoujiyi 已提交
224 225

        if self.launch_barrier and self.launch_barrier_flag:
226
            wait_server_ready(self.server_endpoints)
Z
ziyoujiyi 已提交
227 228 229 230 231 232

        return


class CpuAsyncPsProgramBuilder(CpuSyncPsProgramBuilder):
    def __init__(self, pass_ctx):
233
        super().__init__(pass_ctx)
Z
ziyoujiyi 已提交
234

235 236 237 238
    def _build_trainer_desc(self):
        opt_info = self.loss.block.program._fleet_opt
        opt_info = {} if opt_info is None else opt_info
        opt_info["trainer"] = opt_info.get("trainer", "DistMultiTrainer")
239 240 241
        opt_info["device_worker"] = opt_info.get(
            "device_worker", "DownpourLite"
        )
242 243 244 245 246 247
        pid = str(id(self.cloned_main))
        program_configs = {
            pid: {
                'pull_dense': [],
                'push_dense': [],
                'pull_sparse': [],
248
                'push_sparse': [],
249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266
            }
        }
        dense_table_config = {}
        send_ctx = get_the_one_send_context(self.attrs)
        recv_ctx = get_the_one_recv_context(self.attrs)
        for name, ctx in send_ctx.items():
            if ctx.program_id() != id(self.loss.block.program):
                continue
            if ctx.is_sparse():
                continue
            if not ctx.is_tensor_table():
                program_configs[pid]['pull_dense'].append(ctx.table_id())
                program_configs[pid]['push_dense'].append(ctx.table_id())
            dense_table_config[ctx.table_id()] = recv_ctx[ctx.table_id()]
        opt_info['program_configs'] = program_configs
        opt_info['dense_table_config'] = dense_table_config
        self.cloned_main._fleet_opt = opt_info

Z
ziyoujiyi 已提交
267

Z
ziyoujiyi 已提交
268
class GpuPsProgramBuilder(PsProgramBuilder):
Z
ziyoujiyi 已提交
269
    def __init__(self, pass_ctx):
270
        super().__init__(pass_ctx)
Z
ziyoujiyi 已提交
271 272

    def _build_trainer_programs(self):
Z
ziyoujiyi 已提交
273

274 275 276
        add_lr_decay_table_pass = new_pass(
            "add_lr_decay_table_pass", self.attrs
        )
Z
ziyoujiyi 已提交
277 278 279 280 281 282 283 284 285 286 287 288
        add_lr_decay_table_pass.apply([], [], self.pass_ctx)

        distributed_ops_pass = new_pass("distributed_ops_pass", self.attrs)
        distributed_ops_pass.apply([self.cloned_main], [None], self.pass_ctx)

        fake_init_ops_pass = new_pass("fake_init_ops_pass", self.attrs)
        fake_init_ops_pass.apply([None], [self.cloned_startup], self.pass_ctx)

        ps_gpu_pass = new_pass("ps_gpu_pass", self.attrs)
        ps_gpu_pass.apply([self.cloned_main], [None], self.pass_ctx)

        ps_transpile_pass = new_pass("ps_transpile_pass", self.attrs)
289 290 291
        ps_transpile_pass.apply(
            [self.cloned_main], [self.cloned_startup], self.pass_ctx
        )
Z
ziyoujiyi 已提交
292 293 294 295 296

        self.attrs['origin_main_program'] = self.cloned_main
        self.attrs['origin_startup_program'] = self.cloned_startup

        if self.launch_barrier and self.launch_barrier_flag:
297
            wait_server_ready(self.server_endpoints)
Z
ziyoujiyi 已提交
298 299 300 301 302 303

        return


class HeterAsyncPsProgramBuilder(PsProgramBuilder):
    def __init__(self, pass_ctx):
304
        super().__init__(pass_ctx)
Z
ziyoujiyi 已提交
305 306

    def _build_trainer_programs(self):
307 308 309
        add_lr_decay_table_pass = new_pass(
            "add_lr_decay_table_pass", self.attrs
        )
Z
ziyoujiyi 已提交
310 311 312
        add_lr_decay_table_pass.apply([], [], self.pass_ctx)

        distributed_ops_pass = new_pass("distributed_ops_pass", self.attrs)
313
        distributed_ops_pass.apply([self.cloned_main], [None], self.pass_ctx)
Z
ziyoujiyi 已提交
314 315

        delete_optimizer_pass = new_pass("delete_optimizer_pass", self.attrs)
316
        delete_optimizer_pass.apply([self.cloned_main], [None], self.pass_ctx)
Z
ziyoujiyi 已提交
317 318 319 320

        append_send_ops_pass = new_pass("append_send_ops_pass", self.attrs)
        append_send_ops_pass.apply([self.cloned_main], [None], self.pass_ctx)

321 322 323 324 325 326 327 328
        delete_extra_optimizer_pass = new_pass(
            "delete_extra_optimizer_pass", self.attrs
        )
        delete_extra_optimizer_pass.apply(
            [self.attrs['origin_main_program']],
            [self.cloned_startup],
            self.pass_ctx,
        )
Z
ziyoujiyi 已提交
329 330 331 332 333 334

        fake_init_ops_pass = new_pass("fake_init_ops_pass", self.attrs)
        fake_init_ops_pass.apply([None], [self.cloned_startup], self.pass_ctx)

        if self.is_heter_worker:
            split_heter_worker_ops_pass = new_pass(
335 336 337 338 339
                "split_heter_worker_ops_pass", self.attrs
            )
            split_heter_worker_ops_pass.apply(
                [self.cloned_main], [None], self.pass_ctx
            )
Z
ziyoujiyi 已提交
340
        else:
341 342 343 344 345 346 347 348 349 350 351 352 353
            split_trainer_ops_pass = new_pass(
                "split_trainer_ops_pass", self.attrs
            )
            split_trainer_ops_pass.apply(
                [self.cloned_main], [None], self.pass_ctx
            )

        set_heter_pipeline_opt_pass = new_pass(
            'set_heter_pipeline_opt_pass', self.attrs
        )
        set_heter_pipeline_opt_pass.apply(
            [self.cloned_main], [self.cloned_startup], self.pass_ctx
        )
Z
ziyoujiyi 已提交
354 355

        if self.launch_barrier and self.launch_barrier_flag:
356
            wait_server_ready(self.server_endpoints)
Z
ziyoujiyi 已提交
357 358 359 360 361 362 363

        return

    def _build_programs(self):
        if self.attrs['is_worker'] or self.attrs['is_heter_worker']:
            self._build_trainer_programs()
            ps_set_heter_pipeline_opt_pass = new_pass(
364 365 366 367 368
                "set_heter_pipeline_opt_pass", self.attrs
            )
            ps_set_heter_pipeline_opt_pass.apply(
                [self.cloned_main], [self.cloned_startup], self.pass_ctx
            )
Z
ziyoujiyi 已提交
369 370 371 372

        elif self.attrs['is_server']:
            self._build_pserver_programs()
            self.loss.block.program = self.attrs['_main_server']
373
            fluid.framework.switch_startup_program(
374 375
                self.attrs['_startup_server']
            )
Z
ziyoujiyi 已提交
376 377


378
class FlPsProgramBuilder(HeterAsyncPsProgramBuilder):
Z
ziyoujiyi 已提交
379
    def __init__(self, pass_ctx):
380
        super().__init__(pass_ctx)
Z
ziyoujiyi 已提交
381 382

    def _build_trainer_programs(self):
383
        _main_file = ps_log_root_dir + '0_fl_worker_main_program.prototxt'
384
        # debug_program(_main_file, self.cloned_main)
385 386 387 388 389

        distributed_ops_pass = new_pass("distributed_ops_pass", self.attrs)
        distributed_ops_pass.apply([self.cloned_main], [None], self.pass_ctx)

        _main_file = ps_log_root_dir + '1_fl_worker_main_program.prototxt'
390
        # debug_program(_main_file, self.cloned_main)
391 392 393 394 395

        delete_optimizer_pass = new_pass("delete_optimizer_pass", self.attrs)
        delete_optimizer_pass.apply([self.cloned_main], [None], self.pass_ctx)

        _main_file = ps_log_root_dir + '2_fl_worker_main_program.prototxt'
396
        # debug_program(_main_file, self.cloned_main)
397 398 399 400 401

        append_send_ops_pass = new_pass("append_send_ops_pass", self.attrs)
        append_send_ops_pass.apply([self.cloned_main], [None], self.pass_ctx)

        _main_file = ps_log_root_dir + '3_fl_worker_main_program.prototxt'
402
        # debug_program(_main_file, self.cloned_main)
403

404 405 406 407 408 409 410 411
        delete_extra_optimizer_pass = new_pass(
            "delete_extra_optimizer_pass", self.attrs
        )
        delete_extra_optimizer_pass.apply(
            [self.attrs['origin_main_program']],
            [self.cloned_startup],
            self.pass_ctx,
        )
412 413

        _main_file = ps_log_root_dir + '4_fl_worker_main_program.prototxt'
414
        # debug_program(_main_file, self.cloned_main)
415

416 417
        # fake_init_ops_pass = new_pass("fake_init_ops_pass", self.attrs)
        # fake_init_ops_pass.apply([None], [self.cloned_startup], self.pass_ctx)
418 419

        _main_file = ps_log_root_dir + '5_fl_worker_main_program.prototxt'
420
        # debug_program(_main_file, self.cloned_main)
421 422 423 424 425 426 427 428 429 430 431 432 433 434 435

        split_trainer_ops_pass = new_pass("split_fl_ops_pass", self.attrs)
        split_trainer_ops_pass.apply([self.cloned_main], [None], self.pass_ctx)

        if not self.is_heter_worker:
            self.part_a_program = self.pass_ctx._attrs['part_a_main_program']
            self.cloned_main = self.part_a_program
            _main_file = ps_log_root_dir + '8_fl_A_main_program.prototxt'
            debug_program(_main_file, self.cloned_main)
        else:
            self.part_b_program = self.pass_ctx._attrs['part_b_main_program']
            self.cloned_main = self.part_b_program
            _main_file = ps_log_root_dir + '8_fl_B_main_program.prototxt'
            debug_program(_main_file, self.cloned_main)

436 437 438 439 440 441
        set_heter_pipeline_opt_pass = new_pass(
            'set_heter_pipeline_opt_pass', self.attrs
        )
        set_heter_pipeline_opt_pass.apply(
            [self.cloned_main], [self.cloned_startup], self.pass_ctx
        )
442 443 444 445 446 447

        self.attrs['origin_startup_program'] = self.cloned_startup
        self.attrs['origin_main_program'] = self.cloned_main

        if not self.is_heter_worker:
            _main_file = ps_log_root_dir + 'final_fl_A_main_program.prototxt'
448
            debug_program(
449 450 451 452 453
                _main_file,
                self.attrs['origin_main_program']._heter_pipeline_opt[
                    'section_program'
                ],
            )
454 455
        else:
            _main_file = ps_log_root_dir + 'final_fl_B_main_program.prototxt'
456
            debug_program(
457 458 459 460 461
                _main_file,
                self.attrs['origin_main_program']._heter_pipeline_opt[
                    'section_program'
                ],
            )
462 463

        return
Z
ziyoujiyi 已提交
464 465

    def _build_pserver_programs(self):
466
        self.loss.block.program = self.attrs['_main_server']
Z
ziyoujiyi 已提交
467 468

    def _build_programs(self):
469 470 471 472
        if not self.is_server:
            self._build_trainer_programs()
            fluid.framework.switch_startup_program(self.cloned_startup)
            fluid.framework.switch_main_program(self.cloned_main)
473 474 475 476 477
            print(
                "fluid.default_startup_program: {}".format(
                    fluid.default_startup_program()._heter_pipeline_opt
                )
            )
478 479
        else:
            self._build_pserver_programs()
480
            fluid.framework.switch_startup_program(
481 482
                self.attrs['_startup_server']
            )
483
            fluid.framework.switch_main_program(self.attrs['_main_server'])