ctr_reader.cc 8.5 KB
Newer Older
Q
Qiao Longfei 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "paddle/fluid/operators/reader/ctr_reader.h"

Q
Qiao Longfei 已提交
17 18
#include <gzstream.h>

Q
Qiao Longfei 已提交
19 20 21 22 23 24 25 26 27 28
#include <cstdlib>
#include <fstream>
#include <iostream>
#include <sstream>
#include <string>
#include <unordered_map>

#include <algorithm>
#include <random>

Q
Qiao Longfei 已提交
29 30
namespace paddle {
namespace operators {
Q
Qiao Longfei 已提交
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
namespace reader {

static inline void string_split(const std::string& s, const char delimiter,
                                std::vector<std::string>* output) {
  size_t start = 0;
  size_t end = s.find_first_of(delimiter);

  while (end <= std::string::npos) {
    output->emplace_back(s.substr(start, end - start));
    if (end == std::string::npos) {
      break;
    }
    start = end + 1;
    end = s.find_first_of(delimiter, start);
  }
}

static inline void parse_line(
Q
Qiao Longfei 已提交
49 50
    const std::string& line,
    const std::unordered_map<std::string, size_t>& slot_to_index,
Q
Qiao Longfei 已提交
51
    int64_t* label,
Q
Qiao Longfei 已提交
52
    std::unordered_map<std::string, std::vector<int64_t>>* slot_to_data) {
Q
Qiao Longfei 已提交
53 54 55
  std::vector<std::string> ret;
  string_split(line, ' ', &ret);
  *label = std::stoi(ret[2]) > 0;
Q
Qiao Longfei 已提交
56

Q
Qiao Longfei 已提交
57 58
  for (size_t i = 3; i < ret.size(); ++i) {
    const std::string& item = ret[i];
Q
Qiao Longfei 已提交
59 60 61
    std::vector<std::string> feasign_and_slot;
    string_split(item, ':', &feasign_and_slot);
    if (feasign_and_slot.size() == 2 &&
Q
Qiao Longfei 已提交
62
        slot_to_index.find(feasign_and_slot[1]) != slot_to_index.end()) {
Q
Qiao Longfei 已提交
63 64
      int64_t feasign = std::strtoll(feasign_and_slot[0].c_str(), NULL, 10);
      (*slot_to_data)[feasign_and_slot[1]].push_back(feasign);
Q
Qiao Longfei 已提交
65 66
    }
  }
Q
Qiao Longfei 已提交
67 68

  // NOTE:: if the slot has no value, then fill [0] as it's data.
Q
Qiao Longfei 已提交
69 70 71
  for (auto& item : slot_to_index) {
    if (slot_to_data->find(item.first) == slot_to_data->end()) {
      (*slot_to_data)[item.first].push_back(0);
Q
Qiao Longfei 已提交
72 73
    }
  }
Q
Qiao Longfei 已提交
74 75
}

Q
Qiao Longfei 已提交
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
// label slot1:fea_sign slot2:fea_sign slot1:fea_sign
static inline void parse_svm_line(const std::string& line) {}

// label,dense_fea,dense_fea,sparse_fea,sparse_fea
static inline void parse_csv_line(const std::string& line,
                                  const std::vector<std::string>& dense_slots,
                                  const std::vector<std::string>& sparse_slots,
                                  int64_t* label,
                                  std::vector<float>* dense_datas,
                                  std::vector<int64_t>* sparse_datas) {
  std::vector<std::string> ret;
  string_split(line, ',', &ret);
  *label = std::stoi(ret[2]) > 0;
}

Q
Qiao Longfei 已提交
91 92 93 94 95 96 97 98
class Reader {
 public:
  virtual ~Reader() {}
  virtual bool HasNext() = 0;
  virtual void NextLine(std::string* line) = 0;
};

class GzipReader : public Reader {
Q
Qiao Longfei 已提交
99
 public:
Q
Qiao Longfei 已提交
100 101
  explicit GzipReader(const std::string& file_name)
      : gzstream_(file_name.c_str()) {}
Q
Qiao Longfei 已提交
102

Q
Qiao Longfei 已提交
103
  ~GzipReader() {}
Q
Qiao Longfei 已提交
104

Q
Qiao Longfei 已提交
105
  bool HasNext() override { return gzstream_.peek() != EOF; }
Q
Qiao Longfei 已提交
106

Q
Qiao Longfei 已提交
107
  void NextLine(std::string* line) override { std::getline(gzstream_, *line); }
Q
Qiao Longfei 已提交
108 109

 private:
Q
Qiao Longfei 已提交
110
  igzstream gzstream_;
Q
Qiao Longfei 已提交
111 112
};

Q
Qiao Longfei 已提交
113
class PlainFileReader : public Reader {
Q
Qiao Longfei 已提交
114
 public:
Q
Qiao Longfei 已提交
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
  explicit PlainFileReader(const std::string& file_name)
      : myfile_(file_name.c_str()) {}

