parameter_server_optimizer.py 16.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
#   Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and

from paddle import fluid
from .meta_optimizer_base import MetaOptimizerBase
16 17 18
from paddle.fluid import core
import subprocess
import re
19
import os
20
import platform
21
from ..base.private_helper_function import wait_server_ready
22

23 24
__all__ = []

25

26
class ParameterServerOptimizer(MetaOptimizerBase):
27
    def __init__(self, optimizer):
28
        super(ParameterServerOptimizer, self).__init__(optimizer)
29 30 31 32
        self.inner_opt = optimizer
        # we do not allow meta optimizer to be inner optimizer currently
        self.meta_optimizers_white_list = []

33 34 35 36 37 38
    def _set_basic_info(
        self, loss, role_maker, user_defined_optimizer, user_defined_strategy
    ):
        super(ParameterServerOptimizer, self)._set_basic_info(
            loss, role_maker, user_defined_optimizer, user_defined_strategy
        )
39

40
        # self.micro_batch_size = user_defined_strategy.pipeline_configs[
41 42
        #    'micro_batch_size']
        self.num_microbatches = user_defined_strategy.pipeline_configs[
43 44
            'accumulate_steps'
        ]
45

46 47 48 49 50 51
    def _is_graph_out(self):
        return False

    def _can_apply(self):
        if self.role_maker._is_collective:
            return False
52

53 54 55
        k_steps = self.user_defined_strategy.a_sync_configs["k_steps"]
        return True if k_steps >= 0 else False

T
Thunderbrook 已提交
56 57 58 59 60 61 62 63 64 65 66 67 68 69
    def get_dist_env(self):
        trainer_id = int(os.getenv('PADDLE_TRAINER_ID', '0'))
        trainer_endpoints = ''
        current_endpoint = ''
        num_trainers = 0
        if os.getenv('PADDLE_TRAINER_ENDPOINTS'):
            trainer_endpoints = os.getenv('PADDLE_TRAINER_ENDPOINTS')
            current_endpoint = trainer_endpoints.split(',')[trainer_id]
            num_trainers = len(trainer_endpoints.split(','))

        return {
            'trainer_id': trainer_id,
            'num_trainers': num_trainers,
            'current_endpoint': current_endpoint,
70
            'trainer_endpoints': trainer_endpoints,
T
Thunderbrook 已提交
71 72
        }

73
    def _get_distributed_strategy(self):
74 75 76
        from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import (
            StrategyFactory,
        )
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95

        k_steps = self.user_defined_strategy.a_sync_configs["k_steps"]
        strategy = None

        if not self.user_defined_strategy.a_sync and k_steps == 0:
            strategy = StrategyFactory.create_sync_strategy()

        if self.user_defined_strategy.a_sync and k_steps == 0:
            strategy = StrategyFactory.create_async_strategy()

        if self.user_defined_strategy.a_sync and k_steps > 0:
            strategy = StrategyFactory.create_geo_strategy(k_steps)

        if not strategy:
            raise ValueError("k_steps must be invalid value, please check")

        return strategy

    def _build_trainer_programs(self, compiled_config):
96 97 98
        from paddle.fluid.incubate.fleet.parameter_server.ir import (
            trainer_pass as worker,
        )
99 100 101 102

        _main = compiled_config.origin_main_program.clone()
        _startup = compiled_config.origin_startup_program.clone()

T
Thunderbrook 已提交
103 104
        use_ps_gpu = self.user_defined_strategy.a_sync_configs["use_ps_gpu"]

105
        if not compiled_config.is_geo_mode():
106 107 108 109
            from paddle.fluid.incubate.fleet.parameter_server.ir.public import (
                _add_lr_decay_table_pass,
            )

110
            _add_lr_decay_table_pass(
111 112 113 114
                _main,
                compiled_config,
                self.user_defined_strategy.a_sync_configs["lr_decay_steps"],
            )
115

116
            # for main program
117 118 119
            _main = worker.distributed_ops_pass(
                _main, compiled_config, use_ps_gpu
            )
T
Thunderbrook 已提交
120 121 122
            if not use_ps_gpu:
                _main = worker.delete_optimizer_pass(_main, compiled_config)
                _main = worker.append_send_ops_pass(_main, compiled_config)
123
                _startup = worker.delete_extra_optimizes_pass(
124 125
                    _startup, compiled_config
                )
T
Thunderbrook 已提交
126 127

                # for startup program
128
            _startup = worker.fake_init_ops_pass(_startup, compiled_config)
T
Thunderbrook 已提交
129 130
            if use_ps_gpu:
                _main = worker.ps_gpu_pass(_main)
131 132 133 134
                from paddle.fluid.transpiler.collective import (
                    SingleProcessMultiThread,
                )

T
Thunderbrook 已提交
135 136
                t = SingleProcessMultiThread()
                env = self.get_dist_env()
137 138 139 140 141 142 143 144
                t.transpile(
                    startup_program=_startup,
                    main_program=_main,
                    rank=env["trainer_id"],
                    endpoints=env["trainer_endpoints"],
                    current_endpoint=env['current_endpoint'],
                    wait_port=False,
                )
145

146 147
            compiled_config.set_origin_ps_main_program(_main)
            compiled_config.set_origin_ps_startup_program(_startup)
148 149
            # for heter program
            if self.role_maker._is_heter_parameter_server_mode:
150 151 152 153
                from paddle.fluid.incubate.fleet.parameter_server.ir import (
                    heter_trainer_pass as heter_worker,
                )

154 155
                if self.role_maker._is_heter_worker():
                    # for heter worker
156 157
                    stage_id = self.role_maker._get_stage_id()
                    device = self.role_maker._heter_device_type().lower()
158
                    _main = heter_worker.split_heter_worker_ops_pass(
159 160
                        _main, compiled_config, stage_id, device
                    )
161 162
                else:
                    # for default worker
163
                    _main = heter_worker.split_trainer_ops_pass(
164 165
                        _main, compiled_config
                    )
166 167 168
        else:
            _main = worker.append_send_ops_pass(_main, compiled_config)
            _startup = _startup
169 170
            compiled_config.set_origin_ps_main_program(_main)
            compiled_config.set_origin_ps_startup_program(_startup)
171

172
        launch_barrier = self.user_defined_strategy.a_sync_configs[
173 174
            "launch_barrier"
        ]
175 176 177 178 179 180
        launch_barrier_flag = int(os.getenv("FLAGS_LAUNCH_BARRIER", "1"))
        if launch_barrier and launch_barrier_flag:
            # for trainer wait server ready
            wait_server_ready(self.role_maker._get_pserver_endpoints())

            # for ps-heter mode, wait heter worker ready
T
tangwei12 已提交
181 182 183
            # if self.role_maker._is_heter_parameter_server_mode and self.role_maker._is_worker(
            # ):
            #     wait_server_ready(self.role_maker._get_heter_worker_endpoints())
184

185 186 187 188 189 190
        return _main, _startup

    def _build_pserver_programs(self, compiled_config):
        _main = fluid.Program()
        _startup = fluid.Program()

191 192 193
        from paddle.fluid.incubate.fleet.parameter_server.ir import (
            pserver_pass as server,
        )
T
tangwei12 已提交
194

195
        if not compiled_config.is_geo_mode():
T
tangwei12 已提交
196

197 198 199 200
            from paddle.fluid.incubate.fleet.parameter_server.ir.public import (
                _get_optimize_ops,
            )

T
tangwei12 已提交
201 202 203 204 205 206 207 208
            is_sgd_adam = False

            main_program = compiled_config.get_origin_main_program()
            ops = _get_optimize_ops(main_program)

            if len(ops) == 0:
                return _main, _startup

209 210 211 212
            from paddle.fluid.incubate.fleet.parameter_server.ir.public import (
                _add_lr_decay_table_pass,
            )

213
            lr_decay_steps = self.user_defined_strategy.a_sync_configs[
214 215 216 217 218
                "lr_decay_steps"
            ]
            _add_lr_decay_table_pass(
                main_program, compiled_config, lr_decay_steps
            )
219

T
tangwei12 已提交
220 221 222 223 224 225 226 227
            for op in ops:
                if op.type in ["sgd", "adam"]:
                    is_sgd_adam = True
                    break

            if is_sgd_adam:
                return _main, _startup

