提交 42a15a43 编写于 作者: Q qiaolongfei

Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into add-async-listen-and-serv-op

......@@ -128,7 +128,7 @@ FeedFetchList ThreadedSSAGraphExecutor::Run(
//
// NOTE: DelayedOps have a lower priority. It will be scheduled after all
// ready_ops have been performed.
if (ready_ops.empty() && allow_op_delay_) {
if (ready_ops.empty() && allow_op_delay_ && running_ops_ == 0) {
run_all_ops(delayed_ops);
} else {
run_all_ops(ready_ops);
......
......@@ -68,9 +68,9 @@ class RequestSend final : public RequestBase {
queue_(queue),
responder_(&ctx_) {
if (sync_mode_) {
request_.reset(new VariableResponse(false, scope, dev_ctx_));
request_.reset(new VariableResponse(scope, dev_ctx_, false));
} else {
request_.reset(new VariableResponse(true, scope, dev_ctx_));
request_.reset(new VariableResponse(scope, dev_ctx_, true));
}
int method_id = static_cast<int>(detail::GrpcMethod::kSendVariable);
service_->RequestAsyncUnary(method_id, &ctx_, request_.get(), &responder_,
......@@ -158,9 +158,9 @@ class RequestPrefetch final : public RequestBase {
program_(program),
prefetch_ctx_(prefetch_ctx) {
if (sync_mode_) {
request_.reset(new VariableResponse(false, scope, dev_ctx_));
request_.reset(new VariableResponse(scope, dev_ctx_, false));
} else {
request_.reset(new VariableResponse(true, scope, dev_ctx_));
request_.reset(new VariableResponse(scope, dev_ctx_, true));
}
int method_id = static_cast<int>(detail::GrpcMethod::kPrefetchVariable);
service_->RequestAsyncUnary(method_id, &ctx_, request_.get(), &responder_,
......
......@@ -186,7 +186,7 @@ void DeserializeFromByteBuffer(const ::grpc::ByteBuffer& msg,
const platform::DeviceContext& ctx,
const framework::Scope* scope,
framework::Variable** var) {
operators::detail::VariableResponse resp(false, scope, &ctx);
operators::detail::VariableResponse resp(scope, &ctx);
PADDLE_ENFORCE(resp.Parse(msg) == 0, "parse bytebuffer to tensor error!");
*var = resp.GetVar();
}
......
......@@ -51,7 +51,7 @@ void RunSerdeTestSelectedRows(platform::Place place) {
::grpc::ByteBuffer msg;
operators::detail::SerializeToByteBuffer("myvar", &var, ctx, &msg);
EXPECT_GT(msg.Length(), 0);
EXPECT_GT(msg.Length(), static_cast<size_t>(0));
// deserialize
std::vector<::grpc::Slice> slices;
......@@ -84,7 +84,7 @@ void RunSerdeTestSelectedRows(platform::Place place) {
// operators::detail::DeserializeFromByteBuffer(msg, ctx, &var2);
framework::Scope scope;
scope.Var("myvar");
operators::detail::VariableResponse resp(false, &scope, &ctx);
operators::detail::VariableResponse resp(&scope, &ctx);
EXPECT_EQ(resp.Parse(msg), 0);
framework::Variable* var2 = resp.GetVar();
......@@ -129,7 +129,7 @@ void RunTestLodTensor(platform::Place place, int from_type = 0) {
::grpc::ByteBuffer msg;
operators::detail::SerializeToByteBuffer("myvar", &var, ctx, &msg);
EXPECT_GT(msg.Length(), 0);
EXPECT_GT(msg.Length(), static_cast<size_t>(0));
// deserialize
std::vector<::grpc::Slice> slices;
......@@ -171,7 +171,7 @@ void RunTestLodTensor(platform::Place place, int from_type = 0) {
// deserialize zero-copy
framework::Scope scope;
scope.Var("myvar");
operators::detail::VariableResponse resp(false, &scope, &ctx);
operators::detail::VariableResponse resp(&scope, &ctx);
if (from_type == 0) {
EXPECT_EQ(resp.Parse(msg), 0);
} else {
......
......@@ -114,7 +114,7 @@ bool VariableResponse::CopyLodTensorData(
::google::protobuf::io::CodedInputStream* input,
const platform::DeviceContext& ctx, const framework::DDim& dims,
int length) {
auto* tensor = InitVar()->GetMutable<framework::LoDTensor>();
auto* tensor = GetVar()->GetMutable<framework::LoDTensor>();
tensor->Resize(dims);
framework::LoD lod;
......@@ -150,7 +150,7 @@ bool VariableResponse::CopySelectRowsTensorData(
::google::protobuf::io::CodedInputStream* input,
const platform::DeviceContext& ctx, const framework::DDim& dims,
int length) {
auto* slr = InitVar()->GetMutable<framework::SelectedRows>();
auto* slr = GetVar()->GetMutable<framework::SelectedRows>();
slr->set_height(meta_.slr_height());
auto* tensor = slr->mutable_value();
tensor->Resize(dims);
......@@ -172,7 +172,7 @@ bool VariableResponse::CopySelectRowsTensorData(
bool VariableResponse::CopySelectRowsData(
::google::protobuf::io::CodedInputStream* input,
const platform::DeviceContext& ctx, int length) {
auto* slr = InitVar()->GetMutable<framework::SelectedRows>();
auto* slr = GetVar()->GetMutable<framework::SelectedRows>();
slr->mutable_rows()->resize(length /
framework::SizeOfType(typeid(int64_t))); // int64
int64_t* rows_data = slr->mutable_rows()->data();
......
......@@ -36,13 +36,18 @@ namespace detail {
class VariableResponse {
public:
VariableResponse(bool use_local_scope, const framework::Scope* scope,
const platform::DeviceContext* dev_ctx)
: use_local_scope_(use_local_scope), scope_(scope), dev_ctx_(dev_ctx) {
VariableResponse(const framework::Scope* scope,
const platform::DeviceContext* dev_ctx,
bool create_scope = false)
: scope_(scope), dev_ctx_(dev_ctx), create_scope_(create_scope) {
if (create_scope) {
local_scope_ = &scope->NewScope();
}
}
virtual ~VariableResponse() { scope_->DeleteScope(local_scope_); }
virtual ~VariableResponse() {
if (create_scope_) scope_->DeleteScope(local_scope_);
}
// return:
// 0:ok.
......@@ -63,17 +68,10 @@ class VariableResponse {
// should call parse first.
framework::Variable* GetVar() {
return local_scope_->FindVar(meta_.varname());
}
framework::Variable* InitVar() {
if (use_local_scope_) {
bool has_var = (scope_->FindVar(meta_.varname()) != nullptr);
PADDLE_ENFORCE(has_var);
if (create_scope_) {
return local_scope_->Var(meta_.varname());
} else {
return scope_->FindVar(meta_.varname());
}
return scope_->FindVar(meta_.varname());
}
private:
......@@ -89,10 +87,10 @@ class VariableResponse {
const framework::DDim& dims, int length);
private:
bool use_local_scope_ = false;
const framework::Scope* scope_;
framework::Scope* local_scope_ = nullptr;
const platform::DeviceContext* dev_ctx_;
bool create_scope_ = false;
framework::Scope* local_scope_ = nullptr;
// only Skeleton
sendrecv::VariableMessage meta_;
};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册