  ~PlainFileReader() {}

  bool HasNext() override { return myfile_.peek() != EOF; }

  void NextLine(std::string* line) override { std::getline(myfile_, *line); }

 private:
  std::ifstream myfile_;
};

template <typename SingleFileReader>
class MultiFileReader : public Reader {
 public:
  explicit MultiFileReader(const std::vector<std::string>& file_list) {
Q
Qiao Longfei 已提交
132
    for (auto& file : file_list) {
Q
Qiao Longfei 已提交
133
      readers_.emplace_back(std::make_shared<SingleFileReader>(file));
Q
Qiao Longfei 已提交
134 135 136
    }
  }

Q
Qiao Longfei 已提交
137
  bool HasNext() override {
Q
Qiao Longfei 已提交
138 139 140 141 142 143 144 145 146 147
    if (current_reader_index_ >= readers_.size()) {
      return false;
    }
    if (!readers_[current_reader_index_]->HasNext()) {
      current_reader_index_++;
      return HasNext();
    }
    return true;
  }

Q
Qiao Longfei 已提交
148
  void NextLine(std::string* line) override {
Q
Qiao Longfei 已提交
149
    readers_[current_reader_index_]->NextLine(line);
Q
Qiao Longfei 已提交
150 151 152
  }

 private:
Q
Qiao Longfei 已提交
153
  std::vector<std::shared_ptr<SingleFileReader>> readers_;
Q
Qiao Longfei 已提交
154 155 156
  size_t current_reader_index_ = 0;
};

Q
Qiao Longfei 已提交
157 158
void MonitorThread(std::vector<ReaderThreadStatus>* thread_status,
                   std::shared_ptr<LoDTensorBlockingQueue> queue) {
Q
Qiao Longfei 已提交
159
  VLOG(3) << "monitor thread in";
Q
Qiao Longfei 已提交
160 161
  bool reader_thread_is_running = true;
  while (reader_thread_is_running) {
Q
Qiao Longfei 已提交
162
    VLOG(3) << "reader_thread_is_running";
Q
Qiao Longfei 已提交
163 164 165
    reader_thread_is_running = false;
    for (size_t i = 0; i < (*thread_status).size(); ++i) {
      if ((*thread_status)[i] == Running) {
Q
Qiao Longfei 已提交
166
        VLOG(3) << "reader is running!";
Q
Qiao Longfei 已提交
167 168 169 170 171
        reader_thread_is_running = true;
      }
    }
    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
  }
Q
Qiao Longfei 已提交
172
  VLOG(3) << "all reader thread is stopped, push empty data into queue";
Q
Qiao Longfei 已提交
173
  queue->Push({});
Q
Qiao Longfei 已提交
174
  VLOG(3) << "monitor thread exited";
Q
Qiao Longfei 已提交
175 176
}

Q
Qiao Longfei 已提交
177
void ReadThread(const std::vector<std::string>& file_list,
Q
Qiao Longfei 已提交
178 179
                const DataDesc& data_desc, int thread_id,
                std::vector<ReaderThreadStatus>* thread_status,
Q
Qiao Longfei 已提交
180
                std::shared_ptr<LoDTensorBlockingQueue> queue) {
Q
Qiao Longfei 已提交
181 182
  VLOG(3) << "[" << thread_id << "]"
          << " reader thread start! thread_id = " << thread_id;
Q
Qiao Longfei 已提交
183
  for (auto& file : file_list) {
Q
Qiao Longfei 已提交
184 185
    VLOG(3) << "[" << thread_id << "]"
            << " file " << file;
Q
Qiao Longfei 已提交
186
  }
Q
Qiao Longfei 已提交
187
  (*thread_status)[thread_id] = Running;
Q
Qiao Longfei 已提交
188
  VLOG(3) << "set status to running";
Q
Qiao Longfei 已提交
189 190

  std::unordered_map<std::string, size_t> slot_to_index;
Q
Qiao Longfei 已提交
191 192
  for (size_t i = 0; i < data_desc.sparse_slot_ids_.size(); ++i) {
    slot_to_index[data_desc.sparse_slot_ids_[i]] = i;
Q
Qiao Longfei 已提交
193
  }
Q
Qiao Longfei 已提交
194

Q
Qiao Longfei 已提交
195
  std::string line;
Q
Qiao Longfei 已提交
196 197 198

  std::vector<std::unordered_map<std::string, std::vector<int64_t>>> batch_data;
  std::vector<int64_t> batch_label;
Q
Qiao Longfei 已提交
199

Q
Qiao Longfei 已提交
200
  std::unique_ptr<Reader> reader;
Q
Qiao Longfei 已提交
201
  if (data_desc.file_type_ == "gzip") {
Q
Qiao Longfei 已提交
202
    reader.reset(new MultiFileReader<GzipReader>(file_list));
Q
Qiao Longfei 已提交
203
  } else if (data_desc.file_type_ == "plain") {
Q
Qiao Longfei 已提交
204 205
    reader.reset(new MultiFileReader<PlainFileReader>(file_list));
  } else {
Q
Qiao Longfei 已提交
206
    PADDLE_THROW("do not support file format %s", data_desc.file_type_);
Q
Qiao Longfei 已提交
207
  }
Q
Qiao Longfei 已提交
208

Q
Qiao Longfei 已提交
209
  VLOG(3) << "reader inited";
Q
Qiao Longfei 已提交
210

Q
Qiao Longfei 已提交
211
  while (reader->HasNext()) {
Q
Qiao Longfei 已提交
212
    batch_data.clear();
Q
Qiao Longfei 已提交
213
    batch_data.reserve(data_desc.batch_size_);
Q
Qiao Longfei 已提交
214

Q
Qiao Longfei 已提交
215
    batch_label.clear();
Q
Qiao Longfei 已提交
216
    batch_label.reserve(data_desc.batch_size_);
Q
Qiao Longfei 已提交
217 218

    // read batch_size data
Q
Qiao Longfei 已提交
219
    for (int i = 0; i < data_desc.batch_size_; ++i) {
Q
Qiao Longfei 已提交
220 221
      if (reader->HasNext()) {
        reader->NextLine(&line);
Q
Qiao Longfei 已提交
222
        std::unordered_map<std::string, std::vector<int64_t>> slot_to_data;
Q
Qiao Longfei 已提交
223
        int64_t label;
Q
Qiao Longfei 已提交
224 225
        parse_line(line, slot_to_index, &label, &slot_to_data);
        batch_data.push_back(slot_to_data);
Q
Qiao Longfei 已提交
226 227 228 229
        batch_label.push_back(label);
      } else {
        break;
      }
Q
Qiao Longfei 已提交
230
    }
Q
Qiao Longfei 已提交
231

Q
Qiao Longfei 已提交
232 233
    std::vector<framework::LoDTensor> lod_datas;

Q
Qiao Longfei 已提交
234
    // first insert tensor for each sparse_slots
Q
Qiao Longfei 已提交
235
    for (auto& slot : data_desc.sparse_slot_ids_) {
Q
Qiao Longfei 已提交
236 237 238
      std::vector<size_t> lod_data{0};
      std::vector<int64_t> batch_feasign;

Q
Qiao Longfei 已提交
239 240 241
      for (size_t i = 0; i < batch_data.size(); ++i) {
        auto& feasign = batch_data[i][slot];
        lod_data.push_back(lod_data.back() + feasign.size());
Q
Qiao Longfei 已提交
242 243
        batch_feasign.insert(batch_feasign.end(), feasign.begin(),
                             feasign.end());
Q
Qiao Longfei 已提交
244
      }
Q
Qiao Longfei 已提交
245 246 247 248 249 250 251

      framework::LoDTensor lod_tensor;
      framework::LoD lod{lod_data};
      lod_tensor.set_lod(lod);
      int64_t* tensor_data = lod_tensor.mutable_data<int64_t>(
          framework::make_ddim({1, static_cast<int64_t>(batch_feasign.size())}),
          platform::CPUPlace());
Q
Qiao Longfei 已提交
252 253
      memcpy(tensor_data, batch_feasign.data(),
             batch_feasign.size() * sizeof(int64_t));
Q
Qiao Longfei 已提交
254 255
      lod_datas.push_back(lod_tensor);
    }
Q
Qiao Longfei 已提交
256 257 258

    // insert label tensor
    framework::LoDTensor label_tensor;
Q
Qiao Longfei 已提交
259
    auto* label_tensor_data = label_tensor.mutable_data<int64_t>(
Q
Qiao Longfei 已提交
260 261
        framework::make_ddim({1, static_cast<int64_t>(batch_label.size())}),
        platform::CPUPlace());
Q
Qiao Longfei 已提交
262 263
    memcpy(label_tensor_data, batch_label.data(),
           batch_label.size() * sizeof(int64_t));
Q
Qiao Longfei 已提交
264 265
    lod_datas.push_back(label_tensor);

Q
Qiao Longfei 已提交
266
    queue->Push(lod_datas);
Q
Qiao Longfei 已提交
267
    VLOG(4) << "push one data, queue_size=" << queue->Size();
Q
Qiao Longfei 已提交
268
  }
Q
Qiao Longfei 已提交
269 270

  (*thread_status)[thread_id] = Stopped;
Q
Qiao Longfei 已提交
271
  VLOG(3) << "set status to stopped, thread " << thread_id << " exited";
Q
Qiao Longfei 已提交
272 273 274
}

}  // namespace reader
Q
Qiao Longfei 已提交
275 276
}  // namespace operators
}  // namespace paddle