“2514d70ea77bd770068adc12f99b7eb7f1fcdaf8”上不存在“paddle/fluid/memory/allocation/legacy_allocator.cc”
ctr_reader.cc 8.8 KB
Newer Older
Q
Qiao Longfei 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "paddle/fluid/operators/reader/ctr_reader.h"

Q
Qiao Longfei 已提交
17 18
#include <gzstream.h>

Q
Qiao Longfei 已提交
19 20 21 22 23 24 25 26 27 28
#include <cstdlib>
#include <fstream>
#include <iostream>
#include <sstream>
#include <string>
#include <unordered_map>

#include <algorithm>
#include <random>

Q
Qiao Longfei 已提交
29 30
namespace paddle {
namespace operators {
Q
Qiao Longfei 已提交
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
namespace reader {

static inline void string_split(const std::string& s, const char delimiter,
                                std::vector<std::string>* output) {
  size_t start = 0;
  size_t end = s.find_first_of(delimiter);

  while (end <= std::string::npos) {
    output->emplace_back(s.substr(start, end - start));
    if (end == std::string::npos) {
      break;
    }
    start = end + 1;
    end = s.find_first_of(delimiter, start);
  }
}

static inline void parse_line(
Q
Qiao Longfei 已提交
49 50
    const std::string& line,
    const std::unordered_map<std::string, size_t>& slot_to_index,
Q
Qiao Longfei 已提交
51
    int64_t* label,
Q
Qiao Longfei 已提交
52
    std::unordered_map<std::string, std::vector<int64_t>>* slot_to_data) {
Q
Qiao Longfei 已提交
53 54 55
  std::vector<std::string> ret;
  string_split(line, ' ', &ret);
  *label = std::stoi(ret[2]) > 0;
Q
Qiao Longfei 已提交
56

Q
Qiao Longfei 已提交
57 58
  for (size_t i = 3; i < ret.size(); ++i) {
    const std::string& item = ret[i];
Q
Qiao Longfei 已提交
59 60 61
    std::vector<std::string> feasign_and_slot;
    string_split(item, ':', &feasign_and_slot);
    if (feasign_and_slot.size() == 2 &&
Q
Qiao Longfei 已提交
62
        slot_to_index.find(feasign_and_slot[1]) != slot_to_index.end()) {
Q
Qiao Longfei 已提交
63 64
      int64_t feasign = std::strtoll(feasign_and_slot[0].c_str(), NULL, 10);
      (*slot_to_data)[feasign_and_slot[1]].push_back(feasign);
Q
Qiao Longfei 已提交
65 66
    }
  }
Q
Qiao Longfei 已提交
67 68

  // NOTE:: if the slot has no value, then fill [0] as it's data.
Q
Qiao Longfei 已提交
69 70 71
  for (auto& item : slot_to_index) {
    if (slot_to_data->find(item.first) == slot_to_data->end()) {
      (*slot_to_data)[item.first].push_back(0);
Q
Qiao Longfei 已提交
72 73
    }
  }
Q
Qiao Longfei 已提交
74 75
}

Q
Qiao Longfei 已提交
76 77 78 79 80
// label slot1:fea_sign slot2:fea_sign slot1:fea_sign
static inline void parse_svm_line(const std::string& line) {}

// label,dense_fea,dense_fea,sparse_fea,sparse_fea
static inline void parse_csv_line(const std::string& line,
Q
Qiao Longfei 已提交
81
                                  const DataDesc& data_desc, int64_t* label,
Q
Qiao Longfei 已提交
82 83 84 85
                                  std::vector<float>* dense_datas,
                                  std::vector<int64_t>* sparse_datas) {
  std::vector<std::string> ret;
  string_split(line, ',', &ret);
Q
Qiao Longfei 已提交
86 87 88 89 90 91 92
  *label = std::stol(ret[2]) > 0;
  for (auto& idx : data_desc.dense_slot_index_) {
    dense_datas->push_back(std::stof(ret[idx]));
  }
  for (auto& idx : data_desc.sparse_slot_index_) {
    sparse_datas->push_back(std::stol(ret[idx]));
  }
Q
Qiao Longfei 已提交
93 94
}

Q
Qiao Longfei 已提交
95 96 97 98 99 100 101 102
class Reader {
 public:
  virtual ~Reader() {}
  virtual bool HasNext() = 0;
  virtual void NextLine(std::string* line) = 0;
};

class GzipReader : public Reader {
Q
Qiao Longfei 已提交
103
 public:
Q
Qiao Longfei 已提交
104 105
  explicit GzipReader(const std::string& file_name)
      : gzstream_(file_name.c_str()) {}
Q
Qiao Longfei 已提交
106

Q
Qiao Longfei 已提交
107
  ~GzipReader() {}
Q
Qiao Longfei 已提交
108

Q
Qiao Longfei 已提交
109
  bool HasNext() override { return gzstream_.peek() != EOF; }
Q
Qiao Longfei 已提交
110

Q
Qiao Longfei 已提交
111
  void NextLine(std::string* line) override { std::getline(gzstream_, *line); }
Q
Qiao Longfei 已提交
112 113

