From 1c4e3e5dd0d32a4216bdad0b1cafcab4ca5ed5bb Mon Sep 17 00:00:00 2001 From: ziyoujiyi <73728031+ziyoujiyi@users.noreply.github.com> Date: Wed, 2 Mar 2022 16:23:52 +0800 Subject: [PATCH] new fleet_desc builder (#39948) * delete gloo connect retry * the_one_ps dirs reconstruct * . * . * create the_one_ps dirs * create the_one_ps dirs * create the_one_ps dirs * create the_one_ps dirs * create the_one_ps dirs * create the_one_ps dirs * the one ps dirs modify * the one ps dirs modify * the one ps dirs modify * the one ps dirs modify * refactor ps optimize * refactor ps optimize * refactor ps optimize * . * . * . * . * . * . * refactor theoneps * the_one_ps * add ps pass unittest * add ps pass unittest * ps unitest frame * ps unittest frame * ps unittest frame * ps unittest frame * ps unittest frame * ps unittest frame * ps unittest frame * ps unittest frame * ps unittest frame * ps unittest frame * ps unittest frame * ps unittest frame * ps unittest frame * ps unittest frame * ps unittest frame * ps unittest frame * ps unittest frame * add cpu_async_ps_mode test * add cpu_async_ps_mode test * add cpu_async_ps_mode test * ps unittest ready * ps unittest ready * solve dist_pass init conflict * solve import CommContext error * unittest ok * implement AllocateFrom * solve setup.py.in conflict * solve conflict * solve conflict * solve conflict * . * . * cpu-async-ps minimize test ok & gpu minimize test ok * add heter 2stage unittest * add heter 2stage unittest * add heter 2stage unittest * sync/geo test ok & fix heter_worker program ok * . * new fleet desc generator * new fleet_desc builder * new fleet_desc builder * . * . * correct ps.proto compile * . Co-authored-by: zkh2016 --- paddle/fluid/distributed/ps/ps.proto | 13 - paddle/fluid/framework/CMakeLists.txt | 5 +- paddle/fluid/framework/ps.proto | 213 ++++ .../fleet/meta_optimizers/ps_optimizer.py | 1 + python/paddle/distributed/ps/README.md | 3 - python/paddle/distributed/ps/the_one_ps.py | 1022 ++++++++--------- .../paddle/distributed/ps/utils/ps_factory.py | 4 +- .../ps/utils/ps_program_builder.py | 5 +- python/paddle/distributed/ps/utils/public.py | 4 +- .../fluid/tests/unittests/CMakeLists.txt | 2 +- .../distributed_passes/ps_pass_test_base.py | 54 +- .../test_ps_trainer_pass.py | 122 +- .../fluid/tests/unittests/ps/CMakeLists.txt | 4 +- .../tests/unittests/ps/ps_dnn_trainer.py | 86 +- .../tests/unittests/ps/test_the_one_ps.py | 92 +- .../fluid/tests/unittests/ps_dnn_model.py | 1 + 16 files changed, 961 insertions(+), 670 deletions(-) delete mode 100755 paddle/fluid/distributed/ps/ps.proto mode change 100644 => 100755 paddle/fluid/framework/CMakeLists.txt create mode 100755 paddle/fluid/framework/ps.proto delete mode 100755 python/paddle/distributed/ps/README.md mode change 100644 => 100755 python/paddle/fluid/tests/unittests/CMakeLists.txt mode change 100644 => 100755 python/paddle/fluid/tests/unittests/ps/CMakeLists.txt mode change 100644 => 100755 python/paddle/fluid/tests/unittests/ps/test_the_one_ps.py diff --git a/paddle/fluid/distributed/ps/ps.proto b/paddle/fluid/distributed/ps/ps.proto deleted file mode 100755 index 2691f637527..00000000000 --- a/paddle/fluid/distributed/ps/ps.proto +++ /dev/null @@ -1,13 +0,0 @@ -/* Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. */ \ No newline at end of file diff --git a/paddle/fluid/framework/CMakeLists.txt b/paddle/fluid/framework/CMakeLists.txt old mode 100644 new mode 100755 index 14aecb5fd43..02d90b9c6da --- a/paddle/fluid/framework/CMakeLists.txt +++ b/paddle/fluid/framework/CMakeLists.txt @@ -235,6 +235,7 @@ if(WITH_PYTHON) py_proto_compile(trainer_py_proto SRCS trainer_desc.proto data_feed.proto) py_proto_compile(distributed_strategy_py_proto SRCS distributed_strategy.proto) py_proto_compile(pass_desc_py_proto SRCS pass_desc.proto) + py_proto_compile(ps_py_proto SRCS ps.proto) #Generate an empty \ #__init__.py to make framework_py_proto as a valid python module. add_custom_target(fleet_proto_init ALL @@ -242,12 +243,13 @@ if(WITH_PYTHON) COMMAND ${CMAKE_COMMAND} -E touch ${PADDLE_BINARY_DIR}/python/paddle/distributed/fleet/proto/__init__.py ) add_custom_target(framework_py_proto_init ALL COMMAND ${CMAKE_COMMAND} -E touch __init__.py) - add_dependencies(framework_py_proto framework_py_proto_init trainer_py_proto distributed_strategy_py_proto fleet_proto_init pass_desc_py_proto) + add_dependencies(framework_py_proto framework_py_proto_init trainer_py_proto distributed_strategy_py_proto fleet_proto_init pass_desc_py_proto ps_py_proto) if (NOT WIN32) add_custom_command(TARGET framework_py_proto POST_BUILD COMMAND ${CMAKE_COMMAND} -E make_directory ${PADDLE_BINARY_DIR}/python/paddle/fluid/proto COMMAND cp *.py ${PADDLE_BINARY_DIR}/python/paddle/fluid/proto/ COMMAND cp distributed_strategy_*.py ${PADDLE_BINARY_DIR}/python/paddle/distributed/fleet/proto + COMMAND cp ps_pb2.py ${PADDLE_BINARY_DIR}/python/paddle/distributed/fleet/proto COMMENT "Copy generated python proto into directory paddle/fluid/proto." WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}) add_custom_target(fleet_executor_proto_init ALL DEPENDS fleet_proto_init fleet_executor_desc_py_proto @@ -259,6 +261,7 @@ if(WITH_PYTHON) add_custom_command(TARGET framework_py_proto POST_BUILD COMMAND ${CMAKE_COMMAND} -E make_directory ${PADDLE_BINARY_DIR}/python/paddle/fluid/proto COMMAND copy /Y *.py ${proto_dstpath} + COMMAND copy /Y ps_pb2.py ${fleet_proto_dstpath} COMMAND copy /Y distributed_strategy_*.py ${fleet_proto_dstpath} COMMENT "Copy generated python proto into directory paddle/fluid/proto." COMMENT "Copy generated python proto into directory paddle/distributed/fleet/proto." diff --git a/paddle/fluid/framework/ps.proto b/paddle/fluid/framework/ps.proto new file mode 100755 index 00000000000..0ae87812bce --- /dev/null +++ b/paddle/fluid/framework/ps.proto @@ -0,0 +1,213 @@ +// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto2"; +package paddle.distributed; +option cc_generic_services = true; +option cc_enable_arenas = true; + +message FsClientParameter { + enum FsApiType { + HDFS = 0; + AFS = 1; + } + optional FsApiType fs_type = 1 [ default = HDFS ]; + optional string uri = 2; // such as afs://xxx.afs.com:9902 + optional string user = 3; // user_name to access fs + optional string passwd = 4; // password + optional int32 buffer_size = 5; // buffer for read/write + optional string hadoop_bin = 51; + optional string afs_conf = 101; +} + +message PSParameter { + optional string worker_class = 1; + optional string server_class = 2; + optional string instance_class = 3; + optional string init_gflags = 4 [ default = "" ]; + optional WorkerParameter worker_param = 101; + optional ServerParameter server_param = 102; + repeated DownpourTrainerParameter trainer_param = 301; + optional FsClientParameter fs_client_param = 501; +} + +message WorkerParameter { + optional DownpourWorkerParameter downpour_worker_param = 1; +} + +message DownpourWorkerParameter { + repeated TableParameter downpour_table_param = 1; +} + +message DownpourServerParameter { + repeated TableParameter downpour_table_param = 1; + optional ServerServiceParameter service_param = 2; +} + +message ServerParameter { + optional DownpourServerParameter downpour_server_param = 1; +} + +message DownpourTrainerParameter { + repeated DenseTableParameter dense_table = 1; + repeated SparseTableParameter sparse_table = 2; + optional int32 push_sparse_per_batch = 3; + optional int32 push_dense_per_batch = 4; + repeated string skip_op = 5; + repeated ProgramConfig program_config = 6; +} + +message DenseTableParameter { + optional int32 table_id = 1; + repeated string dense_variable_name = 2; + repeated string dense_gradient_variable_name = 3; + optional int32 fea_dim = 4; +} + +message SparseTableParameter { + optional int32 table_id = 1; + optional int32 feature_dim = 2; + repeated string slot_key = 3; + repeated string slot_value = 4; + repeated string slot_gradient = 5; +} + +message ServerServiceParameter { + optional string server_class = 1 [ default = "BrpcPsServer" ]; + optional string client_class = 2 [ default = "BrpcPsClient" ]; + optional string service_class = 3 [ default = "BrpcPsService" ]; + optional uint32 start_server_port = 4 + [ default = 0 ]; // will find a avaliable port from it + optional uint32 server_thread_num = 5 [ default = 12 ]; +} + +message ProgramConfig { + required string program_id = 1; + repeated int32 push_sparse_table_id = 2; + repeated int32 push_dense_table_id = 3; + repeated int32 pull_sparse_table_id = 4; + repeated int32 pull_dense_table_id = 5; +} + +enum TableType { + PS_SPARSE_TABLE = 0; + PS_DENSE_TABLE = 1; + PS_OTHER_TABLE = 2; +} + +message TableParameter { + optional uint64 table_id = 1; + optional string table_class = 2; + optional uint64 shard_num = 3 [ default = 1000 ]; + optional TableAccessorParameter accessor = 4; + optional TensorAccessorParameter tensor = 5; + optional CommonAccessorParameter common = 6; + optional TableType type = 7; + optional bool compress_in_save = 8 [ default = false ]; +} + +message TableAccessorParameter { + optional string accessor_class = 1; + optional uint32 fea_dim = 4 [ default = 11 ]; + optional uint32 embedx_dim = 5 [ default = 8 ]; + optional uint32 embedx_threshold = 6 [ default = 10 ]; + optional CtrAccessorParameter ctr_accessor_param = 7; + repeated TableAccessorSaveParameter table_accessor_save_param = 8; + optional SparseCommonSGDRuleParameter embed_sgd_param = 10; + optional SparseCommonSGDRuleParameter embedx_sgd_param = 11; +} + +message CtrAccessorParameter { + optional float nonclk_coeff = 1 + [ default = 0.1 ]; // to calculate show_click_score + optional float click_coeff = 2 + [ default = 1 ]; // to calculate show_click_score + optional float base_threshold = 3 [ + default = 1.5 + ]; // show_click_score > base_threshold, this feature can be saved + optional float delta_threshold = 4 + [ default = + 0.25 ]; // delta_score > delta_threshold, this feature can be saved + optional float delta_keep_days = 5 + [ default = + 16 ]; // unseen_day < delta_keep_days, this feature can be saved + optional float show_click_decay_rate = 6 [ + default = 0.98 + ]; // show/click will update to show/click * show_click_decay_rate after a day + optional float delete_threshold = 7 + [ default = 0.8 ]; // threshold to shrink a feasign + optional float delete_after_unseen_days = 8 + [ default = 30 ]; // unseen_day > delete_after_unseen_days, this feature + // will be delete in shrink_model + optional int32 ssd_unseenday_threshold = 9 + [ default = 1 ]; // threshold to save ssd +} + +message TensorAccessorParameter { + optional string feed_var_name = 1; + optional string fetch_var_name = 2; + optional int64 startup_program_id = 3; + optional int64 main_program_id = 4; + optional string tensor_table_class = 6; +} + +message CommonAccessorParameter { + optional string name = 1; + optional string table_name = 2; + repeated string attributes = 3; + repeated string params = 4; + repeated uint32 dims = 5; + repeated string initializers = 6; + optional string entry = 7; + optional int32 trainer_num = 8; + optional bool sync = 9; + optional uint32 table_num = 10; + optional uint32 table_dim = 11; +} + +message TableAccessorSaveParameter { + optional uint32 param = 1; + optional string converter = 2; + optional string deconverter = 3; +} + +message SparseCommonSGDRuleParameter { + optional string name = 1; + optional SparseNaiveSGDRuleParameter naive = 2; + optional SparseAdagradSGDRuleParameter adagrad = 3; + optional SparseAdamSGDParameter adam = 4; +} + +message SparseNaiveSGDRuleParameter { // SparseNaiveSGDRule + optional double learning_rate = 1 [ default = 0.05 ]; + optional double initial_range = 2 [ default = 0.0001 ]; + repeated float weight_bounds = 3; +} + +message + SparseAdagradSGDRuleParameter { // SparseAdaGradSGDRule|StdAdaGradSGDRule + optional double learning_rate = 1 [ default = 0.05 ]; + optional double initial_g2sum = 2 [ default = 3.0 ]; + optional double initial_range = 3 [ default = 0.0001 ]; + repeated float weight_bounds = 4; +} + +message SparseAdamSGDParameter { // SparseAdamSGDRule + optional double learning_rate = 1 [ default = 0.001 ]; + optional double initial_range = 2 [ default = 0.0001 ]; + optional double beta1_decay_rate = 3 [ default = 0.9 ]; + optional double beta2_decay_rate = 4 [ default = 0.999 ]; + optional double ada_epsilon = 5 [ default = 1e-08 ]; + repeated float weight_bounds = 6; +} diff --git a/python/paddle/distributed/fleet/meta_optimizers/ps_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/ps_optimizer.py index 100a6882b1b..00937dbe7a4 100755 --- a/python/paddle/distributed/fleet/meta_optimizers/ps_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/ps_optimizer.py @@ -54,6 +54,7 @@ class ParameterServerOptimizer(MetaOptimizerBase): attrs['cloned_startup'] = attrs['origin_startup_program'].clone() attrs['user_defined_strategy'] = self.user_defined_strategy + attrs['valid_strategy'] = self.user_defined_strategy attrs['trainer'] = TrainerRuntimeConfig(self.user_defined_strategy) attrs['ps_mode'] = attrs['trainer'].mode logger.info("ps_mode: {}".format(attrs['ps_mode'])) diff --git a/python/paddle/distributed/ps/README.md b/python/paddle/distributed/ps/README.md deleted file mode 100755 index 8d28031794f..00000000000 --- a/python/paddle/distributed/ps/README.md +++ /dev/null @@ -1,3 +0,0 @@ -# 目录说明 - -* 改完之后,上层目录中 fleet 中相关文件(夹)就可以删除 diff --git a/python/paddle/distributed/ps/the_one_ps.py b/python/paddle/distributed/ps/the_one_ps.py index 14a68ad9167..cc744bc9d9e 100755 --- a/python/paddle/distributed/ps/the_one_ps.py +++ b/python/paddle/distributed/ps/the_one_ps.py @@ -15,10 +15,11 @@ import warnings import os +from paddle.distributed.fleet.proto import ps_pb2 import paddle.fluid as fluid import paddle.distributed.fleet as fleet from paddle.fluid import core -from .utils.public import * +from paddle.distributed.ps.utils.public import * from paddle.fluid.framework import Program from paddle.fluid.compiler import CompiledProgram from paddle.fluid.executor import Executor @@ -29,14 +30,10 @@ from paddle.distributed.fleet.base.private_helper_function import wait_server_re from paddle.fluid.communicator import Communicator, HeterClient from google.protobuf import text_format -__all__ = [] - - -def conv_indent(indent): - return "".join([" "] * indent) - - -PSERVER_SAVE_SUFFIX = ".shard" +__all__ = [ + 'Table', 'SparseTable', 'GeoSparseTable', 'BarrierTable', 'TensorTable', + 'DenseTable' +] def get_program_by_id(context, program_id): @@ -62,129 +59,140 @@ def parse_table_class(varname, program_id, context): return "MemorySparseTable" -def get_default_accessor_proto(accessor, varname, program_id, context): +def check_embedding_dim(accessor_proto, varname, program_id, context): main_program, startup_program = get_program_by_id(context, program_id) embedding_dim = 0 for var in main_program.list_vars(): if var.name == varname: embedding_dim = var.shape[1] + print('new var: {}, {}, {}'.format(var, embedding_dim, + accessor_proto.fea_dim)) break - - if not accessor.HasField("accessor_class"): - accessor.accessor_class = "CtrCommonAccessor" - if not accessor.HasField("fea_dim"): - accessor.fea_dim = embedding_dim + 2 - if not accessor.HasField("embedx_dim"): - accessor.embedx_dim = embedding_dim - 1 - if not accessor.HasField("embedx_threshold"): - accessor.embedx_threshold = 0 - - ctr_accessor_param = accessor.ctr_accessor_param - if not ctr_accessor_param.HasField("nonclk_coeff"): - ctr_accessor_param.nonclk_coeff = 0.1 - if not ctr_accessor_param.HasField("click_coeff"): - ctr_accessor_param.click_coeff = 1.0 - if not ctr_accessor_param.HasField("base_threshold"): - ctr_accessor_param.base_threshold = 0 - if not ctr_accessor_param.HasField("delta_threshold"): - ctr_accessor_param.delta_threshold = 0 - if not ctr_accessor_param.HasField("delta_keep_days"): - ctr_accessor_param.delta_keep_days = 16 - if not ctr_accessor_param.HasField("show_click_decay_rate"): - ctr_accessor_param.show_click_decay_rate = 1 - if not ctr_accessor_param.HasField("delete_threshold"): - ctr_accessor_param.delete_threshold = 0 - if not ctr_accessor_param.HasField("delete_after_unseen_days"): - ctr_accessor_param.delete_after_unseen_days = 30 - if not ctr_accessor_param.HasField("ssd_unseenday_threshold"): - ctr_accessor_param.ssd_unseenday_threshold = 1 - - for sgd_param in [accessor.embed_sgd_param, accessor.embedx_sgd_param]: - if not sgd_param.HasField("name"): - sgd_param.name = "SparseAdaGradSGDRule" - if sgd_param.name == "SparseAdaGradSGDRule" or sgd_param.name == "StdAdaGradSGDRule": - if not sgd_param.adagrad.HasField("learning_rate"): - sgd_param.adagrad.learning_rate = 0.05 - if not sgd_param.adagrad.HasField("initial_g2sum"): - sgd_param.adagrad.initial_g2sum = 3.0 - if not sgd_param.adagrad.HasField("initial_range"): - sgd_param.adagrad.initial_range = 0.0001 - if len(sgd_param.adagrad.weight_bounds) == 0: - sgd_param.adagrad.weight_bounds.extend([-10.0, 10.0]) - if sgd_param.name == "SparseNaiveSGDRule": - if not sgd_param.naive.HasField("learning_rate"): - sgd_param.naive.learning_rate = 0.05 - if not sgd_param.naive.HasField("initial_range"): - sgd_param.naive.initial_range = 0.0001 - if len(sgd_param.naive.weight_bounds) == 0: - sgd_param.naive.weight_bounds.extend([-10.0, 10.0]) - if sgd_param.name == "SparseAdamSGDRule": - if not sgd_param.adam.HasField("learning_rate"): - sgd_param.adam.learning_rate = 0.001 - if not sgd_param.adam.HasField("initial_range"): - sgd_param.adam.initial_range = 0.0001 - if not sgd_param.adam.HasField("beta1_decay_rate"): - sgd_param.adam.beta1_decay_rate = 0.9 - if not sgd_param.adam.HasField("beta2_decay_rate"): - sgd_param.adam.beta2_decay_rate = 0.999 - if not sgd_param.adam.HasField("ada_epsilon"): - sgd_param.adam.ada_epsilon = 1e-08 - if len(sgd_param.adam.weight_bounds) == 0: - sgd_param.adam.weight_bounds.extend([-10.0, 10.0]) - - -def check_embedding_dim(accessor, varname, program_id, context): - main_program, startup_program = get_program_by_id(context, program_id) - embedding_dim = 0 - for var in main_program.list_vars(): - if var.name == varname: - embedding_dim = var.shape[1] - break - fea_dim = accessor.fea_dim + fea_dim = accessor_proto.fea_dim if fea_dim != embedding_dim + 2: raise ValueError( "The fea_dim is wrong, it will be sparse_embedding_dim + 2: {}, but got {}". format(embedding_dim + 2, fea_dim)) - embedx_dim = accessor.embedx_dim + embedx_dim = accessor_proto.embedx_dim if embedx_dim != embedding_dim - 1: raise ValueError( "The embedx_dim is wrong, it will be sparse_embedding_dim - 1: {}, but got {}". format(embedding_dim - 1, embedx_dim)) +class Service: + def __init__(self): + pass + + def _set(self, service_proto): + service_proto.server_class = "BrpcPsServer" + service_proto.client_class = "BrpcPsClient" + service_proto.service_class = "BrpcPsService" + service_proto.start_server_port = 0 + service_proto.server_thread_num = 12 + + +class GpuService(Service): + def __init__(self): + super(GpuService).__init__(self) + + def _set(self, service_proto): + super(GpuService)._set(service_proto) + service_proto.server_class = 'PsLocalServer' + service_proto.client_class = 'PsLocalClient' + + class Accessor: def __init__(self): self.accessor_class = "" self.optimizer = None - self.feature_dim = -1 - self.embedding_dim = -1 - self.optimizer = None - - def to_string(self, indent): - accessor_str = "{}accessor {{{}\n{}}}" - attrs = "" - attrs += "accessor_class: \"{}\" ".format(self.accessor_class) - attrs += "fea_dim: {} ".format(self.feature_dim) - attrs += "embedx_dim: {} ".format(self.embedding_dim) - attrs += "\n" - if self.optimizer is not None: - attrs += self.optimizer.to_string(indent) - return accessor_str.format( - conv_indent(indent), attrs, conv_indent(indent)) + self.feature_dim = 0 + self.embedding_dim = 0 + # TableAccessorParameter accessor + def _set(self, accessor_proto, varname, program_id, context): + main_program, startup_program = get_program_by_id(context, program_id) + embedding_dim = 0 + for var in main_program.list_vars(): + if var.name == varname: + embedding_dim = var.shape[1] + break -class CommonAccessor: + if not accessor_proto.HasField("accessor_class"): + accessor_proto.accessor_class = "CtrCommonAccessor" + if not accessor_proto.HasField("fea_dim"): + accessor_proto.fea_dim = embedding_dim + 2 + if not accessor_proto.HasField("embedx_dim"): + accessor_proto.embedx_dim = embedding_dim - 1 + if not accessor_proto.HasField("embedx_threshold"): + accessor_proto.embedx_threshold = 0 + + ctr_accessor_param = accessor_proto.ctr_accessor_param + if not ctr_accessor_param.HasField("nonclk_coeff"): + ctr_accessor_param.nonclk_coeff = 0.1 + if not ctr_accessor_param.HasField("click_coeff"): + ctr_accessor_param.click_coeff = 1.0 + if not ctr_accessor_param.HasField("base_threshold"): + ctr_accessor_param.base_threshold = 0 + if not ctr_accessor_param.HasField("delta_threshold"): + ctr_accessor_param.delta_threshold = 0 + if not ctr_accessor_param.HasField("delta_keep_days"): + ctr_accessor_param.delta_keep_days = 16 + if not ctr_accessor_param.HasField("show_click_decay_rate"): + ctr_accessor_param.show_click_decay_rate = 1 + if not ctr_accessor_param.HasField("delete_threshold"): + ctr_accessor_param.delete_threshold = 0 + if not ctr_accessor_param.HasField("delete_after_unseen_days"): + ctr_accessor_param.delete_after_unseen_days = 30 + if not ctr_accessor_param.HasField("ssd_unseenday_threshold"): + ctr_accessor_param.ssd_unseenday_threshold = 1 + + for sgd_param in [ + accessor_proto.embed_sgd_param, accessor_proto.embedx_sgd_param + ]: + if not sgd_param.HasField("name"): + sgd_param.name = "SparseAdaGradSGDRule" + if sgd_param.name == "SparseAdaGradSGDRule" or sgd_param.name == "StdAdaGradSGDRule": + if not sgd_param.adagrad.HasField("learning_rate"): + sgd_param.adagrad.learning_rate = 0.05 + if not sgd_param.adagrad.HasField("initial_g2sum"): + sgd_param.adagrad.initial_g2sum = 3.0 + if not sgd_param.adagrad.HasField("initial_range"): + sgd_param.adagrad.initial_range = 0.0001 + if len(sgd_param.adagrad.weight_bounds) == 0: + sgd_param.adagrad.weight_bounds.extend([-10.0, 10.0]) + if sgd_param.name == "SparseNaiveSGDRule": + if not sgd_param.naive.HasField("learning_rate"): + sgd_param.naive.learning_rate = 0.05 + if not sgd_param.naive.HasField("initial_range"): + sgd_param.naive.initial_range = 0.0001 + if len(sgd_param.naive.weight_bounds) == 0: + sgd_param.naive.weight_bounds.extend([-10.0, 10.0]) + if sgd_param.name == "SparseAdamSGDRule": + if not sgd_param.adam.HasField("learning_rate"): + sgd_param.adam.learning_rate = 0.001 + if not sgd_param.adam.HasField("initial_range"): + sgd_param.adam.initial_range = 0.0001 + if not sgd_param.adam.HasField("beta1_decay_rate"): + sgd_param.adam.beta1_decay_rate = 0.9 + if not sgd_param.adam.HasField("beta2_decay_rate"): + sgd_param.adam.beta2_decay_rate = 0.999 + if not sgd_param.adam.HasField("ada_epsilon"): + sgd_param.adam.ada_epsilon = 1e-08 + if len(sgd_param.adam.weight_bounds) == 0: + sgd_param.adam.weight_bounds.extend([-10.0, 10.0]) + + +class CommonAccessor(Accessor): def __init__(self): - self.accessor_class = "" - self.table_name = None - self.entry = None + super(CommonAccessor, self).__init__() + self.table_name = '' + self.entry = 'none' self.attrs = [] self.params = [] self.dims = [] self.trainer_num = 0 - self.sync = "false" - self.table_num = None - self.table_dim = None + self.sync = False self.initializers = [] self.opt_input_map = {} self.opt_attr_map = {} @@ -422,233 +430,361 @@ class CommonAccessor: self.initializers = initializers self.attrs = attrs - def to_string(self, indent): - accessor_str = "{}common {{{}\n{}}}" - attrs = "" - attrs += "name: \"{}\" ".format(self.accessor_class) - - if self.table_name: - attrs += "table_name: \"{}\" ".format(self.table_name) - - if self.entry: - attrs += "entry: \"{}\" ".format(self.entry) - attrs += "trainer_num: {} ".format(self.trainer_num) - attrs += "sync: {} ".format(self.sync) - if self.table_num: - attrs += "table_num: {} ".format(self.table_num) - if self.table_dim: - attrs += "table_dim: {} ".format(self.table_dim) - - for param in self.params: - attrs += "params: \"{}\" ".format(param) - - for dim in self.dims: - attrs += "dims: {} ".format(dim) - - for initializer in self.initializers: - attrs += "initializers: \"{}\" ".format(initializer) - - attrs += "\n" - return accessor_str.format( - conv_indent(indent), attrs, conv_indent(indent)) + # CommonAccessorParameter common + def _set(self, proto): + proto.name = self.accessor_class + proto.table_name = self.table_name + proto.params.extend(self.params) + proto.dims.extend(self.dims) + proto.initializers.extend(self.initializers) + proto.entry = self.entry + proto.trainer_num = self.trainer_num + proto.sync = self.sync + proto.table_num = self.table_num + proto.table_dim = self.table_dim class Tensor: - def __init__(self): - self.main_program_id = None - self.startup_program_id = None - self.feed_var_name = None - self.fetch_var_name = None - self.tensor_table_class = False - - def to_string(self, indent): - program_str = "{}tensor {{{}\n{}}}" - attrs = "" - attrs += "feed_var_name: \"{}\" ".format(str(self.feed_var_name)) - attrs += "fetch_var_name: \"{}\" ".format(str(self.fetch_var_name)) - attrs += "startup_program_id: {} ".format(str(self.startup_program_id)) - attrs += "main_program_id: {} ".format(str(self.main_program_id)) - attrs += "tensor_table_class: \"{}\" ".format( - str(self.tensor_table_class)) - attrs += "\n" - return program_str.format( - conv_indent(indent), attrs, conv_indent(indent)) + def __init__(self, tesnor_dcit): + self.tensor_dict = tesnor_dcit + + def _set(self, tensor_proto): + tensor_proto.main_program_id = self.tensor_dict.get("main_program_id", + 0) + tensor_proto.startup_program_id = self.tensor_dict.get( + "startup_program_id", 0) + tensor_proto.feed_var_name = self.tensor_dict.get("feed_var_name", '') + tensor_proto.fetch_var_name = self.tensor_dict.get("fetch_var_name", '') + tensor_proto.tensor_table_class = self.tensor_dict.get( + "tensor_table_class", '') class Table: def __init__(self): - self.id = -1 self.table_class = None self.shard_num = -1 self.type = None - self.accessor = None - self.common = None + self.accessor = Accessor() + self.shard_num = 256 + self.common = CommonAccessor() self.tensor = None - self.accessor_proto = None - - def to_string(self, indent): - # if self.id == 1: - # proto_txt = '' - # with open('./sparse_table.prototxt') as f: - # proto_txt = f.read() - # return proto_txt - table_str = "{}downpour_table_param {{{}\n{}}}" - - attrs = "" - attrs += "table_id: {} ".format(self.id) - attrs += "table_class: \"{}\" ".format(self.table_class) - attrs += "shard_num: {} ".format(self.shard_num) - attrs += "type: {}".format(self.type) - attrs += "\n" - indent += 2 - - if self.accessor_proto is not None: - accessor_str = "{}accessor {{{}\n{}}}" - accessor_str = accessor_str.format( - conv_indent(indent), self.accessor_proto, conv_indent(indent)) - attrs += accessor_str + "\n" - elif self.accessor is not None: - attrs += self.accessor.to_string(indent) - attrs += "\n" - - if self.tensor is not None: - attrs += self.tensor.to_string(indent) - attrs += "\n" - - if self.common is not None: - attrs += self.common.to_string(indent) - attrs += "\n" - - return table_str.format(conv_indent(indent), attrs, conv_indent(indent)) + def _set(self, table_proto): + pass -class Service: - def __init__(self): - self.server_class = "BrpcPsServer" - self.client_class = "BrpcPsClient" - self.service_class = "BrpcPsService" - self.start_server_port = 0 - self.server_thread_num = 12 - def to_string(self, indent): - service_str = "{}service_param {{{}\n{}}}" +class BarrierTable(Table): + def __init__(self, context, idx): + super(BarrierTable, self).__init__() + self.type = None + self.shard_num = 256 + self.accessor.accessor_class = 'CommMergeAccessor' + self.common.attrs = "" + self.common.dims = [] + self.common.params = [] + self.is_heter_ps_mode = context['is_heter_ps_mode'] + self.role_maker = context['role_maker'] + self.idx = idx + self.is_sync = context['is_sync'] + + def _set(self, table_proto): + table_proto.table_id = self.idx + table_proto.table_class = 'BarrierTable' + table_proto.shard_num = 256 + table_proto.type = ps_pb2.PS_OTHER_TABLE + + table_proto.accessor.accessor_class = "CommMergeAccessor" + table_proto.accessor.fea_dim = 0 + table_proto.accessor.embedx_dim = 0 + + table_proto.common.name = "" + table_proto.common.table_name = "barrier_table" + table_proto.common.sync = self.is_sync + table_proto.common.entry = 'none' + + trainer_num = get_trainers(self.role_maker) + if self.is_heter_ps_mode: + trainer_num += len(self.role_maker._get_heter_worker_endpoints()) + table_proto.common.trainer_num = trainer_num - attrs = "" - attrs += "server_class: \"{}\" ".format(self.server_class) - attrs += "client_class: \"{}\" ".format(self.client_class) - attrs += "service_class: \"{}\" ".format(self.service_class) - attrs += "start_server_port: {} ".format(self.start_server_port) - attrs += "server_thread_num: {} ".format(self.server_thread_num) - return service_str.format( - conv_indent(indent), attrs, conv_indent(indent)) +class TensorTable(Table): + def __init__(self, idx, tensor_dict, role_maker): + super(TensorTable, self).__init__() + self.idx = idx + self.tensor_dict = tensor_dict + self.role_maker = role_maker + def _set(self, table_proto): + table_proto.table_id = self.idx + table_proto.type = ps_pb2.PS_OTHER_TABLE + table_proto.table_class = self.tensor_dict.get("tensor_table_class", '') -class DownpourServer: - def __init__(self): - self.service = None - self.tables = [] + table_proto.accessor.accessor_class = "CommMergeAccessor" - def set_service_param(self, service): - self.service = service + table_proto.common.table_name = self.tensor_dict.get("feed_var_name", + '') + table_proto.common.trainer_num = get_trainers(self.role_maker) - def append_tables(self, table): - if not isinstance(table, Table): - raise ValueError("only support instance Table") - self.tables.append(table) + tensor = Tensor(self.tensor_dict) + tensor._set(table_proto.tensor) - def to_string(self, indent): - server_str = "{}downpour_server_param {{{}\n{}}}" - table_strs = "" - indent += 2 +class SparseTable(Table): + def __init__(self, context, send_ctx): + super(SparseTable, self).__init__() + self.context = context + self.ctx = send_ctx + self.type = None + self.table_class = 'MemorySparseTable' + self.accessor = Accessor() - table_strs += "\n" - table_strs += self.service.to_string(indent) + def _set(self, table_proto): + ctx = self.ctx + if ctx.is_tensor_table() or len(ctx.origin_varnames()) < 1 or ( + ctx.is_sparse() == False): + return + table_proto.table_id = ctx.table_id() + table_proto.table_class = self.table_class + table_proto.type = ps_pb2.PS_SPARSE_TABLE + table_proto.shard_num = self.shard_num + + self.common.table_name = self.context['grad_name_to_param_name'][ + ctx.origin_varnames()[0]] + + print('new table_name: {}'.format(self.common.table_name)) + all_table_proto = self.context[ + "user_defined_strategy"].sparse_table_configs + usr_table_proto = all_table_proto.add() + for proto in all_table_proto: + if proto.table_name == self.common.table_name: + usr_table_proto = proto + break + table_proto.table_class = 'MemorySparseTable' + warnings.warn("The PS mode must use MemorySparseTable.") + if usr_table_proto.HasField("shard_num"): + table_proto.shard_num = usr_table_proto.shard_num + else: + table_proto.shard_num = 1000 + warnings.warn( + "The shard_num of sparse table is not set, use default value 1000." + ) - for table in self.tables: - table_strs += "\n" - table_strs += table.to_string(indent) - return server_str.format( - conv_indent(indent), table_strs, conv_indent(indent)) + if usr_table_proto.accessor.ByteSize() == 0: + warnings.warn( + "The accessor of sparse table is not set, use default value.") + table_proto.accessor.ParseFromString( + usr_table_proto.accessor.SerializeToString()) + self.accessor._set(table_proto.accessor, self.common.table_name, + ctx.program_id(), self.context) -class Server: - def __init__(self): - self.servers = [] + check_embedding_dim(table_proto.accessor, self.common.table_name, + ctx.program_id(), self.context) - def add_server(self, server): - if not isinstance(server, DownpourServer): - raise ValueError("only support instance DownpourServer") - self.servers.append(server) + adam_d2sum = self.context["user_defined_strategy"].adam_d2sum + self.common.parse_by_optimizer(ctx, self.context) + self.common.parse_entry(self.common.table_name, + ctx.program_id(), self.context) + self.common.sync = True if self.context['is_sync'] else False - def __str__(self): - server_str = "server_param {{{}\n}}" - indent = 2 - servers_str = "" - for server in self.servers: - servers_str += "\n" - servers_str += server.to_string(indent) + self.common._set(table_proto.common) - return server_str.format(servers_str) +class GeoSparseTable(SparseTable): + def __init__(self, context, send_ctx): + super(GeoSparseTable, self).__init__(context, send_ctx) + self.table_class = "SparseGeoTable" + if self.context['ps_mode'] != DistributedMode.GEO: + raise ValueError("not geo sparse table!") + + def _set(self, table_proto): + ctx = self.ctx + if ctx.is_tensor_table() or len(ctx.origin_varnames()) < 1 or ( + ctx.is_sparse() == False): + return + table_proto.table_id = ctx.table_id() + table_proto.table_class = self.table_class + table_proto.type = ps_pb2.PS_SPARSE_TABLE + table_proto.shard_num = self.shard_num + + table_proto.accessor.accessor_class = 'CommMergeAccessor' + table_proto.accessor.fea_dim = ctx.sections()[0] + table_proto.accessor.embedx_dim = ctx.sections()[1] + + self.common.table_name = self.context['grad_name_to_param_name'][ + ctx.origin_varnames()[0]] + adam_d2sum = self.context["user_defined_strategy"].adam_d2sum + self.common.parse_by_optimizer(ctx, self.context) + self.common.parse_entry(self.common.table_name, + ctx.program_id(), self.context) + self.common.sync = False + self.common._set(table_proto.common) + + +class DenseTable(Table): + def __init__(self, context, send_ctx): + super(DenseTable, self).__init__() + self.context = context + self.ctx = send_ctx + self.accessor = Accessor() -class DownpourWorker: + def _set(self, table_proto): + ctx = self.ctx + if ctx.is_tensor_table() or len(ctx.origin_varnames()) < 1 or ( + ctx.is_sparse() == True): + return + + table_proto.table_id = ctx.table_id() + + table_proto.type = ps_pb2.PS_DENSE_TABLE + table_proto.table_class = "CommonDenseTable" + table_proto.shard_num = 256 + + table_proto.accessor.accessor_class = 'CommMergeAccessor' + table_proto.accessor.fea_dim = ctx.sections()[0] + table_proto.accessor.embedx_dim = 1 + + self.common.table_name = "MergedDense" + adam_d2sum = self.context["user_defined_strategy"].adam_d2sum + self.common.parse_by_optimizer(ctx, self.context) + self.common.parse_entry(self.common.table_name, + ctx.program_id(), self.context) + self.common.sync = True if self.context['is_sync'] else False + + self.common._set(table_proto.common) + + +class Server: def __init__(self): - self.tables = [] + pass - def append_tables(self, table): - if not isinstance(table, Table): - raise ValueError("only support instance Table") - self.tables.append(table) + def _set(self): + pass - def to_string(self, indent): - worker_str = "{}downpour_worker_param {{{}\n{}}}" - table_strs = "" - indent += 2 - for table in self.tables: - table_strs += "\n" - table_strs += table.to_string(indent) - return worker_str.format( - conv_indent(indent), table_strs, conv_indent(indent)) +class DownpourServer(Server): + def __init__(self): + super(DownpourServer, self).__init__() + + def _set(self): + pass class Worker: def __init__(self): - self.workers = [] + pass - def add_worker(self, worker): - if not isinstance(worker, DownpourWorker): - raise ValueError("only support instance DownpourWorker") - self.workers.append(worker) + def _set(self): + pass - def __str__(self): - worker_str = "worker_param {{{}\n}}" - indent = 2 - workers_str = "" - for worker in self.workers: - workers_str += "\n" - workers_str += worker.to_string(indent) - return worker_str.format(workers_str) +class DownpourWorker(Worker): + def __init__(self): + super(DownpourWorker, self).__init__() + + def _set(self): + pass class fsClient: - def __init__(self, proto): - self.proto = proto - self.uri = proto.uri - self.user = proto.user - self.passwd = proto.passwd - self.hadoop_bin = proto.hadoop_bin - - def to_string(self): - proto_txt = text_format.MessageToString(self.proto) - if proto_txt: - fs_str = "fs_client_param {{\n{}}}" - return fs_str.format(proto_txt) + def __init__(self, fs_client_param): + self.fs_client_param = fs_client_param + + def _set(self, proto): + if not text_format.MessageToString(self.fs_client_param): + return + proto.uri = self.fs_client_param.uri + proto.user = self.fs_client_param.user + proto.passwd = self.fs_client_param.passwd + proto.hadoop_bin = self.fs_client_param.hadoop_bin + + +class PsDescBuilder(object): + def __init__(self, context): + self.context = context + self.is_sync = context['is_sync'] + self.ps_mode = context['ps_mode'] + self.is_heter_ps_mode = context['is_heter_ps_mode'] + self.use_ps_gpu = context['use_ps_gpu'] + self.send_ctx = get_the_one_send_context( + self.context, + use_origin_program=True, + split_dense_table=self.is_heter_ps_mode) + + self.tensor_table_dict = {} # TODO + self._server_sub_program = [] + + self.tables = self._get_tables() + + self.service = self._get_service() + self.fs_client = self._get_fs_client() + + self.ps_desc = ps_pb2.PSParameter() + + def _get_tensor_tables(self): + program_idx = 0 + if not self.tensor_table_dict: + self._server_sub_program.append(Program().desc) + tables = [] + for table_name in self.tensor_table_dict: + tables.append(globals()['TensorTable'](len(tables), tensor_dict, + self.context['role_maker'])) + program_idx += 1 + return tables + + def _get_tables(self): + tables = [] + for idx, (name, ctx) in enumerate(self.send_ctx.items()): + print('####### {}\n'.format(ctx.is_sparse())) + if ctx.is_sparse(): + if self.ps_mode == DistributedMode.GEO: + tables.append(globals()['GeoSparseTable'](self.context, + ctx)) + else: + tables.append(globals()['SparseTable'](self.context, ctx)) + else: + tables.append(globals()['DenseTable'](self.context, ctx)) + self.tensor_tables = self._get_tensor_tables() + tables.extend(self.tensor_tables) + tables.append(globals()['BarrierTable'](self.context, len(tables))) + return tables + + def _get_service(self): + if self.use_ps_gpu: + return GpuService() else: - return "" + return Service() + + def _get_fs_client(self): + return fsClient(self.context["user_defined_strategy"].fs_client_param) + + def build_worker_desc(self): + for table in self.tables: + table_proto = self.ps_desc.worker_param.downpour_worker_param.downpour_table_param.add( + ) + table._set(table_proto) + table_proto = self.ps_desc.server_param.downpour_server_param.downpour_table_param.add( + ) + table._set(table_proto) + self.service._set( + self.ps_desc.server_param.downpour_server_param.service_param) + return text_format.MessageToString(self.ps_desc) + + def build_server_desc(self): + for table in self.tables: + table_proto = self.ps_desc.server_param.downpour_server_param.downpour_table_param.add( + ) + table._set(table_proto) + self.sparse_table_maps = {} + if table_proto.type == ps_pb2.PS_SPARSE_TABLE and table_proto.common is not None: + self.sparse_table_maps[ + table_proto.common.table_name] = table_proto.table_id + + self.service._set( + self.ps_desc.server_param.downpour_server_param.service_param) + self.fs_client._set(self.ps_desc.fs_client_param) + return text_format.MessageToString(self.ps_desc) class TheOnePSRuntime(RuntimeBase): @@ -665,8 +801,11 @@ class TheOnePSRuntime(RuntimeBase): self.role_maker = context["role_maker"] self.origin_main_program = context["origin_main_program"] - self.origin_main_programs = context["origin_main_programs"] - + self.origin_main_programs = context.get("origin_main_programs", + [self.origin_main_program]) + self.context["origin_main_programs"] = self.origin_main_programs + self.context["origin_startup_programs"] = context.get( + 'origin_startup_programs', [context['origin_startup_program']]) self.context[ 'is_heter_ps_mode'] = self.role_maker._is_heter_parameter_server_mode self.is_heter_ps_mode = self.context['is_heter_ps_mode'] @@ -675,15 +814,23 @@ class TheOnePSRuntime(RuntimeBase): self.context['ps_mode'] = self.context['trainer'].mode self.context['use_ps_gpu'] = context['valid_strategy'].a_sync_configs[ 'use_ps_gpu'] - self.is_sync = True if self.context[ + self.context['is_sync'] = True if self.context[ 'ps_mode'] == DistributedMode.SYNC else False self.context['grad_name_to_param_name'] = {} self.context['tensor_table'] = {} build_var_distributed(self.context) + endpoints = get_ps_endpoints(self.role_maker) + self.string_hosts = [] + for idx, ep in enumerate(endpoints): + host, port = ep.split(":") + pshost = fluid.core.PSHost(host, int(port), idx) + self.string_hosts.append(pshost.serialize_to_string()) + + self.ps_desc_builder = PsDescBuilder(self.context) + def _init_worker(self): - worker = self._get_fleet_proto(is_server=False, is_sync=self.is_sync) - server = self._get_fleet_proto(is_server=True, is_sync=self.is_sync) + worker_desc = self.ps_desc_builder.build_worker_desc() if self.context['use_ps_gpu']: main_program = self.context['loss'].block.program @@ -701,23 +848,11 @@ class TheOnePSRuntime(RuntimeBase): kwargs["trainer_id"] = self.role_maker._worker_index() return kwargs - proto_txt = str(worker) + "\n" + str(server) - with open('proto_txt', 'w') as f: - f.write(proto_txt) - + proto_txt = worker_desc + "\n" + server_desc debug = bool(int(os.getenv("PSERVER_DEBUG", "0"))) - if debug: print("worker: \n{}".format(proto_txt)) - endpoints = get_ps_endpoints(self.role_maker) - - string_hosts = [] - for idx, ep in enumerate(endpoints): - host, port = ep.split(":") - pshost = fluid.core.PSHost(host, int(port), idx) - string_hosts.append(pshost.serialize_to_string()) - dense_map = get_the_one_recv_context( self.context, split_dense_table=self.is_heter_ps_mode) send_ctx = get_the_one_send_context( @@ -741,7 +876,7 @@ class TheOnePSRuntime(RuntimeBase): kwargs["trainer_id"] = self.role_maker._role_id() kwargs["trainers"] = self.role_maker._worker_num() - for table in server.servers[0].tables: + for table in server.servers[0].tables: #TODO if table.table_class == "BarrierTable": kwargs["barrier_table_id"] = table.id break @@ -755,7 +890,8 @@ class TheOnePSRuntime(RuntimeBase): trainer_config.mode, kwargs, trainer_config.get_communicator_flags()) self._communicator.init_with_ctx(send_ctx, dense_map, proto_txt, - string_hosts, fluid.global_scope()) + self.string_hosts, + fluid.global_scope()) fleet.util.barrier() info = self._communicator.get_client_info() @@ -812,275 +948,16 @@ class TheOnePSRuntime(RuntimeBase): previous_trainers, self.role_maker._role_id()) - def _push_sparse_param(self, - var_name, - table_id=-1, - scope=fluid.global_scope()): - self._communicator.push_sparse_param(var_name, table_id, scope) - - def _get_executor(self): - executor = fluid.Executor(fluid.CPUPlace()) - if self.is_heter_ps_mode: - if self.role_maker._is_heter_worker(): - heter_device_type = self.role_maker._heter_device_type().upper() - if heter_device_type not in ["GPU", "XPU", "CPU"]: - raise ValueError("Heter Worker Not Support Device {}". - format(device_type)) - if heter_device_type == "GPU": - executor = Executor( - fluid.CUDAPlace( - int(os.getenv("FLAGS_selected_gpus", "0")))) - elif heter_device_type == "XPU": - executor = Executor( - fluid.XPUPlace( - int(os.getenv("FLAGS_selected_xpus", "0")))) - return executor - - def _get_fleet_proto(self, is_server, is_sync, **kwargs): - def _build_merge_accessor(ctx): - accessor = Accessor() - accessor.accessor_class = "CommMergeAccessor" - accessor.optimizer = None - - if ctx.is_sparse(): - accessor.feature_dim = ctx.sections()[0] - accessor.embedding_dim = ctx.sections()[1] - else: - accessor.feature_dim = ctx.sections()[0] - accessor.embedding_dim = 1 - - return accessor - - def _build_barrier_table(idx): - table = Table() - table.id = idx - table.type = "PS_OTHER_TABLE" - table.table_class = "BarrierTable" - table.shard_num = 256 - - accessor = Accessor() - accessor.accessor_class = "CommMergeAccessor" - accessor.optimizer = None - accessor.feature_dim = 0 - accessor.embedding_dim = 0 - table.accessor = accessor - - common = CommonAccessor() - common.table_name = "barrier_table" - trainer_num = get_trainers(self.context['role_maker']) - if self.is_heter_ps_mode: - trainer_num += len(self.role_maker._get_heter_worker_endpoints( - )) - common.trainer_num = trainer_num - common.attrs = "" - common.dims = [] - common.params = [] - table.common = common - return table - - def _build_tensor_table(idx, tensor_dict): - table = Table() - table.id = idx - table.type = "PS_OTHER_TABLE" - table.table_class = tensor_dict["tensor_table_class"] - table.shard_num = 256 - - accessor = Accessor() - accessor.accessor_class = "CommMergeAccessor" - accessor.optimizer = None - accessor.feature_dim = 0 - accessor.embedding_dim = 0 - table.accessor = accessor - - common = CommonAccessor() - common.table_name = tensor_dict["feed_var_name"] - common.trainer_num = get_trainers(self.role_maker) - common.attrs = "" - common.dims = [] - common.params = [] - table.common = common - - tensor = Tensor() - tensor.main_program_id = tensor_dict["main_program_id"] - tensor.startup_program_id = tensor_dict["startup_program_id"] - tensor.feed_var_name = tensor_dict["feed_var_name"] - tensor.fetch_var_name = tensor_dict["fetch_var_name"] - tensor.tensor_table_class = tensor_dict["tensor_table_class"] - table.tensor = tensor - - return table - - def _add_tensor_table(tables): - tensor_table_dict = {} - program_idx = 0 - for table_name in tensor_table_dict: - if tensor_table_dict[table_name]["startup_program"] != None: - tensor_table_dict[table_name][ - "startup_program_id"] = program_idx - self._server_sub_program.append(tensor_table_dict[ - table_name]["startup_program"].desc) - program_idx += 1 - if tensor_table_dict[table_name]["main_program"] != None: - tensor_table_dict[table_name][ - "main_program_id"] = program_idx - self._server_sub_program.append(tensor_table_dict[ - table_name]["main_program"].desc) - program_idx += 1 - # Todo: Hard code for lr_decay table apply table id - new_table = _build_tensor_table( - len(tables), tensor_table_dict[table_name]) - tables.append(new_table) - return tables - - def _get_tables(): - send_ctx = get_the_one_send_context( - self.context, - use_origin_program=True, - split_dense_table=self.is_heter_ps_mode) - - tables = [] - for idx, (name, ctx) in enumerate(send_ctx.items()): - print(" wxm python test send_ctx.items-->", idx, (name, ctx)) - if ctx.is_tensor_table() or len(ctx.origin_varnames()) < 1: - continue - - table = Table() - table.id = ctx.table_id() - common = CommonAccessor() - - if ctx.is_sparse(): - table.type = "PS_SPARSE_TABLE" - table.shard_num = 256 - - common.table_name = self.context['grad_name_to_param_name'][ - ctx.origin_varnames()[0]] - - if self.context['ps_mode'] == DistributedMode.GEO: - table.table_class = "SparseGeoTable" - else: - all_table_proto = self.context[ - "user_defined_strategy"].sparse_table_configs - table_proto = all_table_proto.add() - for proto in all_table_proto: - if proto.table_name == common.table_name: - table_proto = proto - break - if table_proto.HasField("table_class"): - table.table_class = table_proto.table_class - else: - table.table_class = parse_table_class( - common.table_name, - ctx.program_id(), self.context) - if table.table_class != 'MemorySparseTable': - table.table_class = 'MemorySparseTable' - warnings.warn( - "The PS mode must use MemorySparseTable.") - - if table_proto.HasField("shard_num"): - table.shard_num = table_proto.shard_num - else: - table.shard_num = 1000 - warnings.warn( - "The shard_num of sparse table is not set, use default value 1000." - ) - - if table_proto.accessor.ByteSize() == 0: - warnings.warn( - "The accessor of sparse table is not set, use default value." - ) - get_default_accessor_proto( - table_proto.accessor, common.table_name, - ctx.program_id(), self.context) - check_embedding_dim(table_proto.accessor, - common.table_name, - ctx.program_id(), self.context) - table.accessor_proto = text_format.MessageToString( - table_proto.accessor) - else: - table.type = "PS_DENSE_TABLE" - table.table_class = "CommonDenseTable" - table.shard_num = 256 - common.table_name = "MergedDense" - - adam_d2sum = self.context["user_defined_strategy"].adam_d2sum - common.parse_by_optimizer(ctx, self.context) - - if ctx.is_sparse(): - common.parse_entry(common.table_name, - ctx.program_id(), self.context) - - if is_sync: - common.sync = "true" - else: - common.sync = "false" - table.common = common - - if table.table_class != 'MemorySparseTable': - accessor = _build_merge_accessor(ctx) - table.accessor = accessor - tables.append(table) - - tensor_table_dict = {} - if len(tensor_table_dict) > 0: - tables = _add_tensor_table(tables) - else: - empty_porgram = Program() - self._server_sub_program.append(empty_porgram.desc) - - barrier_table = _build_barrier_table(len(tables)) - tables.append(barrier_table) - return tables - - if is_server: - server = Server() - downpour_server = DownpourServer() - - service = Service() - dist_strategy = self.context["valid_strategy"] - use_ps_gpu = dist_strategy.a_sync_configs["use_ps_gpu"] - if use_ps_gpu: - service.server_class = "PsLocalServer" - service.client_class = "PsLocalClient" - downpour_server.set_service_param(service) - - tables = _get_tables() - downpour_server.tables = tables - server.add_server(downpour_server) - return server - else: - worker = Worker() - downpour_worker = DownpourWorker() - - tables = _get_tables() - downpour_worker.tables = tables - worker.add_worker(downpour_worker) - return worker - def _init_server(self, dirname=None, var_names=None, **kwargs): + server_desc = self.ps_desc_builder.build_server_desc() role_id = get_role_id(self.role_maker) - endpoints = get_ps_endpoints(self.role_maker) trainers = get_trainers(self.role_maker) if self.is_heter_ps_mode: trainers += len(self.role_maker._get_heter_worker_endpoints()) - server = self._get_fleet_proto(is_server=True, is_sync=self.is_sync) - proto_txt = str(server) - fs_client = fsClient(self.context["user_defined_strategy"] - .fs_client_param) - proto_txt = proto_txt + "\n" + fs_client.to_string() - - debug = bool(int(os.getenv("PSERVER_DEBUG", "0"))) - if debug: - print("server: \n{}".format(proto_txt)) - - string_hosts = [] - for idx, ep in enumerate(endpoints): - host, port = ep.split(":") - pshost = fluid.core.PSHost(host, int(port), idx) - string_hosts.append(pshost.serialize_to_string()) self._server = fluid.core.DistFleetWrapper() - self._server.init_server(proto_txt, string_hosts, role_id, trainers, - self._server_sub_program) + self._server.init_server(server_desc, self.string_hosts, role_id, + trainers, self._server_sub_program) dist_varnames = get_sparse_tablenames(self.origin_main_programs, True) sparse_varnames = get_sparse_tablenames(self.origin_main_programs, @@ -1101,10 +978,7 @@ class TheOnePSRuntime(RuntimeBase): if dirname is None or not load_varnames: return - sparse_table_maps = {} - for table in server.servers[0].tables: - if table.type == "PS_SPARSE_TABLE" and table.common is not None: - sparse_table_maps[table.common.table_name] = table.id + sparse_table_maps = self.ps_desc_builder.sparse_table_maps dirname = os.path.normpath(dirname) pserver_id = self.role_maker._role_id() @@ -1186,7 +1060,7 @@ class TheOnePSRuntime(RuntimeBase): sparses = get_the_one_recv_context( self.context, is_dense=False, - split_dense_table=self.is_heter_ps_mod, + split_dense_table=self.is_heter_ps_mode, use_origin_program=True) sparse_varnames = self._save_sparse_params(executor, dirname, sparses, @@ -1413,7 +1287,7 @@ class TheOnePSRuntime(RuntimeBase): fleet.util.barrier() if self.role_maker._is_first_worker(): - sparses = sget_the_one_recv_context( + sparses = get_the_one_recv_context( self.context, is_dense=False, split_dense_table=self.role_maker. diff --git a/python/paddle/distributed/ps/utils/ps_factory.py b/python/paddle/distributed/ps/utils/ps_factory.py index 1a426f3ad6c..701ae8be6cb 100755 --- a/python/paddle/distributed/ps/utils/ps_factory.py +++ b/python/paddle/distributed/ps/utils/ps_factory.py @@ -38,5 +38,7 @@ class PsProgramBuilderFactory(object): elif 'is_fl_ps_mode' in attrs and attrs[ 'is_fl_ps_mode'] == DistributedMode.FL: return globals()['FlPsProgramBuilder'](pass_ctx) - else: + elif attrs['ps_mode'] == DistributedMode.SYNC: return globals()['CpuSyncPsProgramBuilder'](pass_ctx) + else: + return globals()['CpuAsyncPsProgramBuilder'](pass_ctx) diff --git a/python/paddle/distributed/ps/utils/ps_program_builder.py b/python/paddle/distributed/ps/utils/ps_program_builder.py index 25e4dc28bdc..d737542f323 100755 --- a/python/paddle/distributed/ps/utils/ps_program_builder.py +++ b/python/paddle/distributed/ps/utils/ps_program_builder.py @@ -95,11 +95,12 @@ class GeoPsProgramBuilder(PsProgramBuilder): # 仅 CPU 模式 class CpuSyncPsProgramBuilder(PsProgramBuilder): def __init__(self, pass_ctx): - logger.info("start building cpu-sync-ps program") super(CpuSyncPsProgramBuilder, self).__init__(pass_ctx) + if self.ps_mode == DistributedMode.SYNC: + logger.info("start building cpu-sync-ps program") if self.ps_mode != DistributedMode.SYNC and self.ps_mode != DistributedMode.ASYNC: raise ValueError("ps mode: {} not matched {}", - format(self.ps_mode, "CpuSyncPsProgramBuilder")) + format(self.ps_mode, "PsProgramBuilder")) def _build_trainer_programs(self): add_lr_decay_table_pass = new_pass("add_lr_decay_table_pass", diff --git a/python/paddle/distributed/ps/utils/public.py b/python/paddle/distributed/ps/utils/public.py index ebec6900e38..ab5bd7da09d 100755 --- a/python/paddle/distributed/ps/utils/public.py +++ b/python/paddle/distributed/ps/utils/public.py @@ -73,7 +73,9 @@ def logger_config(log_path, logging_name): return logger -logger = logger_config(log_path='/ps_log', logging_name='ps_log') +ps_log_root_dir = '/ps_log/' +logger = logger_config( + log_path='/ps_usr_print_log', logging_name='ps_usr_print_log') class DistributedMode: diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt old mode 100644 new mode 100755 index 2f6df075478..1443eebf293 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -627,7 +627,7 @@ set_tests_properties(test_norm_nn_grad PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE") set_tests_properties(test_nn_grad PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE") if(WITH_DISTRIBUTE) add_subdirectory(distributed_passes) - + add_subdirectory(ps) add_subdirectory(auto_parallel) # FIXME(typhoonzero): add these tests back diff --git a/python/paddle/fluid/tests/unittests/distributed_passes/ps_pass_test_base.py b/python/paddle/fluid/tests/unittests/distributed_passes/ps_pass_test_base.py index 63dd4b8e21e..93a0044a5e4 100755 --- a/python/paddle/fluid/tests/unittests/distributed_passes/ps_pass_test_base.py +++ b/python/paddle/fluid/tests/unittests/distributed_passes/ps_pass_test_base.py @@ -23,13 +23,24 @@ import unittest import numpy as np from collections import OrderedDict from paddle.distributed.ps.utils.public import logger -from dist_pass_test_base import prepare_python_path_and_return_module, remove_path_if_exists +from paddle.fluid.tests.unittests.distributed_passes.dist_pass_test_base import prepare_python_path_and_return_module, remove_path_if_exists import paddle.distributed.fleet as fleet class PsPassTestBase(unittest.TestCase): def init(self): - raise NotImplementedError + self.config = {} + self.config['ps_mode_config'] = "" + self.config['worker_num'] = "1" + self.config['server_num'] = "1" + self.config['run_minimize'] = "0" + self.config['run_single_pass'] = "0" + self.config['run_the_one_ps'] = '0' + self.config['debug_new_minimize'] = "0" + self.config['debug_new_pass'] = "0" + self.config['debug_the_one_ps'] = '0' + self.config['log_dir'] = "" + self.config['applied_pass_name'] = "" def setUp(self): print('Ps setUp...') @@ -37,7 +48,7 @@ class PsPassTestBase(unittest.TestCase): def tearDown(self): print('Ps tearDown...') - def ps_launch(self, config, ps_mode="cpu-ps"): + def ps_launch(self, ps_mode="cpu-ps"): if ps_mode == "cpu-ps" or ps_mode == 'heter-ps': os.environ['WITH_DISTRIBUTE'] = 'ON' @@ -45,23 +56,26 @@ class PsPassTestBase(unittest.TestCase): sys.executable, "-u", ] + [ - "-m", "launch", "--log_dir", config['log_dir'], "--worker_num", - config['worker_num'], "--server_num", config['server_num'] + "-m", "launch", "--log_dir", self.config['log_dir'], + "--worker_num", self.config['worker_num'], "--server_num", + self.config['server_num'] ] if ps_mode == 'heter-ps': os.environ['FLAGS_START_PORT'] = '12004' cmd += [ - '--heter_worker_num', config['heter_worker_num'], - '--heter_devices', config['heter_devices'] + '--heter_worker_num', self.config['heter_worker_num'], + '--heter_devices', self.config['heter_devices'] ] cmd += [ - "../ps/ps_dnn_trainer.py", "-m", config['ps_mode_config'], - "--run_minimize", config['run_minimize'], "--run_single_pass", - config['run_single_pass'], "--debug_new_pass", - config['debug_new_pass'], "--debug_new_minimize", - config['debug_new_minimize'], "--applied_pass_name", - config['applied_pass_name'] + "../ps/ps_dnn_trainer.py", "-m", self.config['ps_mode_config'], + "--run_minimize", self.config['run_minimize'], + "--run_single_pass", self.config['run_single_pass'], + "--run_the_one_ps", self.config['run_the_one_ps'], + "--debug_new_pass", self.config['debug_new_pass'], + "--debug_new_minimize", self.config['debug_new_minimize'], + "--applied_pass_name", self.config['applied_pass_name'], + "--debug_the_one_ps", self.config['debug_the_one_ps'] ] elif ps_mode == "gpu-ps": os.environ['FLAGS_LAUNCH_BARRIER'] = '0' @@ -80,12 +94,14 @@ class PsPassTestBase(unittest.TestCase): cmd = [ sys.executable, "-u", "../ps/ps_dnn_trainer.py", "-m", - config['ps_mode_config'], "--run_minimize", - config['run_minimize'], "--run_single_pass", - config['run_single_pass'], "--debug_new_pass", - config['debug_new_pass'], "--debug_new_minimize", - config['debug_new_minimize'], "--applied_pass_name", - config['applied_pass_name'] + self.config['ps_mode_config'], "--run_minimize", + self.config['run_minimize'], "--run_single_pass", + self.config['run_single_pass'], "--run_the_one_ps", + self.config['run_the_one_ps'], "--debug_new_pass", + self.config['debug_new_pass'], "--debug_new_minimize", + self.config['debug_new_minimize'], "--applied_pass_name", + self.config['applied_pass_name'], "--debug_the_one_ps", + self.config['debug_the_one_ps'] ] cmd = [shlex.quote(c) for c in cmd] diff --git a/python/paddle/fluid/tests/unittests/distributed_passes/test_ps_trainer_pass.py b/python/paddle/fluid/tests/unittests/distributed_passes/test_ps_trainer_pass.py index b186869ee97..fd558ef0403 100755 --- a/python/paddle/fluid/tests/unittests/distributed_passes/test_ps_trainer_pass.py +++ b/python/paddle/fluid/tests/unittests/distributed_passes/test_ps_trainer_pass.py @@ -21,31 +21,26 @@ import numpy as np import paddle from ps_pass_test_base import * -from paddle.distributed.ps.utils.public import logger +from paddle.distributed.ps.utils.public import logger, ps_log_root_dir from paddle.fluid.tests.unittests.ps.ps_dnn_trainer import DnnTrainer class TestPsTrainerPass(PsPassTestBase): - def init(self): - self.config = {} - self.config['ps_mode_config'] = "" - self.config['worker_num'] = "1" - self.config['server_num'] = "1" - self.config['run_minimize'] = "0" - self.config['run_single_pass'] = "0" - self.config['debug_new_minimize'] = "0" - self.config['debug_new_pass'] = "0" - self.config['log_dir'] = "" - self.config['applied_pass_name'] = "" - def setUp(self): pass def tearDown(self): pass - def check(self): - pass + def check(self, file1, file2): + with open(file1, 'r', encoding='utf-8') as f: + text1 = f.read() + with open(file2, 'r', encoding='utf-8') as f: + text2 = f.read() + if text1 == text2: + return True + else: + return False def test_ps_optimizer_minimize_cpu_async(self): self.init() @@ -53,16 +48,21 @@ class TestPsTrainerPass(PsPassTestBase): self.config['run_minimize'] = '1' self.config['debug_new_minimize'] = '0' - self.config['log_dir'] = "/async_cpu_log_old_minimize" + self.config['log_dir'] = ps_log_root_dir + "async_cpu_log_old_minimize" remove_path_if_exists(self.config['log_dir']) - self.ps_launch(self.config) + self.ps_launch() self.config['debug_new_minimize'] = '1' - self.config['log_dir'] = "/async_cpu_log_new_minimize" + self.config['log_dir'] = ps_log_root_dir + "async_cpu_log_new_minimize" remove_path_if_exists(self.config['log_dir']) - self.ps_launch(self.config) + self.ps_launch() - self.check() + file1 = '/ps_log/async_run_minimize_debug:_0_worker_main.prototxt' + file2 = '/ps_log/async_run_minimize_debug:_1_worker_main.prototxt' + if self.check(file1, file2): + logger.info('test_ps_optimizer_minimize_cpu_async passed!') + else: + logger.error('test_ps_optimizer_minimize_cpu_async failed!') def test_ps_optimizer_minimize_cpu_sync(self): self.init() @@ -70,16 +70,22 @@ class TestPsTrainerPass(PsPassTestBase): self.config['run_minimize'] = '1' self.config['debug_new_minimize'] = '0' - self.config['log_dir'] = "/sync_cpu_log_old_minimize" + self.config['log_dir'] = ps_log_root_dir + "sync_cpu_log_old_minimize" remove_path_if_exists(self.config['log_dir']) - self.ps_launch(self.config) + self.ps_launch() self.config['debug_new_minimize'] = '1' - self.config['log_dir'] = "/sync_cpu_log_new_minimize" + self.config['log_dir'] = ps_log_root_dir + "sync_cpu_log_new_minimize" remove_path_if_exists(self.config['log_dir']) - self.ps_launch(self.config) - - self.check() + self.ps_launch() + ''' + file1 = '/ps_log/sync_run_minimize_debug:_0_worker_main.prototxt' + file2 = '/ps_log/sync_run_minimize_debug:_1_worker_main.prototxt' + if self.check(file1, file2): + logger.info('test_ps_optimizer_minimize_cpu_sync passed!') + else: + logger.error('test_ps_optimizer_minimize_cpu_sync failed!') + ''' def test_ps_optimizer_minimize_cpu_geo(self): self.init() @@ -87,16 +93,21 @@ class TestPsTrainerPass(PsPassTestBase): self.config['run_minimize'] = '1' self.config['debug_new_minimize'] = '0' - self.config['log_dir'] = "/geo_cpu_log_old_minimize" + self.config['log_dir'] = ps_log_root_dir + "geo_cpu_log_old_minimize" remove_path_if_exists(self.config['log_dir']) - self.ps_launch(self.config) + self.ps_launch() self.config['debug_new_minimize'] = '1' - self.config['log_dir'] = "/geo_cpu_log_new_minimize" + self.config['log_dir'] = ps_log_root_dir + "geo_cpu_log_new_minimize" remove_path_if_exists(self.config['log_dir']) - self.ps_launch(self.config) + self.ps_launch() - self.check() + file1 = '/ps_log/geo_run_minimize_debug:_0_worker_main.prototxt' + file2 = '/ps_log/geo_run_minimize_debug:_1_worker_main.prototxt' + if self.check(file1, file2): + logger.info('test_ps_optimizer_minimize_cpu_geo passed!') + else: + logger.error('test_ps_optimizer_minimize_cpu_geo failed!') # heter ps 二阶段 def test_ps_optimizer_minimize_heter(self): @@ -110,14 +121,24 @@ class TestPsTrainerPass(PsPassTestBase): self.config['ps_mode_config'] = "../ps/heter_ps_config.yaml" self.config['debug_new_minimize'] = '0' - self.config['log_dir'] = "/heter_log_old_minimize" + self.config['log_dir'] = ps_log_root_dir + "heter_log_old_minimize" remove_path_if_exists(self.config['log_dir']) - self.ps_launch(self.config, 'heter-ps') + self.ps_launch('heter-ps') self.config['debug_new_minimize'] = '1' - self.config['log_dir'] = "/heter_log_new_minimize" + self.config['log_dir'] = ps_log_root_dir + "heter_log_new_minimize" remove_path_if_exists(self.config['log_dir']) - self.ps_launch(self.config, 'heter-ps') + self.ps_launch('heter-ps') + ''' + file1 = '/ps_log/heter_run_minimize_debug:_0_worker_main.prototxt' + file2 = '/ps_log/heter_run_minimize_debug:_1_worker_main.prototxt' + file3 = '/ps_log/heter_run_minimize_debug:_0_heter_worker_main.prototxt' + file4 = '/ps_log/heter_run_minimize_debug:_1_heter_worker_main.prototxt' + if self.check(file1, file2) and self.check(file3, file4): + logger.info('test_ps_optimizer_minimize_heter passed!') + else: + logger.error('test_ps_optimizer_minimize_heter failed!') + ''' def test_ps_optimizer_minimize_gpu(self): self.init() @@ -125,29 +146,42 @@ class TestPsTrainerPass(PsPassTestBase): self.config['ps_mode_config'] = "../ps/gpu_ps_config.yaml" self.config['debug_new_minimize'] = '0' - self.ps_launch(self.config, "gpu-ps") + self.ps_launch("gpu-ps") self.config['debug_new_minimize'] = '1' - self.ps_launch(self.config, "gpu-ps") + self.ps_launch("gpu-ps") - self.check() + file1 = '/ps_log/gpubox_run_minimize_debug:_0_worker_main.prototxt' + file2 = '/ps_log/gpubox_run_minimize_debug:_1_worker_main.prototxt' + if self.check(file1, file2): + logger.info('test_ps_optimizer_minimize_gpu passed!') + else: + logger.error('test_ps_optimizer_minimize_gpu failed!') def test_append_send_ops_pass(self): self.init() self.config['run_single_pass'] = '1' + self.config['ps_mode_config'] = "../ps/cpu_async_ps_config.yaml" self.config['applied_pass_name'] = "append_send_ops_pass" self.config['debug_new_pass'] = '0' - self.config['log_dir'] = "/log_old_" + self.config['applied_pass_name'] + self.config['log_dir'] = ps_log_root_dir + "log_old_" + self.config[ + 'applied_pass_name'] remove_path_if_exists(self.config['log_dir']) - self.ps_launch(self.config, "cpu-ps") + self.ps_launch("cpu-ps") self.config['debug_new_pass'] = '1' - self.config['log_dir'] = "/log_new_" + self.config['applied_pass_name'] + self.config['log_dir'] = ps_log_root_dir + "log_new_" + self.config[ + 'applied_pass_name'] remove_path_if_exists(self.config['log_dir']) - self.ps_launch(self.config, "cpu-ps") - - self.check() + self.ps_launch("cpu-ps") + + file1 = '/ps_log/async_append_send_ops_pass_debug:_0_worker_main.prototxt' + file2 = '/ps_log/async_append_send_ops_pass_debug:_1_worker_main.prototxt' + if self.check(file1, file2): + logger.info('test_append_send_ops_pass passed!') + else: + logger.info('test_append_send_ops_pass failed!') def test_distributed_ops_pass(self): pass diff --git a/python/paddle/fluid/tests/unittests/ps/CMakeLists.txt b/python/paddle/fluid/tests/unittests/ps/CMakeLists.txt old mode 100644 new mode 100755 index 3aef3283b82..9af32a8aca7 --- a/python/paddle/fluid/tests/unittests/ps/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/ps/CMakeLists.txt @@ -3,6 +3,6 @@ string(REPLACE ".py" "" TEST_OPS "${TEST_OPS}") foreach(TEST_OP ${TEST_OPS}) py_test_modules(${TEST_OP} MODULES ${TEST_OP}) + list(APPEND TEST_OPS ${TEST_OP}) + set_tests_properties(${TEST_OP} PROPERTIES TIMEOUT 50) endforeach(TEST_OP) - -set_tests_properties(test_the_one_ps PROPERTIES TIMEOUT 50) diff --git a/python/paddle/fluid/tests/unittests/ps/ps_dnn_trainer.py b/python/paddle/fluid/tests/unittests/ps/ps_dnn_trainer.py index d08c1d41c89..bc87fc255a5 100755 --- a/python/paddle/fluid/tests/unittests/ps/ps_dnn_trainer.py +++ b/python/paddle/fluid/tests/unittests/ps/ps_dnn_trainer.py @@ -264,12 +264,16 @@ def parse_args(): '--run_minimize', type=int, default=0, help="test single pass") parser.add_argument( '--run_single_pass', type=int, default=0, help="test single pass") + parser.add_argument( + '--run_the_one_ps', type=int, default=0, help="test the_one_ps") parser.add_argument( '--debug_new_minimize', type=int, default=0, help="test single pass") parser.add_argument( '--debug_new_pass', type=int, default=0, help="test single pass") parser.add_argument( '--applied_pass_name', type=str, default="", help="test single pass") + parser.add_argument( + '--debug_the_one_ps', type=int, default=0, help="test the_one_ps") args = parser.parse_args() args.abs_dir = os.path.dirname(os.path.abspath(args.config_yaml)) @@ -280,9 +284,11 @@ def parse_args(): config["pure_bf16"] = args.pure_bf16 config['run_minimize'] = args.run_minimize config['run_single_pass'] = args.run_single_pass + config['run_the_one_ps'] = args.run_the_one_ps config['debug_new_minimize'] = args.debug_new_minimize config['debug_new_pass'] = args.debug_new_pass config['applied_pass_name'] = args.applied_pass_name + config['debug_the_one_ps'] = args.debug_the_one_ps yaml_helper.print_yaml(config) return config @@ -344,15 +350,15 @@ class DnnTrainer(object): fleet_obj.minimize(loss) if fleet.is_server(): - _main_file = '/' + sync_mode + '_run_minimize' + '_debug:_' + str( + _main_file = ps_log_root_dir + sync_mode + '_run_minimize' + '_debug:_' + str( self.config['debug_new_minimize']) + '_server_main.prototxt' debug_program(_main_file, loss.block.program) elif fleet.is_worker(): - _main_file = '/' + sync_mode + '_run_minimize' + '_debug:_' + str( + _main_file = ps_log_root_dir + sync_mode + '_run_minimize' + '_debug:_' + str( self.config['debug_new_minimize']) + '_worker_main.prototxt' debug_program(_main_file, loss.block.program) elif self.role_maker._is_heter_worker(): - _main_file = '/' + sync_mode + '_run_minimize' + '_debug:_' + str( + _main_file = ps_log_root_dir + sync_mode + '_run_minimize' + '_debug:_' + str( self.config[ 'debug_new_minimize']) + '_heter_worker_main.prototxt' debug_program(_main_file, loss.block.program) @@ -397,16 +403,84 @@ class DnnTrainer(object): _main = worker.append_send_ops_pass(_main, compiled_config) if fleet.is_server(): - _main_file = '/' + sync_mode + "_" + str(config[ + _main_file = ps_log_root_dir + sync_mode + "_" + str(config[ "applied_pass_name"]) + '_debug:_' + str(self.config[ 'debug_new_pass']) + '_server_main.prototxt' debug_program(_main_file, _main) elif fleet.is_worker(): - _main_file = '/' + sync_mode + "_" + str(config[ + _main_file = ps_log_root_dir + sync_mode + "_" + str(config[ "applied_pass_name"]) + '_debug:_' + str(self.config[ 'debug_new_pass']) + '_worker_main.prototxt' debug_program(_main_file, _main) + def run_the_one_ps(self): + self.init_fleet_with_gloo() + self.model = get_model(self.config) + self.input_data = self.model.create_feeds() + self.metrics = self.model.net(self.input_data) + loss = self.model._cost + user_defined_strategy = get_user_defined_strategy(self.config) + learning_rate = self.config.get( + "hyper_parameters.optimizer.learning_rate") + sync_mode = self.config.get("runner.sync_mode") + inner_optimizer = paddle.optimizer.Adam(learning_rate, lazy_mode=True) + + self.role_maker._generate_role() # 必要 + if self.config['debug_the_one_ps'] == 1: + logger.info("entering run_the_one_ps -- new") + + from paddle.distributed.fleet.meta_optimizers.ps_optimizer import ParameterServerOptimizer + ps_optimizer = ParameterServerOptimizer(inner_optimizer) + ps_optimizer._set_basic_info(loss, self.role_maker, inner_optimizer, + user_defined_strategy) + ps_optimizer.minimize_impl(loss) + + from paddle.distributed.ps.the_one_ps import TheOnePSRuntime + _runtime_handle = TheOnePSRuntime() # ps 目录下重构版的 TheOnePSRuntime + _runtime_handle._set_basic_info(ps_optimizer.pass_ctx._attrs) + if fleet.is_worker(): + worker_desc = _runtime_handle.ps_desc_builder.build_worker_desc( + ) + with open(ps_log_root_dir + sync_mode + '_' + + 'new_worker_ps_desc', 'w') as f: + f.write(worker_desc) + if fleet.is_server(): + server_desc = _runtime_handle.ps_desc_builder.build_server_desc( + ) + with open(ps_log_root_dir + sync_mode + '_' + + 'new_server_ps_desc', 'w') as f: + f.write(server_desc) + + else: + pass + ''' + logger.info("entering run_the_one_ps -- old") + fleet_obj = fleet.distributed_optimizer( + inner_optimizer, user_defined_strategy) + fleet_obj.minimize(loss) + if fleet.is_worker(): + worker_desc = fleet_obj._runtime_handle._get_fleet_proto(is_server=False, is_sync=False) + server_desc = fleet_obj._runtime_handle._get_fleet_proto(is_server=True, is_sync=False) + with open(ps_log_root_dir + sync_mode + '_' + 'worker_ps_desc', 'w') as f: + f.write(str(worker_desc) + str(server_desc)) + if fleet.is_server(): + server_desc = fleet_obj._runtime_handle._get_fleet_proto(is_server=True, is_sync=False) + with open(ps_log_root_dir + sync_mode + '_' + 'server_ps_desc', 'w') as f: + f.write(str(server_desc) + str(fleet_obj._runtime_handle._get_fs_client_desc().to_string())) + ''' + if fleet.is_server(): + _main_file = ps_log_root_dir + sync_mode + '_run_the_one_ps' + '_debug:_' + str( + self.config['debug_the_one_ps']) + '_server_main.prototxt' + debug_program(_main_file, loss.block.program) + elif fleet.is_worker(): + _main_file = ps_log_root_dir + sync_mode + '_run_the_one_ps' + '_debug:_' + str( + self.config['debug_the_one_ps']) + '_worker_main.prototxt' + debug_program(_main_file, loss.block.program) + elif self.role_maker._is_heter_worker(): + _main_file = ps_log_root_dir + sync_mode + '_run_the_one_ps' + '_debug:_' + str( + self.config['debug_the_one_ps']) + '_heter_worker_main.prototxt' + debug_program(_main_file, loss.block.program) + if __name__ == "__main__": paddle.enable_static() @@ -418,3 +492,5 @@ if __name__ == "__main__": benchmark_main.run_single_pass() elif config['run_minimize'] == 1: benchmark_main.run_minimize() + elif config['run_the_one_ps'] == 1: + benchmark_main.run_the_one_ps() diff --git a/python/paddle/fluid/tests/unittests/ps/test_the_one_ps.py b/python/paddle/fluid/tests/unittests/ps/test_the_one_ps.py old mode 100644 new mode 100755 index 78bae0e50c5..8dddc6abd4c --- a/python/paddle/fluid/tests/unittests/ps/test_the_one_ps.py +++ b/python/paddle/fluid/tests/unittests/ps/test_the_one_ps.py @@ -22,16 +22,100 @@ import numpy as np import paddle import paddle.fluid as fluid +import paddle +from paddle.fluid.tests.unittests.distributed_passes.ps_pass_test_base import * +from paddle.distributed.ps.utils.public import logger, ps_log_root_dir +from ps_dnn_trainer import DnnTrainer +from paddle.distributed.fleet.proto import ps_pb2 +from google.protobuf import text_format + -class TestTheOnePs(unittest.TestCase): +class TestTheOnePs(PsPassTestBase): def setUp(self): - print('setUp...') + pass def tearDown(self): - print('tearDown...') + pass - def test_main(self): + def check(self, file1, file2): pass + ''' + f = open(file1, "rb") + ps_desc_1 = ps_pb2.PSParameter() + text_format.Parse(f.read(), ps_desc_1) + f.close() + + f = open(file2, "rb") + ps_desc_2 = ps_pb2.PSParameter() + text_format.Parse(f.read(), ps_desc_2) + f.close() + str1 = text_format.MessageToString(ps_desc_1) + str2 = text_format.MessageToString(ps_desc_2) + #logger.info('### msg10: {}'.format(str1)) + #logger.info('### msg20: {}'.format(str2)) + if str1 == str2: + return True + else: + return False + ''' + + def test_ps_cpu_async(self): + self.init() + self.config['ps_mode_config'] = "../ps/cpu_async_ps_config.yaml" + self.config['run_the_one_ps'] = '1' + + self.config['debug_the_one_ps'] = '0' + self.config[ + 'log_dir'] = ps_log_root_dir + "async_cpu_log_old_the_one_ps" + remove_path_if_exists(self.config['log_dir']) + self.ps_launch() + + self.config['debug_the_one_ps'] = '1' + self.config[ + 'log_dir'] = ps_log_root_dir + "async_cpu_log_new_the_one_ps" + remove_path_if_exists(self.config['log_dir']) + self.ps_launch() + + desc1 = '/ps_desc_baseline/async_worker_ps_desc' + desc2 = '/ps_log/async_new_worker_ps_desc' + desc3 = '/ps_desc_baseline/async_server_ps_desc' + desc4 = '/ps_log/async_new_server_ps_desc' + if self.check(desc1, desc2): + logger.info('test_ps_cpu_async ps_desc: worker passed!') + else: + logger.info('test_ps_cpu_async ps_desc: worker failed!') + if self.check(desc3, desc4): + logger.info('test_ps_cpu_async ps_desc: server passed!') + else: + logger.info('test_ps_cpu_async ps_desc: server failed!') + + def test_ps_cpu_geo(self): + self.init() + self.config['ps_mode_config'] = "../ps/cpu_geo_ps_config.yaml" + self.config['run_the_one_ps'] = '1' + + self.config['debug_the_one_ps'] = '0' + self.config['log_dir'] = ps_log_root_dir + "geo_cpu_log_old_the_one_ps" + remove_path_if_exists(self.config['log_dir']) + self.ps_launch() + + self.config['debug_the_one_ps'] = '1' + self.config['log_dir'] = ps_log_root_dir + "geo_cpu_log_new_the_one_ps" + remove_path_if_exists(self.config['log_dir']) + self.ps_launch() + + desc1 = '/ps_desc_baseline/geo_worker_ps_desc' + desc2 = '/ps_log/geo_new_worker_ps_desc' + desc3 = '/ps_desc_baseline/geo_server_ps_desc' + desc4 = '/ps_log/geo_new_server_ps_desc' + if self.check(desc1, desc2): + logger.info('test_ps_cpu_geo ps_desc: worker passed!') + else: + logger.info('test_ps_cpu_geo ps_desc: worker failed!') + if self.check(desc3, desc4): + logger.info('test_ps_cpu_geo ps_desc: server passed!') + else: + logger.info('test_ps_cpu_geo ps_desc: server failed!') if __name__ == '__main__': diff --git a/python/paddle/fluid/tests/unittests/ps_dnn_model.py b/python/paddle/fluid/tests/unittests/ps_dnn_model.py index 0a147334dab..8d91e0f4678 100755 --- a/python/paddle/fluid/tests/unittests/ps_dnn_model.py +++ b/python/paddle/fluid/tests/unittests/ps_dnn_model.py @@ -74,6 +74,7 @@ class DNNLayer(nn.Layer): else: emb = self.embedding(s_input) emb = paddle.reshape(emb, shape=[-1, self.sparse_feature_dim]) + # emb.stop_gradient = True sparse_embs.append(emb) y_dnn = paddle.concat(x=sparse_embs + [dense_inputs], axis=1) -- GitLab