228 229 230
            _main = server.add_listen_and_serv_pass(_main, compiled_config)
            _main = server.add_rpc_global_flags_pass(_main, compiled_config)
            _main = server.add_optimizer_pass(_main, compiled_config)
231 232 233
            _main = server.large_scale_sparse_pass(
                _main, _main, compiled_config, False
            )
234
            _startup = server.build_pserver_startup_program_pass(
235 236 237 238 239
                _startup, _main, compiled_config
            )
            _startup = server.large_scale_sparse_pass(
                _startup, _main, compiled_config, True
            )
240 241

            if not compiled_config.is_sync_mode():
242
                _main = server.delete_unused_in_main_pass(
243 244
                    _main, compiled_config
                )
245

246
            _startup = server.delete_unused_in_startup_pass(
247 248
                _startup, _main, compiled_config
            )
249 250 251 252 253
        else:
            _main = server.add_listen_and_serv_pass(_main, compiled_config)
            _main = server.add_rpc_global_flags_pass(_main, compiled_config)
            _main = server.add_geo_optimizer_pass(_main, compiled_config)
            _startup = server.build_pserver_startup_program_pass(
254 255
                _startup, _main, compiled_config
            )
256
            _startup = server.delete_unused_in_startup_pass(
257 258
                _startup, _main, compiled_config
            )
259 260 261

        return _main, _startup

262
    def _can_apply_geo(self, dist_strategy, program):
263 264 265
        def get_sys_free_mem():
            plat = platform.system()
            if platform.system() == "Darwin":
266 267 268
                vm = subprocess.Popen(
                    ['vm_stat'], stdout=subprocess.PIPE
                ).communicate()[0]
269 270
                # Process vm_stat
                vmLines = vm.split('\n')
271
                sep = re.compile(r':[\s]+')
272 273 274 275
                vmStats = {}
                for row in range(1, len(vmLines) - 2):
                    rowText = vmLines[row].strip()
                    rowElements = sep.split(rowText)
276 277 278
                    vmStats[(rowElements[0])] = (
                        int(rowElements[1].strip(r'\.')) * 4096
                    )
279 280 281 282 283 284 285 286 287 288 289
                return vmStats["Pages free"]
            elif platform.system() == "Linux":
                mems = {}
                with open('/proc/meminfo', 'rb') as f:
                    for line in f:
                        fields = line.split()
                        mems[fields[0]] = int(fields[1]) * 1024
                free = mems[b'MemFree:']
                return free
            else:
                raise ValueError(
290 291 292
                    "%s platform is unsupported is parameter server optimizer"
                    % (platform.system())
                )
293 294

        if not isinstance(self.inner_opt, fluid.optimizer.SGDOptimizer):
295
            return False
296 297 298

        free = get_sys_free_mem()

299 300 301
        from paddle.fluid.incubate.fleet.parameter_server.ir import (
            vars_metatools,
        )
302

303
        processed_var_names = set(["@EMPTY@"])
304
        param_memory_size = 0
305 306
        for varname in program.global_block().vars:
            var = program.global_block().vars[varname]
307 308 309 310
            if (
                not var.persistable
                or var.desc.type() != core.VarDesc.VarType.LOD_TENSOR
            ):
311 312
                continue
            param = vars_metatools.create_var_struct(var)
313
            param_memory_size += param.m_size
314
            processed_var_names.add(varname)
315 316 317 318

        upper_mem_use = param_memory_size * 5.0

        program_tmp_vars = dict()
319
        eval_batch_size = 1024
320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335
        for op in program.global_block().ops:
            for var_name in op.output_arg_names:
                if var_name in processed_var_names:
                    continue
                processed_var_names.add(var_name)
                var = program.global_block().vars[var_name]

                if var.desc.type() != core.VarDesc.VarType.LOD_TENSOR:
                    continue

                data_count = 1
                neg_dim_count = 0
                for x in var.shape:
                    if x < 0:
                        if neg_dim_count >= 1:
                            raise ValueError(
336 337 338
                                "Var %s has more than one negative dim."
                                % (var_name)
                            )
339
                        neg_dim_count += 1
340
                        data_count *= -x
341 342
                    else:
                        data_count *= x
343
                program_tmp_vars[var_name] = (
344 345 346 347
                    data_count,
                    neg_dim_count,
                    vars_metatools.dtype_to_size[var.dtype],
                )
