sendrecvop_utils.cc 4.0 KB
Newer Older
1
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.
G
gongweibao 已提交
2 3 4 5 6 7 8 9 10 11 12 13 14

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

T
typhoonzero 已提交
15
#ifdef PADDLE_WITH_CUDA
T
fix ci  
typhoonzero 已提交
16
#include <nccl.h>
T
typhoonzero 已提交
17
#endif
18
#include <sys/time.h>
Y
Yi Wang 已提交
19 20
#include <thread>  // NOLINT

21
#include "paddle/fluid/framework/data_type.h"
22
#include "paddle/fluid/operators/distributed/sendrecvop_utils.h"
23
#include "paddle/fluid/operators/distributed/variable_response.h"
G
gongweibao 已提交
24 25 26

namespace paddle {
namespace operators {
27
namespace distributed {
G
gongweibao 已提交
28

T
typhoonzero 已提交
29 30
using VarMsg = sendrecv::VariableMessage;

31
#ifdef PADDLE_WITH_CUDA
32 33 34 35
void* GetVarPayLoad(const std::string varname, int64_t size) {
  platform::CUDAPinnedPlace cuda_pinned;
  return memory::Alloc(cuda_pinned, size);
}
36
#endif
37

T
typhoonzero 已提交
38 39 40 41
void GetTensorPayload(framework::Variable* var,
                      const platform::DeviceContext& ctx, VarMsg* request,
                      void** payload, size_t* payload_size) {
  auto tensor = var->Get<framework::LoDTensor>();
T
typhoonzero 已提交
42
  // FIXME(wuyi): data types in send_recv.proto is copied from
T
typhoonzero 已提交
43
  // framework.proto
T
typhoonzero 已提交
44 45
  request->set_data_type(
      static_cast<VarMsg::Type>(framework::ToDataType(tensor.type())));
T
typhoonzero 已提交
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
  for (auto& dim : framework::vectorize(tensor.dims())) {
    request->add_dims(dim);
  }
  const framework::LoD lod = tensor.lod();
  if (lod.size() > 0) {
    request->set_lod_level(lod.size());
    for (auto& each : lod) {
      VarMsg::LodData* lod_inner = request->add_lod();
      for (auto& d : each) {
        lod_inner->add_lod_data(d);
      }
    }
  }
  if (platform::is_gpu_place(ctx.GetPlace())) {
#ifdef PADDLE_WITH_CUDA
    PADDLE_ENFORCE(platform::is_gpu_place(tensor.place()));
62
    // platform::CUDAPinnedPlace cuda_pinned;
T
typhoonzero 已提交
63 64
    auto& gpu_dev_ctx = static_cast<const platform::CUDADeviceContext&>(ctx);
    auto copy_size = tensor.numel() * framework::SizeOfType(tensor.type());
65
    *payload = GetVarPayLoad(request->varname(), copy_size);
T
typhoonzero 已提交
66

67
    platform::CUDAPinnedPlace cuda_pinned;
Y
yi.wu 已提交
68 69
    memory::Copy(cuda_pinned, *payload,
                 boost::get<platform::CUDAPlace>(tensor.place()),
T
typhoonzero 已提交
70 71
                 reinterpret_cast<const void*>(tensor.data<void>()), copy_size,
                 gpu_dev_ctx.stream());
72

T
typhoonzero 已提交
73 74 75 76 77 78 79 80 81 82 83 84
    ctx.Wait();
#endif
  } else {
    *payload = tensor.data<void>();
  }
  *payload_size = tensor.numel() * framework::SizeOfType(tensor.type());
}

void GetSelectedRowsPayload(framework::Variable* var,
                            const platform::DeviceContext& ctx, VarMsg* request,
                            void** payload, size_t* payload_size) {
  auto* slr = var->GetMutable<framework::SelectedRows>();
T
typhoonzero 已提交
85 86
  request->set_data_type(
      static_cast<VarMsg::Type>(framework::ToDataType(slr->value().type())));
T
typhoonzero 已提交
87 88 89 90 91 92 93 94 95 96 97 98
  request->set_lod_level(0);
  request->set_slr_height(slr->height());

  for (auto& dim : framework::vectorize(slr->value().dims())) {
    request->add_dims(dim);
  }

  auto* tensor = slr->mutable_value();
  if (platform::is_gpu_place(ctx.GetPlace())) {
#ifdef PADDLE_WITH_CUDA
    auto& gpu_dev_ctx = static_cast<const platform::CUDADeviceContext&>(ctx);
    auto copy_size = tensor->numel() * framework::SizeOfType(tensor->type());
99 100 101
    *payload = GetVarPayLoad(request->varname(), copy_size);

    platform::CUDAPinnedPlace cuda_pinned;
Y
yi.wu 已提交
102
    memory::Copy(cuda_pinned, *payload,
T
typhoonzero 已提交
103 104 105 106 107 108 109 110 111 112 113
                 boost::get<platform::CUDAPlace>(tensor->place()),
                 reinterpret_cast<const void*>(tensor->data<void>()), copy_size,
                 gpu_dev_ctx.stream());
    ctx.Wait();
#endif
  } else {
    *payload = slr->mutable_value()->data<void>();
  }
  *payload_size = tensor->numel() * framework::SizeOfType(tensor->type());
}

114
}  // namespace distributed
G
gongweibao 已提交
115
}  // namespace operators
Y
Yancey 已提交
116
}  // namespace paddle