ctr_reader.cc 5.6 KB
Newer Older
Q
Qiao Longfei 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "paddle/fluid/operators/reader/ctr_reader.h"

Q
Qiao Longfei 已提交
17 18
#include <gzstream.h>

Q
Qiao Longfei 已提交
19 20 21 22 23 24 25 26 27 28
#include <cstdlib>
#include <fstream>
#include <iostream>
#include <sstream>
#include <string>
#include <unordered_map>

#include <algorithm>
#include <random>

Q
Qiao Longfei 已提交
29 30
namespace paddle {
namespace operators {
Q
Qiao Longfei 已提交
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
namespace reader {

static inline void string_split(const std::string& s, const char delimiter,
                                std::vector<std::string>* output) {
  size_t start = 0;
  size_t end = s.find_first_of(delimiter);

  while (end <= std::string::npos) {
    output->emplace_back(s.substr(start, end - start));
    if (end == std::string::npos) {
      break;
    }
    start = end + 1;
    end = s.find_first_of(delimiter, start);
  }
}

static inline void parse_line(
    const std::string& line, const std::vector<std::string>& slots,
    int64_t* label,
    std::unordered_map<std::string, std::vector<int64_t>>* slots_to_data) {
  std::vector<std::string> ret;
  string_split(line, ' ', &ret);
  *label = std::stoi(ret[2]) > 0;
Q
Qiao Longfei 已提交
55

Q
Qiao Longfei 已提交
56 57 58 59 60 61 62 63 64 65
  for (size_t i = 3; i < ret.size(); ++i) {
    const std::string& item = ret[i];
    std::vector<std::string> slot_and_feasign;
    string_split(item, ':', &slot_and_feasign);
    if (slot_and_feasign.size() == 2) {
      const std::string& slot = slot_and_feasign[1];
      int64_t feasign = std::strtoll(slot_and_feasign[0].c_str(), NULL, 10);
      (*slots_to_data)[slot_and_feasign[1]].push_back(feasign);
    }
  }
Q
Qiao Longfei 已提交
66 67 68 69 70 71 72

  // NOTE:: if the slot has no value, then fill [0] as it's data.
  for (auto& slot : slots) {
    if (slots_to_data->find(slot) == slots_to_data->end()) {
      (*slots_to_data)[slot].push_back(0);
    }
  }
Q
Qiao Longfei 已提交
73 74
}

Q
Qiao Longfei 已提交
75 76 77 78 79 80 81 82
class Reader {
 public:
  virtual ~Reader() {}
  virtual bool HasNext() = 0;
  virtual void NextLine(std::string* line) = 0;
};

class GzipReader : public Reader {
Q
Qiao Longfei 已提交
83
 public:
Q
Qiao Longfei 已提交
84 85
  explicit GzipReader(const std::string& file_name)
      : gzstream_(file_name.c_str()) {}
Q
Qiao Longfei 已提交
86

Q
Qiao Longfei 已提交
87
  ~GzipReader() {}
Q
Qiao Longfei 已提交
88

Q
Qiao Longfei 已提交
89
  bool HasNext() override { return gzstream_.peek() != EOF; }
Q
Qiao Longfei 已提交
90

Q
Qiao Longfei 已提交
91
  void NextLine(std::string* line) override { std::getline(gzstream_, *line); }
Q
Qiao Longfei 已提交
92 93

 private:
Q
Qiao Longfei 已提交
94
  igzstream gzstream_;
Q
Qiao Longfei 已提交
95 96
};

Q
Qiao Longfei 已提交
97
class MultiGzipReader : public Reader {
Q
Qiao Longfei 已提交
98 99 100 101 102 103 104
 public:
  explicit MultiGzipReader(const std::vector<std::string>& file_list) {
    for (auto& file : file_list) {
      readers_.emplace_back(std::make_shared<GzipReader>(file));
    }
  }

Q
Qiao Longfei 已提交
105
  bool HasNext() override {
Q
Qiao Longfei 已提交
106 107 108 109 110 111 112 113 114 115
    if (current_reader_index_ >= readers_.size()) {
      return false;
    }
    if (!readers_[current_reader_index_]->HasNext()) {
      current_reader_index_++;
      return HasNext();
    }
    return true;
  }

Q
Qiao Longfei 已提交
116
  void NextLine(std::string* line) override {
Q
Qiao Longfei 已提交
117
    readers_[current_reader_index_]->NextLine(line);
Q
Qiao Longfei 已提交
118 119 120 121 122 123 124
  }

