提交 b8a17987 编写于 作者: Y Yu Yang 提交者: Yang Yang(Tony)

Feature/parallel for bug fix (#7474)

* Fix ParallelDo not support empty input gradient

* Polish ParallelDo and fix several bugs

* Fix CI

* Fix CI
上级 c5067a6a
......@@ -291,23 +291,32 @@ std::vector<LoDTensor> LoDTensor::SplitLoDTensor(
const std::vector<platform::Place> places) const {
check_memory_size();
PADDLE_ENFORCE(lod().empty(), "Disable parallel lod for now");
PADDLE_ENFORCE(dims()[0] % places.size() == 0,
"Batch size should be divided by places size");
size_t result_size = std::min(static_cast<size_t>(dims()[0]), places.size());
size_t remainder = dims()[0] % places.size();
std::vector<LoDTensor> lods;
for (size_t place_idx = 0; place_idx < places.size(); ++place_idx) {
int begin = place_idx * dims()[0] / places.size();
int end = (place_idx + 1) * dims()[0] / places.size();
std::vector<LoDTensor> results;
results.reserve(result_size);
int step_width = static_cast<int>(dims()[0] / result_size);
for (size_t i = 0; i < result_size; ++i) {
int begin = static_cast<int>(i * step_width);
int end = static_cast<int>((i + 1) * step_width);
if (i + 1 == places.size()) { // last
end += remainder;
}
auto src = Slice(begin, end);
auto &dst_place = places[place_idx];
auto &dst_place = places[i];
LoDTensor dst;
if (!(dst_place == place())) {
framework::Copy(src, dst_place, &dst);
lods.emplace_back(dst);
} else { // It is no need to copy if src_place and dst_place are same.
dst.ShareDataWith(src);
}
results.emplace_back(dst);
}
return lods;
return results;
}
// TODO(tonyyang-svail): make this function support LoD
......@@ -318,12 +327,17 @@ void LoDTensor::MergeLoDTensor(
framework::DDim new_dim = lod_tensors[0]->dims();
std::type_index new_type = lod_tensors[0]->type();
auto new_layout = lod_tensors[0]->layout();
int64_t new_height = 0;
for (auto *lod : lod_tensors) {
PADDLE_ENFORCE(new_dim == lod->dims());
PADDLE_ENFORCE(new_type == lod->type());
PADDLE_ENFORCE(new_layout == lod->layout());
new_height += lod->dims()[0];
for (int i = 1; i < new_dim.size(); ++i) {
PADDLE_ENFORCE_EQ(new_dim[i], lod->dims()[i]);
}
PADDLE_ENFORCE_EQ(new_type, lod->type());
PADDLE_ENFORCE_EQ(new_layout, lod->layout());
}
new_dim[0] *= lod_tensors.size();
new_dim[0] = new_height;
Resize(new_dim);
set_layout(new_layout);
......
......@@ -30,16 +30,13 @@ static constexpr char kParallelScopes[] = "parallel_scopes";
static constexpr char kParallelBlock[] = "sub_block";
// using ParallelScopeVar = std::vector<framework::Scope *>;
using LoDTensor = framework::LoDTensor;
using OperatorBase = framework::OperatorBase;
void SplitTensorAndMoveTensorToScopes(
const framework::Scope &scope,
const std::vector<framework::Scope *> &sub_scopes,
static void SplitTensorAndMoveTensorToScopes(
const framework::Scope &scope, std::vector<framework::Scope *> *sub_scopes,
const std::vector<platform::Place> &places,
const std::vector<std::string> &names) {
PADDLE_ENFORCE_EQ(sub_scopes.size(), places.size());
size_t num_sub_scopes = 0;
for (auto &argu : names) {
auto *var = scope.FindVar(argu);
const auto &tensor = var->Get<LoDTensor>();
......@@ -48,9 +45,21 @@ void SplitTensorAndMoveTensorToScopes(
for (auto &lod : lod_tensors) {
VLOG(3) << lod.dims();
}
if (num_sub_scopes == 0) {
num_sub_scopes = lod_tensors.size();
} else {
PADDLE_ENFORCE_EQ(num_sub_scopes, lod_tensors.size());
}
PADDLE_ENFORCE_NE(num_sub_scopes, 0);
if (sub_scopes->size() == 0) {
sub_scopes->reserve(num_sub_scopes);
for (size_t i = 0; i < num_sub_scopes; ++i) {
sub_scopes->emplace_back(&scope.NewScope());
}
}
for (size_t i = 0; i < sub_scopes.size(); ++i) {
*sub_scopes[i]->Var(argu)->GetMutable<LoDTensor>() = lod_tensors[i];
for (size_t i = 0; i < lod_tensors.size(); ++i) {
*(*sub_scopes)[i]->Var(argu)->GetMutable<LoDTensor>() = lod_tensors[i];
}
}
}
......@@ -70,7 +79,7 @@ class ParallelDoOp : public framework::OperatorBase {
const framework::VariableNameMap &inputs,
const framework::VariableNameMap &outputs,
const framework::AttributeMap &attrs)
: OperatorBase(type, inputs, outputs, attrs) {}
: framework::OperatorBase(type, inputs, outputs, attrs) {}
void Run(const framework::Scope &scope,
const platform::Place &place) const override {
......@@ -85,19 +94,17 @@ class ParallelDoOp : public framework::OperatorBase {
auto &sub_scopes = *scope.FindVar(Output(kParallelScopes))
->GetMutable<std::vector<framework::Scope *>>();
for (size_t place_idx = 0; place_idx < places.size(); ++place_idx) {
sub_scopes.push_back(&scope.NewScope());
}
// split input
SplitTensorAndMoveTensorToScopes(scope, sub_scopes, places,
SplitTensorAndMoveTensorToScopes(scope, &sub_scopes, places,
Inputs(kInputs));
// copy parameter
for (auto &param : Inputs(kParameters)) {
PADDLE_ENFORCE(scope.FindVar(param)->IsType<LoDTensor>(),
"Only support parameter type as LoDTensor");
auto &src = scope.FindVar(param)->Get<LoDTensor>();
for (size_t i = 0; i < places.size(); ++i) {
for (size_t i = 0; i < sub_scopes.size(); ++i) {
auto &place = places[i];
auto *sub_scope = sub_scopes[i];
auto *dst = sub_scope->Var(param)->GetMutable<LoDTensor>();
......@@ -108,9 +115,7 @@ class ParallelDoOp : public framework::OperatorBase {
std::vector<std::future<void>> workers;
workers.reserve(places.size());
for (size_t place_idx = 0; place_idx < places.size(); ++place_idx) {
VLOG(3) << "Run " << place_idx;
for (size_t place_idx = 0; place_idx < sub_scopes.size(); ++place_idx) {
auto &place = places[place_idx];
auto *cur_scope = sub_scopes[place_idx];
......@@ -157,21 +162,16 @@ ParallelDo Operator.
}
};
class ParallelDoGradOp : public OperatorBase {
class ParallelDoGradOp : public framework::OperatorBase {
public:
ParallelDoGradOp(const std::string &type,
const framework::VariableNameMap &inputs,
const framework::VariableNameMap &outputs,
const framework::AttributeMap &attrs)
: OperatorBase(type, inputs, outputs, attrs) {}
: framework::OperatorBase(type, inputs, outputs, attrs) {}
void Run(const framework::Scope &scope,
const platform::Place &place) const override {
// // get device context from pool
// platform::DeviceContextPool &pool =
// platform::DeviceContextPool::Instance();
// auto &dev_ctx = *pool.Get(place);
auto *block = Attr<framework::BlockDesc *>(kParallelBlock);
auto *program = block->Program();
......@@ -181,26 +181,16 @@ class ParallelDoGradOp : public OperatorBase {
auto &places = scope.FindVar(Input(kPlaces))->Get<platform::PlaceList>();
// feed output@grad
SplitTensorAndMoveTensorToScopes(scope, sub_scopes, places,
Inputs(framework::GradVarName(kOutputs)));
SplitTensorAndMoveTensorToScopes(
scope, const_cast<std::vector<framework::Scope *> *>(&sub_scopes),
places, Inputs(framework::GradVarName(kOutputs)));
WaitOnPlaces(places);
// for debugging
for (auto &s : Inputs(framework::GradVarName(kOutputs))) {
VLOG(3) << s;
VLOG(3) << scope.FindVar(s)->Get<LoDTensor>();
for (auto *sub_scope : sub_scopes) {
VLOG(3) << sub_scope->FindVar(s)->Get<LoDTensor>();
}
}
// exe run
std::vector<std::future<void>> workers;
for (size_t place_idx = 0; place_idx < places.size(); ++place_idx) {
VLOG(3) << "Run " << place_idx;
auto &place = places[place_idx];
auto *cur_scope = sub_scopes[place_idx];
for (size_t i = 0; i < sub_scopes.size(); ++i) {
auto &place = places[i];
auto *cur_scope = sub_scopes[i];
// execute
workers.emplace_back(framework::Async([program, cur_scope, place, block] {
......@@ -216,33 +206,38 @@ class ParallelDoGradOp : public OperatorBase {
// merge grad
for (auto &s : Outputs(framework::GradVarName(kParameters))) {
VLOG(3) << "merge grad " << s;
auto &t = sub_scopes[0]->FindVar(s)->Get<LoDTensor>();
VLOG(3) << t;
std::string s_buf = s + "@BUF";
auto *t_buf = sub_scopes[0]->Var(s_buf)->GetMutable<LoDTensor>();
for (size_t place_idx = 1; place_idx < places.size(); ++place_idx) {
auto &tt = sub_scopes[place_idx]->FindVar(s)->Get<LoDTensor>();
VLOG(3) << place_idx;
VLOG(3) << tt;
framework::Copy(tt, places[0], t_buf);
auto &result = sub_scopes[0]->FindVar(s)->Get<LoDTensor>();
std::string tmp_name;
auto *tmp = sub_scopes[0]->Var(&tmp_name)->GetMutable<LoDTensor>();
for (size_t i = 1; i < sub_scopes.size(); ++i) {
auto &tensor_to_merge = sub_scopes[i]->FindVar(s)->Get<LoDTensor>();
if (!(places[i] == places[0])) {
framework::Copy(tensor_to_merge, places[0], tmp);
} else {
tmp->ShareDataWith(tensor_to_merge);
}
auto sum_op = framework::OpRegistry::CreateOp(
"sum", {{"X", {s, s_buf}}}, {{"Out", {s}}},
"sum", {{"X", {s, tmp_name}}}, {{"Out", {s}}},
framework::AttributeMap{});
sum_op->Run(*sub_scopes[0], places[0]);
WaitOnPlaces(places);
}
VLOG(3) << t;
framework::Copy(t, place, scope.FindVar(s)->GetMutable<LoDTensor>());
VLOG(3) << result;
framework::Copy(result, place, scope.FindVar(s)->GetMutable<LoDTensor>());
}
}
};
std::ostream &operator<<(std::ostream &sout,
const std::vector<std::string> &strs) {
std::copy(strs.begin(), strs.end(),
std::ostream_iterator<std::string>(sout, ","));
return sout;
}
class ParallelDoGradOpDescMaker : public framework::SingleGradOpDescMaker {
public:
using framework::SingleGradOpDescMaker::SingleGradOpDescMaker;
......@@ -283,18 +278,30 @@ class ParallelDoGradOpShapeInference : public framework::InferShapeBase {
void operator()(framework::InferShapeContext *ctx) const override {
std::vector<std::string> input{kParameters, kInputs};
std::vector<std::string> output{kOutputs};
for (auto &s : input) {
PADDLE_ENFORCE(ctx->HasInputs(s));
PADDLE_ENFORCE(ctx->HasOutputs(framework::GradVarName(s)),
"Cannot find the gradient variable %s",
framework::GradVarName(s));
}
PADDLE_ENFORCE(ctx->HasInputs(kParameters));
PADDLE_ENFORCE(ctx->HasOutputs(framework::GradVarName(kParameters)));
PADDLE_ENFORCE(ctx->HasInput(kInputs));
for (auto &s : output) {
PADDLE_ENFORCE(ctx->HasInputs(s));
}
for (auto &s : input) {
ctx->SetOutputsDim(framework::GradVarName(s), ctx->GetInputsDim(s));
ctx->SetOutputsDim(framework::GradVarName(kParameters),
ctx->GetInputsDim(kParameters));
auto i_dims = ctx->GetInputsDim(kInputs);
auto ig_names = ctx->Outputs(framework::GradVarName(kInputs));
for (size_t i = 0; i < ig_names.size(); ++i) {
auto &ig_name = ig_names[i];
if (ig_name == framework::kEmptyVarName) {
continue;
}
ctx->SetDims({ig_name}, {i_dims[i]});
}
if (ctx->HasInputs(kParameters)) {
PADDLE_ENFORCE(ctx->HasOutputs(framework::GradVarName(kParameters)));
ctx->SetOutputsDim(framework::GradVarName(kParameters),
......
......@@ -15,9 +15,15 @@ limitations under the License. */
#pragma once
#include <sstream>
#include <string>
#include <typeindex>
namespace paddle {
namespace string {
inline std::ostream& operator<<(std::ostream& s, const std::type_index& t) {
s << t.name();
return s;
}
template <typename T>
inline std::string to_string(T v) {
std::ostringstream sout;
......@@ -25,6 +31,11 @@ inline std::string to_string(T v) {
return sout.str();
}
template <>
inline std::string to_string(std::type_index t) {
return t.name();
}
// Faster std::string/const char* type
template <>
inline std::string to_string(std::string v) {
......
......@@ -151,24 +151,28 @@ class BaseParallelForTest(unittest.TestCase):
class ParallelOpTest(BaseParallelForTest):
def test_simple_fc(self):
@staticmethod
def __network__():
x = fluid.layers.data(shape=[784], dtype='float32', name='img')
# FIXME: This is a bug of parallel.do
x.stop_gradient = False
x = yield x
hidden = fluid.layers.fc(input=x, size=200, param_attr='fc1.w')
loss = fluid.layers.mean(x=hidden)
yield loss
def test_simple_fc(self):
self.run_test(
callback=__network__,
callback=ParallelOpTest.__network__,
feed={
'img':
numpy.random.random(size=(128 * 3, 784)).astype('float32')
'img': numpy.random.random(size=(51, 784)).astype('float32')
},
fetch='fc1.w@GRAD')
def test_fc_with_tiny_data(self):
self.run_test(
callback=ParallelOpTest.__network__,
feed={'img': numpy.random.random(size=(1, 784)).astype('float32')},
fetch='fc1.w@GRAD')
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册