 private:
Q
Qiao Longfei 已提交
114
  igzstream gzstream_;
Q
Qiao Longfei 已提交
115 116
};

Q
Qiao Longfei 已提交
117
class PlainFileReader : public Reader {
Q
Qiao Longfei 已提交
118
 public:
Q
Qiao Longfei 已提交
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
  explicit PlainFileReader(const std::string& file_name)
      : myfile_(file_name.c_str()) {}

  ~PlainFileReader() {}

  bool HasNext() override { return myfile_.peek() != EOF; }

  void NextLine(std::string* line) override { std::getline(myfile_, *line); }

 private:
  std::ifstream myfile_;
};

template <typename SingleFileReader>
class MultiFileReader : public Reader {
 public:
  explicit MultiFileReader(const std::vector<std::string>& file_list) {
Q
Qiao Longfei 已提交
136
    for (auto& file : file_list) {
Q
Qiao Longfei 已提交
137
      readers_.emplace_back(std::make_shared<SingleFileReader>(file));
Q
Qiao Longfei 已提交
138 139 140
    }
  }

Q
Qiao Longfei 已提交
141
  bool HasNext() override {
Q
Qiao Longfei 已提交
142 143 144 145 146 147 148 149 150 151
    if (current_reader_index_ >= readers_.size()) {
      return false;
    }
    if (!readers_[current_reader_index_]->HasNext()) {
      current_reader_index_++;
      return HasNext();
    }
    return true;
  }

Q
Qiao Longfei 已提交
152
  void NextLine(std::string* line) override {
Q
Qiao Longfei 已提交
153
    readers_[current_reader_index_]->NextLine(line);
Q
Qiao Longfei 已提交
154 155 156
  }

