ctr_reader.cc 13.1 KB
Newer Older
Q
Qiao Longfei 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "paddle/fluid/operators/reader/ctr_reader.h"

Q
Qiao Longfei 已提交
17 18
#include <gzstream.h>

Q
Qiao Longfei 已提交
19 20 21 22 23 24 25 26 27 28
#include <cstdlib>
#include <fstream>
#include <iostream>
#include <sstream>
#include <string>
#include <unordered_map>

#include <algorithm>
#include <random>

Q
Qiao Longfei 已提交
29 30
namespace paddle {
namespace operators {
Q
Qiao Longfei 已提交
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
namespace reader {

static inline void string_split(const std::string& s, const char delimiter,
                                std::vector<std::string>* output) {
  size_t start = 0;
  size_t end = s.find_first_of(delimiter);

  while (end <= std::string::npos) {
    output->emplace_back(s.substr(start, end - start));
    if (end == std::string::npos) {
      break;
    }
    start = end + 1;
    end = s.find_first_of(delimiter, start);
  }
}

static inline void parse_line(
Q
Qiao Longfei 已提交
49 50
    const std::string& line,
    const std::unordered_map<std::string, size_t>& slot_to_index,
Q
Qiao Longfei 已提交
51
    int64_t* label,
Q
Qiao Longfei 已提交
52
    std::unordered_map<std::string, std::vector<int64_t>>* slot_to_data) {
Q
Qiao Longfei 已提交
53 54 55
  std::vector<std::string> ret;
  string_split(line, ' ', &ret);
  *label = std::stoi(ret[2]) > 0;
Q
Qiao Longfei 已提交
56

Q
Qiao Longfei 已提交
57 58
  for (size_t i = 3; i < ret.size(); ++i) {
    const std::string& item = ret[i];
Q
Qiao Longfei 已提交
59 60 61
    std::vector<std::string> feasign_and_slot;
    string_split(item, ':', &feasign_and_slot);
    if (feasign_and_slot.size() == 2 &&
Q
Qiao Longfei 已提交
62
        slot_to_index.find(feasign_and_slot[1]) != slot_to_index.end()) {
Q
Qiao Longfei 已提交
63 64
      int64_t feasign = std::strtoll(feasign_and_slot[0].c_str(), NULL, 10);
      (*slot_to_data)[feasign_and_slot[1]].push_back(feasign);
Q
Qiao Longfei 已提交
65 66
    }
  }
Q
Qiao Longfei 已提交
67 68

  // NOTE:: if the slot has no value, then fill [0] as it's data.
Q
Qiao Longfei 已提交
69 70 71
  for (auto& item : slot_to_index) {
    if (slot_to_data->find(item.first) == slot_to_data->end()) {
      (*slot_to_data)[item.first].push_back(0);
Q
Qiao Longfei 已提交
72 73
    }
  }
Q
Qiao Longfei 已提交
74 75
}

Q
Qiao Longfei 已提交
76 77 78
// label slot1:fea_sign slot2:fea_sign slot1:fea_sign
static inline void parse_svm_line(const std::string& line) {}

Q
Qiao Longfei 已提交
79 80 81 82 83 84 85 86
class Reader {
 public:
  virtual ~Reader() {}
  virtual bool HasNext() = 0;
  virtual void NextLine(std::string* line) = 0;
};

class GzipReader : public Reader {
Q
Qiao Longfei 已提交
87
 public:
Q
Qiao Longfei 已提交
88 89
  explicit GzipReader(const std::string& file_name)
      : gzstream_(file_name.c_str()) {}
Q
Qiao Longfei 已提交
90

Q
Qiao Longfei 已提交
91
  ~GzipReader() {}
Q
Qiao Longfei 已提交
92

Q
Qiao Longfei 已提交
93
  bool HasNext() override { return gzstream_.peek() != EOF; }
Q
Qiao Longfei 已提交
94

Q
Qiao Longfei 已提交
95
  void NextLine(std::string* line) override { std::getline(gzstream_, *line); }
Q
Qiao Longfei 已提交
96 97

