提交 0ffd33d3 编写于 作者: Q qiaolongfei

VariableResponse support deserialize var into local scope

上级 0b8630b9
...@@ -91,7 +91,7 @@ std::vector<std::string> Scope::LocalVarNames() const { ...@@ -91,7 +91,7 @@ std::vector<std::string> Scope::LocalVarNames() const {
return known_vars; return known_vars;
} }
void Scope::DeleteScope(Scope* scope) { void Scope::DeleteScope(Scope* scope) const {
std::unique_lock<std::mutex> lock(mutex_); std::unique_lock<std::mutex> lock(mutex_);
auto it = std::find(this->kids_.begin(), this->kids_.end(), scope); auto it = std::find(this->kids_.begin(), this->kids_.end(), scope);
PADDLE_ENFORCE(it != this->kids_.end(), "Cannot find %p as kid scope", scope); PADDLE_ENFORCE(it != this->kids_.end(), "Cannot find %p as kid scope", scope);
......
...@@ -63,7 +63,7 @@ class Scope { ...@@ -63,7 +63,7 @@ class Scope {
/// Find the scope or an ancestor scope that contains the given variable. /// Find the scope or an ancestor scope that contains the given variable.
const Scope* FindScope(const Variable* var) const; const Scope* FindScope(const Variable* var) const;
void DeleteScope(Scope* scope); void DeleteScope(Scope* scope) const;
/// Drop all kids scopes belonged to this scope. /// Drop all kids scopes belonged to this scope.
void DropKids(); void DropKids();
......
...@@ -60,7 +60,7 @@ class RequestSend final : public RequestBase { ...@@ -60,7 +60,7 @@ class RequestSend final : public RequestBase {
framework::Scope* scope, ReceivedQueue* queue, framework::Scope* scope, ReceivedQueue* queue,
const platform::DeviceContext* dev_ctx) const platform::DeviceContext* dev_ctx)
: RequestBase(service, cq, dev_ctx), queue_(queue), responder_(&ctx_) { : RequestBase(service, cq, dev_ctx), queue_(queue), responder_(&ctx_) {
request_.reset(new VariableResponse(scope, dev_ctx_)); request_.reset(new VariableResponse(false, scope, dev_ctx_));
int method_id = static_cast<int>(detail::GrpcMethod::kSendVariable); int method_id = static_cast<int>(detail::GrpcMethod::kSendVariable);
service_->RequestAsyncUnary(method_id, &ctx_, request_.get(), &responder_, service_->RequestAsyncUnary(method_id, &ctx_, request_.get(), &responder_,
cq_, cq_, this); cq_, cq_, this);
...@@ -146,7 +146,7 @@ class RequestPrefetch final : public RequestBase { ...@@ -146,7 +146,7 @@ class RequestPrefetch final : public RequestBase {
executor_(executor), executor_(executor),
program_(program), program_(program),
prefetch_ctx_(prefetch_ctx) { prefetch_ctx_(prefetch_ctx) {
request_.reset(new VariableResponse(scope, dev_ctx_)); request_.reset(new VariableResponse(false, scope, dev_ctx_));
int method_id = static_cast<int>(detail::GrpcMethod::kPrefetchVariable); int method_id = static_cast<int>(detail::GrpcMethod::kPrefetchVariable);
service_->RequestAsyncUnary(method_id, &ctx_, request_.get(), &responder_, service_->RequestAsyncUnary(method_id, &ctx_, request_.get(), &responder_,
cq_, cq_, this); cq_, cq_, this);
......
...@@ -186,7 +186,7 @@ void DeserializeFromByteBuffer(const ::grpc::ByteBuffer& msg, ...@@ -186,7 +186,7 @@ void DeserializeFromByteBuffer(const ::grpc::ByteBuffer& msg,
const platform::DeviceContext& ctx, const platform::DeviceContext& ctx,
const framework::Scope* scope, const framework::Scope* scope,
framework::Variable** var) { framework::Variable** var) {
operators::detail::VariableResponse resp(scope, &ctx); operators::detail::VariableResponse resp(false, scope, &ctx);
PADDLE_ENFORCE(resp.Parse(msg) == 0, "parse bytebuffer to tensor error!"); PADDLE_ENFORCE(resp.Parse(msg) == 0, "parse bytebuffer to tensor error!");
*var = resp.GetVar(); *var = resp.GetVar();
} }
......
...@@ -84,7 +84,7 @@ void RunSerdeTestSelectedRows(platform::Place place) { ...@@ -84,7 +84,7 @@ void RunSerdeTestSelectedRows(platform::Place place) {
// operators::detail::DeserializeFromByteBuffer(msg, ctx, &var2); // operators::detail::DeserializeFromByteBuffer(msg, ctx, &var2);
framework::Scope scope; framework::Scope scope;
scope.Var("myvar"); scope.Var("myvar");
operators::detail::VariableResponse resp(&scope, &ctx); operators::detail::VariableResponse resp(false, &scope, &ctx);
EXPECT_EQ(resp.Parse(msg), 0); EXPECT_EQ(resp.Parse(msg), 0);
framework::Variable* var2 = resp.GetVar(); framework::Variable* var2 = resp.GetVar();
...@@ -171,7 +171,7 @@ void RunTestLodTensor(platform::Place place, int from_type = 0) { ...@@ -171,7 +171,7 @@ void RunTestLodTensor(platform::Place place, int from_type = 0) {
// deserialize zero-copy // deserialize zero-copy
framework::Scope scope; framework::Scope scope;
scope.Var("myvar"); scope.Var("myvar");
operators::detail::VariableResponse resp(&scope, &ctx); operators::detail::VariableResponse resp(false, &scope, &ctx);
if (from_type == 0) { if (from_type == 0) {
EXPECT_EQ(resp.Parse(msg), 0); EXPECT_EQ(resp.Parse(msg), 0);
} else { } else {
......
...@@ -114,8 +114,7 @@ bool VariableResponse::CopyLodTensorData( ...@@ -114,8 +114,7 @@ bool VariableResponse::CopyLodTensorData(
::google::protobuf::io::CodedInputStream* input, ::google::protobuf::io::CodedInputStream* input,
const platform::DeviceContext& ctx, const framework::DDim& dims, const platform::DeviceContext& ctx, const framework::DDim& dims,
int length) { int length) {
auto var = scope_->FindVar(meta_.varname()); auto* tensor = InitVar()->GetMutable<framework::LoDTensor>();
auto* tensor = var->GetMutable<framework::LoDTensor>();
tensor->Resize(dims); tensor->Resize(dims);
framework::LoD lod; framework::LoD lod;
...@@ -151,8 +150,7 @@ bool VariableResponse::CopySelectRowsTensorData( ...@@ -151,8 +150,7 @@ bool VariableResponse::CopySelectRowsTensorData(
::google::protobuf::io::CodedInputStream* input, ::google::protobuf::io::CodedInputStream* input,
const platform::DeviceContext& ctx, const framework::DDim& dims, const platform::DeviceContext& ctx, const framework::DDim& dims,
int length) { int length) {
auto var = scope_->FindVar(meta_.varname()); auto* slr = InitVar()->GetMutable<framework::SelectedRows>();
auto* slr = var->GetMutable<framework::SelectedRows>();
slr->set_height(meta_.slr_height()); slr->set_height(meta_.slr_height());
auto* tensor = slr->mutable_value(); auto* tensor = slr->mutable_value();
tensor->Resize(dims); tensor->Resize(dims);
...@@ -174,8 +172,7 @@ bool VariableResponse::CopySelectRowsTensorData( ...@@ -174,8 +172,7 @@ bool VariableResponse::CopySelectRowsTensorData(
bool VariableResponse::CopySelectRowsData( bool VariableResponse::CopySelectRowsData(
::google::protobuf::io::CodedInputStream* input, ::google::protobuf::io::CodedInputStream* input,
const platform::DeviceContext& ctx, int length) { const platform::DeviceContext& ctx, int length) {
auto var = scope_->FindVar(meta_.varname()); auto* slr = InitVar()->GetMutable<framework::SelectedRows>();
auto* slr = var->GetMutable<framework::SelectedRows>();
slr->mutable_rows()->resize(length / slr->mutable_rows()->resize(length /
framework::SizeOfType(typeid(int64_t))); // int64 framework::SizeOfType(typeid(int64_t))); // int64
int64_t* rows_data = slr->mutable_rows()->data(); int64_t* rows_data = slr->mutable_rows()->data();
......
...@@ -36,11 +36,13 @@ namespace detail { ...@@ -36,11 +36,13 @@ namespace detail {
class VariableResponse { class VariableResponse {
public: public:
VariableResponse(const framework::Scope* scope, VariableResponse(bool use_local_scope, const framework::Scope* scope,
const platform::DeviceContext* dev_ctx) const platform::DeviceContext* dev_ctx)
: scope_(scope), dev_ctx_(dev_ctx) {} : use_local_scope_(use_local_scope), scope_(scope), dev_ctx_(dev_ctx) {
local_scope_ = &scope->NewScope();
}
virtual ~VariableResponse() {} virtual ~VariableResponse() { scope_->DeleteScope(local_scope_); }
// return: // return:
// 0:ok. // 0:ok.
...@@ -54,11 +56,23 @@ class VariableResponse { ...@@ -54,11 +56,23 @@ class VariableResponse {
// other: number of error field. // other: number of error field.
int Parse(const ::grpc::ByteBuffer& byte_buffer); int Parse(const ::grpc::ByteBuffer& byte_buffer);
const framework::Scope& GetLocalScope() const { return *local_scope_; }
inline std::string Varname() { return meta_.varname(); } inline std::string Varname() { return meta_.varname(); }
inline std::string OutVarname() { return meta_.out_varname(); } inline std::string OutVarname() { return meta_.out_varname(); }
// should call parse first. // should call parse first.
framework::Variable* GetVar() { return scope_->FindVar(meta_.varname()); } framework::Variable* GetVar() {
return local_scope_->FindVar(meta_.varname());
}
framework::Variable* InitVar() {
if (use_local_scope_) {
return local_scope_->Var(meta_.varname());
} else {
return scope_->FindVar(meta_.varname());
}
}
private: private:
bool CopySelectRowsTensorData(::google::protobuf::io::CodedInputStream* input, bool CopySelectRowsTensorData(::google::protobuf::io::CodedInputStream* input,
...@@ -73,7 +87,9 @@ class VariableResponse { ...@@ -73,7 +87,9 @@ class VariableResponse {
const framework::DDim& dims, int length); const framework::DDim& dims, int length);
private: private:
bool use_local_scope_ = false;
const framework::Scope* scope_; const framework::Scope* scope_;
framework::Scope* local_scope_ = nullptr;
const platform::DeviceContext* dev_ctx_; const platform::DeviceContext* dev_ctx_;
// only Skeleton // only Skeleton
sendrecv::VariableMessage meta_; sendrecv::VariableMessage meta_;
......
...@@ -33,7 +33,7 @@ class SplitByrefOpKernel : public framework::OpKernel<T> { ...@@ -33,7 +33,7 @@ class SplitByrefOpKernel : public framework::OpKernel<T> {
// NOTE: no need to call mutable_data here to allocate memory. // NOTE: no need to call mutable_data here to allocate memory.
auto* out = outs[i]; auto* out = outs[i];
VLOG(3) << "spliting by ref: " << row_offset << " " << out->dims()[0]; VLOG(3) << "spliting by ref: " << row_offset << " " << out->dims()[0];
*out = std::move(in->Slice(row_offset, row_offset + out->dims()[0])); *out = in->Slice(row_offset, row_offset + out->dims()[0]);
row_offset += out->dims()[0]; row_offset += out->dims()[0];
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册