recv_impl.cc 2.5 KB
Newer Older
武毅 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve.

   Licensed under the Apache License, Version 2.0 (the "License");
   you may not use this file except in compliance with the License.
   You may obtain a copy of the License at

   http://www.apache.org/licenses/LICENSE-2.0

   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   See the License for the specific language governing permissions and
   limitations under the License. */

#include "send_recv_impl.h"

namespace paddle {
namespace operators {
namespace detail {

Status SendRecvServerImpl::SendVariable(ServerContext *context,
                                        const VariableMessage *in_var,
T
typhoonzero 已提交
23
                                        VoidMessage *out_var) {
T
typhoonzero 已提交
24
  // TODO(typhoonzero): support different variable types.
武毅 已提交
25
  std::istringstream iss(in_var->serialized());
T
typhoonzero 已提交
26
  framework::LoDTensor t;
武毅 已提交
27
  framework::DeserializeFromStream(iss, &t);
T
typhoonzero 已提交
28 29 30 31
  TensorWithName tensor_with_name =
      std::make_pair(in_var->varname(), std::move(t));

  var_recv_queue_.Push(std::move(tensor_with_name));
T
typhoonzero 已提交
32 33 34 35
  return Status::OK;
}

Status SendRecvServerImpl::GetVariable(ServerContext *context,
T
typhoonzero 已提交
36
                                       const VariableMessage *in_var,
T
typhoonzero 已提交
37
                                       VariableMessage *out_var) {
T
typhoonzero 已提交
38 39 40
  std::string get_var_name = in_var->varname();
  auto *var = scope_->FindVar(get_var_name);
  auto tensor = var->Get<framework::LoDTensor>();
武毅 已提交
41
  std::ostringstream oss;
T
typhoonzero 已提交
42
  framework::SerializeToStream(oss, tensor, platform::CPUDeviceContext());
T
typhoonzero 已提交
43

武毅 已提交
44
  std::string *varname = out_var->mutable_varname();
T
typhoonzero 已提交
45
  *varname = get_var_name;
武毅 已提交
46 47 48 49 50
  std::string *serialized = out_var->mutable_serialized();
  *serialized = oss.str();
  return Status::OK;
}

T
typhoonzero 已提交
51 52 53
Status SendRecvServerImpl::Wait(ServerContext *context,
                                const VoidMessage *in_var,
                                VoidMessage *out_var) {
T
typhoonzero 已提交
54 55 56 57
  {
    std::unique_lock<std::mutex> lock(this->mutex_);
    condition_.wait(lock, [=] { return this->done_ == true; });
  }
T
typhoonzero 已提交
58 59 60 61
  return Status::OK;
}

void SendRecvServerImpl::Start() {
T
typhoonzero 已提交
62
  std::lock_guard<std::mutex> lock(this->mutex_);
T
typhoonzero 已提交
63 64 65 66
  done_ = false;
}

void SendRecvServerImpl::Done() {
T
typhoonzero 已提交
67 68 69 70
  {
    std::lock_guard<std::mutex> lock(this->mutex_);
    done_ = true;
  }
T
typhoonzero 已提交
71 72 73
  condition_.notify_all();
}

武毅 已提交
74 75 76
}  // namespace detail
}  // namespace operators
}  // namespace paddle