parameter_prefetch.cc 11.4 KB
Newer Older
Q
Qiao Longfei 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
//   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

W
wanghuancoder 已提交
15
#include "paddle/fluid/operators/distributed/parameter_prefetch.h"
Q
Qiao Longfei 已提交
16
#include <memory>
Q
Qiao Longfei 已提交
17
#include <set>
Q
Qiao Longfei 已提交
18
#include <unordered_map>
19
#include <unordered_set>
Q
Qiao Longfei 已提交
20 21
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/selected_rows.h"
W
Wu Yi 已提交
22
#include "paddle/fluid/operators/distributed/distributed.h"
W
wanghuancoder 已提交
23 24 25 26 27 28 29

namespace paddle {
namespace framework {
class ExecutionContext;
class Scope;
}  // namespace framework
}  // namespace paddle
Q
Qiao Longfei 已提交
30 31 32 33 34

namespace paddle {
namespace operators {
namespace distributed {

W
wanghuancoder 已提交
35 36
class RPCClient;

37
using LoDTensor = framework::LoDTensor;
Q
Qiao Longfei 已提交
38 39 40 41
using LoDTensor = framework::LoDTensor;
using SelectedRows = framework::SelectedRows;
using DDim = framework::DDim;

Q
Qiao Longfei 已提交
42
static void SplitIdsIntoMultipleVarsBySection(
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
    const std::vector<int64_t> &in_ids,
    const std::vector<std::string> &in_varnames, const int tables,
    const int pservers, const bool is_distibuted, framework::Scope *scope,
    std::vector<std::vector<int64_t>> *splited_ids,
    std::vector<std::vector<int64_t>> *origin_ids) {
  PADDLE_ENFORCE_EQ(
      in_varnames.size(), tables,
      platform::errors::OutOfRange(
          "send varnames size: %d not equal table number: %d, internal error",
          in_varnames.size(), tables));

  PADDLE_ENFORCE_LE(
      tables, pservers,
      platform::errors::OutOfRange("table number %d not equal or less than "
                                   "pserver number: %d, internal error",
                                   tables, pservers));
Q
Qiao Longfei 已提交
59 60 61

  auto place = platform::CPUPlace();

62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
  std::set<int64_t> st(in_ids.begin(), in_ids.end());
  std::vector<int64_t> all_ids;
  all_ids.assign(st.begin(), st.end());

  splited_ids->resize(tables);
  origin_ids->resize(tables);

  if (is_distibuted) {
    for (auto &id : all_ids) {
      auto pserver_id = id % pservers;
      (*splited_ids)[pserver_id].push_back(id);
      (*origin_ids)[pserver_id].push_back(id);
    }
  } else {
    for (auto &id : all_ids) {
      auto pserver_id = id % pservers;
      (*origin_ids)[pserver_id].push_back(id);
      id = id / pservers;
      (*splited_ids)[pserver_id].push_back(id);
    }
  }

  for (size_t i = 0; i < in_varnames.size(); ++i) {
    auto *id_tensor =
        scope->Var(in_varnames[i])->GetMutable<framework::LoDTensor>();

    auto &ids = (*splited_ids)[i];
Q
Qiao Longfei 已提交
89
    if (!ids.empty()) {
90
      auto *id_tensor_data = id_tensor->mutable_data<int64_t>(
Q
Qiao Longfei 已提交
91 92 93 94 95 96
          framework::make_ddim({static_cast<int64_t>(ids.size()), 1}), place);
      memcpy(id_tensor_data, ids.data(), sizeof(int64_t) * ids.size());
    }
  }
}

97
typedef std::vector<std::pair<std::string, std::string>> TableAndEndpoints;
Q
Qiao Longfei 已提交
98

99
void prefetch_core(
100 101 102 103 104 105 106 107 108 109 110
    const std::vector<int64_t> &ids, const TableAndEndpoints &tables,
    const framework::ExecutionContext &context, const framework::Scope &scope,
    const bool is_distributed,
    std::unordered_map<int64_t, std::vector<float>> *recved_vec_map) {
  distributed::RPCClient *rpc_client =
      distributed::RPCClient::GetInstance<RPCCLIENT_T>(
          context.Attr<int>("trainer_id"));

  int pservers = context.Attr<int>("pserver_num");

  platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance();
C
Chengmo 已提交
111
  auto &actual_ctx = *pool.Get(platform::CPUPlace());
Q
Qiao Longfei 已提交
112

113 114 115 116 117 118 119
  std::unique_ptr<framework::Scope> local_scope = scope.NewTmpScope();

  std::vector<std::string> in_var_names;
  std::vector<std::string> out_var_names;
  for (size_t i = 0; i < tables.size(); ++i) {
    in_var_names.push_back("prefetch_send@" + tables[i].second);
    out_var_names.push_back("prefetch_recv@" + tables[i].second);
Q
Qiao Longfei 已提交
120 121
  }

122 123 124 125 126
  std::vector<std::vector<int64_t>> split_ids;
  std::vector<std::vector<int64_t>> origin_ids;
  SplitIdsIntoMultipleVarsBySection(ids, in_var_names, tables.size(), pservers,
                                    is_distributed, local_scope.get(),
                                    &split_ids, &origin_ids);
127 128

  // create output var in local scope
129
  for (auto &name : out_var_names) {
130 131
    local_scope->Var(name)->GetMutable<framework::LoDTensor>();
  }
T
tangwei12 已提交
132

133 134 135 136 137 138 139 140 141 142 143 144 145
  std::vector<distributed::VarHandlePtr> rets;
  for (size_t i = 0; i < in_var_names.size(); i++) {
    if (NeedSend(*local_scope.get(), in_var_names[i])) {
      VLOG(3) << "sending " << in_var_names[i] << " to " << tables[i].second
              << " to get " << out_var_names[i] << " back";
      rets.push_back(rpc_client->AsyncPrefetchVar(
          tables[i].second, actual_ctx, *local_scope.get(), in_var_names[i],
          out_var_names[i], tables[i].first));
    } else {
      VLOG(3) << "don't send no-initialied variable: " << out_var_names[i];
    }
  }
  for (size_t i = 0; i < rets.size(); i++) {
146 147
    PADDLE_ENFORCE_NE(rets[i]->Wait(), 0U, platform::errors::ExecutionTimeout(
                                               "internal error in RPCClient"));
Q
Qiao Longfei 已提交
148
  }
Q
Qiao Longfei 已提交
149

150 151
  for (size_t o_idx = 0; o_idx < out_var_names.size(); ++o_idx) {
    auto &ids_in_this_section = origin_ids[o_idx];
152

Q
Qiao Longfei 已提交
153
    if (!ids_in_this_section.empty()) {
154 155 156 157
      auto &prefetch_out_var =
          local_scope->Var(out_var_names[o_idx])->Get<framework::LoDTensor>();
      const auto *out_var_data = prefetch_out_var.data<float>();
      auto &dims = prefetch_out_var.dims();
Q
Qiao Longfei 已提交
158

M
MRXLT 已提交
159 160 161 162 163 164 165 166
      PADDLE_ENFORCE_EQ(dims.size(), 2,
                        platform::errors::InvalidArgument(
                            "The size of Tensor dims must be 2."));
      PADDLE_ENFORCE_EQ(ids_in_this_section.size(), dims[0],
                        platform::errors::InvalidArgument(
                            "The size of ids in this section must equal to "
                            "dims[0]: %s, but got %s",
                            dims[0], ids_in_this_section.size()));
Q
Qiao Longfei 已提交
167 168 169

      auto row_numel = dims[1];

170
      for (int64_t i = 0; i < dims[0]; ++i) {
171
        auto origin_id = ids_in_this_section[i];
172
        std::vector<float> vecs(row_numel);
C
Chengmo 已提交
173

174 175
        std::copy_n(out_var_data + i * row_numel, row_numel, vecs.begin());
        (*recved_vec_map)[origin_id] = vecs;
Q
Qiao Longfei 已提交
176
      }
Q
Qiao Longfei 已提交
177
    } else {
178
      VLOG(3) << "ids in this section is empty";
Q
Qiao Longfei 已提交
179 180 181 182
    }
  }
}

183 184 185 186 187 188 189 190 191
void prefetch(const std::string &id_name, const std::string &out_name,
              const std::string &persistable_var_name,
              const bool is_distributed,
              const std::vector<std::string> &table_names,
              const std::vector<std::string> &endpoints,
              const framework::ExecutionContext &context,
              const framework::Scope &scope) {
  prefetchs({id_name}, {out_name}, persistable_var_name, is_distributed,
            table_names, endpoints, context, scope);
192
}
Q
Qiao Longfei 已提交
193

194 195 196 197 198 199 200 201
void prefetchs(const std::vector<std::string> &id_var_names,
               const std::vector<std::string> &out_var_names,
               const std::string &persistable_var_name,
               const bool is_distributed,
               const std::vector<std::string> &table_names,
               const std::vector<std::string> &endpoints,
               const framework::ExecutionContext &context,
               const framework::Scope &scope) {
T
tangwei12 已提交
202
  auto vec_dim_1 = 0;
203 204 205 206 207 208 209 210 211
  auto vec_dim_0 = 0;
  framework::Variable *var = scope.FindVar(persistable_var_name);

  if (var->IsType<SelectedRows>()) {
    vec_dim_1 = var->Get<framework::SelectedRows>().value().dims()[1];
  } else {
    vec_dim_0 = var->Get<framework::LoDTensor>().dims()[0];
    vec_dim_1 = var->Get<framework::LoDTensor>().dims()[1];
  }
T
tangwei12 已提交
212 213 214 215

  PADDLE_ENFORCE_GT(vec_dim_1, 0,
                    platform::errors::InvalidArgument(
                        "lookup table var's dim must gather than 0"));
216 217 218 219

  const auto place =
      scope.FindVar(id_var_names[0])->Get<framework::LoDTensor>().place();

C
Chengmo 已提交
220
  std::vector<std::vector<int64_t>> ids_group;
221
  std::vector<int64_t> ids_union;
C
Chengmo 已提交
222
  std::vector<framework::LoD> ids_lods;
223 224
  TableAndEndpoints tables;

225
  for (auto &id_name : id_var_names) {
C
Chengmo 已提交
226 227 228 229 230 231
    auto &id_tensor = scope.FindVar(id_name)->Get<framework::LoDTensor>();
    std::vector<int64_t> ids;
    TensorToVector(id_tensor, context.device_context(), &ids);
    ids_union.insert(ids_union.end(), ids.begin(), ids.end());
    ids_group.push_back(ids);
    ids_lods.push_back(id_tensor.lod());
Q
Qiao Longfei 已提交
232 233
  }

234 235
  std::unordered_set<int64_t> s(ids_union.begin(), ids_union.end());
  ids_union.assign(s.begin(), s.end());
Q
Qiao Longfei 已提交
236

237 238 239 240 241 242 243 244 245 246 247 248 249
  for (auto &i : ids_union) {
    PADDLE_ENFORCE_GE(
        i, 0, platform::errors::OutOfRange(
                  "each element in embedding should be larger or equal 0"));
    if (!is_distributed) {
      PADDLE_ENFORCE_LT(
          i, vec_dim_0,
          platform::errors::OutOfRange(
              "embedding id must in [0, %d) when is_distributed False",
              vec_dim_0));
    }
  }

250
  for (size_t i = 0; i < table_names.size(); i++) {
251
    tables.push_back(std::make_pair(table_names[i], endpoints[i]));
Q
Qiao Longfei 已提交
252
  }
253
  std::unordered_map<int64_t, std::vector<float>> recved_vec_map;
254
  prefetch_core(ids_union, tables, context, scope, is_distributed,
255 256 257 258 259 260
                &recved_vec_map);

  auto padding_idx = distributed::kNoPadding;

  if (context.HasAttr("padding_idx")) {
    padding_idx = context.Attr<int64_t>("padding_idx");
Q
Qiao Longfei 已提交
261
  }
Q
Qiao Longfei 已提交
262

263
  for (size_t i = 0; i < out_var_names.size(); i++) {
C
Chengmo 已提交
264 265
    std::vector<int64_t> ids = ids_group[i];
    auto ids_size = ids.size();
266 267
    auto *out_t =
        scope.FindVar(out_var_names[i])->GetMutable<framework::LoDTensor>();
C
Chengmo 已提交
268 269 270
    out_t->set_lod(ids_lods[i]);
    out_t->Resize(
        framework::make_ddim({static_cast<int64_t>(ids_size), vec_dim_1}));
271
    auto *out_d = out_t->mutable_data<float>(place);
272

C
Chengmo 已提交
273 274 275 276 277 278 279 280 281 282 283
    if (platform::is_cpu_place(out_t->place())) {
      for (auto idx = 0; idx < static_cast<int>(ids_size); idx++) {
        const auto &id = ids[idx];
        if (padding_idx != distributed::kNoPadding && id == padding_idx) {
          memset(out_d + idx * vec_dim_1, 0, sizeof(float) * vec_dim_1);
        } else {
          std::copy_n(recved_vec_map[id].begin(), vec_dim_1,
                      out_d + idx * vec_dim_1);
        }
      }
    } else {
284
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
1
123malin 已提交
285
      std::vector<float> ids_value_vec(ids_size * vec_dim_1);
C
Chengmo 已提交
286 287 288
      for (auto idx = 0; idx < static_cast<int>(ids_size); idx++) {
        const auto &id = ids[idx];
        if (padding_idx != distributed::kNoPadding && id == padding_idx) {
1
123malin 已提交
289
          memset(&ids_value_vec[idx * vec_dim_1], 0, sizeof(float) * vec_dim_1);
C
Chengmo 已提交
290
        } else {
1
123malin 已提交
291 292
          memcpy(&ids_value_vec[idx * vec_dim_1], &recved_vec_map[id][0],
                 sizeof(float) * vec_dim_1);
C
Chengmo 已提交
293
        }
294
      }
1
123malin 已提交
295 296 297 298 299 300
      auto &gpu_place = BOOST_GET_CONST(platform::CUDAPlace, out_t->place());
      auto &cpu_place = BOOST_GET_CONST(
          platform::CPUPlace, paddle::platform::CPUDeviceContext().GetPlace());
      auto stream = context.cuda_device_context().stream();
      memory::Copy(gpu_place, out_d, cpu_place, &ids_value_vec[0],
                   sizeof(float) * ids_size * vec_dim_1, stream);
C
Chengmo 已提交
301 302 303 304
#else
      PADDLE_ENFORCE(true, platform::errors::PermissionDenied(
                               "Paddle is not compiled with GPU!"));
#endif
305
    }
Q
Qiao Longfei 已提交
306 307 308 309 310 311
  }
}

};  // namespace distributed
};  // namespace operators
};  // namespace paddle