提交 49130f9b 编写于 作者: D dongdaxiang

refine downpour sgd API and adapt to pslib proto desc

上级 8eb7f3ad
...@@ -180,7 +180,7 @@ cc_library(parallel_executor SRCS parallel_executor.cc DEPS ...@@ -180,7 +180,7 @@ cc_library(parallel_executor SRCS parallel_executor.cc DEPS
graph build_strategy graph build_strategy
fast_threaded_ssa_graph_executor variable_helper) fast_threaded_ssa_graph_executor variable_helper)
cc_library(async_executor SRCS async_executor.cc data_feed.cc data_feed_factory.cc executor_thread_worker.cc DEPS op_registry device_context scope framework_proto glog lod_rank_table feed_fetch_method graph_to_program_pass async_executor_proto variable_helper) cc_library(async_executor SRCS async_executor.cc data_feed.cc data_feed_factory.cc executor_thread_worker.cc DEPS op_registry device_context scope framework_proto glog lod_rank_table feed_fetch_method graph_to_program_pass async_executor_proto variable_helper pslib)
cc_test(data_feed_test SRCS data_feed_test.cc DEPS async_executor) cc_test(data_feed_test SRCS data_feed_test.cc DEPS async_executor)
cc_library(prune SRCS prune.cc DEPS framework_proto) cc_library(prune SRCS prune.cc DEPS framework_proto)
......
...@@ -15,6 +15,24 @@ ...@@ -15,6 +15,24 @@
LOOKUP_TABLE_TYPE = "lookup_table" LOOKUP_TABLE_TYPE = "lookup_table"
def find_distributed_lookup_table_inputs(program, table_name):
local_vars = program.current_block().vars
inputs = []
for op in program.global_block().ops:
if op.type == LOOKUP_TABLE_TYPE:
if table_name == op.input("W")[0]:
inputs.extend([local_vars[name] for name in op.input("Ids")])
return inputs
def find_distributed_lookup_table_outputs(program, table_name):
local_vars = program.current_block().vars
outputs = []
for op in program.global_block().ops:
if op.type == LOOKUP_TABLE_TYPE:
if table_name == op.input("W")[0]:
outputs.extend([local_vars[name] for name in op.output("Out")])
return outputs
def find_distributed_lookup_table(program): def find_distributed_lookup_table(program):
""" """
Find distribute lookup table in program. Find distribute lookup table in program.
......
...@@ -3,6 +3,8 @@ from .node import DownpourWorker ...@@ -3,6 +3,8 @@ from .node import DownpourWorker
from ..backward import append_backward from ..backward import append_backward
import ps_pb2 as pslib import ps_pb2 as pslib
from paddle.fluid.distribute_lookup_table import find_distributed_lookup_table from paddle.fluid.distribute_lookup_table import find_distributed_lookup_table
from paddle.fluid.distribute_lookup_table import find_distributed_lookup_table_inputs
from paddle.fluid.distribute_lookup_table import find_distributed_lookup_table_outputs
from google.protobuf import text_format from google.protobuf import text_format
class DownpourSGD(object): class DownpourSGD(object):
...@@ -12,21 +14,24 @@ class DownpourSGD(object): ...@@ -12,21 +14,24 @@ class DownpourSGD(object):
self.window_ = window self.window_ = window
def minimize(self, loss, startup_program=None, def minimize(self, loss, startup_program=None,
parameter_list=None, no_grad_set=None, parameter_list=None, no_grad_set=None):
prefetch_slots=None, prefetch_slots_emb=None):
params_grads = sorted(append_backward(loss), key=lambda x:x[0].name) params_grads = sorted(append_backward(loss), key=lambda x:x[0].name)
table_name = find_distributed_lookup_table(loss.block.program) table_name = find_distributed_lookup_table(loss.block.program)
prefetch_slots = find_distributed_lookup_table_inputs(
loss.block.program, table_name)
prefetch_slots_emb = find_distributed_lookup_table_outputs(
loss.block.program, table_name)
server = DownpourServer() server = DownpourServer()
worker = DownpourWorker(self.window_) worker = DownpourWorker(self.window_)
server.add_sparse_table(0, learning_rate, server.add_sparse_table(0, self.learning_rate_,
prefetch_slots, prefetch_slots_emb) prefetch_slots, prefetch_slots_emb)
server.add_dense_table(1, learning_rate, params, grads) server.add_dense_table(1, self.learning_rate_, params_grads[0], params_grads[1])
worker.add_sparse_table(0, learning_rate, worker.add_sparse_table(0, self.learning_rate_,
prefetch_slots, prefetch_slots_emb) prefetch_slots, prefetch_slots_emb)
worker.add_dense_table(1, learning_rate, params, grads) worker.add_dense_table(1, self.learning_rate_, params_grads[0], params_grads[1])
ps_param = pslib.PSParameter() ps_param = pslib.PSParameter()
ps_param.server_param.CopyFrom(server.get_desc()) ps_param.server_param.CopyFrom(server.get_desc())
#ps_param.worker_param.CopyFrom(worker.get_desc()) #ps_param.worker_param.CopyFrom(worker.get_desc())
worker_skipped_ops = ["lookup_table", "lookup_table_grad"] worker_skipped_ops = ["lookup_table", "lookup_table_grad"]
ps_param_str = text_format.MessageToString(ps_param) ps_param_str = text_format.MessageToString(ps_param)
return [ps_param_str, worker_skipped_ops] return [ps_param_str, worker_skipped_ops, text_format.MessageToString(worker.get_desc())]
...@@ -16,25 +16,26 @@ class DownpourServer(Server): ...@@ -16,25 +16,26 @@ class DownpourServer(Server):
self.server_ = pslib.ServerParameter() self.server_ = pslib.ServerParameter()
def add_sparse_table(self, table_id, learning_rate, def add_sparse_table(self, table_id, learning_rate,
slot_key, slot_value_var, slot_grad_var): slot_key_vars, slot_value_var):
#table = self.server_.downpour_table_param.add()
table = self.server_.downpour_server_param.downpour_table_param.add() table = self.server_.downpour_server_param.downpour_table_param.add()
table.table_id = table_id table.table_id = table_id
table.type = PS_SPARSE_TABLE table.type = pslib.PS_SPARSE_TABLE
table.accessor.accessor_class = "DownpourFeatureValueAccessor" table.accessor.accessor_class = "DownpourFeatureValueAccessor"
table.accessor.dense_sgd_param.adam.learning_rate = learning_rate table.accessor.dense_sgd_param.adam.learning_rate = learning_rate
table.accessor.fea_dim = slot_value_var[0].shape[1] table.accessor.fea_dim = abs(reduce(lambda x, y: x * y,
slot_value_var[0].shape, 1))
def add_dense_table(self, table_id, learning_rate, def add_dense_table(self, table_id, learning_rate,
param_var, grad_var): param_var, grad_var):
#table = self.server_.downpour_table_param.add()
table = self.server_.downpour_server_param.downpour_table_param.add() table = self.server_.downpour_server_param.downpour_table_param.add()
table.table_id = table_id table.table_id = table_id
table.type = PS_DENSE_TABLE table.type = pslib.PS_DENSE_TABLE
table.accessor.accessor_class = "DownpourDenseValueAccessor" table.accessor.accessor_class = "DownpourDenseValueAccessor"
table.accessor.sparse_sgd_param.learning_rate = learning_rate table.accessor.sparse_sgd_param.learning_rate = learning_rate
table.accessor.fea_dim = 1 fea_dim = 0
#table.accessor.fea_dim = reduce(lambda x, y: x.shape, 1 for x in param_var) for param in param_var:
fea_dim += reduce(lambda x, y: x * y, param.shape, 1)
table.accessor.fea_dim = fea_dim
def get_desc(self): def get_desc(self):
return self.server_ return self.server_
...@@ -43,28 +44,24 @@ class DownpourServer(Server): ...@@ -43,28 +44,24 @@ class DownpourServer(Server):
class DownpourWorker(Worker): class DownpourWorker(Worker):
def __init__(self, window): def __init__(self, window):
self.window = window self.window = window
#self.worker_ = pslib.WorkerParameter().downpour_worker_param
#self.worker_ = pslib.WorkerParameter()
self.worker_ = pslib.DownpourTrainerParameter() self.worker_ = pslib.DownpourTrainerParameter()
#self.worker_.pull_dense_per_batch = window
#self.worker_.push_dense_per_batch = window
#self.worker_.downpour_worker_param.pull_dense_per_batch = window
#self.worker_.downpour_worker_param.push_dense_per_batch = window
self.worker_.pull_dense_per_batch = window self.worker_.pull_dense_per_batch = window
self.worker_.push_dense_per_batch = window self.worker_.push_dense_per_batch = window
print(self.worker_)
def add_sparse_table(self, table_id, def add_sparse_table(self, table_id, learning_rate,
slot_keys, slot_value_vars, slot_grad_vars): slot_key_vars, slot_value_vars):
#table = self.worker_.sparse_table.add() table = self.worker_.sparse_table.add()
table = self.worker_.downpour_worker_param.sparse_table.add()
table.table_id = table_id table.table_id = table_id
table.slot.extend(slot_keys) table.slot_key.extend(
self.worker_.extend([grad.name for grad in slot_grad_vars]) [var.name for var in slot_key_vars])
table.slot_value.extend(
[var.name for var in slot_value_vars])
table.slot_gradient.extend(
[var.name + "@GRAD" for var in slot_value_vars])
def add_dense_table(self, table_id, param_vars, grad_vars): def add_dense_table(self, table_id, learning_rate,
#table = self.worker_.dense_table.add() param_vars, grad_vars):
table = self.worker_.downpour_worker_param.dense_table.add() table = self.worker_.dense_table.add()
table.table_id = table_id table.table_id = table_id
table.dense_variable_name.extend([p.name for p in param_vars]) table.dense_variable_name.extend([p.name for p in param_vars])
table.dense_gradient_variable_name.extend([g.name for g in grad_vars]) table.dense_gradient_variable_name.extend([g.name for g in grad_vars])
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册