From b12af9e1d9980935f90ac3264797110f9671589e Mon Sep 17 00:00:00 2001 From: wangguanqun Date: Wed, 13 Apr 2022 23:32:14 +0800 Subject: [PATCH] the one ps proto (#41659) * the one ps proto * the one ps proto * fix * fix * fix * fix windows ci * fix windows ci * add dependency * add dependency --- paddle/fluid/distributed/CMakeLists.txt | 20 +- .../{ps.proto => the_one_ps.proto} | 2 +- paddle/fluid/framework/CMakeLists.txt | 5 +- .../framework/distributed_strategy.proto | 2 +- paddle/fluid/framework/the_one_ps.proto | 213 ------------------ .../fleet/base/distributed_strategy.py | 4 +- python/paddle/distributed/ps/the_one_ps.py | 13 +- 7 files changed, 29 insertions(+), 230 deletions(-) rename paddle/fluid/distributed/{ps.proto => the_one_ps.proto} (99%) delete mode 100755 paddle/fluid/framework/the_one_ps.proto diff --git a/paddle/fluid/distributed/CMakeLists.txt b/paddle/fluid/distributed/CMakeLists.txt index 06b0583eddf..0091c14bfd1 100644 --- a/paddle/fluid/distributed/CMakeLists.txt +++ b/paddle/fluid/distributed/CMakeLists.txt @@ -1,11 +1,29 @@ add_subdirectory(collective) add_subdirectory(store) +if(WITH_PYTHON) + py_proto_compile(ps_py_proto SRCS the_one_ps.proto) + add_custom_target(ps_py_proto_init ALL + COMMAND ${CMAKE_COMMAND} -E make_directory ${PADDLE_BINARY_DIR}/python/paddle/distributed/fleet/proto) + if (NOT WIN32) + add_custom_command(TARGET ps_py_proto POST_BUILD + COMMAND mv the_one_ps_pb2.py ${PADDLE_BINARY_DIR}/python/paddle/distributed/fleet/proto/) + else(NOT WIN32) + string(REPLACE "/" "\\" fleet_proto_dstpath "${PADDLE_BINARY_DIR}/python/paddle/distributed/fleet/proto/") + add_custom_command(TARGET ps_py_proto POST_BUILD + COMMAND copy /Y the_one_ps_pb2.py ${fleet_proto_dstpath} + COMMENT "Copy generated python the_one_ps_pb2 into directory ${fleet_proto_dstpath}.") + endif(NOT WIN32) +endif() + if(NOT WITH_PSCORE) add_subdirectory(fleet_executor) return() endif() -proto_library(ps_framework_proto SRCS ps.proto) +proto_library(ps_framework_proto SRCS the_one_ps.proto) +add_custom_command(TARGET ps_framework_proto POST_BUILD + COMMAND mv the_one_ps.pb.h ps.pb.h + COMMAND mv the_one_ps.pb.cc ps.pb.cc) set(DISTRIBUTE_COMPILE_FLAGS "-Wno-error=unused-value -Wno-non-virtual-dtor -Wno-error=non-virtual-dtor -Wno-error=delete-non-virtual-dtor -Wno-error=sign-compare -Wno-error=unused-variable -Wno-error=return-type -Wno-error=unused-but-set-variable -Wno-error=type-limits -Wno-error=unknown-pragmas -Wno-error=parentheses -Wno-error=unused-result") diff --git a/paddle/fluid/distributed/ps.proto b/paddle/fluid/distributed/the_one_ps.proto similarity index 99% rename from paddle/fluid/distributed/ps.proto rename to paddle/fluid/distributed/the_one_ps.proto index 9bfa2c05efa..34b11dfd1c5 100644 --- a/paddle/fluid/distributed/ps.proto +++ b/paddle/fluid/distributed/the_one_ps.proto @@ -233,4 +233,4 @@ message GraphFeature { repeated string name = 1; repeated string dtype = 2; repeated int32 shape = 3; -} \ No newline at end of file +} diff --git a/paddle/fluid/framework/CMakeLists.txt b/paddle/fluid/framework/CMakeLists.txt index 1b9943df1b0..ad9f37b98bd 100755 --- a/paddle/fluid/framework/CMakeLists.txt +++ b/paddle/fluid/framework/CMakeLists.txt @@ -237,7 +237,6 @@ if(WITH_PYTHON) py_proto_compile(trainer_py_proto SRCS trainer_desc.proto data_feed.proto) py_proto_compile(distributed_strategy_py_proto SRCS distributed_strategy.proto) py_proto_compile(pass_desc_py_proto SRCS pass_desc.proto) - py_proto_compile(ps_py_proto SRCS the_one_ps.proto) #Generate an empty \ #__init__.py to make framework_py_proto as a valid python module. add_custom_target(fleet_proto_init ALL @@ -245,13 +244,12 @@ if(WITH_PYTHON) COMMAND ${CMAKE_COMMAND} -E touch ${PADDLE_BINARY_DIR}/python/paddle/distributed/fleet/proto/__init__.py ) add_custom_target(framework_py_proto_init ALL COMMAND ${CMAKE_COMMAND} -E touch __init__.py) - add_dependencies(framework_py_proto framework_py_proto_init trainer_py_proto distributed_strategy_py_proto fleet_proto_init pass_desc_py_proto ps_py_proto) + add_dependencies(framework_py_proto framework_py_proto_init trainer_py_proto distributed_strategy_py_proto fleet_proto_init pass_desc_py_proto ps_py_proto ps_py_proto_init) if (NOT WIN32) add_custom_command(TARGET framework_py_proto POST_BUILD COMMAND ${CMAKE_COMMAND} -E make_directory ${PADDLE_BINARY_DIR}/python/paddle/fluid/proto COMMAND cp *.py ${PADDLE_BINARY_DIR}/python/paddle/fluid/proto/ COMMAND cp distributed_strategy_*.py ${PADDLE_BINARY_DIR}/python/paddle/distributed/fleet/proto - COMMAND cp the_one_ps_pb2.py ${PADDLE_BINARY_DIR}/python/paddle/distributed/fleet/proto COMMENT "Copy generated python proto into directory paddle/fluid/proto." WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}) add_custom_target(fleet_executor_proto_init ALL DEPENDS fleet_proto_init fleet_executor_desc_py_proto @@ -263,7 +261,6 @@ if(WITH_PYTHON) add_custom_command(TARGET framework_py_proto POST_BUILD COMMAND ${CMAKE_COMMAND} -E make_directory ${PADDLE_BINARY_DIR}/python/paddle/fluid/proto COMMAND copy /Y *.py ${proto_dstpath} - COMMAND copy /Y the_one_ps_pb2.py ${fleet_proto_dstpath} COMMAND copy /Y distributed_strategy_*.py ${fleet_proto_dstpath} COMMENT "Copy generated python proto into directory paddle/fluid/proto." COMMENT "Copy generated python proto into directory paddle/distributed/fleet/proto." diff --git a/paddle/fluid/framework/distributed_strategy.proto b/paddle/fluid/framework/distributed_strategy.proto index c94a344f74d..9b0a033856d 100644 --- a/paddle/fluid/framework/distributed_strategy.proto +++ b/paddle/fluid/framework/distributed_strategy.proto @@ -310,7 +310,7 @@ message DistributedStrategy { optional bool asp = 33 [ default = false ]; optional bool fuse_grad_merge = 34 [ default = false ]; optional bool semi_auto = 35 [ default = false ]; - optional bool adam_d2sum = 36 [ default = true ]; + optional bool adam_d2sum = 36 [ default = false ]; optional bool auto_search = 37 [ default = false ]; optional bool heter_ccl_mode = 38 [ default = false ]; diff --git a/paddle/fluid/framework/the_one_ps.proto b/paddle/fluid/framework/the_one_ps.proto deleted file mode 100755 index 0ae87812bce..00000000000 --- a/paddle/fluid/framework/the_one_ps.proto +++ /dev/null @@ -1,213 +0,0 @@ -// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -syntax = "proto2"; -package paddle.distributed; -option cc_generic_services = true; -option cc_enable_arenas = true; - -message FsClientParameter { - enum FsApiType { - HDFS = 0; - AFS = 1; - } - optional FsApiType fs_type = 1 [ default = HDFS ]; - optional string uri = 2; // such as afs://xxx.afs.com:9902 - optional string user = 3; // user_name to access fs - optional string passwd = 4; // password - optional int32 buffer_size = 5; // buffer for read/write - optional string hadoop_bin = 51; - optional string afs_conf = 101; -} - -message PSParameter { - optional string worker_class = 1; - optional string server_class = 2; - optional string instance_class = 3; - optional string init_gflags = 4 [ default = "" ]; - optional WorkerParameter worker_param = 101; - optional ServerParameter server_param = 102; - repeated DownpourTrainerParameter trainer_param = 301; - optional FsClientParameter fs_client_param = 501; -} - -message WorkerParameter { - optional DownpourWorkerParameter downpour_worker_param = 1; -} - -message DownpourWorkerParameter { - repeated TableParameter downpour_table_param = 1; -} - -message DownpourServerParameter { - repeated TableParameter downpour_table_param = 1; - optional ServerServiceParameter service_param = 2; -} - -message ServerParameter { - optional DownpourServerParameter downpour_server_param = 1; -} - -message DownpourTrainerParameter { - repeated DenseTableParameter dense_table = 1; - repeated SparseTableParameter sparse_table = 2; - optional int32 push_sparse_per_batch = 3; - optional int32 push_dense_per_batch = 4; - repeated string skip_op = 5; - repeated ProgramConfig program_config = 6; -} - -message DenseTableParameter { - optional int32 table_id = 1; - repeated string dense_variable_name = 2; - repeated string dense_gradient_variable_name = 3; - optional int32 fea_dim = 4; -} - -message SparseTableParameter { - optional int32 table_id = 1; - optional int32 feature_dim = 2; - repeated string slot_key = 3; - repeated string slot_value = 4; - repeated string slot_gradient = 5; -} - -message ServerServiceParameter { - optional string server_class = 1 [ default = "BrpcPsServer" ]; - optional string client_class = 2 [ default = "BrpcPsClient" ]; - optional string service_class = 3 [ default = "BrpcPsService" ]; - optional uint32 start_server_port = 4 - [ default = 0 ]; // will find a avaliable port from it - optional uint32 server_thread_num = 5 [ default = 12 ]; -} - -message ProgramConfig { - required string program_id = 1; - repeated int32 push_sparse_table_id = 2; - repeated int32 push_dense_table_id = 3; - repeated int32 pull_sparse_table_id = 4; - repeated int32 pull_dense_table_id = 5; -} - -enum TableType { - PS_SPARSE_TABLE = 0; - PS_DENSE_TABLE = 1; - PS_OTHER_TABLE = 2; -} - -message TableParameter { - optional uint64 table_id = 1; - optional string table_class = 2; - optional uint64 shard_num = 3 [ default = 1000 ]; - optional TableAccessorParameter accessor = 4; - optional TensorAccessorParameter tensor = 5; - optional CommonAccessorParameter common = 6; - optional TableType type = 7; - optional bool compress_in_save = 8 [ default = false ]; -} - -message TableAccessorParameter { - optional string accessor_class = 1; - optional uint32 fea_dim = 4 [ default = 11 ]; - optional uint32 embedx_dim = 5 [ default = 8 ]; - optional uint32 embedx_threshold = 6 [ default = 10 ]; - optional CtrAccessorParameter ctr_accessor_param = 7; - repeated TableAccessorSaveParameter table_accessor_save_param = 8; - optional SparseCommonSGDRuleParameter embed_sgd_param = 10; - optional SparseCommonSGDRuleParameter embedx_sgd_param = 11; -} - -message CtrAccessorParameter { - optional float nonclk_coeff = 1 - [ default = 0.1 ]; // to calculate show_click_score - optional float click_coeff = 2 - [ default = 1 ]; // to calculate show_click_score - optional float base_threshold = 3 [ - default = 1.5 - ]; // show_click_score > base_threshold, this feature can be saved - optional float delta_threshold = 4 - [ default = - 0.25 ]; // delta_score > delta_threshold, this feature can be saved - optional float delta_keep_days = 5 - [ default = - 16 ]; // unseen_day < delta_keep_days, this feature can be saved - optional float show_click_decay_rate = 6 [ - default = 0.98 - ]; // show/click will update to show/click * show_click_decay_rate after a day - optional float delete_threshold = 7 - [ default = 0.8 ]; // threshold to shrink a feasign - optional float delete_after_unseen_days = 8 - [ default = 30 ]; // unseen_day > delete_after_unseen_days, this feature - // will be delete in shrink_model - optional int32 ssd_unseenday_threshold = 9 - [ default = 1 ]; // threshold to save ssd -} - -message TensorAccessorParameter { - optional string feed_var_name = 1; - optional string fetch_var_name = 2; - optional int64 startup_program_id = 3; - optional int64 main_program_id = 4; - optional string tensor_table_class = 6; -} - -message CommonAccessorParameter { - optional string name = 1; - optional string table_name = 2; - repeated string attributes = 3; - repeated string params = 4; - repeated uint32 dims = 5; - repeated string initializers = 6; - optional string entry = 7; - optional int32 trainer_num = 8; - optional bool sync = 9; - optional uint32 table_num = 10; - optional uint32 table_dim = 11; -} - -message TableAccessorSaveParameter { - optional uint32 param = 1; - optional string converter = 2; - optional string deconverter = 3; -} - -message SparseCommonSGDRuleParameter { - optional string name = 1; - optional SparseNaiveSGDRuleParameter naive = 2; - optional SparseAdagradSGDRuleParameter adagrad = 3; - optional SparseAdamSGDParameter adam = 4; -} - -message SparseNaiveSGDRuleParameter { // SparseNaiveSGDRule - optional double learning_rate = 1 [ default = 0.05 ]; - optional double initial_range = 2 [ default = 0.0001 ]; - repeated float weight_bounds = 3; -} - -message - SparseAdagradSGDRuleParameter { // SparseAdaGradSGDRule|StdAdaGradSGDRule - optional double learning_rate = 1 [ default = 0.05 ]; - optional double initial_g2sum = 2 [ default = 3.0 ]; - optional double initial_range = 3 [ default = 0.0001 ]; - repeated float weight_bounds = 4; -} - -message SparseAdamSGDParameter { // SparseAdamSGDRule - optional double learning_rate = 1 [ default = 0.001 ]; - optional double initial_range = 2 [ default = 0.0001 ]; - optional double beta1_decay_rate = 3 [ default = 0.9 ]; - optional double beta2_decay_rate = 4 [ default = 0.999 ]; - optional double ada_epsilon = 5 [ default = 1e-08 ]; - repeated float weight_bounds = 6; -} diff --git a/python/paddle/distributed/fleet/base/distributed_strategy.py b/python/paddle/distributed/fleet/base/distributed_strategy.py index 199418ab779..c46b6eeb048 100644 --- a/python/paddle/distributed/fleet/base/distributed_strategy.py +++ b/python/paddle/distributed/fleet/base/distributed_strategy.py @@ -404,7 +404,7 @@ class DistributedStrategy(object): def adam_d2sum(self): """ set adam_d2sum - Default value: True + Default value: False Examples: @@ -415,7 +415,7 @@ class DistributedStrategy(object): fleet.init(role_maker) strategy = fleet.DistributedStrategy() - strategy.adam_d2sum = True # by default this is True + strategy.adam_d2sum = True # by default this is False # code block for defining loss and local optimizer # sgd = fleet.distributed_optimizer(optimizer, strategy) diff --git a/python/paddle/distributed/ps/the_one_ps.py b/python/paddle/distributed/ps/the_one_ps.py index 1d23567b72a..5be739785ff 100755 --- a/python/paddle/distributed/ps/the_one_ps.py +++ b/python/paddle/distributed/ps/the_one_ps.py @@ -609,7 +609,6 @@ class SparseTable(Table): check_embedding_dim(table_proto.accessor, self.common.table_name, ctx.program_id(), self.context) - adam_d2sum = self.context["user_defined_strategy"].adam_d2sum self.common.parse_by_optimizer(ctx, self.context) self.common.parse_entry(self.common.table_name, ctx.program_id(), self.context) @@ -641,7 +640,6 @@ class GeoSparseTable(SparseTable): self.common.table_name = self.context['grad_name_to_param_name'][ ctx.origin_varnames()[0]] - adam_d2sum = self.context["user_defined_strategy"].adam_d2sum self.common.parse_by_optimizer(ctx, self.context) self.common.parse_entry(self.common.table_name, ctx.program_id(), self.context) @@ -673,7 +671,6 @@ class DenseTable(Table): table_proto.accessor.embedx_dim = 1 self.common.table_name = "MergedDense" - adam_d2sum = self.context["user_defined_strategy"].adam_d2sum self.common.parse_by_optimizer(ctx, self.context) self.common.parse_entry(self.common.table_name, ctx.program_id(), self.context) @@ -922,11 +919,6 @@ class TheOnePSRuntime(RuntimeBase): kwargs["trainer_id"] = self.role_maker._worker_index() return kwargs - proto_txt = worker_desc - debug = bool(int(os.getenv("PSERVER_DEBUG", "0"))) - if debug: - print("worker: \n{}".format(proto_txt)) - dense_map = get_the_one_recv_context( self.context, split_dense_table=self.is_heter_ps_mode) send_ctx = get_the_one_send_context( @@ -937,6 +929,7 @@ class TheOnePSRuntime(RuntimeBase): self._send_ctx = send_ctx trainer_config = self.context['trainer'] + proto_txt = worker_desc debug = bool(int(os.getenv("PSERVER_DEBUG", "0"))) if debug: print("worker: \n{}".format(proto_txt)) @@ -1060,6 +1053,10 @@ class TheOnePSRuntime(RuntimeBase): if self.is_heter_ps_mode: trainers += len(self.role_maker._get_heter_worker_endpoints()) + # debug = bool(int(os.getenv("PSERVER_DEBUG", "0"))) + # if debug: + # print("server: \n{}".format(server_desc)) + self._server = fluid.core.DistFleetWrapper() self._server.init_server(server_desc, self.string_hosts, role_id, trainers, self._server_sub_program) -- GitLab