sendrecvop_utils.cc 4.2 KB
Newer Older
1
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.
G
gongweibao 已提交
2 3 4 5 6 7 8 9 10 11 12 13

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
14
#include <memory>
Y
Yi Wang 已提交
15

16
#include "paddle/fluid/operators/distributed/sendrecvop_utils.h"
W
wanghuancoder 已提交
17 18 19 20 21 22 23 24 25 26 27

namespace paddle {
namespace framework {
class Variable;
}  // namespace framework
namespace memory {
namespace allocation {
class Allocation;
}  // namespace allocation
}  // namespace memory
}  // namespace paddle
G
gongweibao 已提交
28

29
DEFINE_bool(rpc_disable_reuse_port, false, "Disable SO_REUSEPORT or not.");
30 31
DEFINE_int32(rpc_retry_bind_port, 3,
             "Retry to bind the address if address is already used.");
32

G
gongweibao 已提交
33 34
namespace paddle {
namespace operators {
35
namespace distributed {
G
gongweibao 已提交
36

T
typhoonzero 已提交
37 38
using VarMsg = sendrecv::VariableMessage;

Y
Yu Yang 已提交
39
static TensorPayload GetCommunicationAllocationFromTensor(
Y
Yu Yang 已提交
40 41
    const platform::DeviceContext& ctx, const framework::Tensor& tensor) {
  if (is_gpu_place(ctx.GetPlace())) {
42
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
M
MRXLT 已提交
43 44 45
    PADDLE_ENFORCE_EQ(
        is_gpu_place(tensor.place()), true,
        platform::errors::PreconditionNotMet("Please run in gpu place."));
Y
Yu Yang 已提交
46 47 48 49
    auto& gpu_dev_ctx =
        reinterpret_cast<const platform::CUDADeviceContext&>(ctx);
    auto copy_size = tensor.numel() * framework::SizeOfType(tensor.type());
    platform::CUDAPinnedPlace cuda_pinned;
50
    auto result = memory::AllocShared(cuda_pinned, copy_size);
51

Y
Yu Yang 已提交
52
    memory::Copy(cuda_pinned, result->ptr(),
53
                 BOOST_GET_CONST(platform::CUDAPlace, tensor.place()),
Y
Yu Yang 已提交
54
                 tensor.data<void>(), copy_size, gpu_dev_ctx.stream());
Y
Yu Yang 已提交
55
    ctx.Wait();
Y
Yu Yang 已提交
56
    return TensorPayload(result);
Y
Yu Yang 已提交
57
#else
M
MRXLT 已提交
58 59
    PADDLE_THROW(
        platform::errors::Unavailable("This situation should not be happened"));
Y
Yu Yang 已提交
60 61
#endif
  } else {
Y
Yu Yang 已提交
62
    return TensorPayload(tensor);
Y
Yu Yang 已提交
63 64
  }
}
Y
Yu Yang 已提交
65 66 67
TensorPayload GetTensorPayload(framework::Variable* var,
                               const platform::DeviceContext& ctx,
                               VarMsg* request) {
T
typhoonzero 已提交
68
  auto tensor = var->Get<framework::LoDTensor>();
T
typhoonzero 已提交
69
  // FIXME(wuyi): data types in send_recv.proto is copied from
T
typhoonzero 已提交
70
  // framework.proto
Y
Yu Yang 已提交
71
  request->set_data_type(static_cast<VarMsg::Type>(tensor.type()));
T
typhoonzero 已提交
72 73 74 75 76 77 78 79 80 81 82 83 84
  for (auto& dim : framework::vectorize(tensor.dims())) {
    request->add_dims(dim);
  }
  const framework::LoD lod = tensor.lod();
  if (lod.size() > 0) {
    request->set_lod_level(lod.size());
    for (auto& each : lod) {
      VarMsg::LodData* lod_inner = request->add_lod();
      for (auto& d : each) {
        lod_inner->add_lod_data(d);
      }
    }
  }
Y
Yu Yang 已提交
85
  return GetCommunicationAllocationFromTensor(ctx, tensor);
T
typhoonzero 已提交
86 87
}

Y
Yu Yang 已提交
88 89 90
TensorPayload GetSelectedRowsPayload(framework::Variable* var,
                                     const platform::DeviceContext& ctx,
                                     VarMsg* request) {
T
typhoonzero 已提交
91
  auto* slr = var->GetMutable<framework::SelectedRows>();
Y
Yu Yang 已提交
92
  request->set_data_type(static_cast<VarMsg::Type>(slr->value().type()));
T
typhoonzero 已提交
93 94 95 96 97 98 99 100
  request->set_lod_level(0);
  request->set_slr_height(slr->height());

  for (auto& dim : framework::vectorize(slr->value().dims())) {
    request->add_dims(dim);
  }

  auto* tensor = slr->mutable_value();
Y
Yu Yang 已提交
101
  return GetCommunicationAllocationFromTensor(ctx, *tensor);
T
typhoonzero 已提交
102 103
}

Y
Yu Yang 已提交
104 105 106 107 108 109 110 111 112 113 114
TensorPayload::TensorPayload(std::shared_ptr<memory::Allocation> allocation)
    : allocation_(allocation), offset_(0), memory_size_(allocation->size()) {}
TensorPayload::TensorPayload(const framework::Tensor& tensor)
    : allocation_(tensor.Holder()),
      offset_(tensor.offset()),
      memory_size_(tensor.numel() * framework::SizeOfType(tensor.type())) {}
void* TensorPayload::ptr() const {
  return reinterpret_cast<void*>(
      reinterpret_cast<uintptr_t>(allocation_->ptr()) + offset_);
}
size_t TensorPayload::memory_size() const { return memory_size_; }
115
}  // namespace distributed
G
gongweibao 已提交
116
}  // namespace operators
Y
Yancey 已提交
117
}  // namespace paddle