提交 f517d01a 编写于 作者: W Wu Yi 提交者: typhoonzero

fix dist transpile with memopt (#12974)

* fix dist transpile with memopt

* update api.spec

* polish dist transpile api

* update apispec

* update apispec
上级 a6dcadca
...@@ -55,9 +55,10 @@ paddle.fluid.Inferencer.__init__ ArgSpec(args=['self', 'infer_func', 'param_path ...@@ -55,9 +55,10 @@ paddle.fluid.Inferencer.__init__ ArgSpec(args=['self', 'infer_func', 'param_path
paddle.fluid.Inferencer.infer ArgSpec(args=['self', 'inputs', 'return_numpy'], varargs=None, keywords=None, defaults=(True,)) paddle.fluid.Inferencer.infer ArgSpec(args=['self', 'inputs', 'return_numpy'], varargs=None, keywords=None, defaults=(True,))
paddle.fluid.DistributeTranspiler.__init__ ArgSpec(args=['self', 'config'], varargs=None, keywords=None, defaults=(None,)) paddle.fluid.DistributeTranspiler.__init__ ArgSpec(args=['self', 'config'], varargs=None, keywords=None, defaults=(None,))
paddle.fluid.DistributeTranspiler.get_pserver_program ArgSpec(args=['self', 'endpoint'], varargs=None, keywords=None, defaults=None) paddle.fluid.DistributeTranspiler.get_pserver_program ArgSpec(args=['self', 'endpoint'], varargs=None, keywords=None, defaults=None)
paddle.fluid.DistributeTranspiler.get_startup_program ArgSpec(args=['self', 'endpoint', 'pserver_program', 'startup_program'], varargs=None, keywords=None, defaults=(None,)) paddle.fluid.DistributeTranspiler.get_pserver_programs ArgSpec(args=['self', 'endpoint'], varargs=None, keywords=None, defaults=None)
paddle.fluid.DistributeTranspiler.get_startup_program ArgSpec(args=['self', 'endpoint', 'pserver_program', 'startup_program'], varargs=None, keywords=None, defaults=(None, None))
paddle.fluid.DistributeTranspiler.get_trainer_program ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None) paddle.fluid.DistributeTranspiler.get_trainer_program ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None)
paddle.fluid.DistributeTranspiler.transpile ArgSpec(args=['self', 'trainer_id', 'program', 'pservers', 'trainers', 'sync_mode'], varargs=None, keywords=None, defaults=(None, '127.0.0.1:6174', 1, True)) paddle.fluid.DistributeTranspiler.transpile ArgSpec(args=['self', 'trainer_id', 'program', 'pservers', 'trainers', 'sync_mode', 'startup_program'], varargs=None, keywords=None, defaults=(None, '127.0.0.1:6174', 1, True, None))
paddle.fluid.InferenceTranspiler.__init__ paddle.fluid.InferenceTranspiler.__init__
paddle.fluid.InferenceTranspiler.transpile ArgSpec(args=['self', 'program', 'place', 'scope'], varargs=None, keywords=None, defaults=(None,)) paddle.fluid.InferenceTranspiler.transpile ArgSpec(args=['self', 'program', 'place', 'scope'], varargs=None, keywords=None, defaults=(None,))
paddle.fluid.memory_optimize ArgSpec(args=['input_program', 'skip_opt_set', 'print_log', 'level'], varargs=None, keywords=None, defaults=(None, False, 0)) paddle.fluid.memory_optimize ArgSpec(args=['input_program', 'skip_opt_set', 'print_log', 'level'], varargs=None, keywords=None, defaults=(None, False, 0))
...@@ -329,9 +330,10 @@ paddle.fluid.contrib.BeamSearchDecoder.update_array ArgSpec(args=['self', 'array ...@@ -329,9 +330,10 @@ paddle.fluid.contrib.BeamSearchDecoder.update_array ArgSpec(args=['self', 'array
paddle.fluid.contrib.memory_usage ArgSpec(args=['program', 'batch_size'], varargs=None, keywords=None, defaults=None) paddle.fluid.contrib.memory_usage ArgSpec(args=['program', 'batch_size'], varargs=None, keywords=None, defaults=None)
paddle.fluid.transpiler.DistributeTranspiler.__init__ ArgSpec(args=['self', 'config'], varargs=None, keywords=None, defaults=(None,)) paddle.fluid.transpiler.DistributeTranspiler.__init__ ArgSpec(args=['self', 'config'], varargs=None, keywords=None, defaults=(None,))
paddle.fluid.transpiler.DistributeTranspiler.get_pserver_program ArgSpec(args=['self', 'endpoint'], varargs=None, keywords=None, defaults=None) paddle.fluid.transpiler.DistributeTranspiler.get_pserver_program ArgSpec(args=['self', 'endpoint'], varargs=None, keywords=None, defaults=None)
paddle.fluid.transpiler.DistributeTranspiler.get_startup_program ArgSpec(args=['self', 'endpoint', 'pserver_program', 'startup_program'], varargs=None, keywords=None, defaults=(None,)) paddle.fluid.transpiler.DistributeTranspiler.get_pserver_programs ArgSpec(args=['self', 'endpoint'], varargs=None, keywords=None, defaults=None)
paddle.fluid.transpiler.DistributeTranspiler.get_startup_program ArgSpec(args=['self', 'endpoint', 'pserver_program', 'startup_program'], varargs=None, keywords=None, defaults=(None, None))
paddle.fluid.transpiler.DistributeTranspiler.get_trainer_program ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None) paddle.fluid.transpiler.DistributeTranspiler.get_trainer_program ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None)
paddle.fluid.transpiler.DistributeTranspiler.transpile ArgSpec(args=['self', 'trainer_id', 'program', 'pservers', 'trainers', 'sync_mode'], varargs=None, keywords=None, defaults=(None, '127.0.0.1:6174', 1, True)) paddle.fluid.transpiler.DistributeTranspiler.transpile ArgSpec(args=['self', 'trainer_id', 'program', 'pservers', 'trainers', 'sync_mode', 'startup_program'], varargs=None, keywords=None, defaults=(None, '127.0.0.1:6174', 1, True, None))
paddle.fluid.transpiler.InferenceTranspiler.__init__ paddle.fluid.transpiler.InferenceTranspiler.__init__
paddle.fluid.transpiler.InferenceTranspiler.transpile ArgSpec(args=['self', 'program', 'place', 'scope'], varargs=None, keywords=None, defaults=(None,)) paddle.fluid.transpiler.InferenceTranspiler.transpile ArgSpec(args=['self', 'program', 'place', 'scope'], varargs=None, keywords=None, defaults=(None,))
paddle.fluid.transpiler.memory_optimize ArgSpec(args=['input_program', 'skip_opt_set', 'print_log', 'level'], varargs=None, keywords=None, defaults=(None, False, 0)) paddle.fluid.transpiler.memory_optimize ArgSpec(args=['input_program', 'skip_opt_set', 'print_log', 'level'], varargs=None, keywords=None, defaults=(None, False, 0))
......
...@@ -31,7 +31,7 @@ Steps to transpile pserver: ...@@ -31,7 +31,7 @@ Steps to transpile pserver:
""" """
import math import math
import random import sys
import numpy as np import numpy as np
import collections import collections
...@@ -181,7 +181,8 @@ class DistributeTranspiler(object): ...@@ -181,7 +181,8 @@ class DistributeTranspiler(object):
program=None, program=None,
pservers="127.0.0.1:6174", pservers="127.0.0.1:6174",
trainers=1, trainers=1,
sync_mode=True): sync_mode=True,
startup_program=None):
""" """
Run the transpiler. Run the transpiler.
...@@ -194,13 +195,17 @@ class DistributeTranspiler(object): ...@@ -194,13 +195,17 @@ class DistributeTranspiler(object):
list. list.
trainers (int): number of trainers in the distributed job. trainers (int): number of trainers in the distributed job.
sync_mode (bool): Do sync training or not, default is True. sync_mode (bool): Do sync training or not, default is True.
startup_program (Program|None): startup_program to transpile,
default is fluid.default_main_program().
""" """
if program is None: if program is None:
program = default_main_program() program = default_main_program()
if startup_program is None:
startup_program = default_startup_program()
self.origin_program = program self.origin_program = program
self.origin_startup_program = default_startup_program().clone() self.startup_program = startup_program
self.origin_startup_program = self.startup_program.clone()
self.startup_program = default_startup_program()
self.trainer_num = trainers self.trainer_num = trainers
self.sync_mode = sync_mode self.sync_mode = sync_mode
self.trainer_id = trainer_id self.trainer_id = trainer_id
...@@ -369,21 +374,18 @@ class DistributeTranspiler(object): ...@@ -369,21 +374,18 @@ class DistributeTranspiler(object):
return self.origin_program return self.origin_program
def _get_trainer_startup_program(self, def _get_trainer_startup_program(self, recv_vars, eplist):
recv_vars,
eplist,
startup_program=None):
""" """
Get transpiled trainer side startup program. Get transpiled trainer side startup program.
Args: Args:
startup_program(Program): Startup program. recv_vars (list): Variable list to recv for current trainer_id
eplist (list): A list of strings indicating
Returns: Returns:
Program: trainer side startup program. Program: trainer side startup program.
""" """
if startup_program is None: startup_program = self.startup_program
startup_program = self.startup_program
# FIXME(gongwb): delete not need ops. # FIXME(gongwb): delete not need ops.
# note that: some parameter is not trainable and those ops can't be deleted. # note that: some parameter is not trainable and those ops can't be deleted.
...@@ -431,7 +433,18 @@ class DistributeTranspiler(object): ...@@ -431,7 +433,18 @@ class DistributeTranspiler(object):
#add concat ops to merge splited parameters received from parameter servers. #add concat ops to merge splited parameters received from parameter servers.
if len(splited_var) <= 1: if len(splited_var) <= 1:
continue continue
orig_param = startup_program.global_block().vars[varname] # NOTE: if enable memory optimization, origin vars maybe removed.
if startup_program.global_block().vars.has_key(varname):
orig_param = startup_program.global_block().vars[varname]
else:
origin_param_var = self.origin_program.global_block().vars[
varname]
orig_param = startup_program.global_block().create_var(
name=varname,
persistable=origin_param_var.persistable,
type=origin_param_var.type,
dtype=origin_param_var.dtype,
shape=origin_param_var.shape)
startup_program.global_block().append_op( startup_program.global_block().append_op(
type="concat", type="concat",
inputs={"X": splited_var}, inputs={"X": splited_var},
...@@ -454,7 +467,9 @@ class DistributeTranspiler(object): ...@@ -454,7 +467,9 @@ class DistributeTranspiler(object):
# NOTE: assume blocks of the same variable is not distributed # NOTE: assume blocks of the same variable is not distributed
# on the same pserver, only change param/grad varnames for # on the same pserver, only change param/grad varnames for
# trainers to fetch. # trainers to fetch.
sys.stderr.write("get_pserver_program() is deprecated, call\
get_pserver_programs() to get pserver main and startup\
in a single call.")
# step1 # step1
pserver_program = Program() pserver_program = Program()
pserver_program.random_seed = self.origin_program.random_seed pserver_program.random_seed = self.origin_program.random_seed
...@@ -638,32 +653,58 @@ class DistributeTranspiler(object): ...@@ -638,32 +653,58 @@ class DistributeTranspiler(object):
attrs=attrs) attrs=attrs)
pserver_program._sync_with_cpp() pserver_program._sync_with_cpp()
# save pserver program to generate pserver side startup relatively.
self.pserver_program = pserver_program
return pserver_program return pserver_program
def get_pserver_programs(self, endpoint):
"""
Get pserver side main program and startup program for distributed training.
Args:
endpoint (str): current pserver endpoint.
Returns:
tuple: (main_program, startup_program), of type "Program"
"""
pserver_prog = self.get_pserver_program(endpoint)
pserver_startup = self.get_startup_program(endpoint)
return pserver_prog, pserver_startup
def get_startup_program(self, def get_startup_program(self,
endpoint, endpoint,
pserver_program, pserver_program=None,
startup_program=None): startup_program=None):
""" """
**Deprecated**
Get startup program for current parameter server. Get startup program for current parameter server.
Modify operator input variables if there are variables that Modify operator input variables if there are variables that
were split to several blocks. were split to several blocks.
Args: Args:
endpoint (str): current pserver endpoint. endpoint (str): current pserver endpoint.
pserver_program (Program): call get_pserver_program first and pserver_program (Program): deprecated, call get_pserver_program first.
pass the result here. startup_program (Program): deprecated, should pass startup_program
startup_program (Program): if pass None, will use when initalizing
default_startup_program
Returns: Returns:
Program: parameter server side startup program. Program: parameter server side startup program.
""" """
sys.stderr.write("get_startup_program() is deprecated, call\
get_pserver_programs() to get pserver main and startup\
in a single call.")
if pserver_program != None:
sys.stderr.write("passing pserver_program to get_startup_program()\
is deprecated, you can use new API get_pserver_programs() to\
get both pserver main program and startup program.")
if startup_program != None:
sys.stderr.write("passing startup_program to get_startup_program()\
is deprecated, use fluid.program_guard() or pass this argument\
to transpile() call.")
s_prog = Program() s_prog = Program()
if not startup_program: orig_s_prog = self.startup_program
orig_s_prog = default_startup_program()
else:
orig_s_prog = startup_program
s_prog.random_seed = orig_s_prog.random_seed s_prog.random_seed = orig_s_prog.random_seed
params = self.param_grad_ep_mapping[endpoint]["params"] params = self.param_grad_ep_mapping[endpoint]["params"]
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册