From 49130f9b8f41cda0ac50e5c57f4b033c260c7541 Mon Sep 17 00:00:00 2001 From: dongdaxiang Date: Tue, 4 Dec 2018 20:15:52 +0800 Subject: [PATCH] refine downpour sgd API and adapt to pslib proto desc --- paddle/fluid/framework/CMakeLists.txt | 2 +- .../paddle/fluid/distribute_lookup_table.py | 18 ++++++++ python/paddle/fluid/distributed/downpour.py | 19 +++++--- python/paddle/fluid/distributed/node.py | 45 +++++++++---------- 4 files changed, 52 insertions(+), 32 deletions(-) diff --git a/paddle/fluid/framework/CMakeLists.txt b/paddle/fluid/framework/CMakeLists.txt index 9f5631b87cb..8556dcbc36c 100644 --- a/paddle/fluid/framework/CMakeLists.txt +++ b/paddle/fluid/framework/CMakeLists.txt @@ -180,7 +180,7 @@ cc_library(parallel_executor SRCS parallel_executor.cc DEPS graph build_strategy fast_threaded_ssa_graph_executor variable_helper) -cc_library(async_executor SRCS async_executor.cc data_feed.cc data_feed_factory.cc executor_thread_worker.cc DEPS op_registry device_context scope framework_proto glog lod_rank_table feed_fetch_method graph_to_program_pass async_executor_proto variable_helper) +cc_library(async_executor SRCS async_executor.cc data_feed.cc data_feed_factory.cc executor_thread_worker.cc DEPS op_registry device_context scope framework_proto glog lod_rank_table feed_fetch_method graph_to_program_pass async_executor_proto variable_helper pslib) cc_test(data_feed_test SRCS data_feed_test.cc DEPS async_executor) cc_library(prune SRCS prune.cc DEPS framework_proto) diff --git a/python/paddle/fluid/distribute_lookup_table.py b/python/paddle/fluid/distribute_lookup_table.py index 52d9ce75f8d..a903257fa94 100644 --- a/python/paddle/fluid/distribute_lookup_table.py +++ b/python/paddle/fluid/distribute_lookup_table.py @@ -15,6 +15,24 @@ LOOKUP_TABLE_TYPE = "lookup_table" +def find_distributed_lookup_table_inputs(program, table_name): + local_vars = program.current_block().vars + inputs = [] + for op in program.global_block().ops: + if op.type == LOOKUP_TABLE_TYPE: + if table_name == op.input("W")[0]: + inputs.extend([local_vars[name] for name in op.input("Ids")]) + return inputs + +def find_distributed_lookup_table_outputs(program, table_name): + local_vars = program.current_block().vars + outputs = [] + for op in program.global_block().ops: + if op.type == LOOKUP_TABLE_TYPE: + if table_name == op.input("W")[0]: + outputs.extend([local_vars[name] for name in op.output("Out")]) + return outputs + def find_distributed_lookup_table(program): """ Find distribute lookup table in program. diff --git a/python/paddle/fluid/distributed/downpour.py b/python/paddle/fluid/distributed/downpour.py index 551a4714950..3fe4afdbffb 100644 --- a/python/paddle/fluid/distributed/downpour.py +++ b/python/paddle/fluid/distributed/downpour.py @@ -3,6 +3,8 @@ from .node import DownpourWorker from ..backward import append_backward import ps_pb2 as pslib from paddle.fluid.distribute_lookup_table import find_distributed_lookup_table +from paddle.fluid.distribute_lookup_table import find_distributed_lookup_table_inputs +from paddle.fluid.distribute_lookup_table import find_distributed_lookup_table_outputs from google.protobuf import text_format class DownpourSGD(object): @@ -12,21 +14,24 @@ class DownpourSGD(object): self.window_ = window def minimize(self, loss, startup_program=None, - parameter_list=None, no_grad_set=None, - prefetch_slots=None, prefetch_slots_emb=None): + parameter_list=None, no_grad_set=None): params_grads = sorted(append_backward(loss), key=lambda x:x[0].name) table_name = find_distributed_lookup_table(loss.block.program) + prefetch_slots = find_distributed_lookup_table_inputs( + loss.block.program, table_name) + prefetch_slots_emb = find_distributed_lookup_table_outputs( + loss.block.program, table_name) server = DownpourServer() worker = DownpourWorker(self.window_) - server.add_sparse_table(0, learning_rate, + server.add_sparse_table(0, self.learning_rate_, prefetch_slots, prefetch_slots_emb) - server.add_dense_table(1, learning_rate, params, grads) - worker.add_sparse_table(0, learning_rate, + server.add_dense_table(1, self.learning_rate_, params_grads[0], params_grads[1]) + worker.add_sparse_table(0, self.learning_rate_, prefetch_slots, prefetch_slots_emb) - worker.add_dense_table(1, learning_rate, params, grads) + worker.add_dense_table(1, self.learning_rate_, params_grads[0], params_grads[1]) ps_param = pslib.PSParameter() ps_param.server_param.CopyFrom(server.get_desc()) #ps_param.worker_param.CopyFrom(worker.get_desc()) worker_skipped_ops = ["lookup_table", "lookup_table_grad"] ps_param_str = text_format.MessageToString(ps_param) - return [ps_param_str, worker_skipped_ops] + return [ps_param_str, worker_skipped_ops, text_format.MessageToString(worker.get_desc())] diff --git a/python/paddle/fluid/distributed/node.py b/python/paddle/fluid/distributed/node.py index 3344bba137e..7c9a76efb69 100644 --- a/python/paddle/fluid/distributed/node.py +++ b/python/paddle/fluid/distributed/node.py @@ -16,25 +16,26 @@ class DownpourServer(Server): self.server_ = pslib.ServerParameter() def add_sparse_table(self, table_id, learning_rate, - slot_key, slot_value_var, slot_grad_var): - #table = self.server_.downpour_table_param.add() + slot_key_vars, slot_value_var): table = self.server_.downpour_server_param.downpour_table_param.add() table.table_id = table_id - table.type = PS_SPARSE_TABLE + table.type = pslib.PS_SPARSE_TABLE table.accessor.accessor_class = "DownpourFeatureValueAccessor" table.accessor.dense_sgd_param.adam.learning_rate = learning_rate - table.accessor.fea_dim = slot_value_var[0].shape[1] + table.accessor.fea_dim = abs(reduce(lambda x, y: x * y, + slot_value_var[0].shape, 1)) def add_dense_table(self, table_id, learning_rate, param_var, grad_var): - #table = self.server_.downpour_table_param.add() table = self.server_.downpour_server_param.downpour_table_param.add() table.table_id = table_id - table.type = PS_DENSE_TABLE + table.type = pslib.PS_DENSE_TABLE table.accessor.accessor_class = "DownpourDenseValueAccessor" table.accessor.sparse_sgd_param.learning_rate = learning_rate - table.accessor.fea_dim = 1 - #table.accessor.fea_dim = reduce(lambda x, y: x.shape, 1 for x in param_var) + fea_dim = 0 + for param in param_var: + fea_dim += reduce(lambda x, y: x * y, param.shape, 1) + table.accessor.fea_dim = fea_dim def get_desc(self): return self.server_ @@ -43,28 +44,24 @@ class DownpourServer(Server): class DownpourWorker(Worker): def __init__(self, window): self.window = window - #self.worker_ = pslib.WorkerParameter().downpour_worker_param - #self.worker_ = pslib.WorkerParameter() self.worker_ = pslib.DownpourTrainerParameter() - #self.worker_.pull_dense_per_batch = window - #self.worker_.push_dense_per_batch = window - #self.worker_.downpour_worker_param.pull_dense_per_batch = window - #self.worker_.downpour_worker_param.push_dense_per_batch = window self.worker_.pull_dense_per_batch = window self.worker_.push_dense_per_batch = window - print(self.worker_) - def add_sparse_table(self, table_id, - slot_keys, slot_value_vars, slot_grad_vars): - #table = self.worker_.sparse_table.add() - table = self.worker_.downpour_worker_param.sparse_table.add() + def add_sparse_table(self, table_id, learning_rate, + slot_key_vars, slot_value_vars): + table = self.worker_.sparse_table.add() table.table_id = table_id - table.slot.extend(slot_keys) - self.worker_.extend([grad.name for grad in slot_grad_vars]) + table.slot_key.extend( + [var.name for var in slot_key_vars]) + table.slot_value.extend( + [var.name for var in slot_value_vars]) + table.slot_gradient.extend( + [var.name + "@GRAD" for var in slot_value_vars]) - def add_dense_table(self, table_id, param_vars, grad_vars): - #table = self.worker_.dense_table.add() - table = self.worker_.downpour_worker_param.dense_table.add() + def add_dense_table(self, table_id, learning_rate, + param_vars, grad_vars): + table = self.worker_.dense_table.add() table.table_id = table_id table.dense_variable_name.extend([p.name for p in param_vars]) table.dense_gradient_variable_name.extend([g.name for g in grad_vars]) -- GitLab