sendrecvop_utils.cc 4.0 KB
Newer Older
1
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.
G
gongweibao 已提交
2 3 4 5 6 7 8 9 10 11 12 13 14

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

T
typhoonzero 已提交
15
#ifdef PADDLE_WITH_CUDA
T
fix ci  
typhoonzero 已提交
16
#include <nccl.h>
T
typhoonzero 已提交
17
#endif
18
#include <sys/time.h>
Y
Yi Wang 已提交
19 20
#include <thread>  // NOLINT

21
#include "paddle/fluid/framework/data_type.h"
22
#include "paddle/fluid/operators/distributed/sendrecvop_utils.h"
23
#include "paddle/fluid/operators/distributed/variable_response.h"
G
gongweibao 已提交
24 25 26

namespace paddle {
namespace operators {
27
namespace distributed {
G
gongweibao 已提交
28

T
typhoonzero 已提交
29 30
using VarMsg = sendrecv::VariableMessage;

31 32 33 34 35
void* GetVarPayLoad(const std::string varname, int64_t size) {
  platform::CUDAPinnedPlace cuda_pinned;
  return memory::Alloc(cuda_pinned, size);
}

T
typhoonzero 已提交
36 37 38 39
void GetTensorPayload(framework::Variable* var,
                      const platform::DeviceContext& ctx, VarMsg* request,
                      void** payload, size_t* payload_size) {
  auto tensor = var->Get<framework::LoDTensor>();
T
typhoonzero 已提交
40
  // FIXME(wuyi): data types in send_recv.proto is copied from
T
typhoonzero 已提交
41
  // framework.proto
T
typhoonzero 已提交
42 43
  request->set_data_type(
      static_cast<VarMsg::Type>(framework::ToDataType(tensor.type())));
T
typhoonzero 已提交
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
  for (auto& dim : framework::vectorize(tensor.dims())) {
    request->add_dims(dim);
  }
  const framework::LoD lod = tensor.lod();
  if (lod.size() > 0) {
    request->set_lod_level(lod.size());
    for (auto& each : lod) {
      VarMsg::LodData* lod_inner = request->add_lod();
      for (auto& d : each) {
        lod_inner->add_lod_data(d);
      }
    }
  }
  if (platform::is_gpu_place(ctx.GetPlace())) {
#ifdef PADDLE_WITH_CUDA
    PADDLE_ENFORCE(platform::is_gpu_place(tensor.place()));
60
    // platform::CUDAPinnedPlace cuda_pinned;
T
typhoonzero 已提交
61 62
    auto& gpu_dev_ctx = static_cast<const platform::CUDADeviceContext&>(ctx);
    auto copy_size = tensor.numel() * framework::SizeOfType(tensor.type());
63
    *payload = GetVarPayLoad(request->varname(), copy_size);
T
typhoonzero 已提交
64

65
    platform::CUDAPinnedPlace cuda_pinned;
Y
yi.wu 已提交
66 67
    memory::Copy(cuda_pinned, *payload,
                 boost::get<platform::CUDAPlace>(tensor.place()),
T
typhoonzero 已提交
68 69
                 reinterpret_cast<const void*>(tensor.data<void>()), copy_size,
                 gpu_dev_ctx.stream());
70

T
typhoonzero 已提交
71 72 73 74 75 76 77 78 79 80 81 82
    ctx.Wait();
#endif
  } else {
    *payload = tensor.data<void>();
  }
  *payload_size = tensor.numel() * framework::SizeOfType(tensor.type());
}

void GetSelectedRowsPayload(framework::Variable* var,
                            const platform::DeviceContext& ctx, VarMsg* request,
                            void** payload, size_t* payload_size) {
  auto* slr = var->GetMutable<framework::SelectedRows>();
T
typhoonzero 已提交
83 84
  request->set_data_type(
      static_cast<VarMsg::Type>(framework::ToDataType(slr->value().type())));
T
typhoonzero 已提交
85 86 87 88 89 90 91 92 93 94 95 96
  request->set_lod_level(0);
  request->set_slr_height(slr->height());

  for (auto& dim : framework::vectorize(slr->value().dims())) {
    request->add_dims(dim);
  }

  auto* tensor = slr->mutable_value();
  if (platform::is_gpu_place(ctx.GetPlace())) {
#ifdef PADDLE_WITH_CUDA
    auto& gpu_dev_ctx = static_cast<const platform::CUDADeviceContext&>(ctx);
    auto copy_size = tensor->numel() * framework::SizeOfType(tensor->type());
97 98 99
    *payload = GetVarPayLoad(request->varname(), copy_size);

    platform::CUDAPinnedPlace cuda_pinned;
Y
yi.wu 已提交
100
    memory::Copy(cuda_pinned, *payload,
T
typhoonzero 已提交
101 102 103 104 105 106 107 108 109 110 111
                 boost::get<platform::CUDAPlace>(tensor->place()),
                 reinterpret_cast<const void*>(tensor->data<void>()), copy_size,
                 gpu_dev_ctx.stream());
    ctx.Wait();
#endif
  } else {
    *payload = slr->mutable_value()->data<void>();
  }
  *payload_size = tensor->numel() * framework::SizeOfType(tensor->type());
}

112
}  // namespace distributed
G
gongweibao 已提交
113
}  // namespace operators
Y
Yancey 已提交
114
}  // namespace paddle