提交 8de4d31a 编写于 作者: H heqiaozhi 提交者: dongdaxiang

refactor async exe

上级 24863897
...@@ -121,7 +121,9 @@ class AsyncExecutor(object): ...@@ -121,7 +121,9 @@ class AsyncExecutor(object):
with open("trainer_desc.proto", "w") as fout: with open("trainer_desc.proto", "w") as fout:
fout.write(trainer._desc()) fout.write(trainer._desc())
# define a trainer and a device_worker here # define a trainer and a device_worker here
self.executor.run_from_files(program_desc, trainer._desc(), debug) self.executor.run_from_files(program_desc,
trainer._desc(), debug,
str(id(program_desc)))
''' '''
def run(self, def run(self,
...@@ -194,7 +196,7 @@ class AsyncExecutor(object): ...@@ -194,7 +196,7 @@ class AsyncExecutor(object):
self.executor.run_from_files(program_desc, self.executor.run_from_files(program_desc,
data_feed.desc(), filelist, thread_num, data_feed.desc(), filelist, thread_num,
fetch_var_names, mode, debug) fetch_var_names, mode, debug, str(id(program_desc)))
''' '''
def download_data(self, def download_data(self,
...@@ -313,6 +315,10 @@ class AsyncExecutor(object): ...@@ -313,6 +315,10 @@ class AsyncExecutor(object):
self.dist_desc = dist_desc self.dist_desc = dist_desc
place = core.CPUPlace() place = core.CPUPlace()
executor = Executor(place) executor = Executor(place)
if isinstance(startup_program, list):
for sp in startup_program:
executor.run(sp)
else:
executor.run(startup_program) executor.run(startup_program)
self.instance.barrier_all() #wait all server start self.instance.barrier_all() #wait all server start
......
...@@ -43,9 +43,13 @@ class DownpourSGD(object): ...@@ -43,9 +43,13 @@ class DownpourSGD(object):
self.learning_rate_ = learning_rate self.learning_rate_ = learning_rate
self.window_ = window self.window_ = window
self.type = "downpour" self.type = "downpour"
self.data_norm_name = [
".batch_size", ".batch_square_sum", ".batch_sum",
".batch_size@GRAD", ".batch_square_sum@GRAD", ".batch_sum@GRAD"
]
def minimize(self, def minimize(self,
loss, losses,
startup_program=None, startup_program=None,
parameter_list=None, parameter_list=None,
no_grad_set=None): no_grad_set=None):
...@@ -65,39 +69,75 @@ class DownpourSGD(object): ...@@ -65,39 +69,75 @@ class DownpourSGD(object):
worker_skipped_ops: operator names that need worker_skipped_ops: operator names that need
to be skipped during execution to be skipped during execution
""" """
params_grads = sorted( if not isinstance(losses, list):
append_backward(loss, parameter_list, no_grad_set), raise ValueError('losses is a list, just lick [model.cost]')
key=lambda x: x[0].name) table_name = find_distributed_lookup_table(losses[0].block.program)
table_name = find_distributed_lookup_table(loss.block.program)
prefetch_slots = find_distributed_lookup_table_inputs( prefetch_slots = find_distributed_lookup_table_inputs(
loss.block.program, table_name) losses[0].block.program, table_name)
prefetch_slots_emb = find_distributed_lookup_table_outputs( prefetch_slots_emb = find_distributed_lookup_table_outputs(
loss.block.program, table_name) losses[0].block.program, table_name)
ps_param = pslib.PSParameter()
server = DownpourServer() server = DownpourServer()
# window is communication strategy
worker = DownpourWorker(self.window_) worker = DownpourWorker(self.window_)
# Todo(guru4elephant): support multiple tables definitions
# currently support one big sparse table
sparse_table_index = 0 sparse_table_index = 0
# currently merge all dense parameters into one dense table server.add_sparse_table(sparse_table_index, self.learning_rate_,
prefetch_slots, prefetch_slots_emb)
worker.add_sparse_table(sparse_table_index, self.learning_rate_,
prefetch_slots, prefetch_slots_emb)
dense_table_index = 1 dense_table_index = 1
program_configs = []
for loss_index in range(len(losses)):
program_config = ps_param.trainer_param.program_config.add()
program_config.program_id = str(
id(losses[loss_index].block.program))
program_config.pull_sparse_table_id.extend([sparse_table_index])
program_config.push_sparse_table_id.extend([sparse_table_index])
params_grads = sorted(
append_backward(losses[loss_index], parameter_list,
no_grad_set),
key=lambda x: x[0].name)
params = [] params = []
grads = [] grads = []
data_norm_params = []
data_norm_grads = []
for i in params_grads: for i in params_grads:
is_data_norm_data = False
for data_norm_name in self.data_norm_name:
if i[0].name.endswith(data_norm_name):
is_data_norm_data = True
data_norm_params.append(i[0])
if not is_data_norm_data:
params.append(i[0]) params.append(i[0])
for i in params_grads: for i in params_grads:
is_data_norm_data = False
for data_norm_grad in self.data_norm_name:
if i[0].name.endswith(data_norm_grad):
is_data_norm_data = True
data_norm_grads.append(i[1])
if not is_data_norm_data:
grads.append(i[1]) grads.append(i[1])
server.add_sparse_table(sparse_table_index, self.learning_rate_, server.add_dense_table(dense_table_index, self.learning_rate_,
prefetch_slots, prefetch_slots_emb) params, grads)
server.add_dense_table(dense_table_index, self.learning_rate_, params, worker.add_dense_table(dense_table_index, self.learning_rate_,
grads) params, grads)
worker.add_sparse_table(sparse_table_index, self.learning_rate_, program_config.pull_dense_table_id.extend([dense_table_index])
prefetch_slots, prefetch_slots_emb) program_config.push_dense_table_id.extend([dense_table_index])
worker.add_dense_table(dense_table_index, self.learning_rate_, params, if len(data_norm_params) != 0 and len(data_norm_grads) != 0:
grads) dense_table_index += 1
ps_param = pslib.PSParameter() server.add_data_norm_table(dense_table_index,
self.learning_rate_,
data_norm_params, data_norm_grads)
worker.add_dense_table(dense_table_index, self.learning_rate_,
data_norm_params, data_norm_grads)
program_config.pull_dense_table_id.extend([dense_table_index])
program_config.push_dense_table_id.extend([dense_table_index])
dense_table_index += 1
program_configs.append(program_config)
ps_param.server_param.CopyFrom(server.get_desc()) ps_param.server_param.CopyFrom(server.get_desc())
ps_param.trainer_param.CopyFrom(worker.get_desc()) ps_param.trainer_param.CopyFrom(worker.get_desc())
for program_config in program_configs:
ps_param.trainer_param.program_config.extend([program_config])
# Todo(guru4elephant): figure out how to support more sparse parameters # Todo(guru4elephant): figure out how to support more sparse parameters
# currently only support lookup_table # currently only support lookup_table
worker_skipped_ops = ["lookup_table", "lookup_table_grad"] worker_skipped_ops = ["lookup_table", "lookup_table_grad"]
......
...@@ -112,6 +112,30 @@ class DownpourServer(Server): ...@@ -112,6 +112,30 @@ class DownpourServer(Server):
fea_dim += reduce(lambda x, y: x * y, param.shape, 1) fea_dim += reduce(lambda x, y: x * y, param.shape, 1)
table.accessor.fea_dim = fea_dim table.accessor.fea_dim = fea_dim
def add_data_norm_table(self, table_id, learning_rate, param_var, grad_var):
"""
Args:
table_id(int): id of sparse params table
learning_rate(float): the learning rate used to update parameters. \
Can be a float value
param_var(list): all dense param. it is a list.
grad_var(list): all dense grad parm it is a list.
Returns:
return None
"""
table = self.server_.downpour_server_param.downpour_table_param.add()
table.table_id = table_id
table.table_class = "DownpourDenseTable"
table.type = pslib.PS_DENSE_TABLE
table.accessor.accessor_class = "DownpourDenseValueAccessor"
table.accessor.dense_sgd_param.name = "summary"
table.accessor.dense_sgd_param.summary.summary_decay_rate = 0.999999
fea_dim = 0
for param in filter(lambda x: x.name.find("embedding") == -1,
param_var):
fea_dim += reduce(lambda x, y: x * y, param.shape, 1)
table.accessor.fea_dim = fea_dim
def get_desc(self): def get_desc(self):
""" """
Return downpour server program_desc Return downpour server program_desc
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册