data_feed.cc 35.0 KB
Newer Older
W
Wang Guibao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

D
dongdaxiang 已提交
15 16 17 18 19
#if defined _WIN32 || defined __APPLE__
#else
#define _LINUX
#endif

20
#include "paddle/fluid/framework/data_feed.h"
D
dongdaxiang 已提交
21
#ifdef _LINUX
D
dongdaxiang 已提交
22
#include <stdio_ext.h>
H
hutuxian 已提交
23 24 25
#include <sys/mman.h>
#include <sys/stat.h>
#include <sys/types.h>
D
dongdaxiang 已提交
26
#endif
27
#include <utility>
28
#include "gflags/gflags.h"
W
Wang Guibao 已提交
29 30 31
#include "google/protobuf/io/zero_copy_stream_impl.h"
#include "google/protobuf/message.h"
#include "google/protobuf/text_format.h"
32 33
#include "io/fs.h"
#include "io/shell.h"
W
Wang Guibao 已提交
34 35
#include "paddle/fluid/framework/feed_fetch_method.h"
#include "paddle/fluid/framework/feed_fetch_type.h"
36
#include "paddle/fluid/platform/timer.h"
W
Wang Guibao 已提交
37 38 39 40 41 42 43 44

namespace paddle {
namespace framework {

void DataFeed::AddFeedVar(Variable* var, const std::string& name) {
  CheckInit();
  for (size_t i = 0; i < use_slots_.size(); ++i) {
    if (name == use_slots_[i]) {
45 46 47 48 49
      if (var == nullptr) {
        feed_vec_[i] = nullptr;
      } else {
        feed_vec_[i] = var->GetMutable<LoDTensor>();
      }
W
Wang Guibao 已提交
50 51 52 53 54
    }
  }
}

bool DataFeed::SetFileList(const std::vector<std::string>& files) {
55
  std::unique_lock<std::mutex> lock(*mutex_for_pick_file_);
W
Wang Guibao 已提交
56
  CheckInit();
57 58
  // Do not set finish_set_filelist_ flag,
  // since a user may set file many times after init reader
W
Wang Guibao 已提交
59 60 61 62 63 64 65 66 67 68 69 70
  filelist_.assign(files.begin(), files.end());

  finish_set_filelist_ = true;
  return true;
}

void DataFeed::SetBatchSize(int batch_size) {
  PADDLE_ENFORCE(batch_size > 0, "Illegal batch size: %d.", batch_size);
  default_batch_size_ = batch_size;
}

bool DataFeed::PickOneFile(std::string* filename) {
71 72 73 74 75 76
  PADDLE_ENFORCE(mutex_for_pick_file_ != nullptr,
                 "should call SetFileListMutex before PickOneFile");
  PADDLE_ENFORCE(file_idx_ != nullptr,
                 "should call SetFileListIndex before PickOneFile");
  std::unique_lock<std::mutex> lock(*mutex_for_pick_file_);
  if (*file_idx_ == filelist_.size()) {
77
    VLOG(3) << "DataFeed::PickOneFile no more file to pick";
W
Wang Guibao 已提交
78 79
    return false;
  }
80 81
  VLOG(3) << "file_idx_=" << *file_idx_;
  *filename = filelist_[(*file_idx_)++];
W
Wang Guibao 已提交
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
  return true;
}

void DataFeed::CheckInit() {
  PADDLE_ENFORCE(finish_init_, "Initialization did not succeed.");
}

void DataFeed::CheckSetFileList() {
  PADDLE_ENFORCE(finish_set_filelist_, "Set filelist did not succeed.");
}

void DataFeed::CheckStart() {
  PADDLE_ENFORCE(finish_start_, "Datafeed has not started running yet.");
}

H
hutuxian 已提交
97 98 99 100 101 102 103
void DataFeed::AssignFeedVar(const Scope& scope) {
  CheckInit();
  for (size_t i = 0; i < use_slots_.size(); ++i) {
    feed_vec_[i] = scope.FindVar(use_slots_[i])->GetMutable<LoDTensor>();
  }
}

W
Wang Guibao 已提交
104 105 106 107
template <typename T>
void PrivateQueueDataFeed<T>::SetQueueSize(int queue_size) {
  PADDLE_ENFORCE(queue_size > 0, "Illegal queue size: %d.", queue_size);
  queue_size_ = queue_size;
108
  queue_ = paddle::framework::MakeChannel<T>();
W
Wang Guibao 已提交
109 110 111 112 113
}

template <typename T>
bool PrivateQueueDataFeed<T>::Start() {
  CheckSetFileList();
114 115
  read_thread_ = std::thread(&PrivateQueueDataFeed::ReadThread, this);
  read_thread_.detach();
W
Wang Guibao 已提交
116 117 118 119 120 121 122

  finish_start_ = true;
  return true;
}

template <typename T>
void PrivateQueueDataFeed<T>::ReadThread() {
D
dongdaxiang 已提交
123
#ifdef _LINUX
124 125 126 127 128 129 130
  std::string filename;
  while (PickOneFile(&filename)) {
    int err_no = 0;
    fp_ = fs_open_read(filename, &err_no, pipe_command_);
    __fsetlocking(&*fp_, FSETLOCKING_BYCALLER);
    T instance;
    while (ParseOneInstanceFromPipe(&instance)) {
131
      queue_->Put(instance);
132
    }
W
Wang Guibao 已提交
133
  }
134
  queue_->Close();
D
dongdaxiang 已提交
135
#endif
W
Wang Guibao 已提交
136 137 138 139
}

template <typename T>
int PrivateQueueDataFeed<T>::Next() {
X
xjqbest 已提交
140
#ifdef _LINUX
W
Wang Guibao 已提交
141 142 143 144
  CheckStart();
  int index = 0;
  T ins_vec;
  while (index < default_batch_size_) {
145 146
    T instance;
    if (!queue_->Get(instance)) {
W
Wang Guibao 已提交
147 148 149 150 151 152 153 154 155
      break;
    }
    AddInstanceToInsVec(&ins_vec, instance, index++);
  }
  batch_size_ = index;
  if (batch_size_ != 0) {
    PutToFeedVec(ins_vec);
  }
  return batch_size_;
X
xjqbest 已提交
156 157 158
#else
  return 0;
#endif
W
Wang Guibao 已提交
159 160
}

161
// explicit instantiation
W
Wang Guibao 已提交
162 163
template class PrivateQueueDataFeed<std::vector<MultiSlotType>>;

164 165
template <typename T>
InMemoryDataFeed<T>::InMemoryDataFeed() {
166 167
  this->file_idx_ = nullptr;
  this->mutex_for_pick_file_ = nullptr;
J
jiaqi 已提交
168 169 170
  this->fp_ = nullptr;
  this->thread_id_ = 0;
  this->thread_num_ = 1;
171
  this->parse_ins_id_ = false;
J
jiaqi 已提交
172 173 174
  this->input_channel_ = nullptr;
  this->output_channel_ = nullptr;
  this->consume_channel_ = nullptr;
175 176 177 178
}

template <typename T>
bool InMemoryDataFeed<T>::Start() {
X
xjqbest 已提交
179
#ifdef _LINUX
J
jiaqi 已提交
180 181 182 183 184
  this->CheckSetFileList();
  if (output_channel_->Size() == 0 && input_channel_->Size() != 0) {
    std::vector<T> data;
    input_channel_->Read(data);
    output_channel_->Write(std::move(data));
185
  }
X
xjqbest 已提交
186
#endif
J
jiaqi 已提交
187
  this->finish_start_ = true;
188 189 190 191 192
  return true;
}

template <typename T>
int InMemoryDataFeed<T>::Next() {
X
xjqbest 已提交
193
#ifdef _LINUX
J
jiaqi 已提交
194 195 196 197 198
  this->CheckStart();
  CHECK(output_channel_ != nullptr);
  CHECK(consume_channel_ != nullptr);
  VLOG(3) << "output_channel_ size=" << output_channel_->Size()
          << ", consume_channel_ size=" << consume_channel_->Size()
X
xujiaqi01 已提交
199
          << ", thread_id=" << thread_id_;
200
  int index = 0;
D
dongdaxiang 已提交
201
  T instance;
J
jiaqi 已提交
202 203 204 205
  std::vector<T> ins_vec;
  ins_vec.reserve(this->default_batch_size_);
  while (index < this->default_batch_size_) {
    if (output_channel_->Size() == 0) {
D
dongdaxiang 已提交
206
      break;
207
    }
J
jiaqi 已提交
208 209 210 211
    output_channel_->Get(instance);
    ins_vec.push_back(instance);
    ++index;
    consume_channel_->Put(std::move(instance));
D
dongdaxiang 已提交
212
  }
J
jiaqi 已提交
213 214
  this->batch_size_ = index;
  VLOG(3) << "batch_size_=" << this->batch_size_
215
          << ", thread_id=" << thread_id_;
J
jiaqi 已提交
216
  if (this->batch_size_ != 0) {
D
dongdaxiang 已提交
217 218
    PutToFeedVec(ins_vec);
  } else {
J
jiaqi 已提交
219 220 221 222
    VLOG(3) << "finish reading, output_channel_ size="
            << output_channel_->Size()
            << ", consume_channel_ size=" << consume_channel_->Size()
            << ", thread_id=" << thread_id_;
D
dongdaxiang 已提交
223
  }
J
jiaqi 已提交
224
  return this->batch_size_;
X
xjqbest 已提交
225 226 227
#else
  return 0;
#endif
228 229
}

230
template <typename T>
J
jiaqi 已提交
231 232 233 234 235 236 237
void InMemoryDataFeed<T>::SetInputChannel(void* channel) {
  input_channel_ = static_cast<paddle::framework::ChannelObject<T>*>(channel);
}

template <typename T>
void InMemoryDataFeed<T>::SetOutputChannel(void* channel) {
  output_channel_ = static_cast<paddle::framework::ChannelObject<T>*>(channel);
238 239 240
}

template <typename T>
J
jiaqi 已提交
241 242
void InMemoryDataFeed<T>::SetConsumeChannel(void* channel) {
  consume_channel_ = static_cast<paddle::framework::ChannelObject<T>*>(channel);
243 244 245 246 247 248 249 250 251 252 253 254
}

template <typename T>
void InMemoryDataFeed<T>::SetThreadId(int thread_id) {
  thread_id_ = thread_id;
}

template <typename T>
void InMemoryDataFeed<T>::SetThreadNum(int thread_num) {
  thread_num_ = thread_num;
}

255 256 257 258 259
template <typename T>
void InMemoryDataFeed<T>::SetParseInsId(bool parse_ins_id) {
  parse_ins_id_ = parse_ins_id;
}

260 261
template <typename T>
void InMemoryDataFeed<T>::LoadIntoMemory() {
D
dongdaxiang 已提交
262
#ifdef _LINUX
X
xujiaqi01 已提交
263
  VLOG(3) << "LoadIntoMemory() begin, thread_id=" << thread_id_;
264
  std::string filename;
J
jiaqi 已提交
265
  while (this->PickOneFile(&filename)) {
X
xujiaqi01 已提交
266 267
    VLOG(3) << "PickOneFile, filename=" << filename
            << ", thread_id=" << thread_id_;
268
    int err_no = 0;
J
jiaqi 已提交
269 270 271 272
    this->fp_ = fs_open_read(filename, &err_no, this->pipe_command_);
    CHECK(this->fp_ != nullptr);
    __fsetlocking(&*(this->fp_), FSETLOCKING_BYCALLER);
    paddle::framework::ChannelWriter<T> writer(input_channel_);
273
    T instance;
274 275
    platform::Timer timeline;
    timeline.Start();
D
dongdaxiang 已提交
276
    while (ParseOneInstanceFromPipe(&instance)) {
J
jiaqi 已提交
277 278
      writer << std::move(instance);
      instance = T();
279
    }
J
jiaqi 已提交
280
    writer.Flush();
281
    timeline.Pause();
282 283
    VLOG(3) << "LoadIntoMemory() read all lines, file=" << filename
            << ", cost time=" << timeline.ElapsedSec()
284
            << " seconds, thread_id=" << thread_id_;
285
  }
X
xujiaqi01 已提交
286
  VLOG(3) << "LoadIntoMemory() end, thread_id=" << thread_id_;
D
dongdaxiang 已提交
287
#endif
288 289
}

290
// explicit instantiation
J
jiaqi 已提交
291
template class InMemoryDataFeed<Record>;
292

W
Wang Guibao 已提交
293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308
void MultiSlotDataFeed::Init(
    const paddle::framework::DataFeedDesc& data_feed_desc) {
  finish_init_ = false;
  finish_set_filelist_ = false;
  finish_start_ = false;

  PADDLE_ENFORCE(data_feed_desc.has_multi_slot_desc(),
                 "Multi_slot_desc has not been set.");
  paddle::framework::MultiSlotDesc multi_slot_desc =
      data_feed_desc.multi_slot_desc();
  SetBatchSize(data_feed_desc.batch_size());
  SetQueueSize(data_feed_desc.batch_size());
  size_t all_slot_num = multi_slot_desc.slots_size();
  all_slots_.resize(all_slot_num);
  all_slots_type_.resize(all_slot_num);
  use_slots_index_.resize(all_slot_num);
309 310
  total_dims_without_inductive_.resize(all_slot_num);
  inductive_shape_index_.resize(all_slot_num);
W
Wang Guibao 已提交
311 312 313 314 315 316 317
  use_slots_.clear();
  use_slots_is_dense_.clear();
  for (size_t i = 0; i < all_slot_num; ++i) {
    const auto& slot = multi_slot_desc.slots(i);
    all_slots_[i] = slot.name();
    all_slots_type_[i] = slot.type();
    use_slots_index_[i] = slot.is_used() ? use_slots_.size() : -1;
318 319
    total_dims_without_inductive_[i] = 1;
    inductive_shape_index_[i] = -1;
W
Wang Guibao 已提交
320 321 322
    if (slot.is_used()) {
      use_slots_.push_back(all_slots_[i]);
      use_slots_is_dense_.push_back(slot.is_dense());
323 324
      std::vector<int> local_shape;
      if (slot.is_dense()) {
325 326 327
        for (size_t j = 0; j < slot.shape_size(); ++j) {
          if (slot.shape(j) > 0) {
            total_dims_without_inductive_[i] *= slot.shape(j);
328
          }
329 330
          if (slot.shape(j) == -1) {
            inductive_shape_index_[i] = j;
331
          }
332 333
        }
      }
334 335
      for (size_t j = 0; j < slot.shape_size(); ++j) {
        local_shape.push_back(slot.shape(j));
336 337
      }
      use_slots_shape_.push_back(local_shape);
W
Wang Guibao 已提交
338 339 340
    }
  }
  feed_vec_.resize(use_slots_.size());
341
  pipe_command_ = data_feed_desc.pipe_command();
W
Wang Guibao 已提交
342 343 344
  finish_init_ = true;
}

D
dongdaxiang 已提交
345
void MultiSlotDataFeed::ReadThread() {
346
#ifdef _LINUX
347 348 349 350
  std::string filename;
  while (PickOneFile(&filename)) {
    int err_no = 0;
    fp_ = fs_open_read(filename, &err_no, pipe_command_);
D
dongdaxiang 已提交
351
    CHECK(fp_ != nullptr);
352 353 354 355 356
    __fsetlocking(&*fp_, FSETLOCKING_BYCALLER);
    std::vector<MultiSlotType> instance;
    int ins_num = 0;
    while (ParseOneInstanceFromPipe(&instance)) {
      ins_num++;
357
      queue_->Put(instance);
358
    }
D
dongdaxiang 已提交
359
    VLOG(3) << "filename: " << filename << " inst num: " << ins_num;
D
dongdaxiang 已提交
360
  }
361
  queue_->Close();
362
#endif
D
dongdaxiang 已提交
363 364
}

W
Wang Guibao 已提交
365
bool MultiSlotDataFeed::CheckFile(const char* filename) {
366
#ifdef _LINUX
W
Wang Guibao 已提交
367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392
  CheckInit();  // get info of slots
  std::ifstream fin(filename);
  if (!fin.good()) {
    VLOG(1) << "error: open file<" << filename << "> fail";
    return false;
  }
  std::string line;
  int instance_cout = 0;
  std::string all_slots_alias = "";
  for (const auto& alias : all_slots_) {
    all_slots_alias += alias + " ";
  }
  std::string use_slots_alias = "";
  for (const auto& alias : use_slots_) {
    use_slots_alias += alias + " ";
  }
  VLOG(3) << "total slots num: " << all_slots_.size();
  VLOG(3) << "total slots alias: " << all_slots_alias;
  VLOG(3) << "used slots num: " << use_slots_.size();
  VLOG(3) << "used slots alias: " << use_slots_alias;
  while (getline(fin, line)) {
    ++instance_cout;
    const char* str = line.c_str();
    char* endptr = const_cast<char*>(str);
    int len = line.length();
    for (size_t i = 0; i < all_slots_.size(); ++i) {
X
xjqbest 已提交
393
      auto num = strtol(endptr, &endptr, 10);
W
Wang Guibao 已提交
394
      if (num < 0) {
395 396
        VLOG(0) << "error: the number of ids is a negative number: " << num;
        VLOG(0) << "please check line<" << instance_cout << "> in file<"
W
Wang Guibao 已提交
397 398 399
                << filename << ">";
        return false;
      } else if (num == 0) {
400
        VLOG(0)
W
Wang Guibao 已提交
401 402 403 404
            << "error: the number of ids can not be zero, you need "
               "padding it in data generator; or if there is something wrong"
               " with the data, please check if the data contains unresolvable "
               "characters.";
405
        VLOG(0) << "please check line<" << instance_cout << "> in file<"
W
Wang Guibao 已提交
406 407
                << filename << ">";
        return false;
X
xjqbest 已提交
408
      } else if (errno == ERANGE || num > INT_MAX) {
409 410
        VLOG(0) << "error: the number of ids greater than INT_MAX";
        VLOG(0) << "please check line<" << instance_cout << "> in file<"
W
Wang Guibao 已提交
411 412 413 414 415 416 417
                << filename << ">";
        return false;
      }
      if (all_slots_type_[i] == "float") {
        for (int i = 0; i < num; ++i) {
          strtof(endptr, &endptr);
          if (errno == ERANGE) {
418
            VLOG(0) << "error: the value is out of the range of "
W
Wang Guibao 已提交
419
                       "representable values for float";
420
            VLOG(0) << "please check line<" << instance_cout << "> in file<"
W
Wang Guibao 已提交
421 422 423 424
                    << filename << ">";
            return false;
          }
          if (i + 1 != num && endptr - str == len) {
425 426
            VLOG(0) << "error: there is a wrong with the number of ids.";
            VLOG(0) << "please check line<" << instance_cout << "> in file<"
W
Wang Guibao 已提交
427 428 429 430 431 432 433 434
                    << filename << ">";
            return false;
          }
        }
      } else if (all_slots_type_[i] == "uint64") {
        for (int i = 0; i < num; ++i) {
          strtoull(endptr, &endptr, 10);
          if (errno == ERANGE) {
435
            VLOG(0) << "error: the value is out of the range of "
W
Wang Guibao 已提交
436
                       "representable values for uint64_t";
437
            VLOG(0) << "please check line<" << instance_cout << "> in file<"
W
Wang Guibao 已提交
438 439 440 441
                    << filename << ">";
            return false;
          }
          if (i + 1 != num && endptr - str == len) {
442 443
            VLOG(0) << "error: there is a wrong with the number of ids.";
            VLOG(0) << "please check line<" << instance_cout << "> in file<"
W
Wang Guibao 已提交
444 445 446 447 448
                    << filename << ">";
            return false;
          }
        }
      } else {
449
        VLOG(0) << "error: this type<" << all_slots_type_[i]
W
Wang Guibao 已提交
450 451 452 453
                << "> is not supported";
        return false;
      }
    }
454 455 456
    // It may be added '\t' character to the end of the output of reduce
    // task when processes data by Hadoop(when the output of the reduce
    // task of Hadoop has only one field, it will add a '\t' at the end
457 458 459 460 461
    // of the line by default, and you can use this option to avoid it:
    // `-D mapred.textoutputformat.ignoreseparator=true`), which does
    // not affect the correctness of the data. Therefore, it should be
    // judged that the data is not normal when the end of each line of
    // data contains characters which are not spaces.
462 463 464 465 466 467 468 469
    while (endptr - str != len) {
      if (!isspace(*(endptr++))) {
        VLOG(0)
            << "error: there is some extra characters at the end of the line.";
        VLOG(0) << "please check line<" << instance_cout << "> in file<"
                << filename << ">";
        return false;
      }
W
Wang Guibao 已提交
470 471 472 473
    }
  }
  VLOG(3) << "instances cout: " << instance_cout;
  VLOG(3) << "The file format is correct";
474
#endif
W
Wang Guibao 已提交
475 476 477
  return true;
}

D
dongdaxiang 已提交
478 479
bool MultiSlotDataFeed::ParseOneInstanceFromPipe(
    std::vector<MultiSlotType>* instance) {
480
#ifdef _LINUX
481 482 483
  thread_local string::LineFileReader reader;

  if (!reader.getline(&*(fp_.get()))) {
D
dongdaxiang 已提交
484 485
    return false;
  } else {
486 487 488
    int use_slots_num = use_slots_.size();
    instance->resize(use_slots_num);

D
dongdaxiang 已提交
489 490
    const char* str = reader.get();
    std::string line = std::string(str);
491
    // VLOG(3) << line;
D
dongdaxiang 已提交
492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519
    char* endptr = const_cast<char*>(str);
    int pos = 0;
    for (size_t i = 0; i < use_slots_index_.size(); ++i) {
      int idx = use_slots_index_[i];
      int num = strtol(&str[pos], &endptr, 10);
      PADDLE_ENFORCE(
          num,
          "The number of ids can not be zero, you need padding "
          "it in data generator; or if there is something wrong with "
          "the data, please check if the data contains unresolvable "
          "characters.\nplease check this error line: %s",
          str);
      if (idx != -1) {
        (*instance)[idx].Init(all_slots_type_[i]);
        if ((*instance)[idx].GetType()[0] == 'f') {  // float
          for (int j = 0; j < num; ++j) {
            float feasign = strtof(endptr, &endptr);
            (*instance)[idx].AddValue(feasign);
          }
        } else if ((*instance)[idx].GetType()[0] == 'u') {  // uint64
          for (int j = 0; j < num; ++j) {
            uint64_t feasign = (uint64_t)strtoull(endptr, &endptr, 10);
            (*instance)[idx].AddValue(feasign);
          }
        }
        pos = endptr - str;
      } else {
        for (int j = 0; j <= num; ++j) {
D
dongdaxiang 已提交
520 521 522 523
          // pos = line.find_first_of(' ', pos + 1);
          while (line[pos + 1] != ' ') {
            pos++;
          }
D
dongdaxiang 已提交
524 525 526 527 528
        }
      }
    }
    return true;
  }
529 530 531
#else
  return true;
#endif
D
dongdaxiang 已提交
532 533
}

W
Wang Guibao 已提交
534
bool MultiSlotDataFeed::ParseOneInstance(std::vector<MultiSlotType>* instance) {
X
xjqbest 已提交
535
#ifdef _LINUX
W
Wang Guibao 已提交
536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553
  std::string line;
  if (getline(file_, line)) {
    int use_slots_num = use_slots_.size();
    instance->resize(use_slots_num);
    // parse line
    const char* str = line.c_str();
    char* endptr = const_cast<char*>(str);
    int pos = 0;
    for (size_t i = 0; i < use_slots_index_.size(); ++i) {
      int idx = use_slots_index_[i];
      int num = strtol(&str[pos], &endptr, 10);
      PADDLE_ENFORCE(
          num,
          "The number of ids can not be zero, you need padding "
          "it in data generator; or if there is something wrong with "
          "the data, please check if the data contains unresolvable "
          "characters.\nplease check this error line: %s",
          str);
554

W
Wang Guibao 已提交
555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577
      if (idx != -1) {
        (*instance)[idx].Init(all_slots_type_[i]);
        if ((*instance)[idx].GetType()[0] == 'f') {  // float
          for (int j = 0; j < num; ++j) {
            float feasign = strtof(endptr, &endptr);
            (*instance)[idx].AddValue(feasign);
          }
        } else if ((*instance)[idx].GetType()[0] == 'u') {  // uint64
          for (int j = 0; j < num; ++j) {
            uint64_t feasign = (uint64_t)strtoull(endptr, &endptr, 10);
            (*instance)[idx].AddValue(feasign);
          }
        }
        pos = endptr - str;
      } else {
        for (int j = 0; j <= num; ++j) {
          pos = line.find_first_of(' ', pos + 1);
        }
      }
    }
  } else {
    return false;
  }
X
xjqbest 已提交
578 579
#endif
  return false;
W
Wang Guibao 已提交
580 581 582 583 584
}

void MultiSlotDataFeed::AddInstanceToInsVec(
    std::vector<MultiSlotType>* ins_vec,
    const std::vector<MultiSlotType>& instance, int index) {
X
xjqbest 已提交
585
#ifdef _LINUX
W
Wang Guibao 已提交
586 587 588 589 590 591 592
  if (index == 0) {
    ins_vec->resize(instance.size());
    for (size_t i = 0; i < instance.size(); ++i) {
      (*ins_vec)[i].Init(instance[i].GetType());
      (*ins_vec)[i].InitOffset();
    }
  }
593

W
Wang Guibao 已提交
594 595 596
  for (size_t i = 0; i < instance.size(); ++i) {
    (*ins_vec)[i].AddIns(instance[i]);
  }
X
xjqbest 已提交
597
#endif
W
Wang Guibao 已提交
598 599 600 601
}

void MultiSlotDataFeed::PutToFeedVec(
    const std::vector<MultiSlotType>& ins_vec) {
X
xjqbest 已提交
602
#ifdef _LINUX
W
Wang Guibao 已提交
603
  for (size_t i = 0; i < use_slots_.size(); ++i) {
604 605 606
    if (feed_vec_[i] == nullptr) {
      continue;
    }
W
Wang Guibao 已提交
607 608 609
    const auto& type = ins_vec[i].GetType();
    const auto& offset = ins_vec[i].GetOffset();
    int total_instance = static_cast<int>(offset.back());
610

W
Wang Guibao 已提交
611 612
    if (type[0] == 'f') {  // float
      const auto& feasign = ins_vec[i].GetFloatData();
613 614 615
      float* tensor_ptr = feed_vec_[i]->mutable_data<float>(
          {total_instance, 1}, platform::CPUPlace());
      memcpy(tensor_ptr, &feasign[0], total_instance * sizeof(float));
W
Wang Guibao 已提交
616 617 618
    } else if (type[0] == 'u') {  // uint64
      // no uint64_t type in paddlepaddle
      const auto& feasign = ins_vec[i].GetUint64Data();
619 620 621 622
      int64_t* tensor_ptr = feed_vec_[i]->mutable_data<int64_t>(
          {total_instance, 1}, platform::CPUPlace());
      memcpy(tensor_ptr, &feasign[0], total_instance * sizeof(int64_t));
    }
623

624 625 626
    LoD data_lod{offset};
    feed_vec_[i]->set_lod(data_lod);
    if (use_slots_is_dense_[i]) {
627 628 629 630
      if (inductive_shape_index_[i] != -1) {
        use_slots_shape_[i][inductive_shape_index_[i]] =
            total_instance / total_dims_without_inductive_[i];
      }
631
      feed_vec_[i]->Resize(framework::make_ddim(use_slots_shape_[i]));
W
Wang Guibao 已提交
632 633
    }
  }
X
xjqbest 已提交
634
#endif
W
Wang Guibao 已提交
635 636
}

637 638 639 640 641 642 643 644 645 646 647 648 649 650 651
void MultiSlotInMemoryDataFeed::Init(
    const paddle::framework::DataFeedDesc& data_feed_desc) {
  finish_init_ = false;
  finish_set_filelist_ = false;
  finish_start_ = false;

  PADDLE_ENFORCE(data_feed_desc.has_multi_slot_desc(),
                 "Multi_slot_desc has not been set.");
  paddle::framework::MultiSlotDesc multi_slot_desc =
      data_feed_desc.multi_slot_desc();
  SetBatchSize(data_feed_desc.batch_size());
  size_t all_slot_num = multi_slot_desc.slots_size();
  all_slots_.resize(all_slot_num);
  all_slots_type_.resize(all_slot_num);
  use_slots_index_.resize(all_slot_num);
652 653
  total_dims_without_inductive_.resize(all_slot_num);
  inductive_shape_index_.resize(all_slot_num);
654 655 656 657 658 659 660
  use_slots_.clear();
  use_slots_is_dense_.clear();
  for (size_t i = 0; i < all_slot_num; ++i) {
    const auto& slot = multi_slot_desc.slots(i);
    all_slots_[i] = slot.name();
    all_slots_type_[i] = slot.type();
    use_slots_index_[i] = slot.is_used() ? use_slots_.size() : -1;
661 662
    total_dims_without_inductive_[i] = 1;
    inductive_shape_index_[i] = -1;
663 664 665
    if (slot.is_used()) {
      use_slots_.push_back(all_slots_[i]);
      use_slots_is_dense_.push_back(slot.is_dense());
666 667
      std::vector<int> local_shape;
      if (slot.is_dense()) {
668 669 670
        for (size_t j = 0; j < slot.shape_size(); ++j) {
          if (slot.shape(j) > 0) {
            total_dims_without_inductive_[i] *= slot.shape(j);
671
          }
672 673
          if (slot.shape(j) == -1) {
            inductive_shape_index_[i] = j;
674
          }
675 676
        }
      }
677 678
      for (size_t j = 0; j < slot.shape_size(); ++j) {
        local_shape.push_back(slot.shape(j));
679 680
      }
      use_slots_shape_.push_back(local_shape);
681 682 683 684 685 686 687
    }
  }
  feed_vec_.resize(use_slots_.size());
  pipe_command_ = data_feed_desc.pipe_command();
  finish_init_ = true;
}

J
jiaqi 已提交
688
bool MultiSlotInMemoryDataFeed::ParseOneInstanceFromPipe(Record* instance) {
X
xjqbest 已提交
689
#ifdef _LINUX
690 691 692 693 694 695 696
  thread_local string::LineFileReader reader;

  if (!reader.getline(&*(fp_.get()))) {
    return false;
  } else {
    const char* str = reader.get();
    std::string line = std::string(str);
697
    // VLOG(3) << line;
698 699
    char* endptr = const_cast<char*>(str);
    int pos = 0;
700 701 702 703 704 705 706 707 708 709 710 711
    if (parse_ins_id_) {
      int num = strtol(&str[pos], &endptr, 10);
      CHECK(num == 1);  // NOLINT
      pos = endptr - str + 1;
      size_t len = 0;
      while (str[pos + len] != ' ') {
        ++len;
      }
      instance->ins_id_ = std::string(str + pos, len);
      pos += len + 1;
      VLOG(3) << "ins_id " << instance->ins_id_;
    }
712 713 714 715 716 717 718 719 720 721 722
    for (size_t i = 0; i < use_slots_index_.size(); ++i) {
      int idx = use_slots_index_[i];
      int num = strtol(&str[pos], &endptr, 10);
      PADDLE_ENFORCE(
          num,
          "The number of ids can not be zero, you need padding "
          "it in data generator; or if there is something wrong with "
          "the data, please check if the data contains unresolvable "
          "characters.\nplease check this error line: %s",
          str);
      if (idx != -1) {
J
jiaqi 已提交
723
        if (all_slots_type_[i][0] == 'f') {  // float
724 725
          for (int j = 0; j < num; ++j) {
            float feasign = strtof(endptr, &endptr);
J
jiaqi 已提交
726
            // if float feasign is equal to zero, ignore it
727 728
            // except when slot is dense
            if (fabs(feasign) < 1e-6 && !use_slots_is_dense_[i]) {
J
jiaqi 已提交
729 730 731 732 733
              continue;
            }
            FeatureKey f;
            f.float_feasign_ = feasign;
            instance->float_feasigns_.push_back(FeatureItem(f, idx));
734
          }
J
jiaqi 已提交
735
        } else if (all_slots_type_[i][0] == 'u') {  // uint64
736 737
          for (int j = 0; j < num; ++j) {
            uint64_t feasign = (uint64_t)strtoull(endptr, &endptr, 10);
J
jiaqi 已提交
738
            // if uint64 feasign is equal to zero, ignore it
739 740
            // except when slot is dense
            if (feasign == 0 && !use_slots_is_dense_[i]) {
J
jiaqi 已提交
741 742 743 744 745
              continue;
            }
            FeatureKey f;
            f.uint64_feasign_ = feasign;
            instance->uint64_feasigns_.push_back(FeatureItem(f, idx));
746 747 748 749 750 751 752 753 754 755 756 757
          }
        }
        pos = endptr - str;
      } else {
        for (int j = 0; j <= num; ++j) {
          // pos = line.find_first_of(' ', pos + 1);
          while (line[pos + 1] != ' ') {
            pos++;
          }
        }
      }
    }
J
jiaqi 已提交
758 759
    instance->float_feasigns_.shrink_to_fit();
    instance->uint64_feasigns_.shrink_to_fit();
760 761
    return true;
  }
X
xjqbest 已提交
762 763 764
#else
  return false;
#endif
765 766
}

J
jiaqi 已提交
767
bool MultiSlotInMemoryDataFeed::ParseOneInstance(Record* instance) {
X
xjqbest 已提交
768
#ifdef _LINUX
769 770
  std::string line;
  if (getline(file_, line)) {
771
    VLOG(3) << line;
772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787
    // parse line
    const char* str = line.c_str();
    char* endptr = const_cast<char*>(str);
    int pos = 0;
    for (size_t i = 0; i < use_slots_index_.size(); ++i) {
      int idx = use_slots_index_[i];
      int num = strtol(&str[pos], &endptr, 10);
      PADDLE_ENFORCE(
          num,
          "The number of ids can not be zero, you need padding "
          "it in data generator; or if there is something wrong with "
          "the data, please check if the data contains unresolvable "
          "characters.\nplease check this error line: %s",
          str);

      if (idx != -1) {
J
jiaqi 已提交
788
        if (all_slots_type_[i][0] == 'f') {  // float
789 790
          for (int j = 0; j < num; ++j) {
            float feasign = strtof(endptr, &endptr);
J
jiaqi 已提交
791 792 793 794 795 796
            if (fabs(feasign) < 1e-6) {
              continue;
            }
            FeatureKey f;
            f.float_feasign_ = feasign;
            instance->float_feasigns_.push_back(FeatureItem(f, idx));
797
          }
J
jiaqi 已提交
798
        } else if (all_slots_type_[i][0] == 'u') {  // uint64
799 800
          for (int j = 0; j < num; ++j) {
            uint64_t feasign = (uint64_t)strtoull(endptr, &endptr, 10);
J
jiaqi 已提交
801 802 803 804 805 806
            if (feasign == 0) {
              continue;
            }
            FeatureKey f;
            f.uint64_feasign_ = feasign;
            instance->uint64_feasigns_.push_back(FeatureItem(f, idx));
807 808 809 810 811 812 813 814 815
          }
        }
        pos = endptr - str;
      } else {
        for (int j = 0; j <= num; ++j) {
          pos = line.find_first_of(' ', pos + 1);
        }
      }
    }
J
jiaqi 已提交
816 817 818
    instance->float_feasigns_.shrink_to_fit();
    instance->uint64_feasigns_.shrink_to_fit();
    return true;
819 820 821
  } else {
    return false;
  }
X
xjqbest 已提交
822 823
#endif
  return false;
824 825
}

J
jiaqi 已提交
826 827
void MultiSlotInMemoryDataFeed::PutToFeedVec(
    const std::vector<Record>& ins_vec) {
X
xjqbest 已提交
828
#ifdef _LINUX
J
jiaqi 已提交
829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863
  std::vector<std::vector<float>> batch_float_feasigns(use_slots_.size(),
                                                       std::vector<float>());
  std::vector<std::vector<uint64_t>> batch_uint64_feasigns(
      use_slots_.size(), std::vector<uint64_t>());
  std::vector<std::vector<size_t>> offset(use_slots_.size(),
                                          std::vector<size_t>{0});
  std::vector<bool> visit(use_slots_.size(), false);
  for (size_t i = 0; i < ins_vec.size(); ++i) {
    auto& r = ins_vec[i];
    for (auto& item : r.float_feasigns_) {
      batch_float_feasigns[item.slot()].push_back(item.sign().float_feasign_);
      visit[item.slot()] = true;
    }
    for (auto& item : r.uint64_feasigns_) {
      batch_uint64_feasigns[item.slot()].push_back(item.sign().uint64_feasign_);
      visit[item.slot()] = true;
    }
    for (size_t j = 0; j < use_slots_.size(); ++j) {
      const auto& type = all_slots_type_[j];
      if (visit[j]) {
        visit[j] = false;
      } else {
        // fill slot value with default value 0
        if (type[0] == 'f') {  // float
          batch_float_feasigns[j].push_back(0.0);
        } else if (type[0] == 'u') {  // uint64
          batch_uint64_feasigns[j].push_back(0);
        }
      }
      // get offset of this ins in this slot
      if (type[0] == 'f') {  // float
        offset[j].push_back(batch_float_feasigns[j].size());
      } else if (type[0] == 'u') {  // uint64
        offset[j].push_back(batch_uint64_feasigns[j].size());
      }
864 865 866 867
    }
  }

  for (size_t i = 0; i < use_slots_.size(); ++i) {
868 869 870
    if (feed_vec_[i] == nullptr) {
      continue;
    }
J
jiaqi 已提交
871 872
    int total_instance = offset[i].back();
    const auto& type = all_slots_type_[i];
873
    if (type[0] == 'f') {  // float
J
jiaqi 已提交
874
      float* feasign = batch_float_feasigns[i].data();
875 876
      float* tensor_ptr = feed_vec_[i]->mutable_data<float>(
          {total_instance, 1}, platform::CPUPlace());
J
jiaqi 已提交
877
      memcpy(tensor_ptr, feasign, total_instance * sizeof(float));
878 879
    } else if (type[0] == 'u') {  // uint64
      // no uint64_t type in paddlepaddle
J
jiaqi 已提交
880
      uint64_t* feasign = batch_uint64_feasigns[i].data();
881 882
      int64_t* tensor_ptr = feed_vec_[i]->mutable_data<int64_t>(
          {total_instance, 1}, platform::CPUPlace());
J
jiaqi 已提交
883
      memcpy(tensor_ptr, feasign, total_instance * sizeof(int64_t));
884
    }
J
jiaqi 已提交
885 886
    auto& slot_offset = offset[i];
    LoD data_lod{slot_offset};
887 888
    feed_vec_[i]->set_lod(data_lod);
    if (use_slots_is_dense_[i]) {
889 890 891 892
      if (inductive_shape_index_[i] != -1) {
        use_slots_shape_[i][inductive_shape_index_[i]] =
            total_instance / total_dims_without_inductive_[i];
      }
893
      feed_vec_[i]->Resize(framework::make_ddim(use_slots_shape_[i]));
894 895
    }
  }
X
xjqbest 已提交
896
#endif
897 898
}

H
hutuxian 已提交
899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
template <typename T>
void PrivateInstantDataFeed<T>::PutToFeedVec() {
  for (size_t i = 0; i < use_slots_.size(); ++i) {
    const auto& type = ins_vec_[i].GetType();
    const auto& offset = ins_vec_[i].GetOffset();
    int total_instance = static_cast<int>(offset.back());

    if (type[0] == 'f') {  // float
      const auto& feasign = ins_vec_[i].GetFloatData();
      float* tensor_ptr = feed_vec_[i]->mutable_data<float>(
          {total_instance, 1}, platform::CPUPlace());
      memcpy(tensor_ptr, &feasign[0], total_instance * sizeof(float));
    } else if (type[0] == 'u') {  // uint64
      // no uint64_t type in paddlepaddle
      const auto& feasign = ins_vec_[i].GetUint64Data();
      int64_t* tensor_ptr = feed_vec_[i]->mutable_data<int64_t>(
          {total_instance, 1}, platform::CPUPlace());
      memcpy(tensor_ptr, &feasign[0], total_instance * sizeof(int64_t));
    }

    LoD data_lod{offset};
    feed_vec_[i]->set_lod(data_lod);
    if (use_slots_is_dense_[i]) {
      int64_t total_dims = 1;
      for (const auto e : use_slots_shape_[i]) {
        total_dims *= e;
      }
      PADDLE_ENFORCE(
          total_dims == total_instance,
          "The actual data size of slot[%s] doesn't match its declaration",
          use_slots_[i].c_str());
      feed_vec_[i]->Resize(framework::make_ddim(use_slots_shape_[i]));
    }
  }
}

template <typename T>
int PrivateInstantDataFeed<T>::Next() {
  if (ParseOneMiniBatch()) {
    PutToFeedVec();
    return ins_vec_[0].GetBatchSize();
  }
  Postprocess();

  std::string filename;
  if (!PickOneFile(&filename)) {
    return -1;
  }
  if (!Preprocess(filename)) {
    return -1;
  }

  PADDLE_ENFORCE(true == ParseOneMiniBatch(), "Fail to parse mini-batch data");
  PutToFeedVec();
  return ins_vec_[0].GetBatchSize();
}

template <typename T>
void PrivateInstantDataFeed<T>::Init(const DataFeedDesc& data_feed_desc) {
  finish_init_ = false;
  finish_set_filelist_ = false;
  finish_start_ = false;

  PADDLE_ENFORCE(data_feed_desc.has_multi_slot_desc(),
                 "Multi_slot_desc has not been set.");
  paddle::framework::MultiSlotDesc multi_slot_desc =
      data_feed_desc.multi_slot_desc();
  SetBatchSize(data_feed_desc.batch_size());
  size_t all_slot_num = multi_slot_desc.slots_size();
  all_slots_.resize(all_slot_num);
  all_slots_type_.resize(all_slot_num);
  use_slots_index_.resize(all_slot_num);
  multi_inductive_shape_index_.resize(all_slot_num);
  use_slots_.clear();
  use_slots_is_dense_.clear();
  for (size_t i = 0; i < all_slot_num; ++i) {
    const auto& slot = multi_slot_desc.slots(i);
    all_slots_[i] = slot.name();
    all_slots_type_[i] = slot.type();
    use_slots_index_[i] = slot.is_used() ? use_slots_.size() : -1;
    if (slot.is_used()) {
      use_slots_.push_back(all_slots_[i]);
      use_slots_is_dense_.push_back(slot.is_dense());
      std::vector<int> local_shape;
      if (slot.is_dense()) {
        for (size_t j = 0; j < slot.shape_size(); ++j) {
          if (slot.shape(j) == -1) {
            multi_inductive_shape_index_[i].push_back(j);
          }
        }
      }
      for (size_t j = 0; j < slot.shape_size(); ++j) {
        local_shape.push_back(slot.shape(j));
      }
      use_slots_shape_.push_back(local_shape);
    }
  }
  feed_vec_.resize(use_slots_.size());
  ins_vec_.resize(use_slots_.size());

  finish_init_ = true;
}

template class PrivateInstantDataFeed<std::vector<MultiSlotType>>;

bool MultiSlotFileInstantDataFeed::Preprocess(const std::string& filename) {
  fd_ = open(filename.c_str(), O_RDONLY);
  PADDLE_ENFORCE(fd_ != -1, "Fail to open file: %s", filename.c_str());

  struct stat sb;
  fstat(fd_, &sb);
  end_ = static_cast<size_t>(sb.st_size);

  buffer_ =
      reinterpret_cast<char*>(mmap(NULL, end_, PROT_READ, MAP_PRIVATE, fd_, 0));
  PADDLE_ENFORCE(buffer_ != MAP_FAILED, strerror(errno));

  offset_ = 0;
  return true;
}

bool MultiSlotFileInstantDataFeed::Postprocess() {
  if (buffer_ != nullptr) {
    munmap(buffer_, end_);
    buffer_ = nullptr;
  }
  if (fd_ != -1) {
    close(fd_);
    fd_ = -1;
    end_ = 0;
    offset_ = 0;
  }
  return true;
}

bool MultiSlotFileInstantDataFeed::ParseOneMiniBatch() {
  if (offset_ == end_) {
    return false;
  }

  batch_size_ = 0;
  while (batch_size_ < default_batch_size_ && offset_ < end_) {
    for (size_t i = 0; i < use_slots_index_.size(); ++i) {
      int idx = use_slots_index_[i];
      char type = all_slots_type_[i][0];

      uint16_t num = *reinterpret_cast<uint16_t*>(buffer_ + offset_);
      PADDLE_ENFORCE(
          num,
          "The number of ids can not be zero, you need padding "
          "it in data generator; or if there is something wrong with "
          "the data, please check if the data contains unresolvable "
          "characters.");
      offset_ += sizeof(uint16_t);

      if (idx != -1) {
        int inductive_size = multi_inductive_shape_index_[i].size();
        if (UNLIKELY(batch_size_ == 0)) {
          ins_vec_[idx].Init(all_slots_type_[i], default_batch_size_ * num);
          ins_vec_[idx].InitOffset(default_batch_size_);
          uint64_t* inductive_shape =
              reinterpret_cast<uint64_t*>(buffer_ + offset_);
          for (int inductive_id = 0; inductive_id < inductive_size;
               ++inductive_id) {
            use_slots_shape_[i][multi_inductive_shape_index_[i][inductive_id]] =
                static_cast<int>(*(inductive_shape + inductive_id));
          }
        }
        num -= inductive_size;
        offset_ += sizeof(uint64_t) * inductive_size;

        if (type == 'f') {
          ins_vec_[idx].AppendValues(
              reinterpret_cast<float*>(buffer_ + offset_), num);
          offset_ += num * sizeof(float);
        } else if (type == 'u') {
          ins_vec_[idx].AppendValues(
              reinterpret_cast<uint64_t*>(buffer_ + offset_), num);
          offset_ += num * sizeof(uint64_t);
        }
      } else {
        if (type == 'f') {
          offset_ += num * sizeof(float);
        } else if (type == 'u') {
          offset_ += num * sizeof(uint64_t);
        }
      }
    }
    ++batch_size_;
    // OPTIMIZE: It is better to insert check codes between instances for format
    // checking
  }

  PADDLE_ENFORCE(batch_size_ == default_batch_size_ || offset_ == end_,
                 "offset_ != end_");
  return true;
}
#endif

W
Wang Guibao 已提交
1099 1100
}  // namespace framework
}  // namespace paddle