未验证 提交 b12af9e1 编写于 作者: W wangguanqun 提交者: GitHub

the one ps proto (#41659)

* the one ps proto

* the one ps proto

* fix

* fix

* fix

* fix windows ci

* fix windows ci

* add dependency

* add dependency
上级 27a91b1a
add_subdirectory(collective) add_subdirectory(collective)
add_subdirectory(store) add_subdirectory(store)
if(WITH_PYTHON)
py_proto_compile(ps_py_proto SRCS the_one_ps.proto)
add_custom_target(ps_py_proto_init ALL
COMMAND ${CMAKE_COMMAND} -E make_directory ${PADDLE_BINARY_DIR}/python/paddle/distributed/fleet/proto)
if (NOT WIN32)
add_custom_command(TARGET ps_py_proto POST_BUILD
COMMAND mv the_one_ps_pb2.py ${PADDLE_BINARY_DIR}/python/paddle/distributed/fleet/proto/)
else(NOT WIN32)
string(REPLACE "/" "\\" fleet_proto_dstpath "${PADDLE_BINARY_DIR}/python/paddle/distributed/fleet/proto/")
add_custom_command(TARGET ps_py_proto POST_BUILD
COMMAND copy /Y the_one_ps_pb2.py ${fleet_proto_dstpath}
COMMENT "Copy generated python the_one_ps_pb2 into directory ${fleet_proto_dstpath}.")
endif(NOT WIN32)
endif()
if(NOT WITH_PSCORE) if(NOT WITH_PSCORE)
add_subdirectory(fleet_executor) add_subdirectory(fleet_executor)
return() return()
endif() endif()
proto_library(ps_framework_proto SRCS ps.proto) proto_library(ps_framework_proto SRCS the_one_ps.proto)
add_custom_command(TARGET ps_framework_proto POST_BUILD
COMMAND mv the_one_ps.pb.h ps.pb.h
COMMAND mv the_one_ps.pb.cc ps.pb.cc)
set(DISTRIBUTE_COMPILE_FLAGS "-Wno-error=unused-value -Wno-non-virtual-dtor -Wno-error=non-virtual-dtor -Wno-error=delete-non-virtual-dtor -Wno-error=sign-compare -Wno-error=unused-variable -Wno-error=return-type -Wno-error=unused-but-set-variable -Wno-error=type-limits -Wno-error=unknown-pragmas -Wno-error=parentheses -Wno-error=unused-result") set(DISTRIBUTE_COMPILE_FLAGS "-Wno-error=unused-value -Wno-non-virtual-dtor -Wno-error=non-virtual-dtor -Wno-error=delete-non-virtual-dtor -Wno-error=sign-compare -Wno-error=unused-variable -Wno-error=return-type -Wno-error=unused-but-set-variable -Wno-error=type-limits -Wno-error=unknown-pragmas -Wno-error=parentheses -Wno-error=unused-result")
......
...@@ -233,4 +233,4 @@ message GraphFeature { ...@@ -233,4 +233,4 @@ message GraphFeature {
repeated string name = 1; repeated string name = 1;
repeated string dtype = 2; repeated string dtype = 2;
repeated int32 shape = 3; repeated int32 shape = 3;
} }
\ No newline at end of file
...@@ -237,7 +237,6 @@ if(WITH_PYTHON) ...@@ -237,7 +237,6 @@ if(WITH_PYTHON)
py_proto_compile(trainer_py_proto SRCS trainer_desc.proto data_feed.proto) py_proto_compile(trainer_py_proto SRCS trainer_desc.proto data_feed.proto)
py_proto_compile(distributed_strategy_py_proto SRCS distributed_strategy.proto) py_proto_compile(distributed_strategy_py_proto SRCS distributed_strategy.proto)
py_proto_compile(pass_desc_py_proto SRCS pass_desc.proto) py_proto_compile(pass_desc_py_proto SRCS pass_desc.proto)
py_proto_compile(ps_py_proto SRCS the_one_ps.proto)
#Generate an empty \ #Generate an empty \
#__init__.py to make framework_py_proto as a valid python module. #__init__.py to make framework_py_proto as a valid python module.
add_custom_target(fleet_proto_init ALL add_custom_target(fleet_proto_init ALL
...@@ -245,13 +244,12 @@ if(WITH_PYTHON) ...@@ -245,13 +244,12 @@ if(WITH_PYTHON)
COMMAND ${CMAKE_COMMAND} -E touch ${PADDLE_BINARY_DIR}/python/paddle/distributed/fleet/proto/__init__.py COMMAND ${CMAKE_COMMAND} -E touch ${PADDLE_BINARY_DIR}/python/paddle/distributed/fleet/proto/__init__.py
) )
add_custom_target(framework_py_proto_init ALL COMMAND ${CMAKE_COMMAND} -E touch __init__.py) add_custom_target(framework_py_proto_init ALL COMMAND ${CMAKE_COMMAND} -E touch __init__.py)
add_dependencies(framework_py_proto framework_py_proto_init trainer_py_proto distributed_strategy_py_proto fleet_proto_init pass_desc_py_proto ps_py_proto) add_dependencies(framework_py_proto framework_py_proto_init trainer_py_proto distributed_strategy_py_proto fleet_proto_init pass_desc_py_proto ps_py_proto ps_py_proto_init)
if (NOT WIN32) if (NOT WIN32)
add_custom_command(TARGET framework_py_proto POST_BUILD add_custom_command(TARGET framework_py_proto POST_BUILD
COMMAND ${CMAKE_COMMAND} -E make_directory ${PADDLE_BINARY_DIR}/python/paddle/fluid/proto COMMAND ${CMAKE_COMMAND} -E make_directory ${PADDLE_BINARY_DIR}/python/paddle/fluid/proto
COMMAND cp *.py ${PADDLE_BINARY_DIR}/python/paddle/fluid/proto/ COMMAND cp *.py ${PADDLE_BINARY_DIR}/python/paddle/fluid/proto/
COMMAND cp distributed_strategy_*.py ${PADDLE_BINARY_DIR}/python/paddle/distributed/fleet/proto COMMAND cp distributed_strategy_*.py ${PADDLE_BINARY_DIR}/python/paddle/distributed/fleet/proto
COMMAND cp the_one_ps_pb2.py ${PADDLE_BINARY_DIR}/python/paddle/distributed/fleet/proto
COMMENT "Copy generated python proto into directory paddle/fluid/proto." COMMENT "Copy generated python proto into directory paddle/fluid/proto."
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}) WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
add_custom_target(fleet_executor_proto_init ALL DEPENDS fleet_proto_init fleet_executor_desc_py_proto add_custom_target(fleet_executor_proto_init ALL DEPENDS fleet_proto_init fleet_executor_desc_py_proto
...@@ -263,7 +261,6 @@ if(WITH_PYTHON) ...@@ -263,7 +261,6 @@ if(WITH_PYTHON)
add_custom_command(TARGET framework_py_proto POST_BUILD add_custom_command(TARGET framework_py_proto POST_BUILD
COMMAND ${CMAKE_COMMAND} -E make_directory ${PADDLE_BINARY_DIR}/python/paddle/fluid/proto COMMAND ${CMAKE_COMMAND} -E make_directory ${PADDLE_BINARY_DIR}/python/paddle/fluid/proto
COMMAND copy /Y *.py ${proto_dstpath} COMMAND copy /Y *.py ${proto_dstpath}
COMMAND copy /Y the_one_ps_pb2.py ${fleet_proto_dstpath}
COMMAND copy /Y distributed_strategy_*.py ${fleet_proto_dstpath} COMMAND copy /Y distributed_strategy_*.py ${fleet_proto_dstpath}
COMMENT "Copy generated python proto into directory paddle/fluid/proto." COMMENT "Copy generated python proto into directory paddle/fluid/proto."
COMMENT "Copy generated python proto into directory paddle/distributed/fleet/proto." COMMENT "Copy generated python proto into directory paddle/distributed/fleet/proto."
......
...@@ -310,7 +310,7 @@ message DistributedStrategy { ...@@ -310,7 +310,7 @@ message DistributedStrategy {
optional bool asp = 33 [ default = false ]; optional bool asp = 33 [ default = false ];
optional bool fuse_grad_merge = 34 [ default = false ]; optional bool fuse_grad_merge = 34 [ default = false ];
optional bool semi_auto = 35 [ default = false ]; optional bool semi_auto = 35 [ default = false ];
optional bool adam_d2sum = 36 [ default = true ]; optional bool adam_d2sum = 36 [ default = false ];
optional bool auto_search = 37 [ default = false ]; optional bool auto_search = 37 [ default = false ];
optional bool heter_ccl_mode = 38 [ default = false ]; optional bool heter_ccl_mode = 38 [ default = false ];
......
// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto2";
package paddle.distributed;
option cc_generic_services = true;
option cc_enable_arenas = true;
message FsClientParameter {
enum FsApiType {
HDFS = 0;
AFS = 1;
}
optional FsApiType fs_type = 1 [ default = HDFS ];
optional string uri = 2; // such as afs://xxx.afs.com:9902
optional string user = 3; // user_name to access fs
optional string passwd = 4; // password
optional int32 buffer_size = 5; // buffer for read/write
optional string hadoop_bin = 51;
optional string afs_conf = 101;
}
message PSParameter {
optional string worker_class = 1;
optional string server_class = 2;
optional string instance_class = 3;
optional string init_gflags = 4 [ default = "" ];
optional WorkerParameter worker_param = 101;
optional ServerParameter server_param = 102;
repeated DownpourTrainerParameter trainer_param = 301;
optional FsClientParameter fs_client_param = 501;
}
message WorkerParameter {
optional DownpourWorkerParameter downpour_worker_param = 1;
}
message DownpourWorkerParameter {
repeated TableParameter downpour_table_param = 1;
}
message DownpourServerParameter {
repeated TableParameter downpour_table_param = 1;
optional ServerServiceParameter service_param = 2;
}
message ServerParameter {
optional DownpourServerParameter downpour_server_param = 1;
}
message DownpourTrainerParameter {
repeated DenseTableParameter dense_table = 1;
repeated SparseTableParameter sparse_table = 2;
optional int32 push_sparse_per_batch = 3;
optional int32 push_dense_per_batch = 4;
repeated string skip_op = 5;
repeated ProgramConfig program_config = 6;
}
message DenseTableParameter {
optional int32 table_id = 1;
repeated string dense_variable_name = 2;
repeated string dense_gradient_variable_name = 3;
optional int32 fea_dim = 4;
}
message SparseTableParameter {
optional int32 table_id = 1;
optional int32 feature_dim = 2;
repeated string slot_key = 3;
repeated string slot_value = 4;
repeated string slot_gradient = 5;
}
message ServerServiceParameter {
optional string server_class = 1 [ default = "BrpcPsServer" ];
optional string client_class = 2 [ default = "BrpcPsClient" ];
optional string service_class = 3 [ default = "BrpcPsService" ];
optional uint32 start_server_port = 4
[ default = 0 ]; // will find a avaliable port from it
optional uint32 server_thread_num = 5 [ default = 12 ];
}
message ProgramConfig {
required string program_id = 1;
repeated int32 push_sparse_table_id = 2;
repeated int32 push_dense_table_id = 3;
repeated int32 pull_sparse_table_id = 4;
repeated int32 pull_dense_table_id = 5;
}
enum TableType {
PS_SPARSE_TABLE = 0;
PS_DENSE_TABLE = 1;
PS_OTHER_TABLE = 2;
}
message TableParameter {
optional uint64 table_id = 1;
optional string table_class = 2;
optional uint64 shard_num = 3 [ default = 1000 ];
optional TableAccessorParameter accessor = 4;
optional TensorAccessorParameter tensor = 5;
optional CommonAccessorParameter common = 6;
optional TableType type = 7;
optional bool compress_in_save = 8 [ default = false ];
}
message TableAccessorParameter {
optional string accessor_class = 1;
optional uint32 fea_dim = 4 [ default = 11 ];
optional uint32 embedx_dim = 5 [ default = 8 ];
optional uint32 embedx_threshold = 6 [ default = 10 ];
optional CtrAccessorParameter ctr_accessor_param = 7;
repeated TableAccessorSaveParameter table_accessor_save_param = 8;
optional SparseCommonSGDRuleParameter embed_sgd_param = 10;
optional SparseCommonSGDRuleParameter embedx_sgd_param = 11;
}
message CtrAccessorParameter {
optional float nonclk_coeff = 1
[ default = 0.1 ]; // to calculate show_click_score
optional float click_coeff = 2
[ default = 1 ]; // to calculate show_click_score
optional float base_threshold = 3 [
default = 1.5
]; // show_click_score > base_threshold, this feature can be saved
optional float delta_threshold = 4
[ default =
0.25 ]; // delta_score > delta_threshold, this feature can be saved
optional float delta_keep_days = 5
[ default =
16 ]; // unseen_day < delta_keep_days, this feature can be saved
optional float show_click_decay_rate = 6 [
default = 0.98
]; // show/click will update to show/click * show_click_decay_rate after a day
optional float delete_threshold = 7
[ default = 0.8 ]; // threshold to shrink a feasign
optional float delete_after_unseen_days = 8
[ default = 30 ]; // unseen_day > delete_after_unseen_days, this feature
// will be delete in shrink_model
optional int32 ssd_unseenday_threshold = 9
[ default = 1 ]; // threshold to save ssd
}
message TensorAccessorParameter {
optional string feed_var_name = 1;
optional string fetch_var_name = 2;
optional int64 startup_program_id = 3;
optional int64 main_program_id = 4;
optional string tensor_table_class = 6;
}
message CommonAccessorParameter {
optional string name = 1;
optional string table_name = 2;
repeated string attributes = 3;
repeated string params = 4;
repeated uint32 dims = 5;
repeated string initializers = 6;
optional string entry = 7;
optional int32 trainer_num = 8;
optional bool sync = 9;
optional uint32 table_num = 10;
optional uint32 table_dim = 11;
}
message TableAccessorSaveParameter {
optional uint32 param = 1;
optional string converter = 2;
optional string deconverter = 3;
}
message SparseCommonSGDRuleParameter {
optional string name = 1;
optional SparseNaiveSGDRuleParameter naive = 2;
optional SparseAdagradSGDRuleParameter adagrad = 3;
optional SparseAdamSGDParameter adam = 4;
}
message SparseNaiveSGDRuleParameter { // SparseNaiveSGDRule
optional double learning_rate = 1 [ default = 0.05 ];
optional double initial_range = 2 [ default = 0.0001 ];
repeated float weight_bounds = 3;
}
message
SparseAdagradSGDRuleParameter { // SparseAdaGradSGDRule|StdAdaGradSGDRule
optional double learning_rate = 1 [ default = 0.05 ];
optional double initial_g2sum = 2 [ default = 3.0 ];
optional double initial_range = 3 [ default = 0.0001 ];
repeated float weight_bounds = 4;
}
message SparseAdamSGDParameter { // SparseAdamSGDRule
optional double learning_rate = 1 [ default = 0.001 ];
optional double initial_range = 2 [ default = 0.0001 ];
optional double beta1_decay_rate = 3 [ default = 0.9 ];
optional double beta2_decay_rate = 4 [ default = 0.999 ];
optional double ada_epsilon = 5 [ default = 1e-08 ];
repeated float weight_bounds = 6;
}
...@@ -404,7 +404,7 @@ class DistributedStrategy(object): ...@@ -404,7 +404,7 @@ class DistributedStrategy(object):
def adam_d2sum(self): def adam_d2sum(self):
""" """
set adam_d2sum set adam_d2sum
Default value: True Default value: False
Examples: Examples:
...@@ -415,7 +415,7 @@ class DistributedStrategy(object): ...@@ -415,7 +415,7 @@ class DistributedStrategy(object):
fleet.init(role_maker) fleet.init(role_maker)
strategy = fleet.DistributedStrategy() strategy = fleet.DistributedStrategy()
strategy.adam_d2sum = True # by default this is True strategy.adam_d2sum = True # by default this is False
# code block for defining loss and local optimizer # code block for defining loss and local optimizer
# sgd = fleet.distributed_optimizer(optimizer, strategy) # sgd = fleet.distributed_optimizer(optimizer, strategy)
......
...@@ -609,7 +609,6 @@ class SparseTable(Table): ...@@ -609,7 +609,6 @@ class SparseTable(Table):
check_embedding_dim(table_proto.accessor, self.common.table_name, check_embedding_dim(table_proto.accessor, self.common.table_name,
ctx.program_id(), self.context) ctx.program_id(), self.context)
adam_d2sum = self.context["user_defined_strategy"].adam_d2sum
self.common.parse_by_optimizer(ctx, self.context) self.common.parse_by_optimizer(ctx, self.context)
self.common.parse_entry(self.common.table_name, self.common.parse_entry(self.common.table_name,
ctx.program_id(), self.context) ctx.program_id(), self.context)
...@@ -641,7 +640,6 @@ class GeoSparseTable(SparseTable): ...@@ -641,7 +640,6 @@ class GeoSparseTable(SparseTable):
self.common.table_name = self.context['grad_name_to_param_name'][ self.common.table_name = self.context['grad_name_to_param_name'][
ctx.origin_varnames()[0]] ctx.origin_varnames()[0]]
adam_d2sum = self.context["user_defined_strategy"].adam_d2sum
self.common.parse_by_optimizer(ctx, self.context) self.common.parse_by_optimizer(ctx, self.context)
self.common.parse_entry(self.common.table_name, self.common.parse_entry(self.common.table_name,
ctx.program_id(), self.context) ctx.program_id(), self.context)
...@@ -673,7 +671,6 @@ class DenseTable(Table): ...@@ -673,7 +671,6 @@ class DenseTable(Table):
table_proto.accessor.embedx_dim = 1 table_proto.accessor.embedx_dim = 1
self.common.table_name = "MergedDense" self.common.table_name = "MergedDense"
adam_d2sum = self.context["user_defined_strategy"].adam_d2sum
self.common.parse_by_optimizer(ctx, self.context) self.common.parse_by_optimizer(ctx, self.context)
self.common.parse_entry(self.common.table_name, self.common.parse_entry(self.common.table_name,
ctx.program_id(), self.context) ctx.program_id(), self.context)
...@@ -922,11 +919,6 @@ class TheOnePSRuntime(RuntimeBase): ...@@ -922,11 +919,6 @@ class TheOnePSRuntime(RuntimeBase):
kwargs["trainer_id"] = self.role_maker._worker_index() kwargs["trainer_id"] = self.role_maker._worker_index()
return kwargs return kwargs
proto_txt = worker_desc
debug = bool(int(os.getenv("PSERVER_DEBUG", "0")))
if debug:
print("worker: \n{}".format(proto_txt))
dense_map = get_the_one_recv_context( dense_map = get_the_one_recv_context(
self.context, split_dense_table=self.is_heter_ps_mode) self.context, split_dense_table=self.is_heter_ps_mode)
send_ctx = get_the_one_send_context( send_ctx = get_the_one_send_context(
...@@ -937,6 +929,7 @@ class TheOnePSRuntime(RuntimeBase): ...@@ -937,6 +929,7 @@ class TheOnePSRuntime(RuntimeBase):
self._send_ctx = send_ctx self._send_ctx = send_ctx
trainer_config = self.context['trainer'] trainer_config = self.context['trainer']
proto_txt = worker_desc
debug = bool(int(os.getenv("PSERVER_DEBUG", "0"))) debug = bool(int(os.getenv("PSERVER_DEBUG", "0")))
if debug: if debug:
print("worker: \n{}".format(proto_txt)) print("worker: \n{}".format(proto_txt))
...@@ -1060,6 +1053,10 @@ class TheOnePSRuntime(RuntimeBase): ...@@ -1060,6 +1053,10 @@ class TheOnePSRuntime(RuntimeBase):
if self.is_heter_ps_mode: if self.is_heter_ps_mode:
trainers += len(self.role_maker._get_heter_worker_endpoints()) trainers += len(self.role_maker._get_heter_worker_endpoints())
# debug = bool(int(os.getenv("PSERVER_DEBUG", "0")))
# if debug:
# print("server: \n{}".format(server_desc))
self._server = fluid.core.DistFleetWrapper() self._server = fluid.core.DistFleetWrapper()
self._server.init_server(server_desc, self.string_hosts, role_id, self._server.init_server(server_desc, self.string_hosts, role_id,
trainers, self._server_sub_program) trainers, self._server_sub_program)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册