sendrecvop_utils.cc 4.0 KB
Newer Older
1
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.
G
gongweibao 已提交
2 3 4 5 6 7 8 9 10 11 12 13

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
14
#include <memory>
Y
Yi Wang 已提交
15

16
#include "paddle/fluid/operators/distributed/sendrecvop_utils.h"
W
wanghuancoder 已提交
17 18 19 20 21 22 23 24 25 26 27

namespace paddle {
namespace framework {
class Variable;
}  // namespace framework
namespace memory {
namespace allocation {
class Allocation;
}  // namespace allocation
}  // namespace memory
}  // namespace paddle
G
gongweibao 已提交
28

29
DEFINE_bool(rpc_disable_reuse_port, false, "Disable SO_REUSEPORT or not.");
30 31
DEFINE_int32(rpc_retry_bind_port, 3,
             "Retry to bind the address if address is already used.");
32

G
gongweibao 已提交
33 34
namespace paddle {
namespace operators {
35
namespace distributed {
G
gongweibao 已提交
36

T
typhoonzero 已提交
37 38
using VarMsg = sendrecv::VariableMessage;

Y
Yu Yang 已提交
39
static TensorPayload GetCommunicationAllocationFromTensor(
Y
Yu Yang 已提交
40 41
    const platform::DeviceContext& ctx, const framework::Tensor& tensor) {
  if (is_gpu_place(ctx.GetPlace())) {
42
#ifdef PADDLE_WITH_CUDA
Y
Yu Yang 已提交
43 44 45 46 47
    PADDLE_ENFORCE(is_gpu_place(tensor.place()));
    auto& gpu_dev_ctx =
        reinterpret_cast<const platform::CUDADeviceContext&>(ctx);
    auto copy_size = tensor.numel() * framework::SizeOfType(tensor.type());
    platform::CUDAPinnedPlace cuda_pinned;
48
    auto result = memory::AllocShared(cuda_pinned, copy_size);
49

Y
Yu Yang 已提交
50
    memory::Copy(cuda_pinned, result->ptr(),
51
                 BOOST_GET_CONST(platform::CUDAPlace, tensor.place()),
Y
Yu Yang 已提交
52
                 tensor.data<void>(), copy_size, gpu_dev_ctx.stream());
Y
Yu Yang 已提交
53
    ctx.Wait();
Y
Yu Yang 已提交
54
    return TensorPayload(result);
Y
Yu Yang 已提交
55
#else
Y
Yu Yang 已提交
56
    PADDLE_THROW("This situation should not be happened");
Y
Yu Yang 已提交
57 58
#endif
  } else {
Y
Yu Yang 已提交
59
    return TensorPayload(tensor);
Y
Yu Yang 已提交
60 61
  }
}
Y
Yu Yang 已提交
62 63 64
TensorPayload GetTensorPayload(framework::Variable* var,
                               const platform::DeviceContext& ctx,
                               VarMsg* request) {
T
typhoonzero 已提交
65
  auto tensor = var->Get<framework::LoDTensor>();
T
typhoonzero 已提交
66
  // FIXME(wuyi): data types in send_recv.proto is copied from
T
typhoonzero 已提交
67
  // framework.proto
Y
Yu Yang 已提交
68
  request->set_data_type(static_cast<VarMsg::Type>(tensor.type()));
T
typhoonzero 已提交
69 70 71 72 73 74 75 76 77 78 79 80 81
  for (auto& dim : framework::vectorize(tensor.dims())) {
    request->add_dims(dim);
  }
  const framework::LoD lod = tensor.lod();
  if (lod.size() > 0) {
    request->set_lod_level(lod.size());
    for (auto& each : lod) {
      VarMsg::LodData* lod_inner = request->add_lod();
      for (auto& d : each) {
        lod_inner->add_lod_data(d);
      }
    }
  }
Y
Yu Yang 已提交
82
  return GetCommunicationAllocationFromTensor(ctx, tensor);
T
typhoonzero 已提交
83 84
}

Y
Yu Yang 已提交
85 86 87
TensorPayload GetSelectedRowsPayload(framework::Variable* var,
                                     const platform::DeviceContext& ctx,
                                     VarMsg* request) {
T
typhoonzero 已提交
88
  auto* slr = var->GetMutable<framework::SelectedRows>();
Y
Yu Yang 已提交
89
  request->set_data_type(static_cast<VarMsg::Type>(slr->value().type()));
T
typhoonzero 已提交
90 91 92 93 94 95 96 97
  request->set_lod_level(0);
  request->set_slr_height(slr->height());

  for (auto& dim : framework::vectorize(slr->value().dims())) {
    request->add_dims(dim);
  }

  auto* tensor = slr->mutable_value();
Y
Yu Yang 已提交
98
  return GetCommunicationAllocationFromTensor(ctx, *tensor);
T
typhoonzero 已提交
99 100
}

Y
Yu Yang 已提交
101 102 103 104 105 106 107 108 109 110 111
TensorPayload::TensorPayload(std::shared_ptr<memory::Allocation> allocation)
    : allocation_(allocation), offset_(0), memory_size_(allocation->size()) {}
TensorPayload::TensorPayload(const framework::Tensor& tensor)
    : allocation_(tensor.Holder()),
      offset_(tensor.offset()),
      memory_size_(tensor.numel() * framework::SizeOfType(tensor.type())) {}
void* TensorPayload::ptr() const {
  return reinterpret_cast<void*>(
      reinterpret_cast<uintptr_t>(allocation_->ptr()) + offset_);
}
size_t TensorPayload::memory_size() const { return memory_size_; }
112
}  // namespace distributed
G
gongweibao 已提交
113
}  // namespace operators
Y
Yancey 已提交
114
}  // namespace paddle