sendrecvop_utils.cc 8.3 KB
Newer Older
1
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.
G
gongweibao 已提交
2 3 4 5 6 7 8 9 10 11 12 13 14

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

Y
Yi Wang 已提交
15
#include "paddle/fluid/operators/detail/sendrecvop_utils.h"
Y
Yi Wang 已提交
16

T
typhoonzero 已提交
17
#ifdef PADDLE_WITH_CUDA
T
fix ci  
typhoonzero 已提交
18
#include <nccl.h>
T
typhoonzero 已提交
19
#endif
20
#include <sys/time.h>
Y
Yi Wang 已提交
21 22
#include <thread>  // NOLINT

23 24 25 26 27
#include "google/protobuf/io/coded_stream.h"
#include "google/protobuf/io/zero_copy_stream.h"
#include "paddle/fluid/framework/data_type.h"
#include "paddle/fluid/operators/detail/bytebuffer_stream.h"
#include "paddle/fluid/operators/detail/proto_encoder_helper.h"
28
#include "paddle/fluid/operators/detail/variable_response.h"
29
#include "paddle/fluid/platform/profiler.h"
G
gongweibao 已提交
30 31 32 33 34

namespace paddle {
namespace operators {
namespace detail {

T
typhoonzero 已提交
35 36 37 38 39 40
using VarMsg = sendrecv::VariableMessage;

void GetTensorPayload(framework::Variable* var,
                      const platform::DeviceContext& ctx, VarMsg* request,
                      void** payload, size_t* payload_size) {
  auto tensor = var->Get<framework::LoDTensor>();
T
typhoonzero 已提交
41
  // FIXME(wuyi): data types in send_recv.proto is copied from
T
typhoonzero 已提交
42
  // framework.proto
T
typhoonzero 已提交
43 44
  request->set_data_type(
      static_cast<VarMsg::Type>(framework::ToDataType(tensor.type())));
T
typhoonzero 已提交
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
  for (auto& dim : framework::vectorize(tensor.dims())) {
    request->add_dims(dim);
  }
  const framework::LoD lod = tensor.lod();
  if (lod.size() > 0) {
    request->set_lod_level(lod.size());
    for (auto& each : lod) {
      VarMsg::LodData* lod_inner = request->add_lod();
      for (auto& d : each) {
        lod_inner->add_lod_data(d);
      }
    }
  }
  if (platform::is_gpu_place(ctx.GetPlace())) {
#ifdef PADDLE_WITH_CUDA
    PADDLE_ENFORCE(platform::is_gpu_place(tensor.place()));
    platform::CPUPlace cpu;
    auto& gpu_dev_ctx = static_cast<const platform::CUDADeviceContext&>(ctx);
    auto copy_size = tensor.numel() * framework::SizeOfType(tensor.type());
    *payload = memory::Alloc(cpu, copy_size);

    memory::Copy(cpu, *payload, boost::get<platform::CUDAPlace>(tensor.place()),
                 reinterpret_cast<const void*>(tensor.data<void>()), copy_size,
                 gpu_dev_ctx.stream());
    ctx.Wait();
#endif
  } else {
    *payload = tensor.data<void>();
  }
  *payload_size = tensor.numel() * framework::SizeOfType(tensor.type());
}

void GetSelectedRowsPayload(framework::Variable* var,
                            const platform::DeviceContext& ctx, VarMsg* request,
                            void** payload, size_t* payload_size) {
  auto* slr = var->GetMutable<framework::SelectedRows>();
T
typhoonzero 已提交
81 82
  request->set_data_type(
      static_cast<VarMsg::Type>(framework::ToDataType(slr->value().type())));
T
typhoonzero 已提交
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
  request->set_lod_level(0);
  request->set_slr_height(slr->height());

  for (auto& dim : framework::vectorize(slr->value().dims())) {
    request->add_dims(dim);
  }

  auto* tensor = slr->mutable_value();
  if (platform::is_gpu_place(ctx.GetPlace())) {
#ifdef PADDLE_WITH_CUDA
    platform::CPUPlace cpu;
    auto& gpu_dev_ctx = static_cast<const platform::CUDADeviceContext&>(ctx);
    auto copy_size = tensor->numel() * framework::SizeOfType(tensor->type());
    *payload = memory::Alloc(cpu, copy_size);
    memory::Copy(cpu, *payload,
                 boost::get<platform::CUDAPlace>(tensor->place()),
                 reinterpret_cast<const void*>(tensor->data<void>()), copy_size,
                 gpu_dev_ctx.stream());
    ctx.Wait();
#endif
  } else {
    *payload = slr->mutable_value()->data<void>();
  }
  *payload_size = tensor->numel() * framework::SizeOfType(tensor->type());
}

109 110
void SerializeToByteBuffer(const std::string& name, framework::Variable* var,
                           const platform::DeviceContext& ctx,
Y
Yancey1989 已提交
111 112
                           ::grpc::ByteBuffer* msg,
                           const std::string& out_name) {
T
typhoonzero 已提交
113 114
  // Default DestroyCallback does nothing, When using GPU
  // the CPU buffer need to be freed.
115
  DestroyCallback destroy_callback = [](void* backing) {};
T
typhoonzero 已提交
116
  VarMsg request;
Y
Yancey 已提交
117
  void* payload = nullptr;
118
  size_t payload_size;
T
typhoonzero 已提交
119 120

  request.set_varname(name);
121 122 123 124
  // Note: normally the profiler is enabled in 1 trainer, hence only
  // 1 trainer returns true for ShouldSendProfileState(). It tells PS
  // servers the trainer's profiling state so that PS can follow the
  // trainer.
T
typhoonzero 已提交
125 126 127
  request.set_profile(platform::IsProfileEnabled());
  if (!out_name.empty()) {
    request.set_out_varname(out_name);
128
  }
129
  if (var->IsType<framework::LoDTensor>()) {
T
typhoonzero 已提交
130 131
    request.set_type(::sendrecv::LOD_TENSOR);
    GetTensorPayload(var, ctx, &request, &payload, &payload_size);
132
  } else if (var->IsType<framework::SelectedRows>()) {
T
typhoonzero 已提交
133 134
    request.set_type(::sendrecv::SELECTED_ROWS);
    GetSelectedRowsPayload(var, ctx, &request, &payload, &payload_size);
T
typhoonzero 已提交
135
#ifdef PADDLE_WITH_CUDA
T
typhoonzero 已提交
136
  } else if (var->IsType<ncclUniqueId>()) {
137
    request.set_type(::sendrecv::NCCL_ID);
T
typhoonzero 已提交
138
#endif
T
typhoonzero 已提交
139 140 141
  } else {
    PADDLE_THROW("Serialize does not support type: %s",
                 typeid(var->Type()).name());
142 143
  }

T
typhoonzero 已提交
144 145 146 147
  if (platform::is_gpu_place(ctx.GetPlace())) {
    // GPU data is copied to CPU buffer when sending,
    // free the buffer when possible.
    destroy_callback = [](void* backing) {
T
typhoonzero 已提交
148
      platform::CPUPlace cpu;
T
typhoonzero 已提交
149 150
      memory::Free(cpu, backing);
    };
Y
Yancey1989 已提交
151
  }
152

T
typhoonzero 已提交
153 154 155 156 157 158
  std::string header;
  request.AppendToString(&header);
  auto buffer = std::unique_ptr<char[]>(new char[1024]);
  void* buf = buffer.get();
  ProtoEncodeHelper e(static_cast<char*>(buf), 1024);
  e.WriteRawBytes(std::string(header.data(), header.size()));
159 160
// NCCLID is copied directly to the message, return bytebuffer
// with only one slice if serializing NCCLID.
T
typhoonzero 已提交
161
#ifdef PADDLE_WITH_CUDA
162
  if (var->IsType<ncclUniqueId>()) {
T
typhoonzero 已提交
163 164
    e.WriteVarlengthBeginning(VarMsg::kSerializedFieldNumber,
                              NCCL_UNIQUE_ID_BYTES);
T
typhoonzero 已提交
165 166
    ncclUniqueId& uid = var->Get<ncclUniqueId>();
    e.WriteRawBytes(std::string(uid.internal, NCCL_UNIQUE_ID_BYTES));
167

T
typhoonzero 已提交
168 169 170 171 172 173 174
    // for serialize NCCL_ID
    ::grpc::Slice slices(e.size());
    memcpy(const_cast<uint8_t*>(slices.begin()), e.data(), e.size());
    ::grpc::ByteBuffer tmp(&slices, 1);
    msg->Swap(&tmp);
    return;
  }
T
typhoonzero 已提交
175
#endif
176

T
typhoonzero 已提交
177
  e.WriteVarlengthBeginning(VarMsg::kSerializedFieldNumber, payload_size);
178 179 180 181 182 183 184 185 186 187
  // steal reference of tensor data
  ::grpc::Slice slices[4];  // metadata, tensor, rows meta, rows
  int num_slices = 2;       // only SelectedRows have rows buffer
  slices[0] = ::grpc::Slice(e.size());
  memcpy(const_cast<uint8_t*>(slices[0].begin()), e.data(), e.size());
  slices[1] = ::grpc::Slice(
      grpc_slice_new_with_user_data(payload, payload_size, destroy_callback,
                                    static_cast<char*>(payload)),
      ::grpc::Slice::STEAL_REF);

T
typhoonzero 已提交
188
  if (var->IsType<framework::SelectedRows>()) {
189
    auto* slr = var->GetMutable<framework::SelectedRows>();
Y
Yi Wang 已提交
190
    ProtoEncodeHelper e2(static_cast<char*>(buf), 128);
191
    size_t rows_memory_size =
T
typhoonzero 已提交
192
        slr->rows().size() * framework::SizeOfType(typeid(int64_t));
193 194 195 196 197 198 199 200
    e2.WriteVarlengthBeginning(VarMsg::kRowsFieldNumber, rows_memory_size);
    slices[2] = ::grpc::Slice(e2.size());
    memcpy(const_cast<uint8_t*>(slices[2].begin()), e2.data(), e2.size());

    slices[3] = ::grpc::Slice(
        grpc_slice_new_with_user_data(
            const_cast<void*>(
                reinterpret_cast<const void*>(slr->rows().data())),
T
typhoonzero 已提交
201
            rows_memory_size, [](void* backing) {},
202 203 204 205 206 207 208 209 210 211 212 213
            const_cast<char*>(
                reinterpret_cast<const char*>(slr->rows().data()))),
        ::grpc::Slice::STEAL_REF);
    num_slices = 4;
  }

  ::grpc::ByteBuffer tmp(&slices[0], num_slices);
  msg->Swap(&tmp);
}

void DeserializeFromByteBuffer(const ::grpc::ByteBuffer& msg,
                               const platform::DeviceContext& ctx,
214
                               const framework::Scope* scope,
Y
Yi Wang 已提交
215
                               framework::Variable** var) {
216
  operators::detail::VariableResponse resp(scope, &ctx);
217
  PADDLE_ENFORCE(resp.Parse(msg) == 0, "parse bytebuffer to tensor error!");
Y
Yi Wang 已提交
218
  *var = resp.GetVar();
219 220
}

G
gongweibao 已提交
221 222
}  // namespace detail
}  // namespace operators
Y
Yancey 已提交
223
}  // namespace paddle