提交 bc290b57 编写于 作者: X xzl

Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into fix_maxpool_with_mask_layer

......@@ -240,8 +240,6 @@ def main():
t = fluid.DistributeTranspiler()
t.transpile(
optimize_ops,
params_grads,
trainer_id=args.task_index,
pservers=args.ps_hosts,
trainers=trainers)
......
......@@ -8,28 +8,28 @@ The user's cluster environment is not the same. To facilitate everyone's deploym
.. toctree::
:maxdepth: 1
k8s_cn.md
k8s_distributed_cn.md
k8s_en.md
k8s_distributed_en.md
`OpenMPI <https://www.open-mpi.org>`_ is a mature high-performance parallel computing framework, which is widely used in the field of HPC. The following guide describes how to use OpenMPI to build PaddlePaddle's cluster training task:
.. toctree::
:maxdepth: 1
openmpi_cn.md
openmpi_en.md
`Fabric <http://www.fabfile.org>`_ is a convenient tool for program deployment and management. We provide a way to deploy and manage with Fabric. If you want to know more about it, please read the following guidelines:
.. toctree::
:maxdepth: 1
fabric_cn.md
fabric_en.md
We also support the deployment of PaddlePaddle on AWS. Learn more about:
.. toctree::
:maxdepth: 1
k8s_aws_cn.md
k8s_aws_en.md
The examples can be found under `cluster_train_v2 <https://github.com/PaddlePaddle/Paddle/tree/develop/paddle/scripts/cluster_train_v2>`_ .
\ No newline at end of file
The examples can be found under `cluster_train_v2 <https://github.com/PaddlePaddle/Paddle/tree/develop/paddle/scripts/cluster_train_v2>`_ .
......@@ -21,14 +21,15 @@ import executor
from executor import *
import trainer
from trainer import *
from trainer import Trainer
from trainer import BeginEpochEvent
from trainer import EndEpochEvent
from trainer import BeginStepEvent
from trainer import EndStepEvent
import inferencer
from inferencer import Inferencer
import params
from params import Params
import io
import evaluator
import initializer
......@@ -57,7 +58,7 @@ from parallel_executor import ParallelExecutor
Tensor = LoDTensor
__all__ = framework.__all__ + executor.__all__ + concurrency.__all__ +\
trainer.__all__ + inferencer.__all__ + params.__all__ + [
trainer.__all__ + inferencer.__all__ + [
'io',
'initializer',
'layers',
......
......@@ -137,8 +137,6 @@ def split_dense_variable(var_list,
class DistributeTranspiler:
def transpile(self,
optimize_ops,
params_grads,
trainer_id,
program=None,
pservers="127.0.0.1:6174",
......@@ -169,11 +167,6 @@ class DistributeTranspiler:
4. append ops that should run on current server instance.
5. add listen_and_serv op
:param optimize_ops: op list of optimization, should be the
return value of Optimizer.minimize
:type optimize_ops: list
:param params_grads: list of tuple(weight, gradient)
:type params_grads: list
:param trainer_id: one unique id for each trainer in a job.
:type trainer_id: int
:param program: program to transpile, default is default_main_program
......@@ -194,7 +187,6 @@ class DistributeTranspiler:
program = default_main_program()
self.origin_program = program
self.trainer_num = trainers
self.optimize_ops = optimize_ops
self.sync_mode = sync_mode
# TODO(typhoonzero): currently trainer_id is fetched from cluster system
# like Kubernetes, we should port this to use etcd later when developing
......@@ -202,6 +194,7 @@ class DistributeTranspiler:
self.trainer_id = trainer_id
pserver_endpoints = pservers.split(",")
self.pserver_endpoints = pserver_endpoints
self.optimize_ops, params_grads = self._get_optimize_pass()
# process lookup_table_op
# 1. check all lookup_table_op is distributed
......@@ -408,11 +401,8 @@ class DistributeTranspiler:
# HACK: optimization global ops only used to scale beta1 and beta2
# replace it with dependency engine.
for op in self.optimize_ops:
if op.type == "scale":
for in_name in op.input_arg_names:
if in_name.startswith("beta1_pow_acc") or \
in_name.startswith("beta2_pow_acc"):
global_ops.append(op)
if self._is_adam_connected_op(op):
global_ops.append(op)
def __append_optimize_op__(op, block, grad_to_block_id):
if self._is_opt_op(op):
......@@ -1147,3 +1137,32 @@ class DistributeTranspiler:
# we only need to append op for once
break
return lr_ops
def _get_optimize_pass(self):
block = self.origin_program.global_block()
opt_ops = []
params_grads = []
for op in block.ops:
if self._is_opt_op(op):
opt_ops.append(op)
params_grads.append((self.origin_program.global_block().var(
op.input("Param")[0]),
self.origin_program.global_block().var(
op.input("Grad")[0])))
elif self._is_adam_connected_op(op):
opt_ops.append(op)
else:
pass
return opt_ops, params_grads
def _is_adam_connected_op(self, op):
"""
A hack function to determinate whether the input operator
is connected to optimize operator.
"""
if op.type == "scale":
for in_name in op.input_arg_names:
if in_name.startswith("beta1_pow_acc") or \
in_name.startswith("beta2_pow_acc"):
return True
return False
......@@ -12,18 +12,22 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import core
__all__ = ['Inferencer', ]
class Inferencer(object):
def __init__(self, network_func, params, place=None):
def __init__(self, network_func, param_path=None, place=None):
# 1. we need to generate a framework.Program by calling
# network_func. Reference: fluid.program_guard in test_word2vec.py
# 2. move the default_main_program to self.program.
# 3. run the default_startup program.
self.params = params
# 4. load params from param_path into scope
self.scope = core.Scope()
self.place = place
def infer(self, inputs):
......
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from . import core
__all__ = ['Params', ]
class Params(object):
def __init__(self, path=None):
self.scope = core.Scope()
if path:
self._load(path)
def _load(self, path):
# reference: load_persistables in io.py
pass
def save(self, path):
# reference: save_persistables in io.py
pass
def add_params(self, scope):
# take the keys from the scope,
# if not already exists in self.scope,
# add the key and value into self.scope.
pass
......@@ -80,12 +80,7 @@ def train(use_cuda, save_dirname, is_local):
trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID"))
training_role = os.getenv("TRAINING_ROLE", "TRAINER")
t = fluid.DistributeTranspiler()
t.transpile(
optimize_ops,
params_grads,
trainer_id,
pservers=pserver_endpoints,
trainers=trainers)
t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers)
if training_role == "PSERVER":
pserver_prog = t.get_pserver_program(current_endpoint)
pserver_startup = t.get_startup_program(current_endpoint,
......
......@@ -189,12 +189,7 @@ def train(net_type, use_cuda, save_dirname, is_local):
trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID"))
training_role = os.getenv("TRAINING_ROLE", "TRAINER")
t = fluid.DistributeTranspiler()
t.transpile(
optimize_ops,
params_grads,
trainer_id,
pservers=pserver_endpoints,
trainers=trainers)
t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers)
if training_role == "PSERVER":
pserver_prog = t.get_pserver_program(current_endpoint)
pserver_startup = t.get_startup_program(current_endpoint,
......
......@@ -259,12 +259,7 @@ def train(use_cuda, save_dirname=None, is_local=True):
trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID"))
training_role = os.getenv("TRAINING_ROLE", "TRAINER")
t = fluid.DistributeTranspiler()
t.transpile(
optimize_ops,
params_grads,
trainer_id,
pservers=pserver_endpoints,
trainers=trainers)
t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers)
if training_role == "PSERVER":
pserver_prog = t.get_pserver_program(current_endpoint)
pserver_startup = t.get_startup_program(current_endpoint,
......
......@@ -231,12 +231,7 @@ def train_main(use_cuda, is_sparse, is_local=True):
trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID"))
training_role = os.getenv("TRAINING_ROLE", "TRAINER")
t = fluid.DistributeTranspiler()
t.transpile(
optimize_ops,
params_grads,
trainer_id,
pservers=pserver_endpoints,
trainers=trainers)
t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers)
if training_role == "PSERVER":
pserver_prog = t.get_pserver_program(current_endpoint)
pserver_startup = t.get_startup_program(current_endpoint,
......
......@@ -162,12 +162,7 @@ def train(nn_type,
trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID"))
training_role = os.getenv("TRAINING_ROLE", "TRAINER")
t = fluid.DistributeTranspiler()
t.transpile(
optimize_ops,
params_grads,
trainer_id,
pservers=pserver_endpoints,
trainers=trainers)
t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers)
if training_role == "PSERVER":
pserver_prog = t.get_pserver_program(current_endpoint)
pserver_startup = t.get_startup_program(current_endpoint,
......
......@@ -261,12 +261,7 @@ def train(use_cuda, save_dirname, is_local=True):
trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID"))
training_role = os.getenv("TRAINING_ROLE", "TRAINER")
t = fluid.DistributeTranspiler()
t.transpile(
optimize_ops,
params_grads,
trainer_id,
pservers=pserver_endpoints,
trainers=trainers)
t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers)
if training_role == "PSERVER":
pserver_prog = t.get_pserver_program(current_endpoint)
pserver_startup = t.get_startup_program(current_endpoint,
......
......@@ -213,12 +213,7 @@ def train(word_dict,
trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID"))
training_role = os.getenv("TRAINING_ROLE", "TRAINER")
t = fluid.DistributeTranspiler()
t.transpile(
optimize_ops,
params_grads,
trainer_id,
pservers=pserver_endpoints,
trainers=trainers)
t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers)
if training_role == "PSERVER":
pserver_prog = t.get_pserver_program(current_endpoint)
pserver_startup = t.get_startup_program(current_endpoint,
......
......@@ -145,12 +145,7 @@ def train(use_cuda, is_sparse, is_parallel, save_dirname, is_local=True):
trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID"))
training_role = os.getenv("TRAINING_ROLE", "TRAINER")
t = fluid.DistributeTranspiler()
t.transpile(
optimize_ops,
params_grads,
trainer_id,
pservers=pserver_endpoints,
trainers=trainers)
t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers)
if training_role == "PSERVER":
pserver_prog = t.get_pserver_program(current_endpoint)
pserver_startup = t.get_startup_program(current_endpoint,
......
......@@ -39,7 +39,7 @@ word_dict = paddle.dataset.imikolov.build_dict()
dict_size = len(word_dict)
def inference_network(is_sparse):
def inference_program(is_sparse):
first_word = fluid.layers.data(name='firstw', shape=[1], dtype='int64')
second_word = fluid.layers.data(name='secondw', shape=[1], dtype='int64')
third_word = fluid.layers.data(name='thirdw', shape=[1], dtype='int64')
......@@ -79,9 +79,9 @@ def inference_network(is_sparse):
return predict_word
def train_network(is_sparse):
def train_program(is_sparse):
next_word = fluid.layers.data(name='nextw', shape=[1], dtype='int64')
predict_word = inference_network(is_sparse)
predict_word = inference_program(is_sparse)
cost = fluid.layers.cross_entropy(input=predict_word, label=next_word)
avg_cost = fluid.layers.mean(cost)
return avg_cost
......@@ -100,23 +100,25 @@ def train(use_cuda, is_sparse, save_path):
word_dict, N))
if avg_cost < 5.0:
trainer.params.save(save_path)
trainer.save_params(save_path)
return
if math.isnan(avg_cost):
sys.exit("got NaN loss, training failed.")
trainer = fluid.Trainer(
partial(train_network, is_sparse),
partial(train_program, is_sparse),
fluid.optimizer.SGD(learning_rate=0.001),
place=place)
trainer.train(
reader=train_reader, num_epochs=100, event_handler=event_handler)
def infer(use_cuda, save_path):
params = fluid.Params(save_path)
def infer(use_cuda, is_sparse, save_path):
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
inferencer = fluid.Inferencer(inference_network, params, place=place)
inferencer = fluid.Inferencer(
partial(inference_program, is_sparse),
param_path=save_path,
place=place)
lod = [0, 1]
first_word = create_random_lodtensor(lod, place, low=0, high=dict_size - 1)
......@@ -138,7 +140,7 @@ def main(use_cuda, is_sparse):
save_path = "word2vec.inference.model"
train(use_cuda, is_sparse, save_path)
infer(use_cuda, save_path)
infer(use_cuda, is_sparse, save_path)
if __name__ == '__main__':
......
......@@ -56,23 +56,22 @@ class Trainer(object):
"""
Args:
network_func(callable): A function which will return loss. The loss must be a scaler.
program_func(callable): A function which will return loss. The loss must be a scaler.
optimizer(optimizer.Optimizer): The optimizer should be an instance of Optimizer
params:
place: The device place of this trainer.
"""
def __init__(self, network_func, optimizer, params=None, place=None):
def __init__(self, program_func, optimizer, param_path=None, place=None):
# 1. we need to generate a framework.Program by calling
# network_func. Reference: fluid.program_guard in
# program_func. Reference: fluid.program_guard in
# test_word2vec.py
self.scope = self._get_scope_from_params(params)
self.scope = core.Scope()
self.startup_program = framework.Program()
self.train_program = framework.Program()
with framework.program_guard(self.train_program, self.startup_program):
loss = network_func()
loss = program_func()
if not isinstance(optimizer, opt_module.Optimizer):
raise TypeError(
"The optimizer should be an instance of Optimizer")
......@@ -84,14 +83,13 @@ class Trainer(object):
# 2. move the default_main_program to self.program and run the
# default_startup program on an empty core.Scope()
# Run startup program
if params is None:
exe = executor.Executor(place)
exe.run(self.startup_program, scope=self.scope)
exe = executor.Executor(place)
exe.run(self.startup_program, scope=self.scope)
# 3. call self.params.add_vars with the initialized scope, it
# will add the new vars of the initialized scope into
# self.params.
# TODO(yuyang): This depends on parameters implementation.
if param_path:
# load params from param_path into scope
# TODO(yuyang): This depends on parameters implementation.
pass
# TODO(helin): support distributed training
......@@ -124,19 +122,9 @@ class Trainer(object):
def test(self, reader):
pass
def _get_scope_from_params(self, params):
"""
Get Scope from parameter object.
Args:
params(Parameter|None): The parameter object instance. Could be None.
Returns: New scope if params is None. Or params.scope()
NOTE: This method is WIP. Not fully implemented.
"""
if params is None:
return core.Scope() # new scope when params is None
else:
raise NotImplementedError("Not implemented right now.")
def save_params(self, param_path):
# reference: save_persistables in io.py
pass
@staticmethod
def _check_and_get_place(place):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册