parameter_prefetch.cc 9.8 KB
Newer Older
Q
Qiao Longfei 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
//   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

15
#include <algorithm>
Q
Qiao Longfei 已提交
16
#include <memory>
Q
Qiao Longfei 已提交
17 18
#include <set>
#include <string>
Q
Qiao Longfei 已提交
19
#include <unordered_map>
20
#include <unordered_set>
Q
Qiao Longfei 已提交
21 22 23 24 25 26 27 28 29
#include <vector>

#include "paddle/fluid/operators/distributed/parameter_prefetch.h"

#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/framework/selected_rows.h"
#include "paddle/fluid/framework/tensor.h"

W
Wu Yi 已提交
30
#include "paddle/fluid/operators/distributed/distributed.h"
Q
Qiao Longfei 已提交
31 32 33 34 35 36 37 38
#include "paddle/fluid/operators/distributed/rpc_client.h"
#include "paddle/fluid/operators/distributed/variable_response.h"
#include "paddle/fluid/operators/distributed_ops/send_recv_util.h"

namespace paddle {
namespace operators {
namespace distributed {

39
using LoDTensor = framework::LoDTensor;
Q
Qiao Longfei 已提交
40 41 42 43
using LoDTensor = framework::LoDTensor;
using SelectedRows = framework::SelectedRows;
using DDim = framework::DDim;

Q
Qiao Longfei 已提交
44
static std::vector<std::vector<int64_t>> SplitIds(
Q
Qiao Longfei 已提交
45
    const std::vector<int64_t>& ids_vector,
Q
Qiao Longfei 已提交
46
    const std::vector<int64_t>& height_section) {
Q
Qiao Longfei 已提交
47
  std::set<int64_t> all_ids;
Q
Qiao Longfei 已提交
48 49
  for (auto id : ids_vector) {
    all_ids.insert(id);
Q
Qiao Longfei 已提交
50
  }
Q
Qiao Longfei 已提交
51

Q
Qiao Longfei 已提交
52 53 54 55 56 57 58 59 60 61
  auto abs_sections = ToAbsoluteSection(height_section);
  std::vector<std::vector<int64_t>> splited_ids;
  splited_ids.resize(height_section.size() + 1);
  for (auto& id : all_ids) {
    auto section_index = GetSectionIndex(id, abs_sections);
    splited_ids[section_index].push_back(id - abs_sections[section_index]);
  }
  return splited_ids;
}

Q
Qiao Longfei 已提交
62
static void SplitIdsIntoMultipleVarsBySection(
Q
Qiao Longfei 已提交
63
    const std::vector<std::string>& in_var_names,
Q
Qiao Longfei 已提交
64
    const std::vector<int64_t>& height_section,
Q
Qiao Longfei 已提交
65 66
    const std::vector<std::vector<int64_t>>& splited_ids,
    framework::Scope* scope) {
Q
Qiao Longfei 已提交
67
  PADDLE_ENFORCE_EQ(in_var_names.size(), height_section.size(), "");
Q
Qiao Longfei 已提交
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82

  auto place = platform::CPUPlace();

  for (size_t i = 0; i < in_var_names.size(); ++i) {
    auto* id_tensor =
        scope->Var(in_var_names[i])->GetMutable<framework::LoDTensor>();
    auto& ids = splited_ids[i];
    if (!ids.empty()) {
      auto* id_tensor_data = id_tensor->mutable_data<int64_t>(
          framework::make_ddim({static_cast<int64_t>(ids.size()), 1}), place);
      memcpy(id_tensor_data, ids.data(), sizeof(int64_t) * ids.size());
    }
  }
}

83
typedef std::vector<std::pair<std::string, std::string>> TableAndEndpoints;
Q
Qiao Longfei 已提交
84

85 86 87 88 89 90 91
void prefetch_core(
    const std::vector<int64_t>& ids, const TableAndEndpoints& tables,
    const std::vector<int64_t>& height_sections,
    const framework::ExecutionContext& context, const framework::Scope& scope,
    std::unordered_map<int64_t, std::vector<float>>* recved_vec_map) {
  platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance();
  auto& actual_ctx = *pool.Get(context.GetPlace());
Q
Qiao Longfei 已提交
92

93 94 95 96 97 98 99
  std::unique_ptr<framework::Scope> local_scope = scope.NewTmpScope();

  std::vector<std::string> in_var_names;
  std::vector<std::string> out_var_names;
  for (size_t i = 0; i < tables.size(); ++i) {
    in_var_names.push_back("prefetch_send@" + tables[i].second);
    out_var_names.push_back("prefetch_recv@" + tables[i].second);
Q
Qiao Longfei 已提交
100 101
  }

102 103 104 105 106 107 108 109
  auto splited_ids = SplitIds(ids, height_sections);
  SplitIdsIntoMultipleVarsBySection(in_var_names, height_sections, splited_ids,
                                    local_scope.get());

  // create output var in local scope
  for (auto& name : out_var_names) {
    local_scope->Var(name)->GetMutable<framework::LoDTensor>();
  }
T
tangwei12 已提交
110

111 112 113
  distributed::RPCClient* rpc_client =
      distributed::RPCClient::GetInstance<RPCCLIENT_T>(
          context.Attr<int>("trainer_id"));
T
tangwei12 已提交
114

115 116 117 118 119 120 121 122 123 124 125 126
  std::vector<distributed::VarHandlePtr> rets;
  for (size_t i = 0; i < in_var_names.size(); i++) {
    if (NeedSend(*local_scope.get(), in_var_names[i])) {
      VLOG(3) << "sending " << in_var_names[i] << " to " << tables[i].second
              << " to get " << out_var_names[i] << " back";
      rets.push_back(rpc_client->AsyncPrefetchVar(
          tables[i].second, actual_ctx, *local_scope.get(), in_var_names[i],
          out_var_names[i], tables[i].first));
    } else {
      VLOG(3) << "don't send no-initialied variable: " << out_var_names[i];
    }
  }
Q
Qiao Longfei 已提交
127

128 129
  for (size_t i = 0; i < rets.size(); i++) {
    PADDLE_ENFORCE(rets[i]->Wait(), "internal error in RPCClient");
Q
Qiao Longfei 已提交
130
  }
Q
Qiao Longfei 已提交
131

132 133 134
  PADDLE_ENFORCE_EQ(out_var_names.size(), height_sections.size(), "");

  auto abs_sections = ToAbsoluteSection(height_sections);
Q
Qiao Longfei 已提交
135 136 137
  for (size_t section_idx = 0; section_idx < out_var_names.size();
       ++section_idx) {
    auto& ids_in_this_section = splited_ids[section_idx];
Q
Qiao Longfei 已提交
138
    if (!ids_in_this_section.empty()) {
139 140
      auto& prefetch_out_var = local_scope->Var(out_var_names[section_idx])
                                   ->Get<framework::LoDTensor>();
Q
Qiao Longfei 已提交
141 142 143 144 145 146 147 148
      const auto* out_var_data = prefetch_out_var.data<float>();
      auto& dims = prefetch_out_var.dims();

      PADDLE_ENFORCE_EQ(dims.size(), 2, "");
      PADDLE_ENFORCE_EQ(ids_in_this_section.size(), dims[0]);

      auto row_numel = dims[1];

149
      for (int64_t i = 0; i < dims[0]; ++i) {
Q
Qiao Longfei 已提交
150 151
        auto id = ids_in_this_section[i];
        auto origin_id = id + abs_sections[section_idx];
152 153 154
        std::vector<float> vecs(row_numel);
        std::copy_n(out_var_data + i * row_numel, row_numel, vecs.begin());
        (*recved_vec_map)[origin_id] = vecs;
Q
Qiao Longfei 已提交
155
      }
Q
Qiao Longfei 已提交
156
    } else {
157
      VLOG(3) << "ids in this section is empty";
Q
Qiao Longfei 已提交
158 159 160 161 162
    }
  }
}

void prefetch(const std::string& id_name, const std::string& out_name,
163
              const std::string& persistable_var_name, const bool backfill,
Q
Qiao Longfei 已提交
164
              const std::vector<std::string>& table_names,
165
              const std::vector<std::string>& endpoints,
Q
Qiao Longfei 已提交
166
              const std::vector<int64_t>& height_sections,
T
tangwei12 已提交
167 168
              const framework::ExecutionContext& context,
              const framework::Scope& scope) {
169 170 171
  prefetchs({id_name}, {out_name}, persistable_var_name, backfill, table_names,
            endpoints, height_sections, context, scope);
}
Q
Qiao Longfei 已提交
172

173 174 175 176 177 178 179 180 181 182 183 184 185
void prefetchs(const std::vector<std::string>& id_var_names,
               const std::vector<std::string>& out_var_names,
               const std::string& persistable_var_name, const bool backfill,
               const std::vector<std::string>& table_names,
               const std::vector<std::string>& endpoints,
               const std::vector<int64_t>& height_sections,
               const framework::ExecutionContext& context,
               const framework::Scope& scope) {
  PADDLE_ENFORCE_GT(id_var_names.size(), 0, "");
  PADDLE_ENFORCE_EQ(id_var_names.size(), out_var_names.size(), "");
  PADDLE_ENFORCE_EQ(table_names.size(), endpoints.size(), "");
  PADDLE_ENFORCE_EQ(table_names.size(), height_sections.size(), "");

T
tangwei12 已提交
186 187 188 189 190 191 192 193 194 195 196 197
  auto vec_dim_1 = 0;
  framework::Variable* var = scope.FindVar(persistable_var_name);

  PADDLE_ENFORCE_EQ(var->IsType<framework::LoDTensor>(), true,
                    platform::errors::InvalidArgument(
                        "prefetch can only support LodTensor only"));

  vec_dim_1 = var->Get<framework::LoDTensor>().dims()[1];

  PADDLE_ENFORCE_GT(vec_dim_1, 0,
                    platform::errors::InvalidArgument(
                        "lookup table var's dim must gather than 0"));
198 199 200 201 202 203

  const auto place =
      scope.FindVar(id_var_names[0])->Get<framework::LoDTensor>().place();

  if (!platform::is_cpu_place(place)) {
    PADDLE_THROW("multi prefetch only support CPU currently");
Q
Qiao Longfei 已提交
204 205
  }

206 207 208 209 210 211
  std::vector<std::vector<int64_t>> ids_group;
  std::vector<int64_t> ids_union;
  std::vector<framework::LoD> ids_lods;
  TableAndEndpoints tables;

  for (auto& id_name : id_var_names) {
212 213 214 215 216 217
    auto* id_tensor =
        scope.FindVar(id_name)->GetMutable<framework::LoDTensor>();
    auto id_dims = id_tensor->dims();
    id_tensor->Resize(framework::make_ddim(
        {static_cast<int64_t>(id_dims[0] * id_dims[1]), 1}));
    auto* id_data = id_tensor->data<int64_t>();
218 219
    std::vector<int64_t> ids;

220
    for (int64_t i = 0; i < id_tensor->numel(); ++i) {
221 222
      ids.push_back(id_data[i]);
      ids_union.push_back(id_data[i]);
Q
Qiao Longfei 已提交
223
    }
224
    ids_group.push_back(ids);
225
    ids_lods.push_back(id_tensor->lod());
Q
Qiao Longfei 已提交
226 227
  }

228 229
  std::unordered_set<int64_t> s(ids_union.begin(), ids_union.end());
  ids_union.assign(s.begin(), s.end());
Q
Qiao Longfei 已提交
230

231
  for (size_t i = 0; i < table_names.size(); i++) {
232
    tables.push_back(std::make_pair(table_names[i], endpoints[i]));
Q
Qiao Longfei 已提交
233 234
  }

235 236 237 238 239 240 241 242
  std::unordered_map<int64_t, std::vector<float>> recved_vec_map;
  prefetch_core(ids_union, tables, height_sections, context, scope,
                &recved_vec_map);

  auto padding_idx = distributed::kNoPadding;

  if (context.HasAttr("padding_idx")) {
    padding_idx = context.Attr<int64_t>("padding_idx");
Q
Qiao Longfei 已提交
243
  }
Q
Qiao Longfei 已提交
244

245
  // copy vectors to out vars
246
  for (size_t i = 0; i < out_var_names.size(); i++) {
247 248 249 250 251 252 253 254 255
    auto& ids = ids_group[i];
    auto* out_t =
        scope.FindVar(out_var_names[i])->GetMutable<framework::LoDTensor>();
    out_t->Resize(
        framework::make_ddim({static_cast<int64_t>(ids.size()), vec_dim_1}));
    out_t->set_lod(ids_lods[i]);

    auto* out_d = out_t->mutable_data<float>(place);

256
    for (size_t idx = 0; idx < ids.size(); idx++) {
257 258 259 260 261 262 263 264 265
      const auto& id = ids[idx];

      if (padding_idx != distributed::kNoPadding && id == padding_idx) {
        memset(out_d + idx * vec_dim_1, 0, sizeof(float) * vec_dim_1);
      } else {
        std::copy_n(recved_vec_map[id].begin(), vec_dim_1,
                    out_d + idx * vec_dim_1);
      }
    }
Q
Qiao Longfei 已提交
266 267 268 269 270 271
  }
}

};  // namespace distributed
};  // namespace operators
};  // namespace paddle