 private:
Q
Qiao Longfei 已提交
98
  igzstream gzstream_;
Q
Qiao Longfei 已提交
99 100
};

Q
Qiao Longfei 已提交
101
class PlainFileReader : public Reader {
Q
Qiao Longfei 已提交
102
 public:
Q
Qiao Longfei 已提交
103
  explicit PlainFileReader(const std::string& file_name)
Q
Qiao Longfei 已提交
104
      : stream_(file_name.c_str()) {}
Q
Qiao Longfei 已提交
105 106 107

  ~PlainFileReader() {}

Q
Qiao Longfei 已提交
108
  bool HasNext() override { return stream_.peek() != EOF; }
Q
Qiao Longfei 已提交
109

Q
Qiao Longfei 已提交
110
  void NextLine(std::string* line) override { std::getline(stream_, *line); }
Q
Qiao Longfei 已提交
111 112

 private:
Q
Qiao Longfei 已提交
113
  std::ifstream stream_;
Q
Qiao Longfei 已提交
114 115 116 117 118 119
};

template <typename SingleFileReader>
class MultiFileReader : public Reader {
 public:
  explicit MultiFileReader(const std::vector<std::string>& file_list) {
Q
Qiao Longfei 已提交
120
    for (auto& file : file_list) {
Q
Qiao Longfei 已提交
121
      readers_.emplace_back(std::make_shared<SingleFileReader>(file));
Q
Qiao Longfei 已提交
122 123 124
    }
  }

Q
Qiao Longfei 已提交
125
  bool HasNext() override {
Q
Qiao Longfei 已提交
126 127 128 129 130 131 132 133 134 135
    if (current_reader_index_ >= readers_.size()) {
      return false;
    }
    if (!readers_[current_reader_index_]->HasNext()) {
      current_reader_index_++;
      return HasNext();
    }
    return true;
  }

Q
Qiao Longfei 已提交
136
  void NextLine(std::string* line) override {
Q
Qiao Longfei 已提交
137
    readers_[current_reader_index_]->NextLine(line);
Q
Qiao Longfei 已提交
138 139 140
  }

