提交 f879ef23 编写于 作者: Y Yang Yang

pass forward backward runtime

上级 f899150e
......@@ -314,6 +314,45 @@ void DeserializeFromStream(std::istream &is, LoDTensor *tensor) {
}
}
std::vector<LoDTensor> LoDTensor::SplitLoDTensor(
const std::vector<platform::Place> places) const {
check_memory_size();
// PADDLE_ENFORCE(lod().empty() || (lod().size() == 1 && lod()[0].empty())
// , "Disable parallel lod for now");
PADDLE_ENFORCE(lod().empty(), "Disable parallel lod for now");
PADDLE_ENFORCE(dims()[0] % places.size() == 0,
"Batch size should be divided by places size");
std::vector<LoDTensor> lods;
for (int place_idx = 0; place_idx < places.size(); ++place_idx) {
int begin = place_idx * dims()[0] / places.size();
int end = (place_idx + 1) * dims()[0] / places.size();
auto src = Slice(begin, end);
LoDTensor dst;
dst.Resize(src.dims());
auto &dst_place = places[place_idx];
auto dst_ptr = dst.mutable_data(dst_place, src.type());
// TODO(tonyyang-svail):
// change the following to framework::CopyFrom
auto src_place = src.place();
auto src_ptr = src.data<void>();
auto size = src.numel() * SizeOfType(src.type());
if (platform::is_cpu_place(src_place) &&
platform::is_cpu_place(dst_place)) {
memory::Copy(boost::get<platform::CPUPlace>(dst_place), dst_ptr,
boost::get<platform::CPUPlace>(src_place), src_ptr, size);
} else {
PADDLE_THROW("Not Implemented");
}
lods.emplace_back(dst);
}
return lods;
}
void LoDTensor::MergeLoDTensor(
const std::vector<const LoDTensor *> &lod_tensors, platform::Place place) {
PADDLE_ENFORCE(platform::is_cpu_place(place));
......
......@@ -144,6 +144,9 @@ class LoDTensor : public Tensor {
*/
void ShrinkInLevel(size_t level, size_t elem_begin, size_t elem_end);
std::vector<LoDTensor> SplitLoDTensor(
const std::vector<platform::Place> places) const;
void MergeLoDTensor(const std::vector<const LoDTensor*>& lod_tensors,
platform::Place place);
......
......@@ -34,6 +34,8 @@ class ElementwiseOp : public framework::OperatorWithKernel {
auto x_dim = ctx->GetInputDim("X");
auto y_dim = ctx->GetInputDim("Y");
LOG(INFO) << x_dim;
LOG(INFO) << y_dim;
PADDLE_ENFORCE_GE(x_dim.size(), y_dim.size(),
"Rank of first input must >= rank of second input.");
ctx->SetOutputDim("Out", x_dim);
......@@ -118,6 +120,9 @@ class ElementwiseOpGrad : public framework::OperatorWithKernel {
auto x_dims = ctx->GetInputDim("X");
auto y_dims = ctx->GetInputDim("Y");
auto out_dims = ctx->GetInputDim(framework::GradVarName("Out"));
LOG(INFO) << x_dims;
LOG(INFO) << y_dims;
LOG(INFO) << out_dims;
PADDLE_ENFORCE_GE(x_dims.size(), y_dims.size(),
"Rank of first input must >= rank of second input.");
......
......@@ -13,11 +13,9 @@
limitations under the License. */
#include <vector>
#include "chunk_eval_op.h"
#include "paddle/framework/executor.h"
#include "paddle/framework/op_registry.h"
#include "paddle/framework/operator.h"
#include "paddle/platform/place.h"
namespace paddle {
namespace operators {
......@@ -31,10 +29,31 @@ constexpr char kParallelScopes[] = "parallel_scopes";
constexpr char kParallelBlock[] = "sub_block";
using ParallelScopeVar = std::vector<framework::Scope *>;
// using ParallelScopeVar = std::vector<framework::Scope *>;
using LoDTensor = framework::LoDTensor;
using OperatorBase = framework::OperatorBase;
class ParallelDoOp : public OperatorBase {
void SplitTensorAndMoveTensorToScopes(
const framework::Scope &scope,
const std::vector<framework::Scope *> &sub_scopes,
const std::vector<platform::Place> &places,
const std::vector<std::string> &names) {
for (auto &argu : names) {
auto *var = scope.FindVar(argu);
const auto &tensor = var->Get<LoDTensor>();
auto lod_tensors = tensor.SplitLoDTensor(places);
for (auto &lod : lod_tensors) {
LOG(INFO) << lod.dims();
}
for (int i = 0; i < sub_scopes.size(); ++i) {
*sub_scopes[i]->Var(argu)->GetMutable<LoDTensor>() = lod_tensors[i];
}
}
}
class ParallelDoOp : public framework::OperatorBase {
public:
ParallelDoOp(const std::string &type,
const framework::VariableNameMap &inputs,
......@@ -52,11 +71,18 @@ class ParallelDoOp : public OperatorBase {
places.emplace_back(platform::CPUPlace());
places.emplace_back(platform::CPUPlace());
std::vector<framework::Scope *> sub_scopes;
auto &sub_scopes = *scope.FindVar(Output(kParallelScopes))
->GetMutable<std::vector<framework::Scope *>>();
// std::vector<framework::Scope *> sub_scopes;
for (int place_idx = 0; place_idx < places.size(); ++place_idx) {
VLOG(3) << "Run " << place_idx;
sub_scopes.push_back(&scope.NewScope());
}
SplitTensorAndMoveTensorToScopes(scope, sub_scopes, places,
Inputs(kInputs));
for (int place_idx = 0; place_idx < places.size(); ++place_idx) {
VLOG(3) << "Run " << place_idx;
auto &place = places[place_idx];
auto *cur_scope = sub_scopes[place_idx];
......@@ -66,26 +92,6 @@ class ParallelDoOp : public OperatorBase {
PADDLE_THROW("Not Implemented");
}
// feed input
for (auto &argu : Inputs(kInputs)) {
auto *var = scope.FindVar(argu);
const auto &tensor = var->Get<LoDTensor>();
if (!tensor.lod().empty()) {
PADDLE_THROW("Disable parallel lod for now");
} else {
PADDLE_ENFORCE(tensor.dims()[0] % places.size() == 0,
"Batch size should be divided by places size");
int begin = place_idx * tensor.dims()[0] / places.size();
int end = (place_idx + 1) * tensor.dims()[0] / places.size();
auto feed_tensor = tensor.Slice(begin, end);
feed_tensor.switch_place(place);
auto *cur_var = cur_scope->Var(argu);
auto *cur_tensor = cur_var->GetMutable<Tensor>();
*cur_tensor = feed_tensor;
}
}
// execute
auto executor = framework::Executor(place);
executor.Run(*program, cur_scope, block->ID(),
......@@ -132,7 +138,49 @@ class ParallelDoGradOp : public OperatorBase {
: OperatorBase(type, inputs, outputs, attrs) {}
void Run(const framework::Scope &scope,
const platform::DeviceContext &dev_ctx) const override {}
const platform::DeviceContext &dev_ctx) const override {
auto *block = Attr<framework::BlockDescBind *>(kParallelBlock);
auto *program = block->Program();
auto &sub_scopes = scope.FindVar(Input(kParallelScopes))
->Get<std::vector<framework::Scope *>>();
// TODO(tonyyang-svail): get places from input
std::vector<platform::Place> places;
places.emplace_back(platform::CPUPlace());
places.emplace_back(platform::CPUPlace());
// feed output@grad
SplitTensorAndMoveTensorToScopes(scope, sub_scopes, places,
Inputs(framework::GradVarName(kOutputs)));
for (auto &s : Inputs(framework::GradVarName(kOutputs))) {
LOG(INFO) << s;
LOG(INFO) << scope.FindVar(s)->Get<LoDTensor>().dims();
for (auto *sub_scope : sub_scopes) {
LOG(INFO) << sub_scope->FindVar(s)->Get<LoDTensor>().dims();
}
}
// exe run
for (int place_idx = 0; place_idx < places.size(); ++place_idx) {
VLOG(3) << "Run " << place_idx;
auto &place = places[place_idx];
auto *cur_scope = sub_scopes[place_idx];
// copy parameter
if (dev_ctx.GetPlace() != place) {
PADDLE_THROW("Not Implemented");
}
// execute
auto executor = framework::Executor(place);
executor.Run(*program, cur_scope, block->ID(),
false /*create_local_scope*/);
}
// merge grad
}
};
class ParallelDoGradOpDescMaker : public framework::SingleGradOpDescMaker {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册