variable_response.cc 8.9 KB
Newer Older
1
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
2 3 4 5 6 7 8 9 10 11 12 13 14
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

15
#include "paddle/fluid/operators/distributed/variable_response.h"
Y
Yi Wang 已提交
16
#include <vector>
17
#include "paddle/fluid/operators/distributed/sendrecvop_utils.h"
C
chengmo 已提交
18
#include "paddle/fluid/platform/profiler.h"
19

Q
Qiao Longfei 已提交
20 21 22
DEFINE_string(rpc_server_profile_path, "./profile_ps",
              "the profile log file path");

23 24
namespace paddle {
namespace operators {
25
namespace distributed {
26

27 28 29 30
bool VariableResponse::ReadRaw(::google::protobuf::io::CodedInputStream* input,
                               const platform::DeviceContext& dev_ctx,
                               platform::Place place, void* dest,
                               int64_t size) {
C
chengmo 已提交
31 32
  platform::RecordEvent record_event("VariableResponse::ReadRaw",
                                     platform::EventRole::kInnerOp);
33 34
  const void* data = NULL;
  int size_to_write = 0;
35
  int64_t length = size;
Y
yi.wu 已提交
36
  int total_written = 0;
37 38 39 40 41 42 43 44

  if (platform::is_gpu_place(place)) {
#ifdef PADDLE_WITH_CUDA
    auto& gpu_dev_ctx =
        static_cast<const platform::CUDADeviceContext&>(dev_ctx);
    platform::CPUPlace cpu;

    char* p = reinterpret_cast<char*>(dest);
Y
yi.wu 已提交
45
    while (total_written < length) {
46 47 48
      if (!input->GetDirectBufferPointer(&data, &size_to_write)) {
        return false;
      }
Y
yi.wu 已提交
49 50 51 52 53 54
      // NOTE: if raw buffer is large and have two neighbor fields of raw
      // buffers GetDirectBufferPointer can get all of them, use length to
      // truncate it.
      if (total_written + size_to_write > length) {
        size_to_write = length - total_written;
      }
G
gongweibao 已提交
55
      // This log is useful to see how long a internal block size is of rpc.
M
minqiyang 已提交
56
      VLOG(7) << "copy " << size_to_write << " data to CUDAPlace";
57
      memory::Copy(BOOST_GET_CONST(platform::CUDAPlace, place),
58 59 60
                   reinterpret_cast<void*>(p), cpu, data, size_to_write,
                   gpu_dev_ctx.stream());
      p += size_to_write;
Y
yi.wu 已提交
61
      total_written += size_to_write;
62 63 64 65 66 67

      input->Skip(size_to_write);
    }
    gpu_dev_ctx.Wait();
#else
    PADDLE_THROW("Unexpected branch");
C
Chengmo 已提交
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
#endif
    return true;
  } else if (platform::is_xpu_place(place)) {
#ifdef PADDLE_WITH_XPU
    auto& xpu_dev_ctx = static_cast<const platform::XPUDeviceContext&>(dev_ctx);
    platform::CPUPlace cpu;
    char* p = reinterpret_cast<char*>(dest);
    while (total_written < length) {
      if (!input->GetDirectBufferPointer(&data, &size_to_write)) {
        return false;
      }

      if (total_written + size_to_write > length) {
        size_to_write = length - total_written;
      }

      memory::Copy(BOOST_GET_CONST(platform::XPUPlace, place),
                   reinterpret_cast<void*>(p), cpu, data, size_to_write);
      p += size_to_write;
      total_written += size_to_write;
      input->Skip(size_to_write);
    }
    xpu_dev_ctx.Wait();
#else
    PADDLE_ENFORCE_NOT_NULL(
        nullptr,
        platform::errors::Unimplemented(
            "Not supported XPU, please compile with option WITH_XPU=ON."));
96 97 98 99 100
#endif
    return true;
  }

  char* p = reinterpret_cast<char*>(dest);
Y
yi.wu 已提交
101
  while (total_written < length) {
102 103 104
    if (!input->GetDirectBufferPointer(&data, &size_to_write)) {
      return false;
    }
Y
yi.wu 已提交
105 106 107 108 109
    // NOTE: if raw buffer is large and have two neighbor fields of raw buffers
    // GetDirectBufferPointer can get all of them, use length to truncate it.
    if (total_written + size_to_write > length) {
      size_to_write = length - total_written;
    }
110 111
    // TODO(gongwb): can we avoid copy?
    platform::CPUPlace cpu;
G
gongweibao 已提交
112
    // This log is useful to see how long a internal block size is of rpc.
M
minqiyang 已提交
113
    VLOG(7) << "copy " << size_to_write << " data to CPUPlace";
114 115 116
    memory::Copy(cpu, reinterpret_cast<void*>(p), cpu, data, size_to_write);

    p += size_to_write;
Y
yi.wu 已提交
117
    total_written += size_to_write;
118 119 120 121 122 123 124 125 126

    input->Skip(size_to_write);
  }

  return true;
}

bool VariableResponse::CopyLodTensorData(
    ::google::protobuf::io::CodedInputStream* input,
Y
Yancey1989 已提交
127 128
    const platform::DeviceContext& ctx, const framework::DDim& dims,
    int length) {
C
chengmo 已提交
129 130
  platform::RecordEvent record_event("VariableResponse::CopyLodTensorData",
                                     platform::EventRole::kInnerOp);
131 132 133 134 135 136
  auto server_var = GetVar();
  if (!server_var) {
    LOG(ERROR) << "recved var should not on current server: "
               << meta_.varname();
    return false;
  }
137
  auto* tensor = GetVar()->GetMutable<framework::LoDTensor>();
138 139 140 141 142 143 144 145 146 147 148 149
  tensor->Resize(dims);
  framework::LoD lod;
  for (int i = 0; i < meta_.lod_level(); ++i) {
    framework::Vector<size_t> v;
    for (int j = 0; j < meta_.lod(i).lod_data_size(); ++j) {
      v.push_back(meta_.lod(i).lod_data(j));
    }
    lod.push_back(v);
  }
  tensor->set_lod(lod);

  void* tensor_data =
Y
Yu Yang 已提交
150
      tensor->mutable_data(ctx.GetPlace(), ToVarType(meta_.data_type()));
151

Y
Yu Yang 已提交
152
  VLOG(6) << "Tensor.memory_size = " << tensor->memory_size()
G
gongweibao 已提交
153 154 155
          << ", Buffer Size = " << length << ", dims:" << dims
          << ", numel:" << tensor->numel();
  PADDLE_ENFORCE_GE(tensor->memory_size(), static_cast<unsigned int>(length));
Y
Yu Yang 已提交
156
  return ReadRaw(input, ctx, tensor->place(), tensor_data, length);
157 158 159 160 161 162 163 164 165 166 167 168 169
}

inline framework::DDim GetDims(
    const ::google::protobuf::RepeatedField<::google::protobuf::int64>& dims) {
  std::vector<int> vecdims;
  for (auto& d : dims) {
    vecdims.push_back(d);
  }
  return framework::make_ddim(vecdims);
}

bool VariableResponse::CopySelectRowsTensorData(
    ::google::protobuf::io::CodedInputStream* input,
Y
Yancey1989 已提交
170 171
    const platform::DeviceContext& ctx, const framework::DDim& dims,
    int length) {
C
chengmo 已提交
172 173 174
  platform::RecordEvent record_event(
      "VariableResponse::CopySelectRowsTensorData",
      platform::EventRole::kInnerOp);
175
  auto* slr = GetVar()->GetMutable<framework::SelectedRows>();
176
  slr->set_height(meta_.slr_height());
177 178
  auto* tensor = slr->mutable_value();
  tensor->Resize(dims);
Y
Yu Yang 已提交
179 180 181 182
  PADDLE_ENFORCE_EQ(
      static_cast<size_t>(tensor->numel()),
      length / framework::SizeOfType(paddle::operators::distributed::ToVarType(
                   meta_.data_type())));
183 184
  void* tensor_data = tensor->mutable_data(
      ctx.GetPlace(),
Y
Yu Yang 已提交
185
      paddle::operators::distributed::ToVarType(meta_.data_type()));
186 187 188 189 190 191 192 193 194 195 196

  if (!ReadRaw(input, ctx, tensor->place(), tensor_data, length)) {
    return false;
  }

  return true;
}

bool VariableResponse::CopySelectRowsData(
    ::google::protobuf::io::CodedInputStream* input,
    const platform::DeviceContext& ctx, int length) {
C
chengmo 已提交
197 198
  platform::RecordEvent record_event("VariableResponse::CopySelectRowsData",
                                     platform::EventRole::kInnerOp);
199
  auto* slr = GetVar()->GetMutable<framework::SelectedRows>();
Q
qiaolongfei 已提交
200
  slr->mutable_rows()->clear();
Y
Yu Yang 已提交
201
  slr->mutable_rows()->resize(length / sizeof(int64_t));  // int64
202 203 204 205 206 207 208 209 210 211 212
  int64_t* rows_data = slr->mutable_rows()->data();

  // copy rows CPU data, GPU data will be copied lazily.
  platform::CPUPlace cpu;
  if (!ReadRaw(input, ctx, cpu, rows_data, length)) {
    return false;
  }

  return true;
}

213 214 215 216 217 218 219 220
bool VariableResponse::ProcSerializedField(
    int tag, ::google::protobuf::io::CodedInputStream* input,
    int64_t num_bytes) {
  PADDLE_ENFORCE((meta_.type() == sendrecv::SELECTED_ROWS ||
                  meta_.type() == sendrecv::LOD_TENSOR ||
                  meta_.type() == sendrecv::NCCL_ID) &&
                     meta_.varname() != "",
                 "meta info should be got first!");
221

222 223 224 225 226 227 228
  if (meta_.type() == sendrecv::NCCL_ID) {
#ifdef PADDLE_WITH_CUDA
    auto* var = scope_->FindVar(meta_.varname());
    if (var != nullptr) {
      ncclUniqueId* id = var->GetMutable<ncclUniqueId>();
      if (!ReadRaw(input, *dev_ctx_, platform::CPUPlace(), id->internal,
                   num_bytes)) {
229 230 231
        return false;
      }
    }
232
    return true;
T
typhoonzero 已提交
233
#else
234 235
    PADDLE_THROW("Not compiled with CUDA!");
    return false;
T
typhoonzero 已提交
236
#endif
237
  }
Y
Yancey1989 已提交
238

M
minqiyang 已提交
239 240
  VLOG(7) << "ProcSerializedField:" << meta_.varname()
          << ", type:" << meta_.type() << std::endl;
241 242 243 244 245 246
  framework::DDim dims = GetDims(meta_.dims());
  if (meta_.type() == sendrecv::LOD_TENSOR) {
    PADDLE_ENFORCE(meta_.lod_size() >= 0, "lod info should be got first!");
    if (!CopyLodTensorData(input, *dev_ctx_, dims, num_bytes)) {
      return false;
    }
G
gongweibao 已提交
247

248 249
    return true;
  }
Y
Yancey1989 已提交
250

251 252 253
  if (meta_.type() == sendrecv::SELECTED_ROWS) {
    if (!CopySelectRowsTensorData(input, *dev_ctx_, dims, num_bytes)) {
      return false;
254
    }
255
    return true;
256 257
  }

G
gongweibao 已提交
258 259 260
  PADDLE_ENFORCE("not supported var types:", meta_.varname(), meta_.type());

  return false;
261 262
}

263
};  // namespace distributed
264 265
};  // namespace operators
};  // namespace paddle