variable_response.cc 9.0 KB
Newer Older
1
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
2 3 4 5 6 7 8 9 10 11 12 13 14
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

15
#include "paddle/fluid/operators/distributed/variable_response.h"
Y
Yi Wang 已提交
16
#include <vector>
17
#include "paddle/fluid/operators/distributed/sendrecvop_utils.h"
18

Q
Qiao Longfei 已提交
19 20 21
DEFINE_string(rpc_server_profile_path, "./profile_ps",
              "the profile log file path");

22 23
namespace paddle {
namespace operators {
24
namespace distributed {
25

26 27 28 29
bool VariableResponse::ReadRaw(::google::protobuf::io::CodedInputStream* input,
                               const platform::DeviceContext& dev_ctx,
                               platform::Place place, void* dest,
                               int64_t size) {
30 31
  const void* data = NULL;
  int size_to_write = 0;
32
  int64_t length = size;
Y
yi.wu 已提交
33
  int total_written = 0;
34 35 36 37 38 39 40 41

  if (platform::is_gpu_place(place)) {
#ifdef PADDLE_WITH_CUDA
    auto& gpu_dev_ctx =
        static_cast<const platform::CUDADeviceContext&>(dev_ctx);
    platform::CPUPlace cpu;

    char* p = reinterpret_cast<char*>(dest);
Y
yi.wu 已提交
42
    while (total_written < length) {
43 44 45
      if (!input->GetDirectBufferPointer(&data, &size_to_write)) {
        return false;
      }
Y
yi.wu 已提交
46 47 48 49 50 51
      // NOTE: if raw buffer is large and have two neighbor fields of raw
      // buffers GetDirectBufferPointer can get all of them, use length to
      // truncate it.
      if (total_written + size_to_write > length) {
        size_to_write = length - total_written;
      }
G
gongweibao 已提交
52
      // This log is useful to see how long a internal block size is of rpc.
M
minqiyang 已提交
53
      VLOG(7) << "copy " << size_to_write << " data to CUDAPlace";
54
      memory::Copy(BOOST_GET_CONST(platform::CUDAPlace, place),
55 56 57
                   reinterpret_cast<void*>(p), cpu, data, size_to_write,
                   gpu_dev_ctx.stream());
      p += size_to_write;
Y
yi.wu 已提交
58
      total_written += size_to_write;
59 60 61 62 63

      input->Skip(size_to_write);
    }
    gpu_dev_ctx.Wait();
#else
M
MRXLT 已提交
64 65
    PADDLE_THROW(platform::errors::PreconditionNotMet(
        "Unexpected branch, please compile with PADDLE_WITH_CUDA"));
C
Chengmo 已提交
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
#endif
    return true;
  } else if (platform::is_xpu_place(place)) {
#ifdef PADDLE_WITH_XPU
    auto& xpu_dev_ctx = static_cast<const platform::XPUDeviceContext&>(dev_ctx);
    platform::CPUPlace cpu;
    char* p = reinterpret_cast<char*>(dest);
    while (total_written < length) {
      if (!input->GetDirectBufferPointer(&data, &size_to_write)) {
        return false;
      }

      if (total_written + size_to_write > length) {
        size_to_write = length - total_written;
      }

      memory::Copy(BOOST_GET_CONST(platform::XPUPlace, place),
                   reinterpret_cast<void*>(p), cpu, data, size_to_write);
      p += size_to_write;
      total_written += size_to_write;
      input->Skip(size_to_write);
    }
    xpu_dev_ctx.Wait();
#else
    PADDLE_ENFORCE_NOT_NULL(
        nullptr,
        platform::errors::Unimplemented(
            "Not supported XPU, please compile with option WITH_XPU=ON."));
94 95 96 97 98
#endif
    return true;
  }

  char* p = reinterpret_cast<char*>(dest);
Y
yi.wu 已提交
99
  while (total_written < length) {
100 101 102
    if (!input->GetDirectBufferPointer(&data, &size_to_write)) {
      return false;
    }
Y
yi.wu 已提交
103 104 105 106 107
    // NOTE: if raw buffer is large and have two neighbor fields of raw buffers
    // GetDirectBufferPointer can get all of them, use length to truncate it.
    if (total_written + size_to_write > length) {
      size_to_write = length - total_written;
    }
108 109
    // TODO(gongwb): can we avoid copy?
    platform::CPUPlace cpu;
G
gongweibao 已提交
110
    // This log is useful to see how long a internal block size is of rpc.
M
minqiyang 已提交
111
    VLOG(7) << "copy " << size_to_write << " data to CPUPlace";
112 113 114
    memory::Copy(cpu, reinterpret_cast<void*>(p), cpu, data, size_to_write);

    p += size_to_write;
Y
yi.wu 已提交
115
    total_written += size_to_write;
116 117 118 119 120 121 122 123 124

    input->Skip(size_to_write);
  }

  return true;
}

bool VariableResponse::CopyLodTensorData(
    ::google::protobuf::io::CodedInputStream* input,
Y
Yancey1989 已提交
125 126
    const platform::DeviceContext& ctx, const framework::DDim& dims,
    int length) {
127 128 129 130 131 132
  auto server_var = GetVar();
  if (!server_var) {
    LOG(ERROR) << "recved var should not on current server: "
               << meta_.varname();
    return false;
  }
133
  auto* tensor = GetVar()->GetMutable<framework::LoDTensor>();
134 135 136 137 138 139 140 141 142 143 144 145
  tensor->Resize(dims);
  framework::LoD lod;
  for (int i = 0; i < meta_.lod_level(); ++i) {
    framework::Vector<size_t> v;
    for (int j = 0; j < meta_.lod(i).lod_data_size(); ++j) {
      v.push_back(meta_.lod(i).lod_data(j));
    }
    lod.push_back(v);
  }
  tensor->set_lod(lod);

  void* tensor_data =
Y
Yu Yang 已提交
146
      tensor->mutable_data(ctx.GetPlace(), ToVarType(meta_.data_type()));
147

Y
Yu Yang 已提交
148
  VLOG(6) << "Tensor.memory_size = " << tensor->memory_size()
G
gongweibao 已提交
149 150
          << ", Buffer Size = " << length << ", dims:" << dims
          << ", numel:" << tensor->numel();
M
MRXLT 已提交
151 152 153 154 155
  PADDLE_ENFORCE_GE(
      tensor->memory_size(), static_cast<unsigned int>(length),
      platform::errors::InvalidArgument(
          "The memory size of tensor: %s should greater than length: %s",
          tensor->memory_size(), length));
Y
Yu Yang 已提交
156
  return ReadRaw(input, ctx, tensor->place(), tensor_data, length);
157 158 159 160 161 162 163 164 165 166 167 168 169
}

inline framework::DDim GetDims(
    const ::google::protobuf::RepeatedField<::google::protobuf::int64>& dims) {
  std::vector<int> vecdims;
  for (auto& d : dims) {
    vecdims.push_back(d);
  }
  return framework::make_ddim(vecdims);
}

bool VariableResponse::CopySelectRowsTensorData(
    ::google::protobuf::io::CodedInputStream* input,
Y
Yancey1989 已提交
170 171
    const platform::DeviceContext& ctx, const framework::DDim& dims,
    int length) {
172
  auto* slr = GetVar()->GetMutable<framework::SelectedRows>();
173
  slr->set_height(meta_.slr_height());
174 175
  auto* tensor = slr->mutable_value();
  tensor->Resize(dims);
Y
Yu Yang 已提交
176 177 178
  PADDLE_ENFORCE_EQ(
      static_cast<size_t>(tensor->numel()),
      length / framework::SizeOfType(paddle::operators::distributed::ToVarType(
M
MRXLT 已提交
179 180 181 182 183 184
                   meta_.data_type())),
      platform::errors::InvalidArgument(
          "length: %s should equal to memory size of tensor: %s", length,
          tensor->numel() *
              framework::SizeOfType(paddle::operators::distributed::ToVarType(
                  meta_.data_type()))));
185 186
  void* tensor_data = tensor->mutable_data(
      ctx.GetPlace(),
Y
Yu Yang 已提交
187
      paddle::operators::distributed::ToVarType(meta_.data_type()));
188 189 190 191 192 193 194 195 196 197 198

  if (!ReadRaw(input, ctx, tensor->place(), tensor_data, length)) {
    return false;
  }

  return true;
}

bool VariableResponse::CopySelectRowsData(
    ::google::protobuf::io::CodedInputStream* input,
    const platform::DeviceContext& ctx, int length) {
199
  auto* slr = GetVar()->GetMutable<framework::SelectedRows>();
Q
qiaolongfei 已提交
200
  slr->mutable_rows()->clear();
Y
Yu Yang 已提交
201
  slr->mutable_rows()->resize(length / sizeof(int64_t));  // int64
202 203 204 205 206 207 208 209 210 211 212
  int64_t* rows_data = slr->mutable_rows()->data();

  // copy rows CPU data, GPU data will be copied lazily.
  platform::CPUPlace cpu;
  if (!ReadRaw(input, ctx, cpu, rows_data, length)) {
    return false;
  }

  return true;
}

213 214 215
bool VariableResponse::ProcSerializedField(
    int tag, ::google::protobuf::io::CodedInputStream* input,
    int64_t num_bytes) {
M
MRXLT 已提交
216 217 218 219 220 221
  PADDLE_ENFORCE(
      (meta_.type() == sendrecv::SELECTED_ROWS ||
       meta_.type() == sendrecv::LOD_TENSOR ||
       meta_.type() == sendrecv::NCCL_ID) &&
          meta_.varname() != "",
      platform::errors::PreconditionNotMet("meta info should be got first!"));
222

223 224 225 226 227 228 229
  if (meta_.type() == sendrecv::NCCL_ID) {
#ifdef PADDLE_WITH_CUDA
    auto* var = scope_->FindVar(meta_.varname());
    if (var != nullptr) {
      ncclUniqueId* id = var->GetMutable<ncclUniqueId>();
      if (!ReadRaw(input, *dev_ctx_, platform::CPUPlace(), id->internal,
                   num_bytes)) {
230 231 232
        return false;
      }
    }
233
    return true;
T
typhoonzero 已提交
234
#else
M
MRXLT 已提交
235 236
    PADDLE_THROW(
        platform::errors::PreconditionNotMet("Please compiled with CUDA!"));
237
    return false;
T
typhoonzero 已提交
238
#endif
239
  }
Y
Yancey1989 已提交
240

M
minqiyang 已提交
241 242
  VLOG(7) << "ProcSerializedField:" << meta_.varname()
          << ", type:" << meta_.type() << std::endl;
243 244
  framework::DDim dims = GetDims(meta_.dims());
  if (meta_.type() == sendrecv::LOD_TENSOR) {
M
MRXLT 已提交
245 246 247
    PADDLE_ENFORCE_GE(
        meta_.lod_size(), 0,
        platform::errors::PreconditionNotMet("lod info should be got first!"));
248 249 250
    if (!CopyLodTensorData(input, *dev_ctx_, dims, num_bytes)) {
      return false;
    }
G
gongweibao 已提交
251

252 253
    return true;
  }
Y
Yancey1989 已提交
254

255 256 257
  if (meta_.type() == sendrecv::SELECTED_ROWS) {
    if (!CopySelectRowsTensorData(input, *dev_ctx_, dims, num_bytes)) {
      return false;
258
    }
259
    return true;
260 261
  }

M
MRXLT 已提交
262 263 264
  PADDLE_THROW(platform::errors::InvalidArgument(
      "The type: %s of var: %s is not supported", meta_.type(),
      meta_.varname()));
G
gongweibao 已提交
265 266

  return false;
267 268
}

269
};  // namespace distributed
270 271
};  // namespace operators
};  // namespace paddle