parameter_prefetch.cc 9.5 KB
Newer Older
Q
Qiao Longfei 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
//   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <set>
#include <string>
S
sneaxiy 已提交
17
#include <unordered_map>
Q
Qiao Longfei 已提交
18 19 20 21 22 23 24 25 26
#include <vector>

#include "paddle/fluid/operators/distributed/parameter_prefetch.h"

#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/framework/selected_rows.h"
#include "paddle/fluid/framework/tensor.h"

W
Wu Yi 已提交
27
#include "paddle/fluid/operators/distributed/distributed.h"
Q
Qiao Longfei 已提交
28 29 30 31 32 33 34 35
#include "paddle/fluid/operators/distributed/rpc_client.h"
#include "paddle/fluid/operators/distributed/variable_response.h"
#include "paddle/fluid/operators/distributed_ops/send_recv_util.h"

namespace paddle {
namespace operators {
namespace distributed {

36
using LoDTensor = framework::LoDTensor;
Q
Qiao Longfei 已提交
37 38 39 40
using LoDTensor = framework::LoDTensor;
using SelectedRows = framework::SelectedRows;
using DDim = framework::DDim;

Q
Qiao Longfei 已提交
41
static size_t GetSectionIndex(int64_t id,
Q
Qiao Longfei 已提交
42 43 44 45 46 47 48 49 50
                              const std::vector<int64_t>& abs_sections) {
  for (size_t i = 1; i < abs_sections.size(); ++i) {
    if (id < abs_sections[i]) {
      return i - 1;
    }
  }
  return abs_sections.size() - 1;
}

Q
Qiao Longfei 已提交
51
static std::vector<int64_t> ToAbsoluteSection(
Q
Qiao Longfei 已提交
52
    const std::vector<int>& height_sections) {
Q
Qiao Longfei 已提交
53 54 55 56 57 58 59 60 61
  std::vector<int64_t> abs_sections;
  abs_sections.resize(height_sections.size());
  abs_sections[0] = 0;
  for (size_t i = 1; i < height_sections.size(); ++i) {
    abs_sections[i] = height_sections[i - 1] + abs_sections[i - 1];
  }
  return abs_sections;
}

Q
Qiao Longfei 已提交
62
static std::vector<std::vector<int64_t>> SplitIds(
Q
Qiao Longfei 已提交
63 64
    const std::vector<int64_t>& ids_vector,
    const std::vector<int>& height_section, framework::Scope* scope) {
Q
Qiao Longfei 已提交
65
  std::set<int64_t> all_ids;
Q
Qiao Longfei 已提交
66 67
  for (auto id : ids_vector) {
    all_ids.insert(id);
Q
Qiao Longfei 已提交
68
  }
Q
Qiao Longfei 已提交
69

Q
Qiao Longfei 已提交
70 71 72 73 74 75 76 77 78 79
  auto abs_sections = ToAbsoluteSection(height_section);
  std::vector<std::vector<int64_t>> splited_ids;
  splited_ids.resize(height_section.size() + 1);
  for (auto& id : all_ids) {
    auto section_index = GetSectionIndex(id, abs_sections);
    splited_ids[section_index].push_back(id - abs_sections[section_index]);
  }
  return splited_ids;
}

Q
Qiao Longfei 已提交
80
static void SplitIdsIntoMultipleVarsBySection(
Q
Qiao Longfei 已提交
81
    const std::vector<std::string>& in_var_names,
Q
Qiao Longfei 已提交
82
    const std::vector<int>& height_section,
Q
Qiao Longfei 已提交
83 84
    const std::vector<std::vector<int64_t>>& splited_ids,
    framework::Scope* scope) {
Q
Qiao Longfei 已提交
85
  PADDLE_ENFORCE_EQ(in_var_names.size(), height_section.size(), "");
Q
Qiao Longfei 已提交
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100

  auto place = platform::CPUPlace();

  for (size_t i = 0; i < in_var_names.size(); ++i) {
    auto* id_tensor =
        scope->Var(in_var_names[i])->GetMutable<framework::LoDTensor>();
    auto& ids = splited_ids[i];
    if (!ids.empty()) {
      auto* id_tensor_data = id_tensor->mutable_data<int64_t>(
          framework::make_ddim({static_cast<int64_t>(ids.size()), 1}), place);
      memcpy(id_tensor_data, ids.data(), sizeof(int64_t) * ids.size());
    }
  }
}

Q
Qiao Longfei 已提交
101
static void MergeMultipleVarsIntoOneBySection(
Q
Qiao Longfei 已提交
102 103
    const std::string& id_name, const std::vector<int64_t>& ids_vector,
    const std::string& out_name, const std::vector<std::string>& out_var_names,
Q
Qiao Longfei 已提交
104
    const std::vector<int>& height_section,
Q
Qiao Longfei 已提交
105
    const std::vector<std::vector<int64_t>>& splited_ids,
106 107
    const framework::ExecutionContext& context, framework::Scope* scope,
    platform::DeviceContext* actual_ctx) {
Q
can run  
Qiao Longfei 已提交
108
  PADDLE_ENFORCE_EQ(out_var_names.size(), height_section.size(), "");
Q
Qiao Longfei 已提交
109 110 111 112 113

  auto cpu_place = platform::CPUPlace();

  auto abs_sections = ToAbsoluteSection(height_section);
  std::unordered_map<int64_t, std::vector<size_t>> id_to_offset;
Q
Qiao Longfei 已提交
114 115
  for (size_t i = 0; i < ids_vector.size(); ++i) {
    id_to_offset[ids_vector[i]].push_back(i);
Q
Qiao Longfei 已提交
116 117
  }

Q
Qiao Longfei 已提交
118
  auto& id_tensor = scope->FindVar(id_name)->Get<framework::LoDTensor>();
Q
Qiao Longfei 已提交
119 120
  auto* out_tensor =
      scope->FindVar(out_name)->GetMutable<framework::LoDTensor>();
T
tangwei12 已提交
121 122 123

  PADDLE_ENFORCE_GT(
      out_tensor->numel(), 0,
124 125
      "When calling this method, the LoDTensor's numel must larger than zero. "
      "Please check LoDTensor::Resize has been called first.");
T
tangwei12 已提交
126

Q
Qiao Longfei 已提交
127 128 129 130 131 132
  auto* out_tensor_data = out_tensor->mutable_data<float>(id_tensor.place());

  bool is_on_cpu_place = true;
  if (!platform::is_cpu_place(id_tensor.place())) {
    is_on_cpu_place = false;
  }
Q
Qiao Longfei 已提交
133 134 135 136

  for (size_t section_idx = 0; section_idx < out_var_names.size();
       ++section_idx) {
    auto& ids_in_this_section = splited_ids[section_idx];
Q
Qiao Longfei 已提交
137 138 139 140 141 142 143 144 145 146 147
    if (!ids_in_this_section.empty()) {
      auto& prefetch_out_var =
          scope->Var(out_var_names[section_idx])->Get<framework::LoDTensor>();
      const auto* out_var_data = prefetch_out_var.data<float>();
      auto& dims = prefetch_out_var.dims();

      PADDLE_ENFORCE_EQ(dims.size(), 2, "");
      PADDLE_ENFORCE_EQ(ids_in_this_section.size(), dims[0]);

      auto row_numel = dims[1];

148
      for (int64_t i = 0; i < dims[0]; ++i) {
Q
Qiao Longfei 已提交
149 150 151 152 153
        auto id = ids_in_this_section[i];
        auto origin_id = id + abs_sections[section_idx];
        auto& offsets = id_to_offset[origin_id];
        for (auto& offset : offsets) {
          // should support GPU tensor
Q
Qiao Longfei 已提交
154 155 156 157 158 159 160 161
          if (is_on_cpu_place) {
            memory::Copy(cpu_place, out_tensor_data + offset * row_numel,
                         cpu_place, out_var_data + i * row_numel,
                         sizeof(float) * row_numel);
          } else {
#ifndef PADDLE_WITH_CUDA
            PADDLE_THROW("paddle is not compiled with CUDA!");
#else
162 163
            auto stream =
                static_cast<platform::CUDADeviceContext*>(actual_ctx)->stream();
Q
Qiao Longfei 已提交
164 165 166
            memory::Copy(boost::get<platform::CUDAPlace>(id_tensor.place()),
                         out_tensor_data + offset * row_numel, cpu_place,
                         out_var_data + i * row_numel,
167
                         sizeof(float) * row_numel, stream);
Q
Qiao Longfei 已提交
168 169
#endif
          }
Q
Qiao Longfei 已提交
170
        }
Q
Qiao Longfei 已提交
171
      }
Q
Qiao Longfei 已提交
172
    } else {
173
      VLOG(3) << "ids in this section is empty";
Q
Qiao Longfei 已提交
174 175 176 177 178
    }
  }
}

void prefetch(const std::string& id_name, const std::string& out_name,
Q
Qiao Longfei 已提交
179
              const std::vector<std::string>& table_names,
Q
Qiao Longfei 已提交
180
              const std::vector<std::string>& epmap,
Q
Qiao Longfei 已提交
181
              const std::vector<int>& height_sections,
T
tangwei12 已提交
182 183
              const framework::ExecutionContext& context,
              const framework::Scope& scope) {
T
tangwei12 已提交
184
  auto& local_scope = scope.NewScope();
Q
Qiao Longfei 已提交
185 186

  platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance();
Q
Qiao Longfei 已提交
187
  auto& cpu_ctx = *pool.Get(platform::CPUPlace());
188
  auto& actual_ctx = *pool.Get(context.GetPlace());
Q
Qiao Longfei 已提交
189 190 191 192 193 194 195 196 197 198 199 200

  distributed::RPCClient* rpc_client =
      distributed::RPCClient::GetInstance<RPCCLIENT_T>(
          context.Attr<int>("trainer_id"));

  std::vector<std::string> in_var_names;
  std::vector<std::string> out_var_names;
  for (size_t i = 0; i < epmap.size(); ++i) {
    in_var_names.push_back(id_name + "@" + epmap[i]);
    out_var_names.push_back(out_name + "@" + epmap[i]);
  }

T
tangwei12 已提交
201
  auto& id_tensor = scope.FindVar(id_name)->Get<framework::LoDTensor>();
Q
Qiao Longfei 已提交
202 203 204
  std::vector<int64_t> ids_vector;
  if (platform::is_cpu_place(id_tensor.place())) {
    auto* id_data = id_tensor.data<int64_t>();
205
    for (int64_t i = 0; i < id_tensor.numel(); ++i) {
Q
Qiao Longfei 已提交
206 207 208 209 210 211 212
      ids_vector.push_back(id_data[i]);
    }
  } else {
#ifndef PADDLE_WITH_CUDA
    PADDLE_THROW("paddle is not compiled with CUDA!");
#else
    auto cpu_place = platform::CPUPlace();
213
    framework::LoDTensor cpu_tensor;
Q
Qiao Longfei 已提交
214 215
    auto* cpu_tensor_data =
        cpu_tensor.mutable_data<int64_t>(id_tensor.dims(), cpu_place);
216 217
    auto stream =
        static_cast<platform::CUDADeviceContext*>(&actual_ctx)->stream();
Q
Qiao Longfei 已提交
218 219
    memory::Copy(cpu_place, cpu_tensor_data,
                 boost::get<platform::CUDAPlace>(id_tensor.place()),
220 221
                 id_tensor.data<int64_t>(), sizeof(int64_t) * id_tensor.numel(),
                 stream);
S
sneaxiy 已提交
222
    for (int64_t i = 0; i < cpu_tensor.numel(); ++i) {
Q
Qiao Longfei 已提交
223 224 225 226 227 228 229 230
      ids_vector.push_back(cpu_tensor_data[i]);
    }
#endif
  }

  auto splited_ids = SplitIds(ids_vector, height_sections, &local_scope);
  SplitIdsIntoMultipleVarsBySection(in_var_names, height_sections, splited_ids,
                                    &local_scope);
Q
Qiao Longfei 已提交
231 232 233 234 235 236 237 238 239

  // create output var in local scope
  for (auto& name : out_var_names) {
    local_scope.Var(name)->GetMutable<framework::LoDTensor>();
  }

  std::vector<distributed::VarHandlePtr> rets;
  for (size_t i = 0; i < in_var_names.size(); i++) {
    if (NeedSend(local_scope, in_var_names[i])) {
240
      VLOG(3) << "sending " << in_var_names[i] << " to " << epmap[i]
Q
Qiao Longfei 已提交
241
              << " to get " << out_var_names[i] << " back";
Q
Qiao Longfei 已提交
242
      rets.push_back(rpc_client->AsyncPrefetchVar(
Q
Qiao Longfei 已提交
243
          epmap[i], cpu_ctx, local_scope, in_var_names[i], out_var_names[i],
Q
Qiao Longfei 已提交
244
          table_names[i]));
Q
Qiao Longfei 已提交
245
    } else {
246
      VLOG(3) << "don't send no-initialied variable: " << out_var_names[i];
Q
Qiao Longfei 已提交
247 248
    }
  }
Q
Qiao Longfei 已提交
249

Q
Qiao Longfei 已提交
250 251 252 253
  for (size_t i = 0; i < rets.size(); i++) {
    PADDLE_ENFORCE(rets[i]->Wait(), "internal error in RPCClient");
  }

Q
Qiao Longfei 已提交
254 255
  MergeMultipleVarsIntoOneBySection(id_name, ids_vector, out_name,
                                    out_var_names, height_sections, splited_ids,
256
                                    context, &local_scope, &actual_ctx);
T
tangwei12 已提交
257
  scope.DeleteScope(&local_scope);
Q
Qiao Longfei 已提交
258 259 260 261 262
}

};  // namespace distributed
};  // namespace operators
};  // namespace paddle