 private:
Q
Qiao Longfei 已提交
141
  std::vector<std::shared_ptr<SingleFileReader>> readers_;
Q
Qiao Longfei 已提交
142 143 144
  size_t current_reader_index_ = 0;
};

Q
Qiao Longfei 已提交
145 146
void MonitorThread(std::vector<ReaderThreadStatus>* thread_status,
                   std::shared_ptr<LoDTensorBlockingQueue> queue) {
Q
Qiao Longfei 已提交
147
  VLOG(3) << "monitor thread in";
Q
Qiao Longfei 已提交
148 149
  bool reader_thread_is_running = true;
  while (reader_thread_is_running) {
Q
Qiao Longfei 已提交
150
    VLOG(3) << "reader_thread_is_running";
Q
Qiao Longfei 已提交
151 152 153
    reader_thread_is_running = false;
    for (size_t i = 0; i < (*thread_status).size(); ++i) {
      if ((*thread_status)[i] == Running) {
Q
Qiao Longfei 已提交
154
        VLOG(3) << "reader is running!";
Q
Qiao Longfei 已提交
155 156 157 158 159
        reader_thread_is_running = true;
      }
    }
    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
  }
Q
Qiao Longfei 已提交
160 161
  VLOG(3) << "all reader thread is stopped, close the queue";
  queue->Close();
Q
Qiao Longfei 已提交
162
  VLOG(3) << "monitor thread exited";
Q
Qiao Longfei 已提交
163 164
}

Q
Qiao Longfei 已提交
165 166
void ReadSvmData(const DataDesc& data_desc, std::shared_ptr<Reader> reader,
                 std::shared_ptr<LoDTensorBlockingQueue> queue) {
Q
Qiao Longfei 已提交
167
  std::unordered_map<std::string, size_t> slot_to_index;
Q
Qiao Longfei 已提交
168 169
  for (size_t i = 0; i < data_desc.sparse_slot_ids_.size(); ++i) {
    slot_to_index[data_desc.sparse_slot_ids_[i]] = i;
Q
Qiao Longfei 已提交
170
  }
Q
Qiao Longfei 已提交
171

Q
Qiao Longfei 已提交
172
  std::string line;
Q
Qiao Longfei 已提交
173 174 175

  std::vector<std::unordered_map<std::string, std::vector<int64_t>>> batch_data;
  std::vector<int64_t> batch_label;
Q
Qiao Longfei 已提交
176

Q
Qiao Longfei 已提交
177
  while (reader->HasNext()) {
Q
Qiao Longfei 已提交
178
    batch_data.clear();
Q
Qiao Longfei 已提交
179
    batch_data.reserve(data_desc.batch_size_);
Q
Qiao Longfei 已提交
180

Q
Qiao Longfei 已提交
181
    batch_label.clear();
Q
Qiao Longfei 已提交
182
    batch_label.reserve(data_desc.batch_size_);
Q
Qiao Longfei 已提交
183 184

    // read batch_size data
Q
Qiao Longfei 已提交
185
    for (int i = 0; i < data_desc.batch_size_; ++i) {
Q
Qiao Longfei 已提交
186 187
      if (reader->HasNext()) {
        reader->NextLine(&line);
Q
Qiao Longfei 已提交
188
        std::unordered_map<std::string, std::vector<int64_t>> slot_to_data;
Q
Qiao Longfei 已提交
189
        int64_t label;
Q
Qiao Longfei 已提交
190 191
        parse_line(line, slot_to_index, &label, &slot_to_data);
        batch_data.push_back(slot_to_data);
Q
Qiao Longfei 已提交
192 193 194 195
        batch_label.push_back(label);
      } else {
        break;
      }
Q
Qiao Longfei 已提交
196
    }
Q
Qiao Longfei 已提交
197

Q
Qiao Longfei 已提交
198 199
    std::vector<framework::LoDTensor> lod_datas;

Q
Qiao Longfei 已提交
200
    // first insert tensor for each sparse_slots
Q
Qiao Longfei 已提交
201
    for (auto& slot : data_desc.sparse_slot_ids_) {
Q
Qiao Longfei 已提交
202 203 204
      std::vector<size_t> lod_data{0};
      std::vector<int64_t> batch_feasign;

Q
Qiao Longfei 已提交
205 206 207
      for (size_t i = 0; i < batch_data.size(); ++i) {
        auto& feasign = batch_data[i][slot];
        lod_data.push_back(lod_data.back() + feasign.size());
Q
Qiao Longfei 已提交
208 209
        batch_feasign.insert(batch_feasign.end(), feasign.begin(),
                             feasign.end());
Q
Qiao Longfei 已提交
210
      }
Q
Qiao Longfei 已提交
211 212 213 214 215 216 217

      framework::LoDTensor lod_tensor;
      framework::LoD lod{lod_data};
      lod_tensor.set_lod(lod);
      int64_t* tensor_data = lod_tensor.mutable_data<int64_t>(
          framework::make_ddim({1, static_cast<int64_t>(batch_feasign.size())}),
          platform::CPUPlace());
Q
Qiao Longfei 已提交
218 219
      memcpy(tensor_data, batch_feasign.data(),
             batch_feasign.size() * sizeof(int64_t));
Q
Qiao Longfei 已提交
220 221
      lod_datas.push_back(lod_tensor);
    }
Q
Qiao Longfei 已提交
222 223 224

    // insert label tensor
    framework::LoDTensor label_tensor;
Q
Qiao Longfei 已提交
225
    auto* label_tensor_data = label_tensor.mutable_data<int64_t>(
Q
Qiao Longfei 已提交
226 227
        framework::make_ddim({1, static_cast<int64_t>(batch_label.size())}),
        platform::CPUPlace());
Q
Qiao Longfei 已提交
228 229
    memcpy(label_tensor_data, batch_label.data(),
           batch_label.size() * sizeof(int64_t));
Q
Qiao Longfei 已提交
230 231
    lod_datas.push_back(label_tensor);

Q
Qiao Longfei 已提交
232
    queue->Push(lod_datas);
Q
Qiao Longfei 已提交
233
    VLOG(4) << "push one data, queue_size=" << queue->Size();
Q
Qiao Longfei 已提交
234
  }
Q
Qiao Longfei 已提交
235 236
}

Q
Qiao Longfei 已提交
237 238 239 240 241 242 243 244 245 246 247 248 249
// label dense_fea,dense_fea sparse_fea,sparse_fea
static inline void parse_csv_line(
    const std::string& line, const DataDesc& data_desc, int64_t* label,
    std::vector<std::vector<float>>* dense_datas,
    std::vector<std::vector<int64_t>>* sparse_datas) {
  std::vector<std::string> ret;
  string_split(line, ' ', &ret);
  *label = std::stol(ret[0]);
  dense_datas->resize(data_desc.dense_slot_index_.size());
  for (size_t i = 0; i < data_desc.dense_slot_index_.size(); ++i) {
    int slot_idx = data_desc.dense_slot_index_[i];
    auto& slot_data = ret[slot_idx];
    std::vector<std::string> data_in_slot_str;
Q
Qiao Longfei 已提交
250
    string_split(slot_data, ',', &data_in_slot_str);
Q
Qiao Longfei 已提交
251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362
    std::vector<float> data_in_slot;
    for (auto& data_str : data_in_slot_str) {
      (*dense_datas)[i].push_back(std::stof(data_str));
    }
  }
  sparse_datas->resize(data_desc.sparse_slot_index_.size());
  for (size_t i = 0; i < data_desc.sparse_slot_index_.size(); ++i) {
    int slot_idx = data_desc.sparse_slot_index_[i];
    auto& slot_data = ret[slot_idx];
    std::vector<std::string> data_in_slot_str;
    string_split(ret[slot_idx], ',', &data_in_slot_str);
    std::vector<int64_t> data_in_slot;
    for (auto& data_str : data_in_slot_str) {
      (*sparse_datas)[i].push_back(std::stol(data_str));
    }
  }
}

void ReadCsvData(const DataDesc& data_desc, std::shared_ptr<Reader> reader,
                 std::shared_ptr<LoDTensorBlockingQueue> queue) {
  std::string line;
  while (reader->HasNext()) {
    std::vector<int64_t> batch_label;
    batch_label.reserve(data_desc.batch_size_);

    std::vector<std::vector<std::vector<float>>> batch_dense_data;
    batch_dense_data.reserve(data_desc.batch_size_);

    std::vector<std::vector<std::vector<int64_t>>> batch_sparse_data;
    batch_sparse_data.reserve(data_desc.batch_size_);

    // read batch_size data
    for (int i = 0; i < data_desc.batch_size_; ++i) {
      if (reader->HasNext()) {
        reader->NextLine(&line);
        int64_t label;
        std::vector<std::vector<float>> dense_datas;
        std::vector<std::vector<int64_t>> sparse_datas;
        parse_csv_line(line, data_desc, &label, &dense_datas, &sparse_datas);
        batch_label.push_back(label);
        if (!batch_dense_data.empty()) {
          PADDLE_ENFORCE_EQ(batch_dense_data[0].size(), dense_datas.size(),
                            "dense data should have the same shape");
        }
        batch_dense_data.push_back(dense_datas);
        batch_sparse_data.push_back(sparse_datas);
      } else {
        break;
      }
    }

    // the order of output data is label, dense_datas, sparse_datas
    std::vector<framework::LoDTensor> lod_datas;

    // insert label tensor
    framework::LoDTensor label_tensor;
    auto* label_tensor_data = label_tensor.mutable_data<int64_t>(
        framework::make_ddim({static_cast<int64_t>(batch_label.size()), 1}),
        platform::CPUPlace());
    memcpy(label_tensor_data, batch_label.data(),
           batch_label.size() * sizeof(int64_t));
    auto dim =
        framework::make_ddim({static_cast<int64_t>(batch_label.size()), 1});
    lod_datas.push_back(label_tensor);

    // insert tensor for each dense_slots
    for (size_t i = 0; i < data_desc.dense_slot_index_.size(); ++i) {
      framework::LoDTensor lod_tensor;
      size_t width = batch_dense_data[0][i].size();
      auto* tensor_data = lod_tensor.mutable_data<float>(
          framework::make_ddim(
              {static_cast<int64_t>(batch_dense_data.size()),  // batch_size
               static_cast<int64_t>(width)}),
          platform::CPUPlace());

      for (size_t j = 0; j < batch_dense_data.size(); ++j) {
        auto& dense_data_row = batch_dense_data[j][i];
        memcpy(tensor_data + j * width, dense_data_row.data(),
               width * sizeof(float));
      }

      lod_datas.push_back(lod_tensor);
    }

    // insert tensor for each sparse_slots
    for (size_t i = 0; i < data_desc.sparse_slot_index_.size(); ++i) {
      std::vector<size_t> lod_data{0};
      std::vector<int64_t> batch_feasign;

      for (size_t row_idx = 0; row_idx < batch_sparse_data.size(); ++row_idx) {
        auto& sparse_ids = batch_sparse_data[row_idx][i];
        lod_data.push_back(lod_data.back() + sparse_ids.size());
        batch_feasign.insert(batch_feasign.end(), sparse_ids.begin(),
                             sparse_ids.end());
      }

      framework::LoDTensor lod_tensor;
      framework::LoD lod{lod_data};
      lod_tensor.set_lod(lod);
      int64_t* tensor_data = lod_tensor.mutable_data<int64_t>(
          framework::make_ddim({static_cast<int64_t>(batch_feasign.size()), 1}),
          platform::CPUPlace());
      memcpy(tensor_data, batch_feasign.data(),
             batch_feasign.size() * sizeof(int64_t));
      lod_datas.push_back(lod_tensor);
    }

    queue->Push(lod_datas);
    VLOG(4) << "push one data, queue_size=" << queue->Size();
  }
}

Q
Qiao Longfei 已提交
363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388
void ReadThread(const std::vector<std::string>& file_list,
                const DataDesc& data_desc, int thread_id,
                std::vector<ReaderThreadStatus>* thread_status,
                std::shared_ptr<LoDTensorBlockingQueue> queue) {
  VLOG(3) << "[" << thread_id << "]"
          << " reader thread start! thread_id = " << thread_id;
  for (auto& file : file_list) {
    VLOG(3) << "[" << thread_id << "]"
            << " file " << file;
  }
  (*thread_status)[thread_id] = Running;
  VLOG(3) << "set status to running";

  std::shared_ptr<Reader> reader;
  if (data_desc.file_type_ == "gzip") {
    reader.reset(new MultiFileReader<GzipReader>(file_list));
  } else if (data_desc.file_type_ == "plain") {
    reader.reset(new MultiFileReader<PlainFileReader>(file_list));
  } else {
    PADDLE_THROW("do not support file format %s", data_desc.file_type_);
  }

  VLOG(3) << "reader inited";

  if (data_desc.file_format_ == "svm") {
    ReadSvmData(data_desc, reader, queue);
Q
Qiao Longfei 已提交
389 390
  } else if (data_desc.file_format_ == "csv") {
    ReadCsvData(data_desc, reader, queue);
Q
Qiao Longfei 已提交
391
  }
Q
Qiao Longfei 已提交
392 393

  (*thread_status)[thread_id] = Stopped;
Q
Qiao Longfei 已提交
394
  VLOG(3) << "set status to stopped, thread " << thread_id << " exited";
Q
Qiao Longfei 已提交
395 396 397
}

}  // namespace reader
Q
Qiao Longfei 已提交
398 399
}  // namespace operators
}  // namespace paddle