 private:
Q
Qiao Longfei 已提交
157
  std::vector<std::shared_ptr<SingleFileReader>> readers_;
Q
Qiao Longfei 已提交
158 159 160
  size_t current_reader_index_ = 0;
};

Q
Qiao Longfei 已提交
161 162
void MonitorThread(std::vector<ReaderThreadStatus>* thread_status,
                   std::shared_ptr<LoDTensorBlockingQueue> queue) {
Q
Qiao Longfei 已提交
163
  VLOG(3) << "monitor thread in";
Q
Qiao Longfei 已提交
164 165
  bool reader_thread_is_running = true;
  while (reader_thread_is_running) {
Q
Qiao Longfei 已提交
166
    VLOG(3) << "reader_thread_is_running";
Q
Qiao Longfei 已提交
167 168 169
    reader_thread_is_running = false;
    for (size_t i = 0; i < (*thread_status).size(); ++i) {
      if ((*thread_status)[i] == Running) {
Q
Qiao Longfei 已提交
170
        VLOG(3) << "reader is running!";
Q
Qiao Longfei 已提交
171 172 173 174 175
        reader_thread_is_running = true;
      }
    }
    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
  }
Q
Qiao Longfei 已提交
176
  VLOG(3) << "all reader thread is stopped, push empty data into queue";
Q
Qiao Longfei 已提交
177
  queue->Push({});
Q
Qiao Longfei 已提交
178
  VLOG(3) << "monitor thread exited";
Q
Qiao Longfei 已提交
179 180
}

Q
Qiao Longfei 已提交
181 182
void ReadSvmData(const DataDesc& data_desc, std::shared_ptr<Reader> reader,
                 std::shared_ptr<LoDTensorBlockingQueue> queue) {
Q
Qiao Longfei 已提交
183
  std::unordered_map<std::string, size_t> slot_to_index;
Q
Qiao Longfei 已提交
184 185
  for (size_t i = 0; i < data_desc.sparse_slot_ids_.size(); ++i) {
    slot_to_index[data_desc.sparse_slot_ids_[i]] = i;
Q
Qiao Longfei 已提交
186
  }
Q
Qiao Longfei 已提交
187

Q
Qiao Longfei 已提交
188
  std::string line;
Q
Qiao Longfei 已提交
189 190 191

  std::vector<std::unordered_map<std::string, std::vector<int64_t>>> batch_data;
  std::vector<int64_t> batch_label;
Q
Qiao Longfei 已提交
192

Q
Qiao Longfei 已提交
193
  while (reader->HasNext()) {
Q
Qiao Longfei 已提交
194
    batch_data.clear();
Q
Qiao Longfei 已提交
195
    batch_data.reserve(data_desc.batch_size_);
Q
Qiao Longfei 已提交
196

Q
Qiao Longfei 已提交
197
    batch_label.clear();
Q
Qiao Longfei 已提交
198
    batch_label.reserve(data_desc.batch_size_);
Q
Qiao Longfei 已提交
199 200

    // read batch_size data
Q
Qiao Longfei 已提交
201
    for (int i = 0; i < data_desc.batch_size_; ++i) {
Q
Qiao Longfei 已提交
202 203
      if (reader->HasNext()) {
        reader->NextLine(&line);
Q
Qiao Longfei 已提交
204
        std::unordered_map<std::string, std::vector<int64_t>> slot_to_data;
Q
Qiao Longfei 已提交
205
        int64_t label;
Q
Qiao Longfei 已提交
206 207
        parse_line(line, slot_to_index, &label, &slot_to_data);
        batch_data.push_back(slot_to_data);
Q
Qiao Longfei 已提交
208 209 210 211
        batch_label.push_back(label);
      } else {
        break;
      }
Q
Qiao Longfei 已提交
212
    }
Q
Qiao Longfei 已提交
213

Q
Qiao Longfei 已提交
214 215
    std::vector<framework::LoDTensor> lod_datas;

Q
Qiao Longfei 已提交
216
    // first insert tensor for each sparse_slots
Q
Qiao Longfei 已提交
217
    for (auto& slot : data_desc.sparse_slot_ids_) {
Q
Qiao Longfei 已提交
218 219 220
      std::vector<size_t> lod_data{0};
      std::vector<int64_t> batch_feasign;

Q
Qiao Longfei 已提交
221 222 223
      for (size_t i = 0; i < batch_data.size(); ++i) {
        auto& feasign = batch_data[i][slot];
        lod_data.push_back(lod_data.back() + feasign.size());
Q
Qiao Longfei 已提交
224 225
        batch_feasign.insert(batch_feasign.end(), feasign.begin(),
                             feasign.end());
Q
Qiao Longfei 已提交
226
      }
Q
Qiao Longfei 已提交
227 228 229 230 231 232 233

      framework::LoDTensor lod_tensor;
      framework::LoD lod{lod_data};
      lod_tensor.set_lod(lod);
      int64_t* tensor_data = lod_tensor.mutable_data<int64_t>(
          framework::make_ddim({1, static_cast<int64_t>(batch_feasign.size())}),
          platform::CPUPlace());
Q
Qiao Longfei 已提交
234 235
      memcpy(tensor_data, batch_feasign.data(),
             batch_feasign.size() * sizeof(int64_t));
Q
Qiao Longfei 已提交
236 237
      lod_datas.push_back(lod_tensor);
    }
Q
Qiao Longfei 已提交
238 239 240

    // insert label tensor
    framework::LoDTensor label_tensor;
Q
Qiao Longfei 已提交
241
    auto* label_tensor_data = label_tensor.mutable_data<int64_t>(
Q
Qiao Longfei 已提交
242 243
        framework::make_ddim({1, static_cast<int64_t>(batch_label.size())}),
        platform::CPUPlace());
Q
Qiao Longfei 已提交
244 245
    memcpy(label_tensor_data, batch_label.data(),
           batch_label.size() * sizeof(int64_t));
Q
Qiao Longfei 已提交
246 247
    lod_datas.push_back(label_tensor);

Q
Qiao Longfei 已提交
248
    queue->Push(lod_datas);
Q
Qiao Longfei 已提交
249
    VLOG(4) << "push one data, queue_size=" << queue->Size();
Q
Qiao Longfei 已提交
250
  }
Q
Qiao Longfei 已提交
251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279
}

void ReadThread(const std::vector<std::string>& file_list,
                const DataDesc& data_desc, int thread_id,
                std::vector<ReaderThreadStatus>* thread_status,
                std::shared_ptr<LoDTensorBlockingQueue> queue) {
  VLOG(3) << "[" << thread_id << "]"
          << " reader thread start! thread_id = " << thread_id;
  for (auto& file : file_list) {
    VLOG(3) << "[" << thread_id << "]"
            << " file " << file;
  }
  (*thread_status)[thread_id] = Running;
  VLOG(3) << "set status to running";

  std::shared_ptr<Reader> reader;
  if (data_desc.file_type_ == "gzip") {
    reader.reset(new MultiFileReader<GzipReader>(file_list));
  } else if (data_desc.file_type_ == "plain") {
    reader.reset(new MultiFileReader<PlainFileReader>(file_list));
  } else {
    PADDLE_THROW("do not support file format %s", data_desc.file_type_);
  }

  VLOG(3) << "reader inited";

  if (data_desc.file_format_ == "svm") {
    ReadSvmData(data_desc, reader, queue);
  }
Q
Qiao Longfei 已提交
280 281

  (*thread_status)[thread_id] = Stopped;
Q
Qiao Longfei 已提交
282
  VLOG(3) << "set status to stopped, thread " << thread_id << " exited";
Q
Qiao Longfei 已提交
283 284 285
}

}  // namespace reader
Q
Qiao Longfei 已提交
286 287
}  // namespace operators
}  // namespace paddle