348 349 350 351

        for varname in program_tmp_vars:
            data_count, neg_dim_count, type_size = program_tmp_vars[varname]
            if neg_dim_count == 1:
352
                data_count *= eval_batch_size
353 354 355 356
            var_memory = data_count * type_size
            upper_mem_use += var_memory

        if upper_mem_use < free:
357
            return True
358
        else:
359
            return False
360

361 362 363 364 365 366
    def minimize_impl(
        self, loss, startup_program=None, parameter_list=None, no_grad_set=None
    ):
        self.inner_opt.minimize(
            loss, startup_program, parameter_list, no_grad_set
        )
367
        strategy = self._get_distributed_strategy()
368 369 370

        _origin_main_program = loss.block.program
        _origin_startup_program = startup_program
371 372 373 374 375 376 377 378 379 380
        from paddle.fluid.incubate.fleet.parameter_server.ir import (
            public as public,
        )

        compiled_config = public.CompileTimeStrategy(
            _origin_main_program,
            _origin_startup_program,
            strategy,
            self.role_maker,
        )
381
        compiled_config.strategy = strategy
382

383
        if self.role_maker._is_worker() or self.role_maker._is_heter_worker():
384
            main_program, startup_program = self._build_trainer_programs(
385 386
                compiled_config
            )
387 388 389
            if self.role_maker._is_heter_parameter_server_mode:
                _origin_startup_program._heter_pipeline_opt = {
                    "startup_program": startup_program,
390 391
                    "pipeline_stage": int(self.role_maker._get_stage_id()) - 1,
                    "heter_place": self.role_maker._heter_device(),
392 393 394 395 396
                }

                loss.block.program._heter_pipeline_opt = {
                    "trainer": "HeterPipelineTrainer",
                    "device_worker": "HeterSection",
397
                    "trainers": self.role_maker._get_stage_trainers(),  ## trainer num in each stage
398 399
                    "trainer_id": int(self.role_maker._role_id()),
                    "pipeline_stage": int(self.role_maker._get_stage_id()) - 1,
400 401 402
                    "num_pipeline_stages": int(
                        self.role_maker._get_num_stage()
                    ),
403 404
                    "section_program": main_program,
                    "num_microbatches": self.num_microbatches,
405
                    "heter_place": self.role_maker._heter_device(),
406 407 408 409 410
                }
            else:
                loss.block.program = main_program
                fluid.framework.switch_startup_program(startup_program)

411
        elif self.role_maker._is_server():
412
            main_program, startup_program = self._build_pserver_programs(
413 414
                compiled_config
            )
415 416
            loss.block.program = main_program
            fluid.framework.switch_startup_program(startup_program)
417 418 419
        return None, None

    def _disable_strategy(self, dist_strategy):
420
        # if self.role_maker._is_heter_parameter_server_mode:
421 422 423 424 425
        #    dist_strategy.pipeline = False
        #    dist_strategy.pipeline_configs = {
        #        "micro_batch_size": 1,
        #        "accumulate_steps": 1,
        #    }
426 427 428 429 430 431
        dist_strategy.a_sync = False
        a_sync_configs = dist_strategy.a_sync_configs
        a_sync_configs["k_steps"] = -1
        dist_strategy.a_sync_configs = a_sync_configs

    def _enable_strategy(self, dist_strategy, context):
432
        # if self.role_maker._is_heter_parameter_server_mode:
433 434 435 436 437
        #    dist_strategy.pipeline = True
        #    dist_strategy.pipeline_configs = {
        #        "micro_batch_size": 1,
        #        "accumulate_steps": 1,
        #    }
438 439 440
        a_sync_configs = dist_strategy.a_sync_configs
        if a_sync_configs["k_steps"] >= 0:
            return
441 442

        dist_strategy.a_sync = True
443 444
        a_sync_configs = dist_strategy.a_sync_configs

445 446 447
        is_geo = self._can_apply_geo(
            dist_strategy, context["origin_main_program"]
        )
448 449 450 451 452 453

        if is_geo:
            a_sync_configs["k_steps"] = 800
        else:
            a_sync_configs["k_steps"] = 0
        dist_strategy.a_sync_configs = a_sync_configs