提交 9cb8f503 编写于 作者: Y Yu Yang

Complete fetch op

上级 254d7ff4
......@@ -87,7 +87,7 @@ cc_library(feed_fetch_method SRCS feed_fetch_method.cc DEPS lod_tensor scope glo
cc_library(executor SRCS executor.cc DEPS op_registry device_context scope
framework_proto backward glog lod_rank_table feed_fetch_method)
cc_library(parallel_executor SRCS parallel_executor.cc DEPS op_registry device_context scope
framework_proto backward glog lod_rank_table feed_fetch_method executor simple_threadpool)
framework_proto backward glog lod_rank_table feed_fetch_method executor simple_threadpool concat)
cc_library(prune SRCS prune.cc DEPS framework_proto)
cc_test(prune_test SRCS prune_test.cc DEPS op_info prune recurrent_op device_context)
......
......@@ -16,7 +16,9 @@ limitations under the License. */
#include "ThreadPool.h"
#include "executor.h"
#include "lod_tensor.h"
#include "lod_tensor_array.h"
#include "op_registry.h"
#include "paddle/fluid/operators/math/concat.h"
namespace paddle {
namespace framework {
......@@ -34,7 +36,7 @@ struct VarHandleBase {
virtual std::string DebugString() const = 0;
OpHandle *generated_op_;
std::vector<OpHandle *> pending_ops_;
std::unordered_set<OpHandle *> pending_ops_;
};
struct VarHandle : public VarHandleBase {
......@@ -93,7 +95,6 @@ struct ComputationOpHandle : public OpHandle {
void Run() override {
// Wait other op if necessary
LOG(INFO) << "Run " << this << " " << DebugString();
auto *cur_ctx = dev_ctx_[place_];
for (auto *in : inputs_) {
if (in->generated_op_ && in->generated_op_->dev_ctx_[place_] != cur_ctx) {
......@@ -102,7 +103,6 @@ struct ComputationOpHandle : public OpHandle {
}
op_->Run(*scope_, place_);
LOG(INFO) << "Done " << this;
}
void Wait(platform::DeviceContext *waited_dev) override {
......@@ -122,8 +122,6 @@ struct ScaleLossGradOpHandle : public OpHandle {
place_(place) {}
void Run() override {
LOG(INFO) << "Run Scale Loss Grad";
std::string var_name = static_cast<VarHandle *>(this->outputs_[0])->name_;
float *tmp = scope_->FindVar(var_name)
......@@ -146,6 +144,64 @@ struct ScaleLossGradOpHandle : public OpHandle {
}
};
struct FetchedData {
public:
std::vector<framework::LoDTensor> tensors_;
explicit FetchedData(size_t num_fetched) { tensors_.resize(num_fetched); }
};
struct FetchOpHandle : public OpHandle {
std::shared_ptr<FetchedData> data_;
size_t offset_;
std::vector<Scope *> *local_scopes_;
std::vector<LoDTensor> tensors_;
~FetchOpHandle() {
for (auto *input_var : inputs_) {
input_var->pending_ops_.erase(this);
}
for (auto &pair : dev_ctx_) {
pair.second->Wait();
}
// Lazily merge tensors. Will faster code.
MergeTensors();
}
void Run() override {
tensors_.resize(inputs_.size());
auto *var = static_cast<VarHandle *>(inputs_[0]);
auto &var_name = var->name_;
platform::CPUPlace cpu;
auto &scopes = *local_scopes_;
for (size_t i = 0; i < scopes.size(); ++i) {
auto &scope = scopes[i];
auto &t = scope->FindVar(var_name)->Get<framework::LoDTensor>();
if (platform::is_gpu_place(var->place_)) {
TensorCopy(t, cpu, *dev_ctx_[t.place()], &tensors_[i]);
} else {
tensors_[i].ShareDataWith(t);
tensors_[i].set_lod(t.lod());
}
}
}
void Wait(platform::DeviceContext *waited_dev) override {
PADDLE_THROW("Nobody should wait FetchOp. Unexpceted Error");
}
private:
void MergeTensors() const {
std::vector<const LoDTensor *> tensors_ptr;
for (auto &t : tensors_) {
tensors_ptr.emplace_back(&t);
}
data_->tensors_[offset_].MergeLoDTensor(tensors_ptr, platform::CPUPlace());
}
};
class ParallelExecutorPrivate {
public:
explicit ParallelExecutorPrivate(size_t num_threads = 12)
......@@ -154,6 +210,7 @@ class ParallelExecutorPrivate {
std::vector<platform::Place> places_;
std::vector<Scope *> local_scopes_;
Scope *global_scope_;
#ifdef PADDLE_WITH_CUDA
struct NCCLContext {
......@@ -297,7 +354,7 @@ ParallelExecutor::ParallelExecutor(
const std::string &loss_var_name, Scope *scope)
: member_(new ParallelExecutorPrivate()) {
member_->places_ = places;
member_->global_scope_ = scope;
// Step 1. RunStartupProgram and Bcast the params to devs.
Executor exe(places[0]);
exe.Run(startup_program, scope, 0);
......@@ -308,9 +365,9 @@ ParallelExecutor::ParallelExecutor(
member_->main_place_ = places[0];
// Bcast Parameters to all GPUs
BuildNCCLCommunicator();
if (platform::is_gpu_place(member_->main_place_) &&
member_->local_scopes_.size() != 1) { // Is CUDA
BuildNCCLCommunicator();
BCastParamsToGPUs(startup_program);
}
// Startup Program has been run. All local scopes has correct parameters.
......@@ -365,7 +422,7 @@ void ParallelExecutor::ConstructDependencyGraph(
for (auto &each_var_name : var_names) {
VarHandle *var = GetVarHandle(each_var_name, p);
op_handle->inputs_.emplace_back(var);
var->pending_ops_.emplace_back(op_handle);
var->pending_ops_.emplace(op_handle);
}
var_names = op->OutputArgumentNames();
......@@ -390,7 +447,6 @@ void ParallelExecutor::ConstructDependencyGraph(
GenerateVar(op_handle, loss_var_name + "@GRAD", p);
change_forward = true;
LOG(INFO) << "Scale Loss " << op_handle->DebugString();
}
}
}
......@@ -416,7 +472,7 @@ void ParallelExecutor::ConstructDependencyGraph(
}
auto *prev_grad = &vars[vars.size() - 1];
op_handle->inputs_.emplace_back(prev_grad);
prev_grad->pending_ops_.emplace_back(op_handle);
prev_grad->pending_ops_.emplace(op_handle);
auto &var = vars[vars.size()];
var.place_ = p;
var.generated_op_ = op_handle;
......@@ -463,10 +519,6 @@ void ParallelExecutor::PolishGraphToSupportDataHarzaeds() const {
continue;
}
LOG(INFO) << "Link " << it_new->second.DebugString() << " From "
<< it_old->second.version_ << " To "
<< it_new->second.version_;
for (auto *read_op : read_ops) {
// Manually add a dependency var from read_op to write_op;
if (read_op == write_op) {
......@@ -479,7 +531,7 @@ void ParallelExecutor::PolishGraphToSupportDataHarzaeds() const {
dep_var->generated_op_ = read_op;
read_op->outputs_.emplace_back(dep_var);
dep_var->pending_ops_.emplace_back(write_op);
dep_var->pending_ops_.emplace(write_op);
write_op->inputs_.emplace_back(dep_var);
member_->dep_vars_.emplace(dep_var);
}
......@@ -572,8 +624,9 @@ void ParallelExecutor::BuildNCCLCommunicator() const {
#endif
}
std::vector<LoDTensor> ParallelExecutor::Run(
const std::vector<std::string> &fetch_tensors) {
void ParallelExecutor::Run(const std::vector<std::string> &fetch_tensors,
const std::string &fetched_var_name) {
auto fetched_data = std::make_shared<FetchedData>(fetch_tensors.size());
// Version --> VarHandle
member_->exception_.reset();
std::unordered_map<VarHandleBase *, bool> pending_vars;
......@@ -602,6 +655,38 @@ std::vector<LoDTensor> ParallelExecutor::Run(
}
}
std::unordered_map<std::string, std::vector<VarHandleBase *>> fetched_vars;
for (auto &fetch_var_name : fetch_tensors) {
for (auto &pair : member_->vars_) {
auto it = pair.second.find(fetch_var_name);
if (it != pair.second.end()) {
fetched_vars[fetch_var_name].push_back(&it->second.rbegin()->second);
}
}
}
std::vector<FetchOpHandle> fetch_ops;
for (size_t i = 0; i < fetch_tensors.size(); ++i) {
auto &var_name = fetch_tensors[i];
auto &vars = fetched_vars[var_name];
fetch_ops.emplace_back();
FetchOpHandle *op = &fetch_ops.back();
op->data_ = fetched_data;
op->offset_ = i;
op->local_scopes_ = &member_->local_scopes_;
for (auto &p : member_->places_) {
op->dev_ctx_[p] = this->member_->GetNCCLCtx(p).ctx_.get();
}
for (auto *var : vars) {
var->pending_ops_.emplace(op);
op->inputs_.emplace_back(var);
}
pending_ops.insert({op, op->inputs_.size()});
}
for (auto *op : to_run) {
RunOp(pending_vars, op);
}
......@@ -642,7 +727,9 @@ std::vector<LoDTensor> ParallelExecutor::Run(
RunOp(pending_vars, op);
}
}
return std::vector<LoDTensor>();
fetch_ops.clear();
*member_->global_scope_->Var(fetched_var_name)->GetMutable<LoDTensorArray>() =
fetched_data->tensors_;
}
void ParallelExecutor::RunOp(
......
......@@ -40,7 +40,8 @@ class ParallelExecutor {
const ProgramDesc& main_program,
const std::string& loss_var_name, Scope* scope);
std::vector<LoDTensor> Run(const std::vector<std::string>& fetch_tensors);
void Run(const std::vector<std::string>& fetch_tensors,
const std::string& fetched_var_name = "fetched_var");
private:
ParallelExecutorPrivate* member_;
......
......@@ -13,6 +13,7 @@ See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include "paddle/fluid/framework/data_type.h"
#include "paddle/fluid/framework/tensor.h"
namespace paddle {
......
......@@ -508,7 +508,7 @@ All parameter, weight, gradient are variables in Paddle.
new (&self) ParallelExecutor(places, params, startup_program,
main_program, loss_var_name, scope);
})
.def("run", [](ParallelExecutor &self) { self.Run({}); });
.def("run", &ParallelExecutor::Run);
BindRecordIOWriter(m);
return m.ptr();
......
......@@ -16,6 +16,7 @@ import unittest
import paddle.fluid as fluid
import paddle.v2 as paddle
import paddle.v2.dataset.mnist as mnist
import numpy
class ParallelExecutor(unittest.TestCase):
......@@ -66,4 +67,16 @@ class ParallelExecutor(unittest.TestCase):
act_places,
set([p.name for p in main.global_block().iter_parameters()]),
startup.desc, main.desc, loss.name, fluid.global_scope())
exe.run()
exe.run([loss.name], 'fetched_var')
first_loss = numpy.array(fluid.global_scope().find_var('fetched_var')
.get_lod_tensor_array()[0])
for i in xrange(10):
exe.run([], 'fetched_var')
exe.run([loss.name], 'fetched_var')
last_loss = numpy.array(fluid.global_scope().find_var('fetched_var')
.get_lod_tensor_array()[0])
print first_loss, last_loss
self.assertGreater(first_loss[0], last_loss[0])
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册