chunk.cc 4.4 KB
Newer Older
D
dongzhihong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
//   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "paddle/fluid/recordio/chunk.h"

Y
Yu Yang 已提交
17
#include <memory>
D
dongzhihong 已提交
18
#include <sstream>
Y
Yu Yang 已提交
19 20 21
#include "paddle/fluid/platform/enforce.h"
#include "snappystream.hpp"
#include "zlib.h"
D
dongzhihong 已提交
22 23 24

namespace paddle {
namespace recordio {
Y
Yu Yang 已提交
25
constexpr size_t kMaxBufSize = 1024;
D
dongzhihong 已提交
26

Y
Yu Yang 已提交
27 28 29 30 31 32 33
/**
 * Read Stream by a fixed sized buffer.
 * @param in input stream
 * @param limit read at most `limit` bytes from input stream. 0 means no limit
 * @param callback A function object with (const char* buf, size_t size) -> void
 * as its type.
 */
Y
Yu Yang 已提交
34
template <typename Callback>
Y
Yu Yang 已提交
35
static void ReadStreamByBuf(std::istream& in, size_t limit, Callback callback) {
Y
Yu Yang 已提交
36 37 38
  char buf[kMaxBufSize];
  std::streamsize actual_size;
  size_t counter = 0;
Y
Yu Yang 已提交
39
  size_t actual_max;
Y
Yu Yang 已提交
40 41
  while (!in.eof() ||
         (limit != 0 && counter >= limit)) {  // End of file or reach limit
Y
Yu Yang 已提交
42 43 44 45
    actual_max =
        limit != 0 ? std::min(limit - counter, kMaxBufSize) : kMaxBufSize;
    in.read(buf, actual_max);
    actual_size = in.gcount();
Y
Yu Yang 已提交
46 47 48 49
    if (actual_size == 0) {
      break;
    }
    callback(buf, actual_size);
Y
Yu Yang 已提交
50
    if (limit != 0) {
Y
Yu Yang 已提交
51 52
      counter += actual_size;
    }
Y
Yu Yang 已提交
53 54
  }
  in.clear();  // unset eof state
D
dongzhihong 已提交
55 56
}

Y
Yu Yang 已提交
57 58 59
/**
 * Copy stream in to another stream
 */
Y
Yu Yang 已提交
60 61
static void PipeStream(std::istream& in, std::ostream& os) {
  ReadStreamByBuf(
Y
Yu Yang 已提交
62
      in, 0, [&os](const char* buf, size_t len) { os.write(buf, len); });
Y
Yu Yang 已提交
63
}
Y
Yu Yang 已提交
64 65 66 67

/**
 * Calculate CRC32 from an input stream.
 */
Y
Yu Yang 已提交
68 69
static uint32_t Crc32Stream(std::istream& in, size_t limit = 0) {
  uint32_t crc = static_cast<uint32_t>(crc32(0, nullptr, 0));
Y
Yu Yang 已提交
70
  ReadStreamByBuf(in, limit, [&crc](const char* buf, size_t len) {
Y
Yu Yang 已提交
71 72
    crc = static_cast<uint32_t>(crc32(
        crc, reinterpret_cast<const Bytef*>(buf), static_cast<uInt>(len)));
Y
Yu Yang 已提交
73 74 75 76 77
  });
  return crc;
}

bool Chunk::Write(std::ostream& os, Compressor ct) const {
D
dongzhihong 已提交
78 79
  // NOTE(dzhwinter): don't check records.numBytes instead, because
  // empty records are allowed.
Y
Yu Yang 已提交
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
  if (records_.empty()) {
    return false;
  }
  std::stringstream sout;
  std::unique_ptr<std::ostream> compressed_stream;
  switch (ct) {
    case Compressor::kNoCompress:
      break;
    case Compressor::kSnappy:
      compressed_stream.reset(new snappy::oSnappyStream(sout));
      break;
    default:
      PADDLE_THROW("Not implemented");
  }

  std::ostream& buf_stream = compressed_stream ? *compressed_stream : sout;
D
dongzhihong 已提交
96 97

  for (auto& record : records_) {
Y
Yu Yang 已提交
98 99 100
    size_t sz = record.size();
    buf_stream.write(reinterpret_cast<const char*>(&sz), sizeof(uint32_t))
        .write(record.data(), record.size());
D
dongzhihong 已提交
101 102
  }

Y
Yu Yang 已提交
103 104 105 106
  if (compressed_stream) {
    compressed_stream.reset();
  }

Y
Yu Yang 已提交
107 108 109
  sout.seekg(0, std::ios::end);
  uint32_t len = static_cast<uint32_t>(sout.tellg());
  sout.seekg(0, std::ios::beg);
Y
Yu Yang 已提交
110 111 112
  uint32_t crc = Crc32Stream(sout);
  Header hdr(static_cast<uint32_t>(records_.size()), crc, ct, len);
  hdr.Write(os);
Y
Yu Yang 已提交
113 114
  sout.seekg(0, std::ios::beg);
  sout.clear();
Y
Yu Yang 已提交
115
  PipeStream(sout, os);
D
dongzhihong 已提交
116 117 118
  return true;
}

Y
Yu Yang 已提交
119
bool Chunk::Parse(std::istream& sin) {
D
dongzhihong 已提交
120
  Header hdr;
Y
Yu Yang 已提交
121 122 123 124
  bool ok = hdr.Parse(sin);
  if (!ok) {
    return ok;
  }
Y
Yu Yang 已提交
125
  auto beg_pos = sin.tellg();
Y
Yu Yang 已提交
126
  uint32_t crc = Crc32Stream(sin, hdr.CompressSize());
Y
Yu Yang 已提交
127 128
  PADDLE_ENFORCE_EQ(hdr.Checksum(), crc);
  Clear();
Y
Yu Yang 已提交
129
  sin.seekg(beg_pos, sin.beg);
Y
Yu Yang 已提交
130 131
  std::unique_ptr<std::istream> compressed_stream;
  switch (hdr.CompressType()) {
D
dongzhihong 已提交
132 133 134
    case Compressor::kNoCompress:
      break;
    case Compressor::kSnappy:
Y
Yu Yang 已提交
135
      compressed_stream.reset(new snappy::iSnappyStream(sin));
D
dongzhihong 已提交
136
      break;
Y
Yu Yang 已提交
137 138
    default:
      PADDLE_THROW("Not implemented");
D
dongzhihong 已提交
139 140
  }

Y
Yu Yang 已提交
141 142 143 144 145 146 147 148
  std::istream& stream = compressed_stream ? *compressed_stream : sin;

  for (uint32_t i = 0; i < hdr.NumRecords(); ++i) {
    uint32_t rec_len;
    stream.read(reinterpret_cast<char*>(&rec_len), sizeof(uint32_t));
    std::string buf;
    buf.resize(rec_len);
    stream.read(&buf[0], rec_len);
Y
Yu Yang 已提交
149
    PADDLE_ENFORCE_EQ(rec_len, stream.gcount());
Y
Yu Yang 已提交
150
    Add(buf);
D
dongzhihong 已提交
151
  }
Y
Yu Yang 已提交
152
  return true;
D
dongzhihong 已提交
153 154 155 156
}

}  // namespace recordio
}  // namespace paddle