未验证 提交 73650a83 编写于 作者: Y Yancey 提交者: GitHub

Merge pull request #10342 from Yancey1989/refine_distribute_transpiler_api

Refine distribute transpiler api
...@@ -240,8 +240,6 @@ def main(): ...@@ -240,8 +240,6 @@ def main():
t = fluid.DistributeTranspiler() t = fluid.DistributeTranspiler()
t.transpile( t.transpile(
optimize_ops,
params_grads,
trainer_id=args.task_index, trainer_id=args.task_index,
pservers=args.ps_hosts, pservers=args.ps_hosts,
trainers=trainers) trainers=trainers)
......
...@@ -137,8 +137,6 @@ def split_dense_variable(var_list, ...@@ -137,8 +137,6 @@ def split_dense_variable(var_list,
class DistributeTranspiler: class DistributeTranspiler:
def transpile(self, def transpile(self,
optimize_ops,
params_grads,
trainer_id, trainer_id,
program=None, program=None,
pservers="127.0.0.1:6174", pservers="127.0.0.1:6174",
...@@ -169,11 +167,6 @@ class DistributeTranspiler: ...@@ -169,11 +167,6 @@ class DistributeTranspiler:
4. append ops that should run on current server instance. 4. append ops that should run on current server instance.
5. add listen_and_serv op 5. add listen_and_serv op
:param optimize_ops: op list of optimization, should be the
return value of Optimizer.minimize
:type optimize_ops: list
:param params_grads: list of tuple(weight, gradient)
:type params_grads: list
:param trainer_id: one unique id for each trainer in a job. :param trainer_id: one unique id for each trainer in a job.
:type trainer_id: int :type trainer_id: int
:param program: program to transpile, default is default_main_program :param program: program to transpile, default is default_main_program
...@@ -194,7 +187,6 @@ class DistributeTranspiler: ...@@ -194,7 +187,6 @@ class DistributeTranspiler:
program = default_main_program() program = default_main_program()
self.origin_program = program self.origin_program = program
self.trainer_num = trainers self.trainer_num = trainers
self.optimize_ops = optimize_ops
self.sync_mode = sync_mode self.sync_mode = sync_mode
# TODO(typhoonzero): currently trainer_id is fetched from cluster system # TODO(typhoonzero): currently trainer_id is fetched from cluster system
# like Kubernetes, we should port this to use etcd later when developing # like Kubernetes, we should port this to use etcd later when developing
...@@ -202,6 +194,7 @@ class DistributeTranspiler: ...@@ -202,6 +194,7 @@ class DistributeTranspiler:
self.trainer_id = trainer_id self.trainer_id = trainer_id
pserver_endpoints = pservers.split(",") pserver_endpoints = pservers.split(",")
self.pserver_endpoints = pserver_endpoints self.pserver_endpoints = pserver_endpoints
self.optimize_ops, params_grads = self._get_optimize_pass()
# process lookup_table_op # process lookup_table_op
# 1. check all lookup_table_op is distributed # 1. check all lookup_table_op is distributed
...@@ -408,10 +401,7 @@ class DistributeTranspiler: ...@@ -408,10 +401,7 @@ class DistributeTranspiler:
# HACK: optimization global ops only used to scale beta1 and beta2 # HACK: optimization global ops only used to scale beta1 and beta2
# replace it with dependency engine. # replace it with dependency engine.
for op in self.optimize_ops: for op in self.optimize_ops:
if op.type == "scale": if self._is_adam_connected_op(op):
for in_name in op.input_arg_names:
if in_name.startswith("beta1_pow_acc") or \
in_name.startswith("beta2_pow_acc"):
global_ops.append(op) global_ops.append(op)
def __append_optimize_op__(op, block, grad_to_block_id): def __append_optimize_op__(op, block, grad_to_block_id):
...@@ -1147,3 +1137,32 @@ class DistributeTranspiler: ...@@ -1147,3 +1137,32 @@ class DistributeTranspiler:
# we only need to append op for once # we only need to append op for once
break break
return lr_ops return lr_ops
def _get_optimize_pass(self):
block = self.origin_program.global_block()
opt_ops = []
params_grads = []
for op in block.ops:
if self._is_opt_op(op):
opt_ops.append(op)
params_grads.append((self.origin_program.global_block().var(
op.input("Param")[0]),
self.origin_program.global_block().var(
op.input("Grad")[0])))
elif self._is_adam_connected_op(op):
opt_ops.append(op)
else:
pass
return opt_ops, params_grads
def _is_adam_connected_op(self, op):
"""
A hack function to determinate whether the input operator
is connected to optimize operator.
"""
if op.type == "scale":
for in_name in op.input_arg_names:
if in_name.startswith("beta1_pow_acc") or \
in_name.startswith("beta2_pow_acc"):
return True
return False
...@@ -80,12 +80,7 @@ def train(use_cuda, save_dirname, is_local): ...@@ -80,12 +80,7 @@ def train(use_cuda, save_dirname, is_local):
trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID")) trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID"))
training_role = os.getenv("TRAINING_ROLE", "TRAINER") training_role = os.getenv("TRAINING_ROLE", "TRAINER")
t = fluid.DistributeTranspiler() t = fluid.DistributeTranspiler()
t.transpile( t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers)
optimize_ops,
params_grads,
trainer_id,
pservers=pserver_endpoints,
trainers=trainers)
if training_role == "PSERVER": if training_role == "PSERVER":
pserver_prog = t.get_pserver_program(current_endpoint) pserver_prog = t.get_pserver_program(current_endpoint)
pserver_startup = t.get_startup_program(current_endpoint, pserver_startup = t.get_startup_program(current_endpoint,
......
...@@ -189,12 +189,7 @@ def train(net_type, use_cuda, save_dirname, is_local): ...@@ -189,12 +189,7 @@ def train(net_type, use_cuda, save_dirname, is_local):
trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID")) trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID"))
training_role = os.getenv("TRAINING_ROLE", "TRAINER") training_role = os.getenv("TRAINING_ROLE", "TRAINER")
t = fluid.DistributeTranspiler() t = fluid.DistributeTranspiler()
t.transpile( t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers)
optimize_ops,
params_grads,
trainer_id,
pservers=pserver_endpoints,
trainers=trainers)
if training_role == "PSERVER": if training_role == "PSERVER":
pserver_prog = t.get_pserver_program(current_endpoint) pserver_prog = t.get_pserver_program(current_endpoint)
pserver_startup = t.get_startup_program(current_endpoint, pserver_startup = t.get_startup_program(current_endpoint,
......
...@@ -259,12 +259,7 @@ def train(use_cuda, save_dirname=None, is_local=True): ...@@ -259,12 +259,7 @@ def train(use_cuda, save_dirname=None, is_local=True):
trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID")) trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID"))
training_role = os.getenv("TRAINING_ROLE", "TRAINER") training_role = os.getenv("TRAINING_ROLE", "TRAINER")
t = fluid.DistributeTranspiler() t = fluid.DistributeTranspiler()
t.transpile( t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers)
optimize_ops,
params_grads,
trainer_id,
pservers=pserver_endpoints,
trainers=trainers)
if training_role == "PSERVER": if training_role == "PSERVER":
pserver_prog = t.get_pserver_program(current_endpoint) pserver_prog = t.get_pserver_program(current_endpoint)
pserver_startup = t.get_startup_program(current_endpoint, pserver_startup = t.get_startup_program(current_endpoint,
......
...@@ -231,12 +231,7 @@ def train_main(use_cuda, is_sparse, is_local=True): ...@@ -231,12 +231,7 @@ def train_main(use_cuda, is_sparse, is_local=True):
trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID")) trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID"))
training_role = os.getenv("TRAINING_ROLE", "TRAINER") training_role = os.getenv("TRAINING_ROLE", "TRAINER")
t = fluid.DistributeTranspiler() t = fluid.DistributeTranspiler()
t.transpile( t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers)
optimize_ops,
params_grads,
trainer_id,
pservers=pserver_endpoints,
trainers=trainers)
if training_role == "PSERVER": if training_role == "PSERVER":
pserver_prog = t.get_pserver_program(current_endpoint) pserver_prog = t.get_pserver_program(current_endpoint)
pserver_startup = t.get_startup_program(current_endpoint, pserver_startup = t.get_startup_program(current_endpoint,
......
...@@ -162,12 +162,7 @@ def train(nn_type, ...@@ -162,12 +162,7 @@ def train(nn_type,
trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID")) trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID"))
training_role = os.getenv("TRAINING_ROLE", "TRAINER") training_role = os.getenv("TRAINING_ROLE", "TRAINER")
t = fluid.DistributeTranspiler() t = fluid.DistributeTranspiler()
t.transpile( t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers)
optimize_ops,
params_grads,
trainer_id,
pservers=pserver_endpoints,
trainers=trainers)
if training_role == "PSERVER": if training_role == "PSERVER":
pserver_prog = t.get_pserver_program(current_endpoint) pserver_prog = t.get_pserver_program(current_endpoint)
pserver_startup = t.get_startup_program(current_endpoint, pserver_startup = t.get_startup_program(current_endpoint,
......
...@@ -261,12 +261,7 @@ def train(use_cuda, save_dirname, is_local=True): ...@@ -261,12 +261,7 @@ def train(use_cuda, save_dirname, is_local=True):
trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID")) trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID"))
training_role = os.getenv("TRAINING_ROLE", "TRAINER") training_role = os.getenv("TRAINING_ROLE", "TRAINER")
t = fluid.DistributeTranspiler() t = fluid.DistributeTranspiler()
t.transpile( t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers)
optimize_ops,
params_grads,
trainer_id,
pservers=pserver_endpoints,
trainers=trainers)
if training_role == "PSERVER": if training_role == "PSERVER":
pserver_prog = t.get_pserver_program(current_endpoint) pserver_prog = t.get_pserver_program(current_endpoint)
pserver_startup = t.get_startup_program(current_endpoint, pserver_startup = t.get_startup_program(current_endpoint,
......
...@@ -213,12 +213,7 @@ def train(word_dict, ...@@ -213,12 +213,7 @@ def train(word_dict,
trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID")) trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID"))
training_role = os.getenv("TRAINING_ROLE", "TRAINER") training_role = os.getenv("TRAINING_ROLE", "TRAINER")
t = fluid.DistributeTranspiler() t = fluid.DistributeTranspiler()
t.transpile( t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers)
optimize_ops,
params_grads,
trainer_id,
pservers=pserver_endpoints,
trainers=trainers)
if training_role == "PSERVER": if training_role == "PSERVER":
pserver_prog = t.get_pserver_program(current_endpoint) pserver_prog = t.get_pserver_program(current_endpoint)
pserver_startup = t.get_startup_program(current_endpoint, pserver_startup = t.get_startup_program(current_endpoint,
......
...@@ -145,12 +145,7 @@ def train(use_cuda, is_sparse, is_parallel, save_dirname, is_local=True): ...@@ -145,12 +145,7 @@ def train(use_cuda, is_sparse, is_parallel, save_dirname, is_local=True):
trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID")) trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID"))
training_role = os.getenv("TRAINING_ROLE", "TRAINER") training_role = os.getenv("TRAINING_ROLE", "TRAINER")
t = fluid.DistributeTranspiler() t = fluid.DistributeTranspiler()
t.transpile( t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers)
optimize_ops,
params_grads,
trainer_id,
pservers=pserver_endpoints,
trainers=trainers)
if training_role == "PSERVER": if training_role == "PSERVER":
pserver_prog = t.get_pserver_program(current_endpoint) pserver_prog = t.get_pserver_program(current_endpoint)
pserver_startup = t.get_startup_program(current_endpoint, pserver_startup = t.get_startup_program(current_endpoint,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册