未验证 提交 c89a1fb2 编写于 作者: X Xin Pan 提交者: GitHub

Merge pull request #14879 from panyx0718/clean

clean parallel do
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include <vector>
#include "paddle/fluid/framework/executor.h"
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/framework/threadpool.h"
#include "paddle/fluid/operators/detail/safe_ref.h"
namespace paddle {
namespace operators {
static constexpr char kInputs[] = "inputs";
static constexpr char kParameters[] = "parameters";
static constexpr char kPlaces[] = "places";
static constexpr char kOutputs[] = "outputs";
static constexpr char kParallelScopes[] = "parallel_scopes";
static constexpr char kParallelBlock[] = "sub_block";
static constexpr char kUseNCCL[] = "use_nccl";
using LoDTensor = framework::LoDTensor;
using SelectedRows = framework::SelectedRows;
static void SplitTensorAndMoveTensorToScopes(
const framework::Scope &scope, std::vector<framework::Scope *> *sub_scopes,
const std::vector<platform::Place> &places,
const std::vector<std::string> &names) {
size_t num_sub_scopes = 0;
for (auto &argu : names) {
const auto &tensor =
detail::Ref(scope.FindVar(argu),
"Cannot find variable %s in the parent scope", argu)
.Get<LoDTensor>();
auto lod_tensors = tensor.SplitLoDTensor(places);
for (auto &lod : lod_tensors) {
VLOG(3) << lod.dims();
}
if (num_sub_scopes == 0) {
num_sub_scopes = lod_tensors.size();
} else {
PADDLE_ENFORCE_EQ(num_sub_scopes, lod_tensors.size());
}
PADDLE_ENFORCE_NE(num_sub_scopes, 0);
if (sub_scopes->size() == 0) {
sub_scopes->reserve(num_sub_scopes);
for (size_t i = 0; i < num_sub_scopes; ++i) {
sub_scopes->emplace_back(&scope.NewScope());
}
}
for (size_t i = 0; i < lod_tensors.size(); ++i) {
*detail::Ref(sub_scopes->at(i)->Var(argu),
"Cannot find variable in the sub-scope", argu)
.GetMutable<LoDTensor>() = lod_tensors[i];
}
}
}
inline void CopyOrShare(const framework::Variable &src,
const platform::Place &dst_place,
framework::Variable *dst) {
if (src.IsType<LoDTensor>()) {
if (src.Get<LoDTensor>().place() == dst_place) {
dst->GetMutable<LoDTensor>()->ShareDataWith(src.Get<LoDTensor>());
dst->GetMutable<LoDTensor>()->set_lod(src.Get<LoDTensor>().lod());
} else {
TensorCopy(src.Get<LoDTensor>(), dst_place, dst->GetMutable<LoDTensor>());
}
} else if (src.IsType<SelectedRows>()) {
auto &src_sr = src.Get<SelectedRows>();
auto *dst_sr = dst->GetMutable<SelectedRows>();
dst_sr->set_height(src_sr.height());
if (src_sr.value().place() == dst_place) {
dst_sr->mutable_value()->ShareDataWith(src_sr.value());
dst_sr->set_rows(src_sr.rows());
} else {
TensorCopy(src_sr.value(), dst_place, dst_sr->mutable_value());
}
} else {
PADDLE_THROW("Expect LoDTensor/SelectedRows, get %s", src.Type().name());
}
}
void WaitOnPlace(const platform::Place place) {
platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance();
auto &dev_ctx = *pool.Get(place);
dev_ctx.Wait();
}
void WaitOnPlaces(const std::vector<platform::Place> places) {
platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance();
for (auto &place : places) {
auto &dev_ctx = *pool.Get(place);
dev_ctx.Wait();
}
}
class ParallelDoOp : public framework::OperatorBase {
public:
ParallelDoOp(const std::string &type,
const framework::VariableNameMap &inputs,
const framework::VariableNameMap &outputs,
const framework::AttributeMap &attrs)
: framework::OperatorBase(type, inputs, outputs, attrs) {}
private:
void RunImpl(const framework::Scope &scope,
const platform::Place &place) const override {
// get device context from pool
platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance();
auto &dev_ctx = *pool.Get(place);
auto *block = Attr<framework::BlockDesc *>(kParallelBlock);
auto *program = block->Program();
auto &places = scope.FindVar(Input(kPlaces))->Get<platform::PlaceList>();
auto &sub_scopes = *scope.FindVar(Output(kParallelScopes))
->GetMutable<std::vector<framework::Scope *>>();
// split input
SplitTensorAndMoveTensorToScopes(scope, &sub_scopes, places,
Inputs(kInputs));
// copy parameter
for (auto &param : Inputs(kParameters)) {
PADDLE_ENFORCE(scope.FindVar(param)->IsType<LoDTensor>(),
"Only support parameter type as LoDTensor");
auto &src = scope.FindVar(param)->Get<LoDTensor>();
auto *sub_scope0 = sub_scopes[0];
auto *dst0 = sub_scope0->Var(param)->GetMutable<LoDTensor>();
dst0->ShareDataWith(src);
for (size_t i = 1; i < sub_scopes.size(); ++i) {
auto &place = places[i];
auto *sub_scope = sub_scopes[i];
auto *dst = sub_scope->Var(param)->GetMutable<LoDTensor>();
framework::TensorCopy(src, place, dst);
}
}
WaitOnPlaces(places);
std::vector<std::future<void>> workers;
workers.reserve(places.size());
for (size_t place_idx = 0; place_idx < sub_scopes.size(); ++place_idx) {
auto &place = places[place_idx];
auto *cur_scope = sub_scopes[place_idx];
workers.emplace_back(framework::Async([program, cur_scope, place, block] {
framework::Executor executor(place);
executor.Run(*program, cur_scope, block->ID(),
false /*create_local_scope*/);
}));
}
for (auto &worker : workers) {
worker.wait();
}
WaitOnPlaces(places);
// merge output
for (auto &o_name : Outputs(kOutputs)) {
std::vector<const framework::LoDTensor *> lod_tensors;
lod_tensors.reserve(sub_scopes.size());
for (auto *sub_scope : sub_scopes) {
lod_tensors.emplace_back(&sub_scope->FindVar(o_name)->Get<LoDTensor>());
}
auto *lod_tensor_to_be_merged =
scope.FindVar(o_name)->GetMutable<LoDTensor>();
lod_tensor_to_be_merged->MergeLoDTensor(lod_tensors, dev_ctx.GetPlace());
}
WaitOnPlaces(places);
}
};
class ParallelDoOpProtoMaker : public framework::OpProtoAndCheckerMaker {
public:
void Make() override {
AddInput(kInputs, "").AsDuplicable();
AddInput(kParameters, "").AsDuplicable();
AddInput(kPlaces, "");
AddOutput(kOutputs, "").AsDuplicable();
AddOutput(kParallelScopes, "");
AddAttr<framework::BlockDesc *>(kParallelBlock, "");
AddAttr<bool>(kUseNCCL, "true if we use nccl on backward")
.SetDefault(false);
AddComment(R"DOC(
ParallelDo Operator.
)DOC");
}
};
class ParallelDoGradOp : public framework::OperatorBase {
public:
ParallelDoGradOp(const std::string &type,
const framework::VariableNameMap &inputs,
const framework::VariableNameMap &outputs,
const framework::AttributeMap &attrs)
: framework::OperatorBase(type, inputs, outputs, attrs) {}
private:
void RunImpl(const framework::Scope &scope,
const platform::Place &place) const override {
auto *block = Attr<framework::BlockDesc *>(kParallelBlock);
auto *program = block->Program();
auto &sub_scopes = scope.FindVar(Input(kParallelScopes))
->Get<std::vector<framework::Scope *>>();
auto &places = scope.FindVar(Input(kPlaces))->Get<platform::PlaceList>();
// feed output@grad
SplitTensorAndMoveTensorToScopes(
scope, const_cast<std::vector<framework::Scope *> *>(&sub_scopes),
places, Inputs(framework::GradVarName(kOutputs)));
WaitOnPlaces(places);
// exe run
std::vector<std::future<void>> workers;
for (size_t i = 0; i < sub_scopes.size(); ++i) {
auto &place = places[i];
auto *cur_scope = sub_scopes[i];
// execute
workers.emplace_back(framework::Async([program, cur_scope, place, block] {
framework::Executor executor(place);
executor.Run(*program, cur_scope, block->ID(),
false /*create_local_scope*/);
}));
}
for (auto &worker : workers) {
worker.wait();
}
WaitOnPlaces(places);
// NCCL allreduce op will be added by backward,
// so no need to explicitly accumulate grad
if (!(Attr<bool>(kUseNCCL))) {
AccumulateGrad(scope, place, sub_scopes, places);
} else {
for (auto &place : places) {
PADDLE_ENFORCE(platform::is_gpu_place(place),
"NCCL only supports cuda place");
}
}
for (auto &s : Outputs(framework::GradVarName(kParameters))) {
if (s == framework::kEmptyVarName) {
continue;
}
VLOG(3) << "Moving " << s;
CopyOrShare(*sub_scopes[0]->FindVar(s), place, scope.FindVar(s));
}
WaitOnPlaces(places);
}
void AccumulateGrad(const framework::Scope &scope,
const platform::Place &place,
const std::vector<framework::Scope *> &sub_scopes,
const platform::PlaceList &places) const {
for (auto &s : Outputs(framework::GradVarName(kParameters))) {
if (s == framework::kEmptyVarName) {
continue;
}
VLOG(3) << "Accumulating " << s;
if (s == framework::kEmptyVarName) continue;
std::string tmp_name;
auto *tmp = sub_scopes[0]->Var(&tmp_name);
for (size_t i = 1; i < sub_scopes.size(); ++i) {
CopyOrShare(*sub_scopes[i]->FindVar(s), places[0], tmp);
WaitOnPlaces(places);
auto sum_op = framework::OpRegistry::CreateOp(
"sum", {{"X", {s, tmp_name}}}, {{"Out", {s}}},
framework::AttributeMap{{"use_mkldnn", {false}}});
VLOG(10) << sum_op->DebugStringEx(sub_scopes[0]);
sum_op->Run(*sub_scopes[0], places[0]);
WaitOnPlace(places[0]);
}
CopyOrShare(*sub_scopes[0]->FindVar(s), place, scope.FindVar(s));
}
WaitOnPlaces(places);
}
};
std::ostream &operator<<(std::ostream &sout,
const std::vector<std::string> &strs) {
std::copy(strs.begin(), strs.end(),
std::ostream_iterator<std::string>(sout, ","));
return sout;
}
class ParallelDoGradOpDescMaker : public framework::SingleGradOpDescMaker {
public:
using framework::SingleGradOpDescMaker::SingleGradOpDescMaker;
protected:
virtual std::unique_ptr<framework::OpDesc> Apply() const {
auto *grad = new framework::OpDesc();
grad->SetType("parallel_do_grad");
for (auto &input_param : this->InputNames()) {
VLOG(3) << input_param;
grad->SetInput(input_param, this->Input(input_param));
if (input_param != kPlaces) {
grad->SetOutput(framework::GradVarName(input_param),
this->InputGrad(input_param, false));
}
}
auto *g_block = this->grad_block_[0];
// All variable name that needed by gradient operators
std::unordered_set<std::string> all_inputs_in_grad_blocks;
for (size_t i = 0; i < g_block->OpSize(); ++i) {
auto *op = g_block->Op(i);
for (auto &var_name : op->InputArgumentNames()) {
all_inputs_in_grad_blocks.insert(var_name);
}
}
for (auto &output_param : this->OutputNames()) {
if (output_param == kParallelScopes) {
grad->SetInput(output_param, this->Output(output_param));
grad->SetInput(framework::GradVarName(output_param),
this->Output(output_param));
} else {
grad->SetInput(output_param, this->Output(output_param));
std::vector<std::string> og_names;
for (auto &og_name : this->OutputGrad(output_param)) {
if (all_inputs_in_grad_blocks.count(og_name) != 0) {
// there are some gradient operators who need the OG. So make this
// OG as an input of parallel.do
og_names.push_back(og_name);
}
// else, there is no operator who need the OG. Do not use this OG as
// an input
}
grad->SetInput(framework::GradVarName(output_param), og_names);
}
}
grad->SetInput("Communicator", {"nccl_com__do_not_change_"});
grad->SetAttrMap(this->Attrs());
grad->SetBlockAttr(kParallelBlock, grad_block_[0]);
return std::unique_ptr<framework::OpDesc>(grad);
}
};
class ParallelDoGradOpShapeInference : public framework::InferShapeBase {
public:
void operator()(framework::InferShapeContext *ctx) const override {
PADDLE_ENFORCE(ctx->HasInputs(kParameters));
PADDLE_ENFORCE(ctx->HasInputs(kInputs));
PADDLE_ENFORCE(ctx->HasInputs(kOutputs));
ctx->SetOutputsDim(framework::GradVarName(kParameters),
ctx->GetInputsDim(kParameters));
auto i_dims = ctx->GetInputsDim(kInputs);
auto ig_names = ctx->Outputs(framework::GradVarName(kInputs));
for (size_t i = 0; i < ig_names.size(); ++i) {
auto &ig_name = ig_names[i];
if (ig_name == framework::kEmptyVarName) {
continue;
}
ctx->SetDims({ig_name}, {i_dims[i]});
}
auto p_dims = ctx->GetInputsDim(kParameters);
auto pg_names = ctx->Outputs(framework::GradVarName(kParameters));
for (size_t i = 0; i < pg_names.size(); ++i) {
auto &pg_name = pg_names[i];
if (pg_name == framework::kEmptyVarName) {
continue;
}
ctx->SetDims({pg_name}, {p_dims[i]});
}
}
};
class ParallelDoGradOpVarTypeInference : public framework::VarTypeInference {
public:
void operator()(const framework::OpDesc &op_desc,
framework::BlockDesc *block) const override {
framework::BlockDesc *sub_block =
boost::get<framework::BlockDesc *>(op_desc.GetAttr(kParallelBlock));
for (auto &out_vars : op_desc.Outputs()) {
for (auto &out_var : out_vars.second) {
auto &var = block->FindRecursiveOrCreateVar(out_var);
auto sub_var = sub_block->FindRecursiveOrCreateVar(out_var);
if (sub_var.GetType() != var.GetType()) {
var.SetType(sub_var.GetType());
}
}
}
}
};
} // namespace operators
} // namespace paddle
REGISTER_OPERATOR(parallel_do, paddle::operators::ParallelDoOp,
paddle::operators::ParallelDoOpProtoMaker,
paddle::operators::ParallelDoGradOpDescMaker);
REGISTER_OPERATOR(parallel_do_grad, paddle::operators::ParallelDoGradOp,
paddle::operators::ParallelDoGradOpShapeInference,
paddle::operators::ParallelDoGradOpVarTypeInference);
......@@ -249,69 +249,6 @@ def serialize_op_decs(op_desc):
return proto.__str__()
def _callback_lookup_(op):
"""
Only used in _append_backward_ops_
Build and returns a callback function for certain op. For example
parallel_do: AllReduce
:param op:
:return: callback function
"""
if op.type == 'parallel_do' and op.attr('use_nccl'):
all_vars = op.block.vars
param_names = set(op.input('parameters'))
param_names = [
name for name in param_names
if all_vars[name].stop_gradient is False
]
param_grad_names = [n + "@GRAD" for n in param_names]
class ParallelDoCallBack(object):
def __init__(self, param_grad_names, parallel_scopes_name):
self.has_inserted_nccl_init = False
self.param_grad_names = param_grad_names
self.parallel_scopes_name = parallel_scopes_name
def __call__(self, block, context):
if not self.has_inserted_nccl_init:
op_desc = _create_op_desc_(
"ncclInit",
{"parallel_scopes": self.parallel_scopes_name},
{"Communicator": ['nccl_com__do_not_change_']}, {})
block.program.global_block().desc.append_op().copy_from(
op_desc)
self.has_inserted_nccl_init = True
current_op_desc = context["__current_op_desc__"]
for o_param in current_op_desc.output_names():
for o_argu in current_op_desc.output(o_param):
if o_argu in self.param_grad_names:
allreduce_out_name = o_argu + "__nccl_all_reduce__"
op_desc = _create_op_desc_(
"ncclReduce",
{
"X": [o_argu],
"Communicator":
['nccl_com__do_not_change_']
},
{"Out": [allreduce_out_name]},
{"reduction": "ncclSum",
"root": 0}, )
block.desc.append_op().copy_from(op_desc)
op_desc = _create_op_desc_(
"assign", {"X": [allreduce_out_name]},
{"Out": [o_argu]}, {})
block.desc.append_op().copy_from(op_desc)
return ParallelDoCallBack(param_grad_names,
op.output("parallel_scopes"))
else:
return None
def _append_backward_ops_(block,
ops,
target_block,
......@@ -349,15 +286,6 @@ def _append_backward_ops_(block,
sub_block = program.block(op._block_attr_id("sub_block"))
grad_sub_block = program._create_block()
grad_sub_block._set_forward_block_idx(sub_block.idx)
cb = _callback_lookup_(op)
if cb is not None:
if callbacks is None:
new_callbacks = [cb]
else:
new_callbacks = callbacks + [_callback_lookup_(op)]
_append_backward_ops_(sub_block, sub_block.ops, grad_sub_block,
no_grad_dict, grad_to_var, new_callbacks)
else:
_append_backward_ops_(sub_block, sub_block.ops, grad_sub_block,
no_grad_dict, grad_to_var, callbacks)
......@@ -424,9 +352,6 @@ def _append_backward_vars_(block, start_op_idx, grad_to_var, grad_info_map):
# infer_shape and infer_type
op_desc.infer_var_type(block.desc)
op_desc.infer_shape(block.desc)
# ncclInit dones't need to set data_type
if op_desc.type() == 'ncclInit':
continue
for arg in op_desc.output_arg_names():
if arg in new_vars:
_infer_var_data_type_(arg, block)
......
......@@ -563,8 +563,8 @@ class Operator(object):
OP_WITHOUT_KERNEL_SET = {
'feed', 'fetch', 'save', 'load', 'recurrent', 'go',
'rnn_memory_helper_grad', 'conditional_block', 'while', 'send', 'recv',
'listen_and_serv', 'parallel_do', 'save_combine', 'load_combine',
'ncclInit', 'select', 'checkpoint_notify', 'gen_nccl_id'
'listen_and_serv', 'save_combine', 'load_combine', 'ncclInit', 'select',
'checkpoint_notify', 'gen_nccl_id'
}
def __init__(self,
......
......@@ -226,156 +226,6 @@ class BlockGuard(object):
return True
class ParallelDo(object):
"""
ParallelDo is used to represent multi-thread data parallel processing.
Its vanilla implementation can be shown as the following (:math:`|` means
single thread and :math:`||||` means multiple threads)
.. code-block:: text
In the forward pass
| Split input onto different devices
| Copy parameter onto different devices
|||| Compute forward pass in parallel
| Merge output from different devices
In the backward pass
| Split output@grad onto different devices
|||| Compute backward pass in parallel
| accumulate param@grad from different devices to the first device
| Merge input@grad from different devices
| Copy param@grad to the place of parallel_do_op
Examples:
.. code-block:: python
images = fluid.layers.data(name='pixel', shape=[1, 28, 28], dtype=DTYPE)
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
# ParallelDo version & Single-thread version
if thread_num > 1:
places = fluid.layers.get_places(thread_num)
pd = fluid.layers.control_flow.ParallelDo(places)
with pd.do():
images = pd.read_input(images)
label = pd.read_input(label)
predict = cnn_model(images)
cost = fluid.layers.cross_entropy(input=predict, label=label)
avg_cost = fluid.layers.mean(x=cost)
pd.write_output(avg_cost)
avg_cost = pd()
avg_cost = fluid.layers.mean(avg_cost)
else:
predict = cnn_model(images)
cost = fluid.layers.cross_entropy(input=predict, label=label)
avg_cost = fluid.layers.mean(x=cost)
.. warning::
It will be soon deprecated, please use ParallelExecutor instead.
"""
def __init__(self, places, use_nccl=False, name=None):
warnings.warn(
"API ParallelDo is deprecated since 0.15.0. Please use ParallelExecutor instead.",
Warning)
self.helper = LayerHelper("parallel_do", name=name)
self.inputs = []
self.places = places
self.outputs = []
self.status = StaticRNN.BEFORE_RNN_BLOCK
self.use_nccl = use_nccl
def do(self):
return BlockGuardWithCompletion(self)
def parent_block(self):
prog = self.helper.main_program
parent_idx = prog.current_block().parent_idx
assert parent_idx >= 0
parent_block = prog.block(parent_idx)
return parent_block
def __call__(self, *args, **kwargs):
if self.status != StaticRNN.AFTER_RNN_BLOCK:
raise ValueError("RNN output can only be retrieved after rnn block")
if len(self.outputs) == 0:
raise ValueError("RNN has no output")
elif len(self.outputs) == 1:
return self.outputs[0]
else:
return self.outputs
def read_input(self, var):
self.inputs.append(var)
return var
def write_output(self, var):
self.outputs.append(var)
def get_parameters(self):
main_program = self.helper.main_program
current_block = main_program.current_block()
parent_block = self.parent_block()
local_inputs = set()
params = list()
for var in self.inputs:
local_inputs.add(var.name)
for op in current_block.ops:
for iname in op.input_names:
for in_var_name in op.input(iname):
if in_var_name not in local_inputs:
params.append(in_var_name)
for oname in op.output_names:
for out_var_name in op.output(oname):
local_inputs.add(out_var_name)
params = list(set(params))
return [parent_block.var(name) for name in params]
def _complete_op(self):
main_program = self.helper.main_program
current_block = main_program.current_block()
parent_block = self.parent_block()
step_scope = parent_block.create_var(
type=core.VarDesc.VarType.STEP_SCOPES)
self.outputs = [
parent_block.create_var(
name=o.name,
shape=o.shape,
dtype=o.dtype,
lod_level=o.lod_level,
persistable=o.persistable,
stop_gradient=o.stop_gradient) for o in self.outputs
]
inputs = [parent_block.var(i.name) for i in self.inputs]
outputs = [parent_block.var(o.name) for o in self.outputs]
parent_block.append_op(
type='parallel_do',
inputs={
'inputs': inputs,
'parameters': self.get_parameters(),
'places': self.places
},
outputs={'outputs': outputs,
'parallel_scopes': [step_scope]},
attrs={'sub_block': current_block,
'use_nccl': self.use_nccl})
class BlockGuardWithCompletion(BlockGuard):
"""
BlockGuardWithCompletion class.
......@@ -384,9 +234,8 @@ class BlockGuardWithCompletion(BlockGuard):
"""
def __init__(self, rnn):
if not (isinstance(rnn, StaticRNN) or isinstance(rnn, ParallelDo)):
raise TypeError(
"BlockGuardWithCompletion takes a StaticRNN or ParallelDo")
if not isinstance(rnn, StaticRNN):
raise TypeError("BlockGuardWithCompletion takes a StaticRNN")
super(BlockGuardWithCompletion, self).__init__(rnn.helper.main_program)
self.rnn = rnn
......
......@@ -15,7 +15,6 @@
from __future__ import print_function
from paddle.fluid.layers.device import get_places
from paddle.fluid.layers.control_flow import ParallelDo
import unittest
import paddle.fluid as fluid
import paddle
......@@ -147,22 +146,7 @@ def train(word_dict,
cost, acc_out, prediction = net_method(
data, label, input_dim=dict_dim, class_dim=class_dim)
else:
places = get_places()
pd = ParallelDo(places)
with pd.do():
cost, acc, _ = net_method(
pd.read_input(data),
pd.read_input(label),
input_dim=dict_dim,
class_dim=class_dim)
pd.write_output(cost)
pd.write_output(acc)
cost, acc = pd()
cost = fluid.layers.mean(cost)
acc_out = fluid.layers.mean(acc)
prediction = None
assert save_dirname is None
raise NotImplementedError()
adagrad = fluid.optimizer.Adagrad(learning_rate=0.002)
adagrad.minimize(cost)
......
......@@ -25,7 +25,6 @@ import numpy
import paddle
import paddle.fluid as fluid
from paddle.fluid.layers.device import get_places
from paddle.fluid.layers.control_flow import ParallelDo
BATCH_SIZE = 64
......@@ -82,19 +81,7 @@ def train(nn_type,
net_conf = conv_net
if parallel:
places = get_places()
pd = ParallelDo(places)
with pd.do():
img_ = pd.read_input(img)
label_ = pd.read_input(label)
prediction, avg_loss, acc = net_conf(img_, label_)
for o in [avg_loss, acc]:
pd.write_output(o)
avg_loss, acc = pd()
# get mean loss and acc through every devices.
avg_loss = fluid.layers.mean(avg_loss)
acc = fluid.layers.mean(acc)
raise NotImplementedError()
else:
prediction, avg_loss, acc = net_conf(img, label)
......@@ -273,7 +260,7 @@ def inject_all_tests():
for use_cuda in (False, True):
if use_cuda and not core.is_compiled_with_cuda():
continue
for parallel in (False, True):
for parallel in (False, ):
for nn_type in ('mlp', 'conv'):
inject_test_method(use_cuda, parallel, nn_type, True)
......
......@@ -17,7 +17,6 @@ from __future__ import print_function
import paddle
import paddle.fluid as fluid
from paddle.fluid.layers.device import get_places
from paddle.fluid.layers.control_flow import ParallelDo
import unittest
import os
import numpy as np
......@@ -84,18 +83,7 @@ def train(use_cuda, is_sparse, is_parallel, save_dirname, is_local=True):
avg_cost, predict_word = __network__(
[first_word, second_word, third_word, forth_word, next_word])
else:
places = get_places()
pd = ParallelDo(places)
with pd.do():
avg_cost, predict_word = __network__(
list(
map(pd.read_input, [
first_word, second_word, third_word, forth_word,
next_word
])))
pd.write_output(avg_cost)
avg_cost = fluid.layers.mean(pd())
raise NotImplementedError()
sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.001)
sgd_optimizer.minimize(avg_cost)
......@@ -262,7 +250,7 @@ def inject_test_method(use_cuda, is_sparse, is_parallel):
for use_cuda in (False, True):
for is_sparse in (False, True):
for is_parallel in (False, True):
for is_parallel in (False, ):
inject_test_method(use_cuda, is_sparse, is_parallel)
if __name__ == '__main__':
......
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import math
import sys
import paddle
import paddle.fluid as fluid
from paddle.fluid.layers.device import get_places
from paddle.fluid.layers.control_flow import ParallelDo
# need to fix random seed and training data to compare the loss
# value accurately calculated by the default and the memory optimization
# version.
fluid.default_startup_program().random_seed = 111
x = fluid.layers.data(name='x', shape=[13], dtype='float32')
y = fluid.layers.data(name='y', shape=[1], dtype='float32')
device_type = 'CPU'
use_nccl = False
place = fluid.CPUPlace()
if fluid.core.is_compiled_with_cuda():
device_type = 'CUDA'
use_nccl = False
place = fluid.CUDAPlace(0)
places = get_places(device_count=0, device_type=device_type)
pd = ParallelDo(places, use_nccl=use_nccl)
with pd.do():
x_ = pd.read_input(x)
y_ = pd.read_input(y)
y_predict = fluid.layers.fc(input=x_, size=1, act=None)
cost = fluid.layers.square_error_cost(input=y_predict, label=y_)
avg_cost = fluid.layers.mean(x=cost)
pd.write_output(avg_cost)
cost = pd()
avg_cost = fluid.layers.mean(x=cost)
sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.01)
sgd_optimizer.minimize(avg_cost)
fluid.memory_optimize(fluid.default_main_program(), print_log=True)
# fluid.release_memory(fluid.default_main_program())
BATCH_SIZE = 200
# fix the order of training data
train_reader = paddle.batch(
paddle.dataset.uci_housing.train(), batch_size=BATCH_SIZE, drop_last=False)
# train_reader = paddle.batch(
# paddle.reader.shuffle(
# paddle.dataset.uci_housing.train(), buf_size=500),
# batch_size=BATCH_SIZE)
feeder = fluid.DataFeeder(place=place, feed_list=[x, y])
exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())
PASS_NUM = 100
for pass_id in range(PASS_NUM):
for data in train_reader():
avg_loss_value, = exe.run(fluid.default_main_program(),
feed=feeder.feed(data),
fetch_list=[avg_cost])
if avg_loss_value[0] < 10.0:
exit(0) # if avg cost less than 10.0, we think our code is good.
print(avg_loss_value[0])
if math.isnan(float(avg_loss_value)):
sys.exit("got NaN loss, training failed.")
exit(1)
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import unittest
import paddle.fluid as fluid
from paddle.fluid.layers.device import get_places
from paddle.fluid.layers.control_flow import ParallelDo
import paddle.fluid.profiler as profiler
import numpy
import six
class BaseParallelForTest(unittest.TestCase):
def run_test(self, callback, feed, fetch):
"""
Run the unittest for parallel.for
Args:
callback(callable): A callable function returns a generator. There
are two yields in the generator function. The first yield
returns the data layers, and the second yield returns the loss.
The modified data variables will be sent back during the first
yield.
feed(dict): The executor feeding dictionary.
fetch(list|basestr): The fetch name lists.
Returns:
None
Raises:
AssertionError when the computation of cpu, parallel.for in cpu,
gpu, parallel.for in gpu are different.
"""
cpu = fluid.CPUPlace()
result_cpu = self._run_test_impl_(
callback=callback,
feed=feed,
fetch=fetch,
place=cpu,
use_parallel=False)
result_cpu_parallel = self._run_test_impl_(
callback=callback,
feed=feed,
fetch=fetch,
place=cpu,
use_parallel=True)
if fluid.core.is_compiled_with_cuda():
gpu = fluid.CUDAPlace(0)
result_gpu = self._run_test_impl_(
callback=callback,
feed=feed,
fetch=fetch,
place=gpu,
use_parallel=False,
use_gpu=True)
result_gpu_parallel = self._run_test_impl_(
callback=callback,
feed=feed,
fetch=fetch,
place=gpu,
use_parallel=True,
use_gpu=True)
result_gpu_nccl = self._run_test_impl_(
callback=callback,
feed=feed,
fetch=fetch,
place=gpu,
use_parallel=True,
use_nccl=True,
use_gpu=True)
self._assert_same_(fetch, result_cpu, result_cpu_parallel,
result_gpu, result_gpu_parallel, result_gpu_nccl)
else:
self._assert_same_(fetch, result_cpu, result_cpu_parallel)
def _run_test_impl_(self,
callback,
feed,
fetch,
place,
use_parallel=False,
use_nccl=False,
use_gpu=False):
"""
Run a single test, returns the fetch values
Args:
place(Place): the computation place.
use_parallel(bool): Whether use parallel.for or not.
Returns:
Fetched numpy arrays.
"""
if isinstance(fetch, six.string_types):
fetch = [fetch]
main = fluid.Program()
startup = fluid.Program()
# Fix seed
main.random_seed = 10
startup.random_seed = 10
with fluid.program_guard(main, startup):
generator = callback()
# Automatically insert parallel do if use_parallel = True
if use_parallel:
thread_num = fluid.core.get_cuda_device_count(
) if use_gpu else 8
places = get_places(thread_num)
pd = ParallelDo(places, use_nccl=use_nccl)
data = next(generator)
if isinstance(data, fluid.framework.Variable):
data = [data]
with pd.do():
ins = list(map(pd.read_input, data))
if len(ins) == 1:
ins = ins[0]
loss = generator.send(ins) # patch input
pd.write_output(loss)
loss = pd()
else:
data = next(generator)
loss = generator.send(data)
self.assertIsNotNone(loss)
avg_loss = fluid.layers.mean(loss)
fluid.backward.append_backward(loss=avg_loss)
exe = fluid.Executor(place)
exe.run(startup)
if use_gpu:
profile_type = 'GPU'
else:
profile_type = 'CPU'
with profiler.profiler(profile_type, 'total', '/tmp/profiler'):
return exe.run(main, feed=feed, fetch_list=fetch)
def _assert_same_(self, fetch, *args):
"""
Assert the return values of `run_test` are same.
Args:
fetch: Fetch list. Used for print error message
*args: The fetch result lists of each situations.
Returns:
None
Raises:
AssertionError
"""
def _impl_(a, b, fetch_id, item_id):
item_str = [
'CPU', 'ParallelCPU', 'GPU', 'ParallelGPU', 'ParallelGPUNCCL'
]
flag = numpy.allclose(a, b, rtol=0.1, atol=1e-3)
self.assertTrue(flag,
"The {0} are different in {1}, {2} vs {3}".format(
fetch[fetch_id], item_str[item_id], a, b))
for i, items in enumerate(zip(*args)):
self.assertGreater(len(items), 0)
for j in range(1, len(items)):
_impl_(items[0], items[j], fetch_id=i, item_id=j)
class ParallelOpTest(BaseParallelForTest):
@staticmethod
def __network__():
x = fluid.layers.data(shape=[784], dtype='float32', name='img')
x = yield x
hidden = fluid.layers.fc(input=x, size=200, param_attr='fc1.w')
hidden = fluid.layers.batch_norm(input=hidden)
loss = fluid.layers.mean(hidden)
yield loss
def test_simple_fc(self):
self.run_test(
callback=self.__network__,
feed={
'img': numpy.random.random(size=(51, 784)).astype('float32')
},
fetch=['fc1.w@GRAD'])
def test_fc_with_tiny_data(self):
self.run_test(
callback=self.__network__,
feed={'img': numpy.random.random(size=(1, 784)).astype('float32')},
fetch=['fc1.w@GRAD'])
class ParallelOpTestMultipleInput(BaseParallelForTest):
@staticmethod
def __network__():
x = fluid.layers.data(
shape=[784], dtype='float32', name='img1', stop_gradient=False)
y = fluid.layers.data(
shape=[784], dtype='float32', name='img2', stop_gradient=False)
yield [x, y]
x = x + y
hidden1 = fluid.layers.fc(input=x, size=200, param_attr='fc1.w')
hidden2 = fluid.layers.fc(input=hidden1, size=200, param_attr='fc2.w')
hidden3 = fluid.layers.fc(input=hidden2, size=200, param_attr='fc3.w')
loss = fluid.layers.mean(hidden3)
yield loss
def test_simple_fc(self):
self.run_test(
callback=self.__network__,
feed={
'img1': numpy.random.random(size=(51, 784)).astype('float32'),
'img2': numpy.random.random(size=(51, 784)).astype('float32')
},
fetch=['fc1.w@GRAD', 'fc2.w@GRAD', 'fc3.w@GRAD'])
if __name__ == '__main__':
unittest.main()
......@@ -35,11 +35,10 @@ dtype_to_size = {
}
SUB_BLOCK_OPS = [
"while", "while_grad", "parallel_do", "parallel_do_grad",
"conditional_block", "conditional_block_grad"
"while", "while_grad", "conditional_block", "conditional_block_grad"
]
SUB_BLOCK_PAIR = [("while", "while_grad"), ("parallel_do", "parallel_do_grad"),
SUB_BLOCK_PAIR = [("while", "while_grad"),
("conditional_block", "conditional_block_grad")]
PRINT_LOG = False
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册