未验证 提交 1c4e3e5d 编写于 作者: Z ziyoujiyi 提交者: GitHub

new fleet_desc builder (#39948)

* delete gloo connect retry

* the_one_ps dirs reconstruct

* .

* .

* create the_one_ps dirs

* create the_one_ps dirs

* create the_one_ps dirs

* create the_one_ps dirs

* create the_one_ps dirs

* create the_one_ps dirs

* the one ps dirs modify

* the one ps dirs modify

* the one ps dirs modify

* the one ps dirs modify

* refactor ps optimize

* refactor ps optimize

* refactor ps optimize

* .

* .

* .

* .

* .

* .

* refactor theoneps

* the_one_ps

* add ps pass unittest

* add ps pass unittest

* ps unitest frame

* ps unittest frame

* ps unittest frame

* ps unittest frame

* ps unittest frame

* ps unittest frame

* ps unittest frame

* ps unittest frame

* ps unittest frame

* ps unittest frame

* ps unittest frame

* ps unittest frame

* ps unittest frame

* ps unittest frame

* ps unittest frame

* ps unittest frame

* ps unittest frame

* add cpu_async_ps_mode test

* add cpu_async_ps_mode test

* add cpu_async_ps_mode test

* ps unittest ready

* ps unittest ready

* solve dist_pass init conflict

* solve import CommContext error

* unittest ok

* implement AllocateFrom

* solve setup.py.in conflict

* solve conflict

* solve conflict

* solve conflict

* .

* .

* cpu-async-ps minimize test ok & gpu minimize test ok

* add heter 2stage unittest

* add heter 2stage unittest

* add heter 2stage unittest

* sync/geo test ok & fix heter_worker program ok

* .

* new fleet desc generator

* new fleet_desc builder

* new fleet_desc builder

* .

* .

* correct ps.proto compile

* .
Co-authored-by: Nzkh2016 <zhangkaihuo@baidu.com>
上级 f30b3f81
/* Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
\ No newline at end of file
...@@ -235,6 +235,7 @@ if(WITH_PYTHON) ...@@ -235,6 +235,7 @@ if(WITH_PYTHON)
py_proto_compile(trainer_py_proto SRCS trainer_desc.proto data_feed.proto) py_proto_compile(trainer_py_proto SRCS trainer_desc.proto data_feed.proto)
py_proto_compile(distributed_strategy_py_proto SRCS distributed_strategy.proto) py_proto_compile(distributed_strategy_py_proto SRCS distributed_strategy.proto)
py_proto_compile(pass_desc_py_proto SRCS pass_desc.proto) py_proto_compile(pass_desc_py_proto SRCS pass_desc.proto)
py_proto_compile(ps_py_proto SRCS ps.proto)
#Generate an empty \ #Generate an empty \
#__init__.py to make framework_py_proto as a valid python module. #__init__.py to make framework_py_proto as a valid python module.
add_custom_target(fleet_proto_init ALL add_custom_target(fleet_proto_init ALL
...@@ -242,12 +243,13 @@ if(WITH_PYTHON) ...@@ -242,12 +243,13 @@ if(WITH_PYTHON)
COMMAND ${CMAKE_COMMAND} -E touch ${PADDLE_BINARY_DIR}/python/paddle/distributed/fleet/proto/__init__.py COMMAND ${CMAKE_COMMAND} -E touch ${PADDLE_BINARY_DIR}/python/paddle/distributed/fleet/proto/__init__.py
) )
add_custom_target(framework_py_proto_init ALL COMMAND ${CMAKE_COMMAND} -E touch __init__.py) add_custom_target(framework_py_proto_init ALL COMMAND ${CMAKE_COMMAND} -E touch __init__.py)
add_dependencies(framework_py_proto framework_py_proto_init trainer_py_proto distributed_strategy_py_proto fleet_proto_init pass_desc_py_proto) add_dependencies(framework_py_proto framework_py_proto_init trainer_py_proto distributed_strategy_py_proto fleet_proto_init pass_desc_py_proto ps_py_proto)
if (NOT WIN32) if (NOT WIN32)
add_custom_command(TARGET framework_py_proto POST_BUILD add_custom_command(TARGET framework_py_proto POST_BUILD
COMMAND ${CMAKE_COMMAND} -E make_directory ${PADDLE_BINARY_DIR}/python/paddle/fluid/proto COMMAND ${CMAKE_COMMAND} -E make_directory ${PADDLE_BINARY_DIR}/python/paddle/fluid/proto
COMMAND cp *.py ${PADDLE_BINARY_DIR}/python/paddle/fluid/proto/ COMMAND cp *.py ${PADDLE_BINARY_DIR}/python/paddle/fluid/proto/
COMMAND cp distributed_strategy_*.py ${PADDLE_BINARY_DIR}/python/paddle/distributed/fleet/proto COMMAND cp distributed_strategy_*.py ${PADDLE_BINARY_DIR}/python/paddle/distributed/fleet/proto
COMMAND cp ps_pb2.py ${PADDLE_BINARY_DIR}/python/paddle/distributed/fleet/proto
COMMENT "Copy generated python proto into directory paddle/fluid/proto." COMMENT "Copy generated python proto into directory paddle/fluid/proto."
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}) WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
add_custom_target(fleet_executor_proto_init ALL DEPENDS fleet_proto_init fleet_executor_desc_py_proto add_custom_target(fleet_executor_proto_init ALL DEPENDS fleet_proto_init fleet_executor_desc_py_proto
...@@ -259,6 +261,7 @@ if(WITH_PYTHON) ...@@ -259,6 +261,7 @@ if(WITH_PYTHON)
add_custom_command(TARGET framework_py_proto POST_BUILD add_custom_command(TARGET framework_py_proto POST_BUILD
COMMAND ${CMAKE_COMMAND} -E make_directory ${PADDLE_BINARY_DIR}/python/paddle/fluid/proto COMMAND ${CMAKE_COMMAND} -E make_directory ${PADDLE_BINARY_DIR}/python/paddle/fluid/proto
COMMAND copy /Y *.py ${proto_dstpath} COMMAND copy /Y *.py ${proto_dstpath}
COMMAND copy /Y ps_pb2.py ${fleet_proto_dstpath}
COMMAND copy /Y distributed_strategy_*.py ${fleet_proto_dstpath} COMMAND copy /Y distributed_strategy_*.py ${fleet_proto_dstpath}
COMMENT "Copy generated python proto into directory paddle/fluid/proto." COMMENT "Copy generated python proto into directory paddle/fluid/proto."
COMMENT "Copy generated python proto into directory paddle/distributed/fleet/proto." COMMENT "Copy generated python proto into directory paddle/distributed/fleet/proto."
......
// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto2";
package paddle.distributed;
option cc_generic_services = true;
option cc_enable_arenas = true;
message FsClientParameter {
enum FsApiType {
HDFS = 0;
AFS = 1;
}
optional FsApiType fs_type = 1 [ default = HDFS ];
optional string uri = 2; // such as afs://xxx.afs.com:9902
optional string user = 3; // user_name to access fs
optional string passwd = 4; // password
optional int32 buffer_size = 5; // buffer for read/write
optional string hadoop_bin = 51;
optional string afs_conf = 101;
}
message PSParameter {
optional string worker_class = 1;
optional string server_class = 2;
optional string instance_class = 3;
optional string init_gflags = 4 [ default = "" ];
optional WorkerParameter worker_param = 101;
optional ServerParameter server_param = 102;
repeated DownpourTrainerParameter trainer_param = 301;
optional FsClientParameter fs_client_param = 501;
}
message WorkerParameter {
optional DownpourWorkerParameter downpour_worker_param = 1;
}
message DownpourWorkerParameter {
repeated TableParameter downpour_table_param = 1;
}
message DownpourServerParameter {
repeated TableParameter downpour_table_param = 1;
optional ServerServiceParameter service_param = 2;
}
message ServerParameter {
optional DownpourServerParameter downpour_server_param = 1;
}
message DownpourTrainerParameter {
repeated DenseTableParameter dense_table = 1;
repeated SparseTableParameter sparse_table = 2;
optional int32 push_sparse_per_batch = 3;
optional int32 push_dense_per_batch = 4;
repeated string skip_op = 5;
repeated ProgramConfig program_config = 6;
}
message DenseTableParameter {
optional int32 table_id = 1;
repeated string dense_variable_name = 2;
repeated string dense_gradient_variable_name = 3;
optional int32 fea_dim = 4;
}
message SparseTableParameter {
optional int32 table_id = 1;
optional int32 feature_dim = 2;
repeated string slot_key = 3;
repeated string slot_value = 4;
repeated string slot_gradient = 5;
}
message ServerServiceParameter {
optional string server_class = 1 [ default = "BrpcPsServer" ];
optional string client_class = 2 [ default = "BrpcPsClient" ];
optional string service_class = 3 [ default = "BrpcPsService" ];
optional uint32 start_server_port = 4
[ default = 0 ]; // will find a avaliable port from it
optional uint32 server_thread_num = 5 [ default = 12 ];
}
message ProgramConfig {
required string program_id = 1;
repeated int32 push_sparse_table_id = 2;
repeated int32 push_dense_table_id = 3;
repeated int32 pull_sparse_table_id = 4;
repeated int32 pull_dense_table_id = 5;
}
enum TableType {
PS_SPARSE_TABLE = 0;
PS_DENSE_TABLE = 1;
PS_OTHER_TABLE = 2;
}
message TableParameter {
optional uint64 table_id = 1;
optional string table_class = 2;
optional uint64 shard_num = 3 [ default = 1000 ];
optional TableAccessorParameter accessor = 4;
optional TensorAccessorParameter tensor = 5;
optional CommonAccessorParameter common = 6;
optional TableType type = 7;
optional bool compress_in_save = 8 [ default = false ];
}
message TableAccessorParameter {
optional string accessor_class = 1;
optional uint32 fea_dim = 4 [ default = 11 ];
optional uint32 embedx_dim = 5 [ default = 8 ];
optional uint32 embedx_threshold = 6 [ default = 10 ];
optional CtrAccessorParameter ctr_accessor_param = 7;
repeated TableAccessorSaveParameter table_accessor_save_param = 8;
optional SparseCommonSGDRuleParameter embed_sgd_param = 10;
optional SparseCommonSGDRuleParameter embedx_sgd_param = 11;
}
message CtrAccessorParameter {
optional float nonclk_coeff = 1
[ default = 0.1 ]; // to calculate show_click_score
optional float click_coeff = 2
[ default = 1 ]; // to calculate show_click_score
optional float base_threshold = 3 [
default = 1.5
]; // show_click_score > base_threshold, this feature can be saved
optional float delta_threshold = 4
[ default =
0.25 ]; // delta_score > delta_threshold, this feature can be saved
optional float delta_keep_days = 5
[ default =
16 ]; // unseen_day < delta_keep_days, this feature can be saved
optional float show_click_decay_rate = 6 [
default = 0.98
]; // show/click will update to show/click * show_click_decay_rate after a day
optional float delete_threshold = 7
[ default = 0.8 ]; // threshold to shrink a feasign
optional float delete_after_unseen_days = 8
[ default = 30 ]; // unseen_day > delete_after_unseen_days, this feature
// will be delete in shrink_model
optional int32 ssd_unseenday_threshold = 9
[ default = 1 ]; // threshold to save ssd
}
message TensorAccessorParameter {
optional string feed_var_name = 1;
optional string fetch_var_name = 2;
optional int64 startup_program_id = 3;
optional int64 main_program_id = 4;
optional string tensor_table_class = 6;
}
message CommonAccessorParameter {
optional string name = 1;
optional string table_name = 2;
repeated string attributes = 3;
repeated string params = 4;
repeated uint32 dims = 5;
repeated string initializers = 6;
optional string entry = 7;
optional int32 trainer_num = 8;
optional bool sync = 9;
optional uint32 table_num = 10;
optional uint32 table_dim = 11;
}
message TableAccessorSaveParameter {
optional uint32 param = 1;
optional string converter = 2;
optional string deconverter = 3;
}
message SparseCommonSGDRuleParameter {
optional string name = 1;
optional SparseNaiveSGDRuleParameter naive = 2;
optional SparseAdagradSGDRuleParameter adagrad = 3;
optional SparseAdamSGDParameter adam = 4;
}
message SparseNaiveSGDRuleParameter { // SparseNaiveSGDRule
optional double learning_rate = 1 [ default = 0.05 ];
optional double initial_range = 2 [ default = 0.0001 ];
repeated float weight_bounds = 3;
}
message
SparseAdagradSGDRuleParameter { // SparseAdaGradSGDRule|StdAdaGradSGDRule
optional double learning_rate = 1 [ default = 0.05 ];
optional double initial_g2sum = 2 [ default = 3.0 ];
optional double initial_range = 3 [ default = 0.0001 ];
repeated float weight_bounds = 4;
}
message SparseAdamSGDParameter { // SparseAdamSGDRule
optional double learning_rate = 1 [ default = 0.001 ];
optional double initial_range = 2 [ default = 0.0001 ];
optional double beta1_decay_rate = 3 [ default = 0.9 ];
optional double beta2_decay_rate = 4 [ default = 0.999 ];
optional double ada_epsilon = 5 [ default = 1e-08 ];
repeated float weight_bounds = 6;
}
...@@ -54,6 +54,7 @@ class ParameterServerOptimizer(MetaOptimizerBase): ...@@ -54,6 +54,7 @@ class ParameterServerOptimizer(MetaOptimizerBase):
attrs['cloned_startup'] = attrs['origin_startup_program'].clone() attrs['cloned_startup'] = attrs['origin_startup_program'].clone()
attrs['user_defined_strategy'] = self.user_defined_strategy attrs['user_defined_strategy'] = self.user_defined_strategy
attrs['valid_strategy'] = self.user_defined_strategy
attrs['trainer'] = TrainerRuntimeConfig(self.user_defined_strategy) attrs['trainer'] = TrainerRuntimeConfig(self.user_defined_strategy)
attrs['ps_mode'] = attrs['trainer'].mode attrs['ps_mode'] = attrs['trainer'].mode
logger.info("ps_mode: {}".format(attrs['ps_mode'])) logger.info("ps_mode: {}".format(attrs['ps_mode']))
......
# 目录说明
* 改完之后,上层目录中 fleet 中相关文件(夹)就可以删除
...@@ -15,10 +15,11 @@ ...@@ -15,10 +15,11 @@
import warnings import warnings
import os import os
from paddle.distributed.fleet.proto import ps_pb2
import paddle.fluid as fluid import paddle.fluid as fluid
import paddle.distributed.fleet as fleet import paddle.distributed.fleet as fleet
from paddle.fluid import core from paddle.fluid import core
from .utils.public import * from paddle.distributed.ps.utils.public import *
from paddle.fluid.framework import Program from paddle.fluid.framework import Program
from paddle.fluid.compiler import CompiledProgram from paddle.fluid.compiler import CompiledProgram
from paddle.fluid.executor import Executor from paddle.fluid.executor import Executor
...@@ -29,14 +30,10 @@ from paddle.distributed.fleet.base.private_helper_function import wait_server_re ...@@ -29,14 +30,10 @@ from paddle.distributed.fleet.base.private_helper_function import wait_server_re
from paddle.fluid.communicator import Communicator, HeterClient from paddle.fluid.communicator import Communicator, HeterClient
from google.protobuf import text_format from google.protobuf import text_format
__all__ = [] __all__ = [
'Table', 'SparseTable', 'GeoSparseTable', 'BarrierTable', 'TensorTable',
'DenseTable'
def conv_indent(indent): ]
return "".join([" "] * indent)
PSERVER_SAVE_SUFFIX = ".shard"
def get_program_by_id(context, program_id): def get_program_by_id(context, program_id):
...@@ -62,7 +59,58 @@ def parse_table_class(varname, program_id, context): ...@@ -62,7 +59,58 @@ def parse_table_class(varname, program_id, context):
return "MemorySparseTable" return "MemorySparseTable"
def get_default_accessor_proto(accessor, varname, program_id, context): def check_embedding_dim(accessor_proto, varname, program_id, context):
main_program, startup_program = get_program_by_id(context, program_id)
embedding_dim = 0
for var in main_program.list_vars():
if var.name == varname:
embedding_dim = var.shape[1]
print('new var: {}, {}, {}'.format(var, embedding_dim,
accessor_proto.fea_dim))
break
fea_dim = accessor_proto.fea_dim
if fea_dim != embedding_dim + 2:
raise ValueError(
"The fea_dim is wrong, it will be sparse_embedding_dim + 2: {}, but got {}".
format(embedding_dim + 2, fea_dim))
embedx_dim = accessor_proto.embedx_dim
if embedx_dim != embedding_dim - 1:
raise ValueError(
"The embedx_dim is wrong, it will be sparse_embedding_dim - 1: {}, but got {}".
format(embedding_dim - 1, embedx_dim))
class Service:
def __init__(self):
pass
def _set(self, service_proto):
service_proto.server_class = "BrpcPsServer"
service_proto.client_class = "BrpcPsClient"
service_proto.service_class = "BrpcPsService"
service_proto.start_server_port = 0
service_proto.server_thread_num = 12
class GpuService(Service):
def __init__(self):
super(GpuService).__init__(self)
def _set(self, service_proto):
super(GpuService)._set(service_proto)
service_proto.server_class = 'PsLocalServer'
service_proto.client_class = 'PsLocalClient'
class Accessor:
def __init__(self):
self.accessor_class = ""
self.optimizer = None
self.feature_dim = 0
self.embedding_dim = 0
# TableAccessorParameter accessor
def _set(self, accessor_proto, varname, program_id, context):
main_program, startup_program = get_program_by_id(context, program_id) main_program, startup_program = get_program_by_id(context, program_id)
embedding_dim = 0 embedding_dim = 0
for var in main_program.list_vars(): for var in main_program.list_vars():
...@@ -70,16 +118,16 @@ def get_default_accessor_proto(accessor, varname, program_id, context): ...@@ -70,16 +118,16 @@ def get_default_accessor_proto(accessor, varname, program_id, context):
embedding_dim = var.shape[1] embedding_dim = var.shape[1]
break break
if not accessor.HasField("accessor_class"): if not accessor_proto.HasField("accessor_class"):
accessor.accessor_class = "CtrCommonAccessor" accessor_proto.accessor_class = "CtrCommonAccessor"
if not accessor.HasField("fea_dim"): if not accessor_proto.HasField("fea_dim"):
accessor.fea_dim = embedding_dim + 2 accessor_proto.fea_dim = embedding_dim + 2
if not accessor.HasField("embedx_dim"): if not accessor_proto.HasField("embedx_dim"):
accessor.embedx_dim = embedding_dim - 1 accessor_proto.embedx_dim = embedding_dim - 1
if not accessor.HasField("embedx_threshold"): if not accessor_proto.HasField("embedx_threshold"):
accessor.embedx_threshold = 0 accessor_proto.embedx_threshold = 0
ctr_accessor_param = accessor.ctr_accessor_param ctr_accessor_param = accessor_proto.ctr_accessor_param
if not ctr_accessor_param.HasField("nonclk_coeff"): if not ctr_accessor_param.HasField("nonclk_coeff"):
ctr_accessor_param.nonclk_coeff = 0.1 ctr_accessor_param.nonclk_coeff = 0.1
if not ctr_accessor_param.HasField("click_coeff"): if not ctr_accessor_param.HasField("click_coeff"):
...@@ -99,7 +147,9 @@ def get_default_accessor_proto(accessor, varname, program_id, context): ...@@ -99,7 +147,9 @@ def get_default_accessor_proto(accessor, varname, program_id, context):
if not ctr_accessor_param.HasField("ssd_unseenday_threshold"): if not ctr_accessor_param.HasField("ssd_unseenday_threshold"):
ctr_accessor_param.ssd_unseenday_threshold = 1 ctr_accessor_param.ssd_unseenday_threshold = 1
for sgd_param in [accessor.embed_sgd_param, accessor.embedx_sgd_param]: for sgd_param in [
accessor_proto.embed_sgd_param, accessor_proto.embedx_sgd_param
]:
if not sgd_param.HasField("name"): if not sgd_param.HasField("name"):
sgd_param.name = "SparseAdaGradSGDRule" sgd_param.name = "SparseAdaGradSGDRule"
if sgd_param.name == "SparseAdaGradSGDRule" or sgd_param.name == "StdAdaGradSGDRule": if sgd_param.name == "SparseAdaGradSGDRule" or sgd_param.name == "StdAdaGradSGDRule":
...@@ -133,58 +183,16 @@ def get_default_accessor_proto(accessor, varname, program_id, context): ...@@ -133,58 +183,16 @@ def get_default_accessor_proto(accessor, varname, program_id, context):
sgd_param.adam.weight_bounds.extend([-10.0, 10.0]) sgd_param.adam.weight_bounds.extend([-10.0, 10.0])
def check_embedding_dim(accessor, varname, program_id, context): class CommonAccessor(Accessor):
main_program, startup_program = get_program_by_id(context, program_id)
embedding_dim = 0
for var in main_program.list_vars():
if var.name == varname:
embedding_dim = var.shape[1]
break
fea_dim = accessor.fea_dim
if fea_dim != embedding_dim + 2:
raise ValueError(
"The fea_dim is wrong, it will be sparse_embedding_dim + 2: {}, but got {}".
format(embedding_dim + 2, fea_dim))
embedx_dim = accessor.embedx_dim
if embedx_dim != embedding_dim - 1:
raise ValueError(
"The embedx_dim is wrong, it will be sparse_embedding_dim - 1: {}, but got {}".
format(embedding_dim - 1, embedx_dim))
class Accessor:
def __init__(self): def __init__(self):
self.accessor_class = "" super(CommonAccessor, self).__init__()
self.optimizer = None self.table_name = ''
self.feature_dim = -1 self.entry = 'none'
self.embedding_dim = -1
self.optimizer = None
def to_string(self, indent):
accessor_str = "{}accessor {{{}\n{}}}"
attrs = ""
attrs += "accessor_class: \"{}\" ".format(self.accessor_class)
attrs += "fea_dim: {} ".format(self.feature_dim)
attrs += "embedx_dim: {} ".format(self.embedding_dim)
attrs += "\n"
if self.optimizer is not None:
attrs += self.optimizer.to_string(indent)
return accessor_str.format(
conv_indent(indent), attrs, conv_indent(indent))
class CommonAccessor:
def __init__(self):
self.accessor_class = ""
self.table_name = None
self.entry = None
self.attrs = [] self.attrs = []
self.params = [] self.params = []
self.dims = [] self.dims = []
self.trainer_num = 0 self.trainer_num = 0
self.sync = "false" self.sync = False
self.table_num = None
self.table_dim = None
self.initializers = [] self.initializers = []
self.opt_input_map = {} self.opt_input_map = {}
self.opt_attr_map = {} self.opt_attr_map = {}
...@@ -422,233 +430,361 @@ class CommonAccessor: ...@@ -422,233 +430,361 @@ class CommonAccessor:
self.initializers = initializers self.initializers = initializers
self.attrs = attrs self.attrs = attrs
def to_string(self, indent): # CommonAccessorParameter common
accessor_str = "{}common {{{}\n{}}}" def _set(self, proto):
attrs = "" proto.name = self.accessor_class
attrs += "name: \"{}\" ".format(self.accessor_class) proto.table_name = self.table_name
proto.params.extend(self.params)
if self.table_name: proto.dims.extend(self.dims)
attrs += "table_name: \"{}\" ".format(self.table_name) proto.initializers.extend(self.initializers)
proto.entry = self.entry
if self.entry: proto.trainer_num = self.trainer_num
attrs += "entry: \"{}\" ".format(self.entry) proto.sync = self.sync
attrs += "trainer_num: {} ".format(self.trainer_num) proto.table_num = self.table_num
attrs += "sync: {} ".format(self.sync) proto.table_dim = self.table_dim
if self.table_num:
attrs += "table_num: {} ".format(self.table_num)
if self.table_dim:
attrs += "table_dim: {} ".format(self.table_dim)
for param in self.params:
attrs += "params: \"{}\" ".format(param)
for dim in self.dims:
attrs += "dims: {} ".format(dim)
for initializer in self.initializers:
attrs += "initializers: \"{}\" ".format(initializer)
attrs += "\n"
return accessor_str.format(
conv_indent(indent), attrs, conv_indent(indent))
class Tensor: class Tensor:
def __init__(self): def __init__(self, tesnor_dcit):
self.main_program_id = None self.tensor_dict = tesnor_dcit
self.startup_program_id = None
self.feed_var_name = None def _set(self, tensor_proto):
self.fetch_var_name = None tensor_proto.main_program_id = self.tensor_dict.get("main_program_id",
self.tensor_table_class = False 0)
tensor_proto.startup_program_id = self.tensor_dict.get(
def to_string(self, indent): "startup_program_id", 0)
program_str = "{}tensor {{{}\n{}}}" tensor_proto.feed_var_name = self.tensor_dict.get("feed_var_name", '')
attrs = "" tensor_proto.fetch_var_name = self.tensor_dict.get("fetch_var_name", '')
attrs += "feed_var_name: \"{}\" ".format(str(self.feed_var_name)) tensor_proto.tensor_table_class = self.tensor_dict.get(
attrs += "fetch_var_name: \"{}\" ".format(str(self.fetch_var_name)) "tensor_table_class", '')
attrs += "startup_program_id: {} ".format(str(self.startup_program_id))
attrs += "main_program_id: {} ".format(str(self.main_program_id))
attrs += "tensor_table_class: \"{}\" ".format(
str(self.tensor_table_class))
attrs += "\n"
return program_str.format(
conv_indent(indent), attrs, conv_indent(indent))
class Table: class Table:
def __init__(self): def __init__(self):
self.id = -1
self.table_class = None self.table_class = None
self.shard_num = -1 self.shard_num = -1
self.type = None self.type = None
self.accessor = None self.accessor = Accessor()
self.common = None self.shard_num = 256
self.common = CommonAccessor()
self.tensor = None self.tensor = None
self.accessor_proto = None
def to_string(self, indent):
# if self.id == 1:
# proto_txt = ''
# with open('./sparse_table.prototxt') as f:
# proto_txt = f.read()
# return proto_txt
table_str = "{}downpour_table_param {{{}\n{}}}"
attrs = ""
attrs += "table_id: {} ".format(self.id)
attrs += "table_class: \"{}\" ".format(self.table_class)
attrs += "shard_num: {} ".format(self.shard_num)
attrs += "type: {}".format(self.type)
attrs += "\n"
indent += 2
if self.accessor_proto is not None:
accessor_str = "{}accessor {{{}\n{}}}"
accessor_str = accessor_str.format(
conv_indent(indent), self.accessor_proto, conv_indent(indent))
attrs += accessor_str + "\n"
elif self.accessor is not None:
attrs += self.accessor.to_string(indent)
attrs += "\n"
if self.tensor is not None:
attrs += self.tensor.to_string(indent)
attrs += "\n"
if self.common is not None:
attrs += self.common.to_string(indent)
attrs += "\n"
return table_str.format(conv_indent(indent), attrs, conv_indent(indent))
def _set(self, table_proto):
pass
class Service:
def __init__(self):
self.server_class = "BrpcPsServer"
self.client_class = "BrpcPsClient"
self.service_class = "BrpcPsService"
self.start_server_port = 0
self.server_thread_num = 12
def to_string(self, indent): class BarrierTable(Table):
service_str = "{}service_param {{{}\n{}}}" def __init__(self, context, idx):
super(BarrierTable, self).__init__()
self.type = None
self.shard_num = 256
self.accessor.accessor_class = 'CommMergeAccessor'
self.common.attrs = ""
self.common.dims = []
self.common.params = []
self.is_heter_ps_mode = context['is_heter_ps_mode']
self.role_maker = context['role_maker']
self.idx = idx
self.is_sync = context['is_sync']
def _set(self, table_proto):
table_proto.table_id = self.idx
table_proto.table_class = 'BarrierTable'
table_proto.shard_num = 256
table_proto.type = ps_pb2.PS_OTHER_TABLE
table_proto.accessor.accessor_class = "CommMergeAccessor"
table_proto.accessor.fea_dim = 0
table_proto.accessor.embedx_dim = 0
table_proto.common.name = ""
table_proto.common.table_name = "barrier_table"
table_proto.common.sync = self.is_sync
table_proto.common.entry = 'none'
trainer_num = get_trainers(self.role_maker)
if self.is_heter_ps_mode:
trainer_num += len(self.role_maker._get_heter_worker_endpoints())
table_proto.common.trainer_num = trainer_num
attrs = ""
attrs += "server_class: \"{}\" ".format(self.server_class)
attrs += "client_class: \"{}\" ".format(self.client_class)
attrs += "service_class: \"{}\" ".format(self.service_class)
attrs += "start_server_port: {} ".format(self.start_server_port)
attrs += "server_thread_num: {} ".format(self.server_thread_num)
return service_str.format( class TensorTable(Table):
conv_indent(indent), attrs, conv_indent(indent)) def __init__(self, idx, tensor_dict, role_maker):
super(TensorTable, self).__init__()
self.idx = idx
self.tensor_dict = tensor_dict
self.role_maker = role_maker
def _set(self, table_proto):
table_proto.table_id = self.idx
table_proto.type = ps_pb2.PS_OTHER_TABLE
table_proto.table_class = self.tensor_dict.get("tensor_table_class", '')
class DownpourServer: table_proto.accessor.accessor_class = "CommMergeAccessor"
def __init__(self):
self.service = None
self.tables = []
def set_service_param(self, service): table_proto.common.table_name = self.tensor_dict.get("feed_var_name",
self.service = service '')
table_proto.common.trainer_num = get_trainers(self.role_maker)
def append_tables(self, table): tensor = Tensor(self.tensor_dict)
if not isinstance(table, Table): tensor._set(table_proto.tensor)
raise ValueError("only support instance Table")
self.tables.append(table)
def to_string(self, indent):
server_str = "{}downpour_server_param {{{}\n{}}}"
table_strs = "" class SparseTable(Table):
indent += 2 def __init__(self, context, send_ctx):
super(SparseTable, self).__init__()
self.context = context
self.ctx = send_ctx
self.type = None
self.table_class = 'MemorySparseTable'
self.accessor = Accessor()
table_strs += "\n" def _set(self, table_proto):
table_strs += self.service.to_string(indent) ctx = self.ctx
if ctx.is_tensor_table() or len(ctx.origin_varnames()) < 1 or (
ctx.is_sparse() == False):
return
table_proto.table_id = ctx.table_id()
table_proto.table_class = self.table_class
table_proto.type = ps_pb2.PS_SPARSE_TABLE
table_proto.shard_num = self.shard_num
for table in self.tables: self.common.table_name = self.context['grad_name_to_param_name'][
table_strs += "\n" ctx.origin_varnames()[0]]
table_strs += table.to_string(indent)
return server_str.format(
conv_indent(indent), table_strs, conv_indent(indent))
print('new table_name: {}'.format(self.common.table_name))
all_table_proto = self.context[
"user_defined_strategy"].sparse_table_configs
usr_table_proto = all_table_proto.add()
for proto in all_table_proto:
if proto.table_name == self.common.table_name:
usr_table_proto = proto
break
table_proto.table_class = 'MemorySparseTable'
warnings.warn("The PS mode must use MemorySparseTable.")
if usr_table_proto.HasField("shard_num"):
table_proto.shard_num = usr_table_proto.shard_num
else:
table_proto.shard_num = 1000
warnings.warn(
"The shard_num of sparse table is not set, use default value 1000."
)
class Server: if usr_table_proto.accessor.ByteSize() == 0:
def __init__(self): warnings.warn(
self.servers = [] "The accessor of sparse table is not set, use default value.")
table_proto.accessor.ParseFromString(
usr_table_proto.accessor.SerializeToString())
self.accessor._set(table_proto.accessor, self.common.table_name,
ctx.program_id(), self.context)
check_embedding_dim(table_proto.accessor, self.common.table_name,
ctx.program_id(), self.context)
adam_d2sum = self.context["user_defined_strategy"].adam_d2sum
self.common.parse_by_optimizer(ctx, self.context)
self.common.parse_entry(self.common.table_name,
ctx.program_id(), self.context)
self.common.sync = True if self.context['is_sync'] else False
self.common._set(table_proto.common)
class GeoSparseTable(SparseTable):
def __init__(self, context, send_ctx):
super(GeoSparseTable, self).__init__(context, send_ctx)
self.table_class = "SparseGeoTable"
if self.context['ps_mode'] != DistributedMode.GEO:
raise ValueError("not geo sparse table!")
def _set(self, table_proto):
ctx = self.ctx
if ctx.is_tensor_table() or len(ctx.origin_varnames()) < 1 or (
ctx.is_sparse() == False):
return
table_proto.table_id = ctx.table_id()
table_proto.table_class = self.table_class
table_proto.type = ps_pb2.PS_SPARSE_TABLE
table_proto.shard_num = self.shard_num
table_proto.accessor.accessor_class = 'CommMergeAccessor'
table_proto.accessor.fea_dim = ctx.sections()[0]
table_proto.accessor.embedx_dim = ctx.sections()[1]
self.common.table_name = self.context['grad_name_to_param_name'][
ctx.origin_varnames()[0]]
adam_d2sum = self.context["user_defined_strategy"].adam_d2sum
self.common.parse_by_optimizer(ctx, self.context)
self.common.parse_entry(self.common.table_name,
ctx.program_id(), self.context)
self.common.sync = False
self.common._set(table_proto.common)
def add_server(self, server):
if not isinstance(server, DownpourServer):
raise ValueError("only support instance DownpourServer")
self.servers.append(server)
def __str__(self): class DenseTable(Table):
server_str = "server_param {{{}\n}}" def __init__(self, context, send_ctx):
indent = 2 super(DenseTable, self).__init__()
servers_str = "" self.context = context
for server in self.servers: self.ctx = send_ctx
servers_str += "\n" self.accessor = Accessor()
servers_str += server.to_string(indent)
return server_str.format(servers_str) def _set(self, table_proto):
ctx = self.ctx
if ctx.is_tensor_table() or len(ctx.origin_varnames()) < 1 or (
ctx.is_sparse() == True):
return
table_proto.table_id = ctx.table_id()
class DownpourWorker: table_proto.type = ps_pb2.PS_DENSE_TABLE
table_proto.table_class = "CommonDenseTable"
table_proto.shard_num = 256
table_proto.accessor.accessor_class = 'CommMergeAccessor'
table_proto.accessor.fea_dim = ctx.sections()[0]
table_proto.accessor.embedx_dim = 1
self.common.table_name = "MergedDense"
adam_d2sum = self.context["user_defined_strategy"].adam_d2sum
self.common.parse_by_optimizer(ctx, self.context)
self.common.parse_entry(self.common.table_name,
ctx.program_id(), self.context)
self.common.sync = True if self.context['is_sync'] else False
self.common._set(table_proto.common)
class Server:
def __init__(self): def __init__(self):
self.tables = [] pass
def append_tables(self, table): def _set(self):
if not isinstance(table, Table): pass
raise ValueError("only support instance Table")
self.tables.append(table)
def to_string(self, indent):
worker_str = "{}downpour_worker_param {{{}\n{}}}"
table_strs = ""
indent += 2
for table in self.tables:
table_strs += "\n"
table_strs += table.to_string(indent)
return worker_str.format( class DownpourServer(Server):
conv_indent(indent), table_strs, conv_indent(indent)) def __init__(self):
super(DownpourServer, self).__init__()
def _set(self):
pass
class Worker: class Worker:
def __init__(self): def __init__(self):
self.workers = [] pass
def _set(self):
pass
def add_worker(self, worker):
if not isinstance(worker, DownpourWorker):
raise ValueError("only support instance DownpourWorker")
self.workers.append(worker)
def __str__(self): class DownpourWorker(Worker):
worker_str = "worker_param {{{}\n}}" def __init__(self):
indent = 2 super(DownpourWorker, self).__init__()
workers_str = ""
for worker in self.workers:
workers_str += "\n"
workers_str += worker.to_string(indent)
return worker_str.format(workers_str) def _set(self):
pass
class fsClient: class fsClient:
def __init__(self, proto): def __init__(self, fs_client_param):
self.proto = proto self.fs_client_param = fs_client_param
self.uri = proto.uri
self.user = proto.user def _set(self, proto):
self.passwd = proto.passwd if not text_format.MessageToString(self.fs_client_param):
self.hadoop_bin = proto.hadoop_bin return
proto.uri = self.fs_client_param.uri
def to_string(self): proto.user = self.fs_client_param.user
proto_txt = text_format.MessageToString(self.proto) proto.passwd = self.fs_client_param.passwd
if proto_txt: proto.hadoop_bin = self.fs_client_param.hadoop_bin
fs_str = "fs_client_param {{\n{}}}"
return fs_str.format(proto_txt)
class PsDescBuilder(object):
def __init__(self, context):
self.context = context
self.is_sync = context['is_sync']
self.ps_mode = context['ps_mode']
self.is_heter_ps_mode = context['is_heter_ps_mode']
self.use_ps_gpu = context['use_ps_gpu']
self.send_ctx = get_the_one_send_context(
self.context,
use_origin_program=True,
split_dense_table=self.is_heter_ps_mode)
self.tensor_table_dict = {} # TODO
self._server_sub_program = []
self.tables = self._get_tables()
self.service = self._get_service()
self.fs_client = self._get_fs_client()
self.ps_desc = ps_pb2.PSParameter()
def _get_tensor_tables(self):
program_idx = 0
if not self.tensor_table_dict:
self._server_sub_program.append(Program().desc)
tables = []
for table_name in self.tensor_table_dict:
tables.append(globals()['TensorTable'](len(tables), tensor_dict,
self.context['role_maker']))
program_idx += 1
return tables
def _get_tables(self):
tables = []
for idx, (name, ctx) in enumerate(self.send_ctx.items()):
print('####### {}\n'.format(ctx.is_sparse()))
if ctx.is_sparse():
if self.ps_mode == DistributedMode.GEO:
tables.append(globals()['GeoSparseTable'](self.context,
ctx))
else:
tables.append(globals()['SparseTable'](self.context, ctx))
else: else:
return "" tables.append(globals()['DenseTable'](self.context, ctx))
self.tensor_tables = self._get_tensor_tables()
tables.extend(self.tensor_tables)
tables.append(globals()['BarrierTable'](self.context, len(tables)))
return tables
def _get_service(self):
if self.use_ps_gpu:
return GpuService()
else:
return Service()
def _get_fs_client(self):
return fsClient(self.context["user_defined_strategy"].fs_client_param)
def build_worker_desc(self):
for table in self.tables:
table_proto = self.ps_desc.worker_param.downpour_worker_param.downpour_table_param.add(
)
table._set(table_proto)
table_proto = self.ps_desc.server_param.downpour_server_param.downpour_table_param.add(
)
table._set(table_proto)
self.service._set(
self.ps_desc.server_param.downpour_server_param.service_param)
return text_format.MessageToString(self.ps_desc)
def build_server_desc(self):
for table in self.tables:
table_proto = self.ps_desc.server_param.downpour_server_param.downpour_table_param.add(
)
table._set(table_proto)
self.sparse_table_maps = {}
if table_proto.type == ps_pb2.PS_SPARSE_TABLE and table_proto.common is not None:
self.sparse_table_maps[
table_proto.common.table_name] = table_proto.table_id
self.service._set(
self.ps_desc.server_param.downpour_server_param.service_param)
self.fs_client._set(self.ps_desc.fs_client_param)
return text_format.MessageToString(self.ps_desc)
class TheOnePSRuntime(RuntimeBase): class TheOnePSRuntime(RuntimeBase):
...@@ -665,8 +801,11 @@ class TheOnePSRuntime(RuntimeBase): ...@@ -665,8 +801,11 @@ class TheOnePSRuntime(RuntimeBase):
self.role_maker = context["role_maker"] self.role_maker = context["role_maker"]
self.origin_main_program = context["origin_main_program"] self.origin_main_program = context["origin_main_program"]
self.origin_main_programs = context["origin_main_programs"] self.origin_main_programs = context.get("origin_main_programs",
[self.origin_main_program])
self.context["origin_main_programs"] = self.origin_main_programs
self.context["origin_startup_programs"] = context.get(
'origin_startup_programs', [context['origin_startup_program']])
self.context[ self.context[
'is_heter_ps_mode'] = self.role_maker._is_heter_parameter_server_mode 'is_heter_ps_mode'] = self.role_maker._is_heter_parameter_server_mode
self.is_heter_ps_mode = self.context['is_heter_ps_mode'] self.is_heter_ps_mode = self.context['is_heter_ps_mode']
...@@ -675,15 +814,23 @@ class TheOnePSRuntime(RuntimeBase): ...@@ -675,15 +814,23 @@ class TheOnePSRuntime(RuntimeBase):
self.context['ps_mode'] = self.context['trainer'].mode self.context['ps_mode'] = self.context['trainer'].mode
self.context['use_ps_gpu'] = context['valid_strategy'].a_sync_configs[ self.context['use_ps_gpu'] = context['valid_strategy'].a_sync_configs[
'use_ps_gpu'] 'use_ps_gpu']
self.is_sync = True if self.context[ self.context['is_sync'] = True if self.context[
'ps_mode'] == DistributedMode.SYNC else False 'ps_mode'] == DistributedMode.SYNC else False
self.context['grad_name_to_param_name'] = {} self.context['grad_name_to_param_name'] = {}
self.context['tensor_table'] = {} self.context['tensor_table'] = {}
build_var_distributed(self.context) build_var_distributed(self.context)
endpoints = get_ps_endpoints(self.role_maker)
self.string_hosts = []
for idx, ep in enumerate(endpoints):
host, port = ep.split(":")
pshost = fluid.core.PSHost(host, int(port), idx)
self.string_hosts.append(pshost.serialize_to_string())
self.ps_desc_builder = PsDescBuilder(self.context)
def _init_worker(self): def _init_worker(self):
worker = self._get_fleet_proto(is_server=False, is_sync=self.is_sync) worker_desc = self.ps_desc_builder.build_worker_desc()
server = self._get_fleet_proto(is_server=True, is_sync=self.is_sync)
if self.context['use_ps_gpu']: if self.context['use_ps_gpu']:
main_program = self.context['loss'].block.program main_program = self.context['loss'].block.program
...@@ -701,23 +848,11 @@ class TheOnePSRuntime(RuntimeBase): ...@@ -701,23 +848,11 @@ class TheOnePSRuntime(RuntimeBase):
kwargs["trainer_id"] = self.role_maker._worker_index() kwargs["trainer_id"] = self.role_maker._worker_index()
return kwargs return kwargs
proto_txt = str(worker) + "\n" + str(server) proto_txt = worker_desc + "\n" + server_desc
with open('proto_txt', 'w') as f:
f.write(proto_txt)
debug = bool(int(os.getenv("PSERVER_DEBUG", "0"))) debug = bool(int(os.getenv("PSERVER_DEBUG", "0")))
if debug: if debug:
print("worker: \n{}".format(proto_txt)) print("worker: \n{}".format(proto_txt))
endpoints = get_ps_endpoints(self.role_maker)
string_hosts = []
for idx, ep in enumerate(endpoints):
host, port = ep.split(":")
pshost = fluid.core.PSHost(host, int(port), idx)
string_hosts.append(pshost.serialize_to_string())
dense_map = get_the_one_recv_context( dense_map = get_the_one_recv_context(
self.context, split_dense_table=self.is_heter_ps_mode) self.context, split_dense_table=self.is_heter_ps_mode)
send_ctx = get_the_one_send_context( send_ctx = get_the_one_send_context(
...@@ -741,7 +876,7 @@ class TheOnePSRuntime(RuntimeBase): ...@@ -741,7 +876,7 @@ class TheOnePSRuntime(RuntimeBase):
kwargs["trainer_id"] = self.role_maker._role_id() kwargs["trainer_id"] = self.role_maker._role_id()
kwargs["trainers"] = self.role_maker._worker_num() kwargs["trainers"] = self.role_maker._worker_num()
for table in server.servers[0].tables: for table in server.servers[0].tables: #TODO
if table.table_class == "BarrierTable": if table.table_class == "BarrierTable":
kwargs["barrier_table_id"] = table.id kwargs["barrier_table_id"] = table.id
break break
...@@ -755,7 +890,8 @@ class TheOnePSRuntime(RuntimeBase): ...@@ -755,7 +890,8 @@ class TheOnePSRuntime(RuntimeBase):
trainer_config.mode, kwargs, trainer_config.mode, kwargs,
trainer_config.get_communicator_flags()) trainer_config.get_communicator_flags())
self._communicator.init_with_ctx(send_ctx, dense_map, proto_txt, self._communicator.init_with_ctx(send_ctx, dense_map, proto_txt,
string_hosts, fluid.global_scope()) self.string_hosts,
fluid.global_scope())
fleet.util.barrier() fleet.util.barrier()
info = self._communicator.get_client_info() info = self._communicator.get_client_info()
...@@ -812,275 +948,16 @@ class TheOnePSRuntime(RuntimeBase): ...@@ -812,275 +948,16 @@ class TheOnePSRuntime(RuntimeBase):
previous_trainers, previous_trainers,
self.role_maker._role_id()) self.role_maker._role_id())
def _push_sparse_param(self,
var_name,
table_id=-1,
scope=fluid.global_scope()):
self._communicator.push_sparse_param(var_name, table_id, scope)
def _get_executor(self):
executor = fluid.Executor(fluid.CPUPlace())
if self.is_heter_ps_mode:
if self.role_maker._is_heter_worker():
heter_device_type = self.role_maker._heter_device_type().upper()
if heter_device_type not in ["GPU", "XPU", "CPU"]:
raise ValueError("Heter Worker Not Support Device {}".
format(device_type))
if heter_device_type == "GPU":
executor = Executor(
fluid.CUDAPlace(
int(os.getenv("FLAGS_selected_gpus", "0"))))
elif heter_device_type == "XPU":
executor = Executor(
fluid.XPUPlace(
int(os.getenv("FLAGS_selected_xpus", "0"))))
return executor
def _get_fleet_proto(self, is_server, is_sync, **kwargs):
def _build_merge_accessor(ctx):
accessor = Accessor()
accessor.accessor_class = "CommMergeAccessor"
accessor.optimizer = None
if ctx.is_sparse():
accessor.feature_dim = ctx.sections()[0]
accessor.embedding_dim = ctx.sections()[1]
else:
accessor.feature_dim = ctx.sections()[0]
accessor.embedding_dim = 1
return accessor
def _build_barrier_table(idx):
table = Table()
table.id = idx
table.type = "PS_OTHER_TABLE"
table.table_class = "BarrierTable"
table.shard_num = 256
accessor = Accessor()
accessor.accessor_class = "CommMergeAccessor"
accessor.optimizer = None
accessor.feature_dim = 0
accessor.embedding_dim = 0
table.accessor = accessor
common = CommonAccessor()
common.table_name = "barrier_table"
trainer_num = get_trainers(self.context['role_maker'])
if self.is_heter_ps_mode:
trainer_num += len(self.role_maker._get_heter_worker_endpoints(
))
common.trainer_num = trainer_num
common.attrs = ""
common.dims = []
common.params = []
table.common = common
return table
def _build_tensor_table(idx, tensor_dict):
table = Table()
table.id = idx
table.type = "PS_OTHER_TABLE"
table.table_class = tensor_dict["tensor_table_class"]
table.shard_num = 256
accessor = Accessor()
accessor.accessor_class = "CommMergeAccessor"
accessor.optimizer = None
accessor.feature_dim = 0
accessor.embedding_dim = 0
table.accessor = accessor
common = CommonAccessor()
common.table_name = tensor_dict["feed_var_name"]
common.trainer_num = get_trainers(self.role_maker)
common.attrs = ""
common.dims = []
common.params = []
table.common = common
tensor = Tensor()
tensor.main_program_id = tensor_dict["main_program_id"]
tensor.startup_program_id = tensor_dict["startup_program_id"]
tensor.feed_var_name = tensor_dict["feed_var_name"]
tensor.fetch_var_name = tensor_dict["fetch_var_name"]
tensor.tensor_table_class = tensor_dict["tensor_table_class"]
table.tensor = tensor
return table
def _add_tensor_table(tables):
tensor_table_dict = {}
program_idx = 0
for table_name in tensor_table_dict:
if tensor_table_dict[table_name]["startup_program"] != None:
tensor_table_dict[table_name][
"startup_program_id"] = program_idx
self._server_sub_program.append(tensor_table_dict[
table_name]["startup_program"].desc)
program_idx += 1
if tensor_table_dict[table_name]["main_program"] != None:
tensor_table_dict[table_name][
"main_program_id"] = program_idx
self._server_sub_program.append(tensor_table_dict[
table_name]["main_program"].desc)
program_idx += 1
# Todo: Hard code for lr_decay table apply table id
new_table = _build_tensor_table(
len(tables), tensor_table_dict[table_name])
tables.append(new_table)
return tables
def _get_tables():
send_ctx = get_the_one_send_context(
self.context,
use_origin_program=True,
split_dense_table=self.is_heter_ps_mode)
tables = []
for idx, (name, ctx) in enumerate(send_ctx.items()):
print(" wxm python test send_ctx.items-->", idx, (name, ctx))
if ctx.is_tensor_table() or len(ctx.origin_varnames()) < 1:
continue
table = Table()
table.id = ctx.table_id()
common = CommonAccessor()
if ctx.is_sparse():
table.type = "PS_SPARSE_TABLE"
table.shard_num = 256
common.table_name = self.context['grad_name_to_param_name'][
ctx.origin_varnames()[0]]
if self.context['ps_mode'] == DistributedMode.GEO:
table.table_class = "SparseGeoTable"
else:
all_table_proto = self.context[
"user_defined_strategy"].sparse_table_configs
table_proto = all_table_proto.add()
for proto in all_table_proto:
if proto.table_name == common.table_name:
table_proto = proto
break
if table_proto.HasField("table_class"):
table.table_class = table_proto.table_class
else:
table.table_class = parse_table_class(
common.table_name,
ctx.program_id(), self.context)
if table.table_class != 'MemorySparseTable':
table.table_class = 'MemorySparseTable'
warnings.warn(
"The PS mode must use MemorySparseTable.")
if table_proto.HasField("shard_num"):
table.shard_num = table_proto.shard_num
else:
table.shard_num = 1000
warnings.warn(
"The shard_num of sparse table is not set, use default value 1000."
)
if table_proto.accessor.ByteSize() == 0:
warnings.warn(
"The accessor of sparse table is not set, use default value."
)
get_default_accessor_proto(
table_proto.accessor, common.table_name,
ctx.program_id(), self.context)
check_embedding_dim(table_proto.accessor,
common.table_name,
ctx.program_id(), self.context)
table.accessor_proto = text_format.MessageToString(
table_proto.accessor)
else:
table.type = "PS_DENSE_TABLE"
table.table_class = "CommonDenseTable"
table.shard_num = 256
common.table_name = "MergedDense"
adam_d2sum = self.context["user_defined_strategy"].adam_d2sum
common.parse_by_optimizer(ctx, self.context)
if ctx.is_sparse():
common.parse_entry(common.table_name,
ctx.program_id(), self.context)
if is_sync:
common.sync = "true"
else:
common.sync = "false"
table.common = common
if table.table_class != 'MemorySparseTable':
accessor = _build_merge_accessor(ctx)
table.accessor = accessor
tables.append(table)
tensor_table_dict = {}
if len(tensor_table_dict) > 0:
tables = _add_tensor_table(tables)
else:
empty_porgram = Program()
self._server_sub_program.append(empty_porgram.desc)
barrier_table = _build_barrier_table(len(tables))
tables.append(barrier_table)
return tables
if is_server:
server = Server()
downpour_server = DownpourServer()
service = Service()
dist_strategy = self.context["valid_strategy"]
use_ps_gpu = dist_strategy.a_sync_configs["use_ps_gpu"]
if use_ps_gpu:
service.server_class = "PsLocalServer"
service.client_class = "PsLocalClient"
downpour_server.set_service_param(service)
tables = _get_tables()
downpour_server.tables = tables
server.add_server(downpour_server)
return server
else:
worker = Worker()
downpour_worker = DownpourWorker()
tables = _get_tables()
downpour_worker.tables = tables
worker.add_worker(downpour_worker)
return worker
def _init_server(self, dirname=None, var_names=None, **kwargs): def _init_server(self, dirname=None, var_names=None, **kwargs):
server_desc = self.ps_desc_builder.build_server_desc()
role_id = get_role_id(self.role_maker) role_id = get_role_id(self.role_maker)
endpoints = get_ps_endpoints(self.role_maker)
trainers = get_trainers(self.role_maker) trainers = get_trainers(self.role_maker)
if self.is_heter_ps_mode: if self.is_heter_ps_mode:
trainers += len(self.role_maker._get_heter_worker_endpoints()) trainers += len(self.role_maker._get_heter_worker_endpoints())
server = self._get_fleet_proto(is_server=True, is_sync=self.is_sync)
proto_txt = str(server)
fs_client = fsClient(self.context["user_defined_strategy"]
.fs_client_param)
proto_txt = proto_txt + "\n" + fs_client.to_string()
debug = bool(int(os.getenv("PSERVER_DEBUG", "0")))
if debug:
print("server: \n{}".format(proto_txt))
string_hosts = []
for idx, ep in enumerate(endpoints):
host, port = ep.split(":")
pshost = fluid.core.PSHost(host, int(port), idx)
string_hosts.append(pshost.serialize_to_string())
self._server = fluid.core.DistFleetWrapper() self._server = fluid.core.DistFleetWrapper()
self._server.init_server(proto_txt, string_hosts, role_id, trainers, self._server.init_server(server_desc, self.string_hosts, role_id,
self._server_sub_program) trainers, self._server_sub_program)
dist_varnames = get_sparse_tablenames(self.origin_main_programs, True) dist_varnames = get_sparse_tablenames(self.origin_main_programs, True)
sparse_varnames = get_sparse_tablenames(self.origin_main_programs, sparse_varnames = get_sparse_tablenames(self.origin_main_programs,
...@@ -1101,10 +978,7 @@ class TheOnePSRuntime(RuntimeBase): ...@@ -1101,10 +978,7 @@ class TheOnePSRuntime(RuntimeBase):
if dirname is None or not load_varnames: if dirname is None or not load_varnames:
return return
sparse_table_maps = {} sparse_table_maps = self.ps_desc_builder.sparse_table_maps
for table in server.servers[0].tables:
if table.type == "PS_SPARSE_TABLE" and table.common is not None:
sparse_table_maps[table.common.table_name] = table.id
dirname = os.path.normpath(dirname) dirname = os.path.normpath(dirname)
pserver_id = self.role_maker._role_id() pserver_id = self.role_maker._role_id()
...@@ -1186,7 +1060,7 @@ class TheOnePSRuntime(RuntimeBase): ...@@ -1186,7 +1060,7 @@ class TheOnePSRuntime(RuntimeBase):
sparses = get_the_one_recv_context( sparses = get_the_one_recv_context(
self.context, self.context,
is_dense=False, is_dense=False,
split_dense_table=self.is_heter_ps_mod, split_dense_table=self.is_heter_ps_mode,
use_origin_program=True) use_origin_program=True)
sparse_varnames = self._save_sparse_params(executor, dirname, sparses, sparse_varnames = self._save_sparse_params(executor, dirname, sparses,
...@@ -1413,7 +1287,7 @@ class TheOnePSRuntime(RuntimeBase): ...@@ -1413,7 +1287,7 @@ class TheOnePSRuntime(RuntimeBase):
fleet.util.barrier() fleet.util.barrier()
if self.role_maker._is_first_worker(): if self.role_maker._is_first_worker():
sparses = sget_the_one_recv_context( sparses = get_the_one_recv_context(
self.context, self.context,
is_dense=False, is_dense=False,
split_dense_table=self.role_maker. split_dense_table=self.role_maker.
......
...@@ -38,5 +38,7 @@ class PsProgramBuilderFactory(object): ...@@ -38,5 +38,7 @@ class PsProgramBuilderFactory(object):
elif 'is_fl_ps_mode' in attrs and attrs[ elif 'is_fl_ps_mode' in attrs and attrs[
'is_fl_ps_mode'] == DistributedMode.FL: 'is_fl_ps_mode'] == DistributedMode.FL:
return globals()['FlPsProgramBuilder'](pass_ctx) return globals()['FlPsProgramBuilder'](pass_ctx)
else: elif attrs['ps_mode'] == DistributedMode.SYNC:
return globals()['CpuSyncPsProgramBuilder'](pass_ctx) return globals()['CpuSyncPsProgramBuilder'](pass_ctx)
else:
return globals()['CpuAsyncPsProgramBuilder'](pass_ctx)
...@@ -95,11 +95,12 @@ class GeoPsProgramBuilder(PsProgramBuilder): # 仅 CPU 模式 ...@@ -95,11 +95,12 @@ class GeoPsProgramBuilder(PsProgramBuilder): # 仅 CPU 模式
class CpuSyncPsProgramBuilder(PsProgramBuilder): class CpuSyncPsProgramBuilder(PsProgramBuilder):
def __init__(self, pass_ctx): def __init__(self, pass_ctx):
logger.info("start building cpu-sync-ps program")
super(CpuSyncPsProgramBuilder, self).__init__(pass_ctx) super(CpuSyncPsProgramBuilder, self).__init__(pass_ctx)
if self.ps_mode == DistributedMode.SYNC:
logger.info("start building cpu-sync-ps program")
if self.ps_mode != DistributedMode.SYNC and self.ps_mode != DistributedMode.ASYNC: if self.ps_mode != DistributedMode.SYNC and self.ps_mode != DistributedMode.ASYNC:
raise ValueError("ps mode: {} not matched {}", raise ValueError("ps mode: {} not matched {}",
format(self.ps_mode, "CpuSyncPsProgramBuilder")) format(self.ps_mode, "PsProgramBuilder"))
def _build_trainer_programs(self): def _build_trainer_programs(self):
add_lr_decay_table_pass = new_pass("add_lr_decay_table_pass", add_lr_decay_table_pass = new_pass("add_lr_decay_table_pass",
......
...@@ -73,7 +73,9 @@ def logger_config(log_path, logging_name): ...@@ -73,7 +73,9 @@ def logger_config(log_path, logging_name):
return logger return logger
logger = logger_config(log_path='/ps_log', logging_name='ps_log') ps_log_root_dir = '/ps_log/'
logger = logger_config(
log_path='/ps_usr_print_log', logging_name='ps_usr_print_log')
class DistributedMode: class DistributedMode:
......
...@@ -627,7 +627,7 @@ set_tests_properties(test_norm_nn_grad PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE") ...@@ -627,7 +627,7 @@ set_tests_properties(test_norm_nn_grad PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE")
set_tests_properties(test_nn_grad PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE") set_tests_properties(test_nn_grad PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE")
if(WITH_DISTRIBUTE) if(WITH_DISTRIBUTE)
add_subdirectory(distributed_passes) add_subdirectory(distributed_passes)
add_subdirectory(ps)
add_subdirectory(auto_parallel) add_subdirectory(auto_parallel)
# FIXME(typhoonzero): add these tests back # FIXME(typhoonzero): add these tests back
......
...@@ -23,13 +23,24 @@ import unittest ...@@ -23,13 +23,24 @@ import unittest
import numpy as np import numpy as np
from collections import OrderedDict from collections import OrderedDict
from paddle.distributed.ps.utils.public import logger from paddle.distributed.ps.utils.public import logger
from dist_pass_test_base import prepare_python_path_and_return_module, remove_path_if_exists from paddle.fluid.tests.unittests.distributed_passes.dist_pass_test_base import prepare_python_path_and_return_module, remove_path_if_exists
import paddle.distributed.fleet as fleet import paddle.distributed.fleet as fleet
class PsPassTestBase(unittest.TestCase): class PsPassTestBase(unittest.TestCase):
def init(self): def init(self):
raise NotImplementedError self.config = {}
self.config['ps_mode_config'] = ""
self.config['worker_num'] = "1"
self.config['server_num'] = "1"
self.config['run_minimize'] = "0"
self.config['run_single_pass'] = "0"
self.config['run_the_one_ps'] = '0'
self.config['debug_new_minimize'] = "0"
self.config['debug_new_pass'] = "0"
self.config['debug_the_one_ps'] = '0'
self.config['log_dir'] = ""
self.config['applied_pass_name'] = ""
def setUp(self): def setUp(self):
print('Ps setUp...') print('Ps setUp...')
...@@ -37,7 +48,7 @@ class PsPassTestBase(unittest.TestCase): ...@@ -37,7 +48,7 @@ class PsPassTestBase(unittest.TestCase):
def tearDown(self): def tearDown(self):
print('Ps tearDown...') print('Ps tearDown...')
def ps_launch(self, config, ps_mode="cpu-ps"): def ps_launch(self, ps_mode="cpu-ps"):
if ps_mode == "cpu-ps" or ps_mode == 'heter-ps': if ps_mode == "cpu-ps" or ps_mode == 'heter-ps':
os.environ['WITH_DISTRIBUTE'] = 'ON' os.environ['WITH_DISTRIBUTE'] = 'ON'
...@@ -45,23 +56,26 @@ class PsPassTestBase(unittest.TestCase): ...@@ -45,23 +56,26 @@ class PsPassTestBase(unittest.TestCase):
sys.executable, sys.executable,
"-u", "-u",
] + [ ] + [
"-m", "launch", "--log_dir", config['log_dir'], "--worker_num", "-m", "launch", "--log_dir", self.config['log_dir'],
config['worker_num'], "--server_num", config['server_num'] "--worker_num", self.config['worker_num'], "--server_num",
self.config['server_num']
] ]
if ps_mode == 'heter-ps': if ps_mode == 'heter-ps':
os.environ['FLAGS_START_PORT'] = '12004' os.environ['FLAGS_START_PORT'] = '12004'
cmd += [ cmd += [
'--heter_worker_num', config['heter_worker_num'], '--heter_worker_num', self.config['heter_worker_num'],
'--heter_devices', config['heter_devices'] '--heter_devices', self.config['heter_devices']
] ]
cmd += [ cmd += [
"../ps/ps_dnn_trainer.py", "-m", config['ps_mode_config'], "../ps/ps_dnn_trainer.py", "-m", self.config['ps_mode_config'],
"--run_minimize", config['run_minimize'], "--run_single_pass", "--run_minimize", self.config['run_minimize'],
config['run_single_pass'], "--debug_new_pass", "--run_single_pass", self.config['run_single_pass'],
config['debug_new_pass'], "--debug_new_minimize", "--run_the_one_ps", self.config['run_the_one_ps'],
config['debug_new_minimize'], "--applied_pass_name", "--debug_new_pass", self.config['debug_new_pass'],
config['applied_pass_name'] "--debug_new_minimize", self.config['debug_new_minimize'],
"--applied_pass_name", self.config['applied_pass_name'],
"--debug_the_one_ps", self.config['debug_the_one_ps']
] ]
elif ps_mode == "gpu-ps": elif ps_mode == "gpu-ps":
os.environ['FLAGS_LAUNCH_BARRIER'] = '0' os.environ['FLAGS_LAUNCH_BARRIER'] = '0'
...@@ -80,12 +94,14 @@ class PsPassTestBase(unittest.TestCase): ...@@ -80,12 +94,14 @@ class PsPassTestBase(unittest.TestCase):
cmd = [ cmd = [
sys.executable, "-u", "../ps/ps_dnn_trainer.py", "-m", sys.executable, "-u", "../ps/ps_dnn_trainer.py", "-m",
config['ps_mode_config'], "--run_minimize", self.config['ps_mode_config'], "--run_minimize",
config['run_minimize'], "--run_single_pass", self.config['run_minimize'], "--run_single_pass",
config['run_single_pass'], "--debug_new_pass", self.config['run_single_pass'], "--run_the_one_ps",
config['debug_new_pass'], "--debug_new_minimize", self.config['run_the_one_ps'], "--debug_new_pass",
config['debug_new_minimize'], "--applied_pass_name", self.config['debug_new_pass'], "--debug_new_minimize",
config['applied_pass_name'] self.config['debug_new_minimize'], "--applied_pass_name",
self.config['applied_pass_name'], "--debug_the_one_ps",
self.config['debug_the_one_ps']
] ]
cmd = [shlex.quote(c) for c in cmd] cmd = [shlex.quote(c) for c in cmd]
......
...@@ -21,31 +21,26 @@ import numpy as np ...@@ -21,31 +21,26 @@ import numpy as np
import paddle import paddle
from ps_pass_test_base import * from ps_pass_test_base import *
from paddle.distributed.ps.utils.public import logger from paddle.distributed.ps.utils.public import logger, ps_log_root_dir
from paddle.fluid.tests.unittests.ps.ps_dnn_trainer import DnnTrainer from paddle.fluid.tests.unittests.ps.ps_dnn_trainer import DnnTrainer
class TestPsTrainerPass(PsPassTestBase): class TestPsTrainerPass(PsPassTestBase):
def init(self):
self.config = {}
self.config['ps_mode_config'] = ""
self.config['worker_num'] = "1"
self.config['server_num'] = "1"
self.config['run_minimize'] = "0"
self.config['run_single_pass'] = "0"
self.config['debug_new_minimize'] = "0"
self.config['debug_new_pass'] = "0"
self.config['log_dir'] = ""
self.config['applied_pass_name'] = ""
def setUp(self): def setUp(self):
pass pass
def tearDown(self): def tearDown(self):
pass pass
def check(self): def check(self, file1, file2):
pass with open(file1, 'r', encoding='utf-8') as f:
text1 = f.read()
with open(file2, 'r', encoding='utf-8') as f:
text2 = f.read()
if text1 == text2:
return True
else:
return False
def test_ps_optimizer_minimize_cpu_async(self): def test_ps_optimizer_minimize_cpu_async(self):
self.init() self.init()
...@@ -53,16 +48,21 @@ class TestPsTrainerPass(PsPassTestBase): ...@@ -53,16 +48,21 @@ class TestPsTrainerPass(PsPassTestBase):
self.config['run_minimize'] = '1' self.config['run_minimize'] = '1'
self.config['debug_new_minimize'] = '0' self.config['debug_new_minimize'] = '0'
self.config['log_dir'] = "/async_cpu_log_old_minimize" self.config['log_dir'] = ps_log_root_dir + "async_cpu_log_old_minimize"
remove_path_if_exists(self.config['log_dir']) remove_path_if_exists(self.config['log_dir'])
self.ps_launch(self.config) self.ps_launch()
self.config['debug_new_minimize'] = '1' self.config['debug_new_minimize'] = '1'
self.config['log_dir'] = "/async_cpu_log_new_minimize" self.config['log_dir'] = ps_log_root_dir + "async_cpu_log_new_minimize"
remove_path_if_exists(self.config['log_dir']) remove_path_if_exists(self.config['log_dir'])
self.ps_launch(self.config) self.ps_launch()
self.check() file1 = '/ps_log/async_run_minimize_debug:_0_worker_main.prototxt'
file2 = '/ps_log/async_run_minimize_debug:_1_worker_main.prototxt'
if self.check(file1, file2):
logger.info('test_ps_optimizer_minimize_cpu_async passed!')
else:
logger.error('test_ps_optimizer_minimize_cpu_async failed!')
def test_ps_optimizer_minimize_cpu_sync(self): def test_ps_optimizer_minimize_cpu_sync(self):
self.init() self.init()
...@@ -70,16 +70,22 @@ class TestPsTrainerPass(PsPassTestBase): ...@@ -70,16 +70,22 @@ class TestPsTrainerPass(PsPassTestBase):
self.config['run_minimize'] = '1' self.config['run_minimize'] = '1'
self.config['debug_new_minimize'] = '0' self.config['debug_new_minimize'] = '0'
self.config['log_dir'] = "/sync_cpu_log_old_minimize" self.config['log_dir'] = ps_log_root_dir + "sync_cpu_log_old_minimize"
remove_path_if_exists(self.config['log_dir']) remove_path_if_exists(self.config['log_dir'])
self.ps_launch(self.config) self.ps_launch()
self.config['debug_new_minimize'] = '1' self.config['debug_new_minimize'] = '1'
self.config['log_dir'] = "/sync_cpu_log_new_minimize" self.config['log_dir'] = ps_log_root_dir + "sync_cpu_log_new_minimize"
remove_path_if_exists(self.config['log_dir']) remove_path_if_exists(self.config['log_dir'])
self.ps_launch(self.config) self.ps_launch()
'''
self.check() file1 = '/ps_log/sync_run_minimize_debug:_0_worker_main.prototxt'
file2 = '/ps_log/sync_run_minimize_debug:_1_worker_main.prototxt'
if self.check(file1, file2):
logger.info('test_ps_optimizer_minimize_cpu_sync passed!')
else:
logger.error('test_ps_optimizer_minimize_cpu_sync failed!')
'''
def test_ps_optimizer_minimize_cpu_geo(self): def test_ps_optimizer_minimize_cpu_geo(self):
self.init() self.init()
...@@ -87,16 +93,21 @@ class TestPsTrainerPass(PsPassTestBase): ...@@ -87,16 +93,21 @@ class TestPsTrainerPass(PsPassTestBase):
self.config['run_minimize'] = '1' self.config['run_minimize'] = '1'
self.config['debug_new_minimize'] = '0' self.config['debug_new_minimize'] = '0'
self.config['log_dir'] = "/geo_cpu_log_old_minimize" self.config['log_dir'] = ps_log_root_dir + "geo_cpu_log_old_minimize"
remove_path_if_exists(self.config['log_dir']) remove_path_if_exists(self.config['log_dir'])
self.ps_launch(self.config) self.ps_launch()
self.config['debug_new_minimize'] = '1' self.config['debug_new_minimize'] = '1'
self.config['log_dir'] = "/geo_cpu_log_new_minimize" self.config['log_dir'] = ps_log_root_dir + "geo_cpu_log_new_minimize"
remove_path_if_exists(self.config['log_dir']) remove_path_if_exists(self.config['log_dir'])
self.ps_launch(self.config) self.ps_launch()
self.check() file1 = '/ps_log/geo_run_minimize_debug:_0_worker_main.prototxt'
file2 = '/ps_log/geo_run_minimize_debug:_1_worker_main.prototxt'
if self.check(file1, file2):
logger.info('test_ps_optimizer_minimize_cpu_geo passed!')
else:
logger.error('test_ps_optimizer_minimize_cpu_geo failed!')
# heter ps 二阶段 # heter ps 二阶段
def test_ps_optimizer_minimize_heter(self): def test_ps_optimizer_minimize_heter(self):
...@@ -110,14 +121,24 @@ class TestPsTrainerPass(PsPassTestBase): ...@@ -110,14 +121,24 @@ class TestPsTrainerPass(PsPassTestBase):
self.config['ps_mode_config'] = "../ps/heter_ps_config.yaml" self.config['ps_mode_config'] = "../ps/heter_ps_config.yaml"
self.config['debug_new_minimize'] = '0' self.config['debug_new_minimize'] = '0'
self.config['log_dir'] = "/heter_log_old_minimize" self.config['log_dir'] = ps_log_root_dir + "heter_log_old_minimize"
remove_path_if_exists(self.config['log_dir']) remove_path_if_exists(self.config['log_dir'])
self.ps_launch(self.config, 'heter-ps') self.ps_launch('heter-ps')
self.config['debug_new_minimize'] = '1' self.config['debug_new_minimize'] = '1'
self.config['log_dir'] = "/heter_log_new_minimize" self.config['log_dir'] = ps_log_root_dir + "heter_log_new_minimize"
remove_path_if_exists(self.config['log_dir']) remove_path_if_exists(self.config['log_dir'])
self.ps_launch(self.config, 'heter-ps') self.ps_launch('heter-ps')
'''
file1 = '/ps_log/heter_run_minimize_debug:_0_worker_main.prototxt'
file2 = '/ps_log/heter_run_minimize_debug:_1_worker_main.prototxt'
file3 = '/ps_log/heter_run_minimize_debug:_0_heter_worker_main.prototxt'
file4 = '/ps_log/heter_run_minimize_debug:_1_heter_worker_main.prototxt'
if self.check(file1, file2) and self.check(file3, file4):
logger.info('test_ps_optimizer_minimize_heter passed!')
else:
logger.error('test_ps_optimizer_minimize_heter failed!')
'''
def test_ps_optimizer_minimize_gpu(self): def test_ps_optimizer_minimize_gpu(self):
self.init() self.init()
...@@ -125,29 +146,42 @@ class TestPsTrainerPass(PsPassTestBase): ...@@ -125,29 +146,42 @@ class TestPsTrainerPass(PsPassTestBase):
self.config['ps_mode_config'] = "../ps/gpu_ps_config.yaml" self.config['ps_mode_config'] = "../ps/gpu_ps_config.yaml"
self.config['debug_new_minimize'] = '0' self.config['debug_new_minimize'] = '0'
self.ps_launch(self.config, "gpu-ps") self.ps_launch("gpu-ps")
self.config['debug_new_minimize'] = '1' self.config['debug_new_minimize'] = '1'
self.ps_launch(self.config, "gpu-ps") self.ps_launch("gpu-ps")
self.check() file1 = '/ps_log/gpubox_run_minimize_debug:_0_worker_main.prototxt'
file2 = '/ps_log/gpubox_run_minimize_debug:_1_worker_main.prototxt'
if self.check(file1, file2):
logger.info('test_ps_optimizer_minimize_gpu passed!')
else:
logger.error('test_ps_optimizer_minimize_gpu failed!')
def test_append_send_ops_pass(self): def test_append_send_ops_pass(self):
self.init() self.init()
self.config['run_single_pass'] = '1' self.config['run_single_pass'] = '1'
self.config['ps_mode_config'] = "../ps/cpu_async_ps_config.yaml"
self.config['applied_pass_name'] = "append_send_ops_pass" self.config['applied_pass_name'] = "append_send_ops_pass"
self.config['debug_new_pass'] = '0' self.config['debug_new_pass'] = '0'
self.config['log_dir'] = "/log_old_" + self.config['applied_pass_name'] self.config['log_dir'] = ps_log_root_dir + "log_old_" + self.config[
'applied_pass_name']
remove_path_if_exists(self.config['log_dir']) remove_path_if_exists(self.config['log_dir'])
self.ps_launch(self.config, "cpu-ps") self.ps_launch("cpu-ps")
self.config['debug_new_pass'] = '1' self.config['debug_new_pass'] = '1'
self.config['log_dir'] = "/log_new_" + self.config['applied_pass_name'] self.config['log_dir'] = ps_log_root_dir + "log_new_" + self.config[
'applied_pass_name']
remove_path_if_exists(self.config['log_dir']) remove_path_if_exists(self.config['log_dir'])
self.ps_launch(self.config, "cpu-ps") self.ps_launch("cpu-ps")
self.check() file1 = '/ps_log/async_append_send_ops_pass_debug:_0_worker_main.prototxt'
file2 = '/ps_log/async_append_send_ops_pass_debug:_1_worker_main.prototxt'
if self.check(file1, file2):
logger.info('test_append_send_ops_pass passed!')
else:
logger.info('test_append_send_ops_pass failed!')
def test_distributed_ops_pass(self): def test_distributed_ops_pass(self):
pass pass
......
...@@ -3,6 +3,6 @@ string(REPLACE ".py" "" TEST_OPS "${TEST_OPS}") ...@@ -3,6 +3,6 @@ string(REPLACE ".py" "" TEST_OPS "${TEST_OPS}")
foreach(TEST_OP ${TEST_OPS}) foreach(TEST_OP ${TEST_OPS})
py_test_modules(${TEST_OP} MODULES ${TEST_OP}) py_test_modules(${TEST_OP} MODULES ${TEST_OP})
list(APPEND TEST_OPS ${TEST_OP})
set_tests_properties(${TEST_OP} PROPERTIES TIMEOUT 50)
endforeach(TEST_OP) endforeach(TEST_OP)
set_tests_properties(test_the_one_ps PROPERTIES TIMEOUT 50)
...@@ -264,12 +264,16 @@ def parse_args(): ...@@ -264,12 +264,16 @@ def parse_args():
'--run_minimize', type=int, default=0, help="test single pass") '--run_minimize', type=int, default=0, help="test single pass")
parser.add_argument( parser.add_argument(
'--run_single_pass', type=int, default=0, help="test single pass") '--run_single_pass', type=int, default=0, help="test single pass")
parser.add_argument(
'--run_the_one_ps', type=int, default=0, help="test the_one_ps")
parser.add_argument( parser.add_argument(
'--debug_new_minimize', type=int, default=0, help="test single pass") '--debug_new_minimize', type=int, default=0, help="test single pass")
parser.add_argument( parser.add_argument(
'--debug_new_pass', type=int, default=0, help="test single pass") '--debug_new_pass', type=int, default=0, help="test single pass")
parser.add_argument( parser.add_argument(
'--applied_pass_name', type=str, default="", help="test single pass") '--applied_pass_name', type=str, default="", help="test single pass")
parser.add_argument(
'--debug_the_one_ps', type=int, default=0, help="test the_one_ps")
args = parser.parse_args() args = parser.parse_args()
args.abs_dir = os.path.dirname(os.path.abspath(args.config_yaml)) args.abs_dir = os.path.dirname(os.path.abspath(args.config_yaml))
...@@ -280,9 +284,11 @@ def parse_args(): ...@@ -280,9 +284,11 @@ def parse_args():
config["pure_bf16"] = args.pure_bf16 config["pure_bf16"] = args.pure_bf16
config['run_minimize'] = args.run_minimize config['run_minimize'] = args.run_minimize
config['run_single_pass'] = args.run_single_pass config['run_single_pass'] = args.run_single_pass
config['run_the_one_ps'] = args.run_the_one_ps
config['debug_new_minimize'] = args.debug_new_minimize config['debug_new_minimize'] = args.debug_new_minimize
config['debug_new_pass'] = args.debug_new_pass config['debug_new_pass'] = args.debug_new_pass
config['applied_pass_name'] = args.applied_pass_name config['applied_pass_name'] = args.applied_pass_name
config['debug_the_one_ps'] = args.debug_the_one_ps
yaml_helper.print_yaml(config) yaml_helper.print_yaml(config)
return config return config
...@@ -344,15 +350,15 @@ class DnnTrainer(object): ...@@ -344,15 +350,15 @@ class DnnTrainer(object):
fleet_obj.minimize(loss) fleet_obj.minimize(loss)
if fleet.is_server(): if fleet.is_server():
_main_file = '/' + sync_mode + '_run_minimize' + '_debug:_' + str( _main_file = ps_log_root_dir + sync_mode + '_run_minimize' + '_debug:_' + str(
self.config['debug_new_minimize']) + '_server_main.prototxt' self.config['debug_new_minimize']) + '_server_main.prototxt'
debug_program(_main_file, loss.block.program) debug_program(_main_file, loss.block.program)
elif fleet.is_worker(): elif fleet.is_worker():
_main_file = '/' + sync_mode + '_run_minimize' + '_debug:_' + str( _main_file = ps_log_root_dir + sync_mode + '_run_minimize' + '_debug:_' + str(
self.config['debug_new_minimize']) + '_worker_main.prototxt' self.config['debug_new_minimize']) + '_worker_main.prototxt'
debug_program(_main_file, loss.block.program) debug_program(_main_file, loss.block.program)
elif self.role_maker._is_heter_worker(): elif self.role_maker._is_heter_worker():
_main_file = '/' + sync_mode + '_run_minimize' + '_debug:_' + str( _main_file = ps_log_root_dir + sync_mode + '_run_minimize' + '_debug:_' + str(
self.config[ self.config[
'debug_new_minimize']) + '_heter_worker_main.prototxt' 'debug_new_minimize']) + '_heter_worker_main.prototxt'
debug_program(_main_file, loss.block.program) debug_program(_main_file, loss.block.program)
...@@ -397,16 +403,84 @@ class DnnTrainer(object): ...@@ -397,16 +403,84 @@ class DnnTrainer(object):
_main = worker.append_send_ops_pass(_main, compiled_config) _main = worker.append_send_ops_pass(_main, compiled_config)
if fleet.is_server(): if fleet.is_server():
_main_file = '/' + sync_mode + "_" + str(config[ _main_file = ps_log_root_dir + sync_mode + "_" + str(config[
"applied_pass_name"]) + '_debug:_' + str(self.config[ "applied_pass_name"]) + '_debug:_' + str(self.config[
'debug_new_pass']) + '_server_main.prototxt' 'debug_new_pass']) + '_server_main.prototxt'
debug_program(_main_file, _main) debug_program(_main_file, _main)
elif fleet.is_worker(): elif fleet.is_worker():
_main_file = '/' + sync_mode + "_" + str(config[ _main_file = ps_log_root_dir + sync_mode + "_" + str(config[
"applied_pass_name"]) + '_debug:_' + str(self.config[ "applied_pass_name"]) + '_debug:_' + str(self.config[
'debug_new_pass']) + '_worker_main.prototxt' 'debug_new_pass']) + '_worker_main.prototxt'
debug_program(_main_file, _main) debug_program(_main_file, _main)
def run_the_one_ps(self):
self.init_fleet_with_gloo()
self.model = get_model(self.config)
self.input_data = self.model.create_feeds()
self.metrics = self.model.net(self.input_data)
loss = self.model._cost
user_defined_strategy = get_user_defined_strategy(self.config)
learning_rate = self.config.get(
"hyper_parameters.optimizer.learning_rate")
sync_mode = self.config.get("runner.sync_mode")
inner_optimizer = paddle.optimizer.Adam(learning_rate, lazy_mode=True)
self.role_maker._generate_role() # 必要
if self.config['debug_the_one_ps'] == 1:
logger.info("entering run_the_one_ps -- new")
from paddle.distributed.fleet.meta_optimizers.ps_optimizer import ParameterServerOptimizer
ps_optimizer = ParameterServerOptimizer(inner_optimizer)
ps_optimizer._set_basic_info(loss, self.role_maker, inner_optimizer,
user_defined_strategy)
ps_optimizer.minimize_impl(loss)
from paddle.distributed.ps.the_one_ps import TheOnePSRuntime
_runtime_handle = TheOnePSRuntime() # ps 目录下重构版的 TheOnePSRuntime
_runtime_handle._set_basic_info(ps_optimizer.pass_ctx._attrs)
if fleet.is_worker():
worker_desc = _runtime_handle.ps_desc_builder.build_worker_desc(
)
with open(ps_log_root_dir + sync_mode + '_' +
'new_worker_ps_desc', 'w') as f:
f.write(worker_desc)
if fleet.is_server():
server_desc = _runtime_handle.ps_desc_builder.build_server_desc(
)
with open(ps_log_root_dir + sync_mode + '_' +
'new_server_ps_desc', 'w') as f:
f.write(server_desc)
else:
pass
'''
logger.info("entering run_the_one_ps -- old")
fleet_obj = fleet.distributed_optimizer(
inner_optimizer, user_defined_strategy)
fleet_obj.minimize(loss)
if fleet.is_worker():
worker_desc = fleet_obj._runtime_handle._get_fleet_proto(is_server=False, is_sync=False)
server_desc = fleet_obj._runtime_handle._get_fleet_proto(is_server=True, is_sync=False)
with open(ps_log_root_dir + sync_mode + '_' + 'worker_ps_desc', 'w') as f:
f.write(str(worker_desc) + str(server_desc))
if fleet.is_server():
server_desc = fleet_obj._runtime_handle._get_fleet_proto(is_server=True, is_sync=False)
with open(ps_log_root_dir + sync_mode + '_' + 'server_ps_desc', 'w') as f:
f.write(str(server_desc) + str(fleet_obj._runtime_handle._get_fs_client_desc().to_string()))
'''
if fleet.is_server():
_main_file = ps_log_root_dir + sync_mode + '_run_the_one_ps' + '_debug:_' + str(
self.config['debug_the_one_ps']) + '_server_main.prototxt'
debug_program(_main_file, loss.block.program)
elif fleet.is_worker():
_main_file = ps_log_root_dir + sync_mode + '_run_the_one_ps' + '_debug:_' + str(
self.config['debug_the_one_ps']) + '_worker_main.prototxt'
debug_program(_main_file, loss.block.program)
elif self.role_maker._is_heter_worker():
_main_file = ps_log_root_dir + sync_mode + '_run_the_one_ps' + '_debug:_' + str(
self.config['debug_the_one_ps']) + '_heter_worker_main.prototxt'
debug_program(_main_file, loss.block.program)
if __name__ == "__main__": if __name__ == "__main__":
paddle.enable_static() paddle.enable_static()
...@@ -418,3 +492,5 @@ if __name__ == "__main__": ...@@ -418,3 +492,5 @@ if __name__ == "__main__":
benchmark_main.run_single_pass() benchmark_main.run_single_pass()
elif config['run_minimize'] == 1: elif config['run_minimize'] == 1:
benchmark_main.run_minimize() benchmark_main.run_minimize()
elif config['run_the_one_ps'] == 1:
benchmark_main.run_the_one_ps()
...@@ -22,16 +22,100 @@ import numpy as np ...@@ -22,16 +22,100 @@ import numpy as np
import paddle import paddle
import paddle.fluid as fluid import paddle.fluid as fluid
import paddle
from paddle.fluid.tests.unittests.distributed_passes.ps_pass_test_base import *
from paddle.distributed.ps.utils.public import logger, ps_log_root_dir
from ps_dnn_trainer import DnnTrainer
from paddle.distributed.fleet.proto import ps_pb2
from google.protobuf import text_format
class TestTheOnePs(unittest.TestCase): class TestTheOnePs(PsPassTestBase):
def setUp(self): def setUp(self):
print('setUp...') pass
def tearDown(self): def tearDown(self):
print('tearDown...') pass
def test_main(self): def check(self, file1, file2):
pass pass
'''
f = open(file1, "rb")
ps_desc_1 = ps_pb2.PSParameter()
text_format.Parse(f.read(), ps_desc_1)
f.close()
f = open(file2, "rb")
ps_desc_2 = ps_pb2.PSParameter()
text_format.Parse(f.read(), ps_desc_2)
f.close()
str1 = text_format.MessageToString(ps_desc_1)
str2 = text_format.MessageToString(ps_desc_2)
#logger.info('### msg10: {}'.format(str1))
#logger.info('### msg20: {}'.format(str2))
if str1 == str2:
return True
else:
return False
'''
def test_ps_cpu_async(self):
self.init()
self.config['ps_mode_config'] = "../ps/cpu_async_ps_config.yaml"
self.config['run_the_one_ps'] = '1'
self.config['debug_the_one_ps'] = '0'
self.config[
'log_dir'] = ps_log_root_dir + "async_cpu_log_old_the_one_ps"
remove_path_if_exists(self.config['log_dir'])
self.ps_launch()
self.config['debug_the_one_ps'] = '1'
self.config[
'log_dir'] = ps_log_root_dir + "async_cpu_log_new_the_one_ps"
remove_path_if_exists(self.config['log_dir'])
self.ps_launch()
desc1 = '/ps_desc_baseline/async_worker_ps_desc'
desc2 = '/ps_log/async_new_worker_ps_desc'
desc3 = '/ps_desc_baseline/async_server_ps_desc'
desc4 = '/ps_log/async_new_server_ps_desc'
if self.check(desc1, desc2):
logger.info('test_ps_cpu_async ps_desc: worker passed!')
else:
logger.info('test_ps_cpu_async ps_desc: worker failed!')
if self.check(desc3, desc4):
logger.info('test_ps_cpu_async ps_desc: server passed!')
else:
logger.info('test_ps_cpu_async ps_desc: server failed!')
def test_ps_cpu_geo(self):
self.init()
self.config['ps_mode_config'] = "../ps/cpu_geo_ps_config.yaml"
self.config['run_the_one_ps'] = '1'
self.config['debug_the_one_ps'] = '0'
self.config['log_dir'] = ps_log_root_dir + "geo_cpu_log_old_the_one_ps"
remove_path_if_exists(self.config['log_dir'])
self.ps_launch()
self.config['debug_the_one_ps'] = '1'
self.config['log_dir'] = ps_log_root_dir + "geo_cpu_log_new_the_one_ps"
remove_path_if_exists(self.config['log_dir'])
self.ps_launch()
desc1 = '/ps_desc_baseline/geo_worker_ps_desc'
desc2 = '/ps_log/geo_new_worker_ps_desc'
desc3 = '/ps_desc_baseline/geo_server_ps_desc'
desc4 = '/ps_log/geo_new_server_ps_desc'
if self.check(desc1, desc2):
logger.info('test_ps_cpu_geo ps_desc: worker passed!')
else:
logger.info('test_ps_cpu_geo ps_desc: worker failed!')
if self.check(desc3, desc4):
logger.info('test_ps_cpu_geo ps_desc: server passed!')
else:
logger.info('test_ps_cpu_geo ps_desc: server failed!')
if __name__ == '__main__': if __name__ == '__main__':
......
...@@ -74,6 +74,7 @@ class DNNLayer(nn.Layer): ...@@ -74,6 +74,7 @@ class DNNLayer(nn.Layer):
else: else:
emb = self.embedding(s_input) emb = self.embedding(s_input)
emb = paddle.reshape(emb, shape=[-1, self.sparse_feature_dim]) emb = paddle.reshape(emb, shape=[-1, self.sparse_feature_dim])
# emb.stop_gradient = True
sparse_embs.append(emb) sparse_embs.append(emb)
y_dnn = paddle.concat(x=sparse_embs + [dense_inputs], axis=1) y_dnn = paddle.concat(x=sparse_embs + [dense_inputs], axis=1)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册