 private:
  std::vector<std::shared_ptr<GzipReader>> readers_;
  size_t current_reader_index_ = 0;
};

Q
Qiao Longfei 已提交
125 126
void ReadThread(const std::vector<std::string>& file_list,
                const std::vector<std::string>& slots, int batch_size,
Q
Qiao Longfei 已提交
127
                int thread_id, std::vector<ReaderThreadStatus>* thread_status,
Q
Qiao Longfei 已提交
128
                std::shared_ptr<LoDTensorBlockingQueue> queue) {
Q
Qiao Longfei 已提交
129 130
  (*thread_status)[thread_id] = Running;

Q
Qiao Longfei 已提交
131
  std::string line;
Q
Qiao Longfei 已提交
132 133 134

  std::vector<std::unordered_map<std::string, std::vector<int64_t>>> batch_data;
  std::vector<int64_t> batch_label;
Q
Qiao Longfei 已提交
135 136

  MultiGzipReader reader(file_list);
Q
Qiao Longfei 已提交
137 138 139 140 141 142 143 144 145 146 147 148 149 150

  while (reader.HasNext()) {
    // read all files
    for (int i = 0; i < batch_size; ++i) {
      if (reader.HasNext()) {
        reader.NextLine(&line);
        std::unordered_map<std::string, std::vector<int64_t>> slots_to_data;
        int64_t label;
        parse_line(line, slots, &label, &slots_to_data);
        batch_data.push_back(slots_to_data);
        batch_label.push_back(label);
      } else {
        break;
      }
Q
Qiao Longfei 已提交
151
    }
Q
Qiao Longfei 已提交
152

Q
Qiao Longfei 已提交
153 154 155 156
    std::vector<framework::LoDTensor> lod_datas;

    // first insert tensor for each slots
    for (auto& slot : slots) {
Q
Qiao Longfei 已提交
157 158 159
      std::vector<size_t> lod_data{0};
      std::vector<int64_t> batch_feasign;

Q
Qiao Longfei 已提交
160 161 162 163 164 165
      for (size_t i = 0; i < batch_data.size(); ++i) {
        auto& feasign = batch_data[i][slot];

        lod_data.push_back(lod_data.back() + feasign.size());
        batch_feasign.insert(feasign.end(), feasign.begin(), feasign.end());
      }
Q
Qiao Longfei 已提交
166 167 168 169 170 171 172 173 174 175

      framework::LoDTensor lod_tensor;
      framework::LoD lod{lod_data};
      lod_tensor.set_lod(lod);
      int64_t* tensor_data = lod_tensor.mutable_data<int64_t>(
          framework::make_ddim({1, static_cast<int64_t>(batch_feasign.size())}),
          platform::CPUPlace());
      memcpy(tensor_data, batch_feasign.data(), batch_feasign.size());
      lod_datas.push_back(lod_tensor);
    }
Q
Qiao Longfei 已提交
176 177 178 179 180 181 182 183 184 185

    // insert label tensor
    framework::LoDTensor label_tensor;
    int64_t* label_tensor_data = label_tensor.mutable_data<int64_t>(
        framework::make_ddim({1, static_cast<int64_t>(batch_label.size())}),
        platform::CPUPlace());
    memcpy(label_tensor_data, batch_label.data(), batch_label.size());
    lod_datas.push_back(label_tensor);

    queue->Push(lod_datas);
Q
Qiao Longfei 已提交
186
  }
Q
Qiao Longfei 已提交
187 188

  (*thread_status)[thread_id] = Stopped;
Q
Qiao Longfei 已提交
189 190 191
}

}  // namespace reader
Q
Qiao Longfei 已提交
192 193
}  // namespace operators
}  // namespace paddle