diff --git a/python/paddle/distributed/fleet/meta_optimizers/ps_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/ps_optimizer.py index 59a92930d22a91f752421d1bf0f64e1f38f12e02..bc50bef010941a48c367046221d17c75138753c2 100755 --- a/python/paddle/distributed/fleet/meta_optimizers/ps_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/ps_optimizer.py @@ -54,14 +54,15 @@ class ParameterServerOptimizer(MetaOptimizerBase): attrs['user_defined_strategy'] = self.user_defined_strategy attrs['trainer'] = TrainerRuntimeConfig(self.user_defined_strategy) attrs['ps_mode'] = attrs['trainer'].mode - + logger.info("ps_mode: {}".format(attrs['ps_mode'])) attrs['role_maker'] = self.role_maker attrs[ 'is_heter_ps_mode'] = self.role_maker._is_heter_parameter_server_mode attrs['is_worker'] = self.role_maker._is_worker() attrs['is_server'] = self.role_maker._is_server() attrs['is_heter_worker'] = self.role_maker._is_heter_worker() - + logger.info("this process is heter? {}".format(attrs[ + 'is_heter_worker'])) attrs['use_ps_gpu'] = self.user_defined_strategy.a_sync_configs[ "use_ps_gpu"] attrs['lr_decay_steps'] = self.user_defined_strategy.a_sync_configs[ diff --git a/python/paddle/distributed/passes/ps_trainer_pass.py b/python/paddle/distributed/passes/ps_trainer_pass.py index fff10a2d4684afe51295cc460f8dc3424d13c4f5..3f39db69abdb2930ec40ffb02cb34dce7be6a034 100755 --- a/python/paddle/distributed/passes/ps_trainer_pass.py +++ b/python/paddle/distributed/passes/ps_trainer_pass.py @@ -47,7 +47,7 @@ class AppendSendOpsPass(PassBase): # 该 pass 被多种模式复用 if ps_mode in [DistributedMode.SYNC, DistributedMode.HALF_ASYNC]: dummy_output = program.global_block().create_var( name=framework.generate_control_dev_var_name()) - + logger.info("dummy_output: {}".format(dummy_output)) program.global_block().append_op( type="send", inputs={"X": send_input_vars}, @@ -61,7 +61,7 @@ class AppendSendOpsPass(PassBase): # 该 pass 被多种模式复用 return dummy_output - def _append_barrier_op(self, program, dummys): + def _append_barrier_op(self, program, dummys, trainer_id): program.global_block().append_op( type="send_barrier", inputs={"X": dummys}, @@ -79,19 +79,24 @@ class AppendSendOpsPass(PassBase): # 该 pass 被多种模式复用 send_ctx = get_geo_trainer_send_context(attrs) # geo 模式 else: send_ctx = get_the_one_send_context(attrs) # async、sync 等各种模式 + logger.info("send_ctx: {}".format(send_ctx)) dummys = [] for merged_name, send in send_ctx.items(): if send.is_sparse() and ps_mode != DistributedMode.GEO: continue + logger.info('merged_name, send: {}, {}'.format(merged_name, send)) is_sparse = 1 if send.is_sparse() else 0 is_sparse = 2 if send.is_distributed() else is_sparse dummys.append( self._append_send_op(main_program, send.origin_varnames(), merged_name, is_sparse, send.table_id(), ps_mode)) - + logger.info('ps trainer pass - ps mode: {}'.format(ps_mode)) + logger.info('dummys: {}'.format(dummys)) if ps_mode in [DistributedMode.SYNC, DistributedMode.HALF_ASYNC]: - self._append_barrier_op(main_program, dummys) + logger.info('insert send_barrier_op') + trainer_id = get_role_id(attrs['role_maker']) + self._append_barrier_op(main_program, dummys, trainer_id) @register_pass("distributed_ops_pass") diff --git a/python/paddle/distributed/ps/utils/ps_program_builder.py b/python/paddle/distributed/ps/utils/ps_program_builder.py index d649a74e4d621bbc531ce194242fbbd07b01209a..c6afd0cb03bf3f8d164082d9cbadf8dd7c08254f 100755 --- a/python/paddle/distributed/ps/utils/ps_program_builder.py +++ b/python/paddle/distributed/ps/utils/ps_program_builder.py @@ -97,7 +97,7 @@ class CpuSyncPsProgramBuilder(PsProgramBuilder): def __init__(self, pass_ctx): logger.info("start building cpu-sync-ps program") super(CpuSyncPsProgramBuilder, self).__init__(pass_ctx) - if self.ps_mode == DistributedMode.GEO: + if self.ps_mode != DistributedMode.SYNC: raise ValueError("ps mode: {} not matched {}", format(ps_mode, "CpuSyncPsProgramBuilder")) diff --git a/python/paddle/distributed/ps/utils/public.py b/python/paddle/distributed/ps/utils/public.py index a8587874776bb5f5586dd23ed32c1ee810ad97c0..7743db1057dd66e7467efee0cc0253c083ff335c 100755 --- a/python/paddle/distributed/ps/utils/public.py +++ b/python/paddle/distributed/ps/utils/public.py @@ -83,8 +83,10 @@ class DistributedMode: class TrainerRuntimeConfig(object): def __init__(self, valid_strategy): - + self.mode = None k_steps = valid_strategy.a_sync_configs["k_steps"] + logger.info("ps mode in strategy: {}, {}".format( + valid_strategy.a_sync, valid_strategy.a_sync_configs["k_steps"])) if not valid_strategy.a_sync and k_steps == 0: self.mode = DistributedMode.SYNC @@ -94,7 +96,6 @@ class TrainerRuntimeConfig(object): if valid_strategy.a_sync and k_steps > 0: self.mode = DistributedMode.GEO - self.mode = None num_threads = os.getenv("CPU_NUM", "1") self.runtime_configs = {} @@ -161,6 +162,13 @@ def get_dist_env(): } +def get_role_id(role_maker): + try: + return role_maker._role_id() + except Exception: + return role_maker.role_id() + + def get_ps_endpoint(role_maker): try: return role_maker._get_pserver_endpoints()[get_role_id(role_maker)] @@ -184,7 +192,7 @@ def get_trainer_endpoint(role_maker): def get_previous_stage_trainers(role_maker): try: - return role_maker_get_previous_trainers() + return role_maker._get_previous_trainers() except Exception: return role_maker.get_previous_trainers() @@ -229,18 +237,11 @@ def get_sparse_tablenames(program, is_distributed): return list(tablenames) -def get_role_id(role_maker): - try: - return role_maker._role_id() - except Exception: - return role_maker.role_id() - - def get_ps_endpoints(role_maker): try: - return role_maker._get_pserver_endpoints()[get_role_id(role_maker)] + return role_maker._get_pserver_endpoints() except Exception: - return role_maker.get_pserver_endpoints()[get_role_id(role_maker)] + return role_maker.get_pserver_endpoints() def get_trainers(role_maker): @@ -296,8 +297,35 @@ def get_geo_trainer_send_context(context): if context['ps_mode'] != DistributedMode.GEO: raise ValueError("ps mode: {} not matched {}", format(ps_mode, "get_geo_trainer_send_context")) - send_ctx = {} + trainer_id = get_role_id(context['role_maker']) + idx = 0 + + distibuted_varnames = get_sparse_tablenames(context['origin_main_program'], + True) + for merged in context['merged_sparse_pairs']: + param, grad = merged + grad_name = grad.merged_var.name + param_name = param.merged_var.name + is_distributed = True if param_name in distibuted_varnames else False + + var = context['origin_main_program'].global_block().vars[ + grad.merged_var.name] + var_numel = reduce(lambda x, y: x * y, var.shape[1:]) + + sparse_ctx = CommContext(grad_name, [grad_name], ["127.0.0.1:6071"], + [var_numel], [grad_name], trainer_id, True, + True, is_distributed, idx, False) + idx += 1 + send_ctx[sparse_ctx.var_name()] = sparse_ctx + + if len(send_ctx) == 0: + raise ValueError("GeoSGD require sparse parameters in your net.") + + if len(context['tensor_table']) > 0 and context['is_worker']: + name, ctx = _step_ctx(idx, context['role_maker']) + send_ctx[name] = ctx + return send_ctx @@ -1253,6 +1281,60 @@ def find_op_input_output(program, block, op): return input_var_list, output_var_list +def add_heter_send_op(program, heter_program, block, block_var_detail): + def _get_send_op_dict(): + send_op_dict = {} + send_op_list = find_send_op(program) + for op in send_op_list: + input_list, _ = find_op_input_output(program, + program.global_block(), op) + for var in input_list: + send_op_dict[var] = op + return send_op_dict + + send_grad_var_list = [] + send_op_dict = _get_send_op_dict() + table_dict = {} + for persistable_var in block_var_detail["backward"]["persistables"]: + if "@GRAD" not in persistable_var: + continue + if "GRAD" != persistable_var.split("@")[-1]: + continue + if persistable_var not in send_op_dict: + continue + send_op = send_op_dict[persistable_var] + is_sparse = send_op.attr('is_sparse') + table_id = send_op.attr('table_id') + send_varnames = send_op.attr('send_varnames') + send_grad_var_list.append(persistable_var) + if table_id not in table_dict: + table_dict[table_id] = {} + table_dict[table_id]['var_list'] = [] + table_dict[table_id]['is_sparse'] = is_sparse + table_dict[table_id]['send_varnames'] = send_varnames + table_dict[table_id]['var_list'].append(persistable_var) + + for table_id in table_dict: + dummy_output = block.create_var( + name=framework.generate_control_dev_var_name()) + send_input_vars = [ + block.vars[union_var] + for union_var in table_dict[table_id]['var_list'] + ] + block.append_op( + type="send", + inputs={"X": send_input_vars}, + outputs={"Out": dummy_output}, + attrs={ + "send_varnames": table_dict[table_id]['send_varnames'], + "is_sparse": is_sparse, + "table_id": table_id, + RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE + }) + + return send_grad_var_list + + def get_vars_name_in_block(block): vars_list = block.vars.keys() vars_name_list = [var_name for var_name in vars_list] @@ -1302,10 +1384,6 @@ def create_backward_block(program, origin_program, bp_ops_list, return heter_block -def debug_program(file, program, is_trainer): - if is_trainer: - with open(file, 'w+') as f: - f.write(str(program)) - else: - with open(file, 'w+') as f: - f.write(str(program)) +def debug_program(file, program): + with open(file, 'w+') as f: + f.write(str(program)) diff --git a/python/paddle/fluid/tests/unittests/distributed_passes/test_ps_trainer_pass.py b/python/paddle/fluid/tests/unittests/distributed_passes/test_ps_trainer_pass.py index f28e99fc00d97ae13689be208bd3b10727f053ef..b186869ee9747fdc2b5c51ecc5051ab6f93f3706 100755 --- a/python/paddle/fluid/tests/unittests/distributed_passes/test_ps_trainer_pass.py +++ b/python/paddle/fluid/tests/unittests/distributed_passes/test_ps_trainer_pass.py @@ -28,7 +28,7 @@ from paddle.fluid.tests.unittests.ps.ps_dnn_trainer import DnnTrainer class TestPsTrainerPass(PsPassTestBase): def init(self): self.config = {} - self.config['ps_mode_config'] = "../ps/cpu_async_ps_config.yaml" + self.config['ps_mode_config'] = "" self.config['worker_num'] = "1" self.config['server_num'] = "1" self.config['run_minimize'] = "0" @@ -47,23 +47,58 @@ class TestPsTrainerPass(PsPassTestBase): def check(self): pass - def test_ps_optimizer_minimize_cpu(self): + def test_ps_optimizer_minimize_cpu_async(self): + self.init() + self.config['ps_mode_config'] = "../ps/cpu_async_ps_config.yaml" + self.config['run_minimize'] = '1' + + self.config['debug_new_minimize'] = '0' + self.config['log_dir'] = "/async_cpu_log_old_minimize" + remove_path_if_exists(self.config['log_dir']) + self.ps_launch(self.config) + + self.config['debug_new_minimize'] = '1' + self.config['log_dir'] = "/async_cpu_log_new_minimize" + remove_path_if_exists(self.config['log_dir']) + self.ps_launch(self.config) + + self.check() + + def test_ps_optimizer_minimize_cpu_sync(self): + self.init() + self.config['ps_mode_config'] = "../ps/cpu_sync_ps_config.yaml" + self.config['run_minimize'] = '1' + + self.config['debug_new_minimize'] = '0' + self.config['log_dir'] = "/sync_cpu_log_old_minimize" + remove_path_if_exists(self.config['log_dir']) + self.ps_launch(self.config) + + self.config['debug_new_minimize'] = '1' + self.config['log_dir'] = "/sync_cpu_log_new_minimize" + remove_path_if_exists(self.config['log_dir']) + self.ps_launch(self.config) + + self.check() + + def test_ps_optimizer_minimize_cpu_geo(self): self.init() + self.config['ps_mode_config'] = "../ps/cpu_geo_ps_config.yaml" self.config['run_minimize'] = '1' self.config['debug_new_minimize'] = '0' - self.config['log_dir'] = "/cpu_log_old_minimize" + self.config['log_dir'] = "/geo_cpu_log_old_minimize" remove_path_if_exists(self.config['log_dir']) self.ps_launch(self.config) self.config['debug_new_minimize'] = '1' - self.config['log_dir'] = "/cpu_log_new_minimize" + self.config['log_dir'] = "/geo_cpu_log_new_minimize" remove_path_if_exists(self.config['log_dir']) self.ps_launch(self.config) self.check() - # heter ps 三阶段待测 + # heter ps 二阶段 def test_ps_optimizer_minimize_heter(self): self.init() self.config['worker_num'] = "2" diff --git a/python/paddle/fluid/tests/unittests/ps/cpu_async_ps_config.yaml b/python/paddle/fluid/tests/unittests/ps/cpu_async_ps_config.yaml index 669709ea5607e5def58204637202e839090f1197..93a13a67ce6b519fa3c1b544f05ff59072a9f77d 100755 --- a/python/paddle/fluid/tests/unittests/ps/cpu_async_ps_config.yaml +++ b/python/paddle/fluid/tests/unittests/ps/cpu_async_ps_config.yaml @@ -26,7 +26,6 @@ hyper_parameters: fc_sizes: [400, 400, 400] runner: - geo_step: 400 sync_mode: "async" # sync / async / geo / heter thread_num: 16 use_gpu: 0 diff --git a/python/paddle/fluid/tests/unittests/ps/cpu_geo_ps_config.yaml b/python/paddle/fluid/tests/unittests/ps/cpu_geo_ps_config.yaml new file mode 100644 index 0000000000000000000000000000000000000000..80125ae6c37faa469469f5f67bd9b8796fd079f2 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/ps/cpu_geo_ps_config.yaml @@ -0,0 +1,36 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# refer to PaddleRec/models/rank/dnn/benchmark.yaml + +hyper_parameters: + optimizer: + class: Adam + learning_rate: 0.0001 + adam_lazy_mode: True + sparse_inputs_slots: 27 + sparse_feature_number: 1000001 + sparse_feature_dim: 10 + dense_input_dim: 13 + fc_sizes: [400, 400, 400] + +runner: + geo_step: 400 + sync_mode: "geo" + thread_num: 16 + use_gpu: 0 + + model_path: "../ps_dnn_model.py" + + diff --git a/python/paddle/fluid/tests/unittests/ps/cpu_sync_ps_config.yaml b/python/paddle/fluid/tests/unittests/ps/cpu_sync_ps_config.yaml new file mode 100644 index 0000000000000000000000000000000000000000..95685a488cade1219290956dab8339ee641e001f --- /dev/null +++ b/python/paddle/fluid/tests/unittests/ps/cpu_sync_ps_config.yaml @@ -0,0 +1,35 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# refer to PaddleRec/models/rank/dnn/benchmark.yaml + +hyper_parameters: + optimizer: + class: Adam + learning_rate: 0.0001 + adam_lazy_mode: True + sparse_inputs_slots: 27 + sparse_feature_number: 1000001 + sparse_feature_dim: 10 + dense_input_dim: 13 + fc_sizes: [400, 400, 400] + +runner: + sync_mode: "sync" + thread_num: 16 + use_gpu: 0 + + model_path: "../ps_dnn_model.py" + + diff --git a/python/paddle/fluid/tests/unittests/ps/ps_dnn_trainer.py b/python/paddle/fluid/tests/unittests/ps/ps_dnn_trainer.py index 8f8ff65af544a1c4ddb4f1548603b418d3bf8bed..d08c1d41c89ec532f6c3124000f9bec38f9b86d7 100755 --- a/python/paddle/fluid/tests/unittests/ps/ps_dnn_trainer.py +++ b/python/paddle/fluid/tests/unittests/ps/ps_dnn_trainer.py @@ -329,9 +329,9 @@ class DnnTrainer(object): sync_mode = self.config.get("runner.sync_mode") inner_optimizer = paddle.optimizer.Adam(learning_rate, lazy_mode=True) + self.role_maker._generate_role() # 必要 if self.config['debug_new_minimize'] == 1: logger.info("entering run_minimize -- new") - self.role_maker._generate_role() # 必要 from paddle.distributed.fleet.meta_optimizers.ps_optimizer import ParameterServerOptimizer ps_optimizer = ParameterServerOptimizer(inner_optimizer) ps_optimizer._set_basic_info(loss, self.role_maker, inner_optimizer, @@ -346,11 +346,16 @@ class DnnTrainer(object): if fleet.is_server(): _main_file = '/' + sync_mode + '_run_minimize' + '_debug:_' + str( self.config['debug_new_minimize']) + '_server_main.prototxt' - debug_program(_main_file, loss.block.program, 0) + debug_program(_main_file, loss.block.program) elif fleet.is_worker(): _main_file = '/' + sync_mode + '_run_minimize' + '_debug:_' + str( self.config['debug_new_minimize']) + '_worker_main.prototxt' - debug_program(_main_file, loss.block.program, 1) + debug_program(_main_file, loss.block.program) + elif self.role_maker._is_heter_worker(): + _main_file = '/' + sync_mode + '_run_minimize' + '_debug:_' + str( + self.config[ + 'debug_new_minimize']) + '_heter_worker_main.prototxt' + debug_program(_main_file, loss.block.program) def run_single_pass(self): self.init_fleet_with_gloo() @@ -395,17 +400,18 @@ class DnnTrainer(object): _main_file = '/' + sync_mode + "_" + str(config[ "applied_pass_name"]) + '_debug:_' + str(self.config[ 'debug_new_pass']) + '_server_main.prototxt' - debug_program(_main_file, _main, 0) + debug_program(_main_file, _main) elif fleet.is_worker(): _main_file = '/' + sync_mode + "_" + str(config[ "applied_pass_name"]) + '_debug:_' + str(self.config[ 'debug_new_pass']) + '_worker_main.prototxt' - debug_program(_main_file, _main, 1) + debug_program(_main_file, _main) if __name__ == "__main__": paddle.enable_static() config = parse_args() + logger.info(">>>>>>>>>> python process started") os.environ["CPU_NUM"] = str(config.get("runner.thread_num")) benchmark_main = DnnTrainer(config) if config['run_single_pass'] == 1: