chunk.cc 3.8 KB
Newer Older
D
dongzhihong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
//   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "paddle/fluid/recordio/chunk.h"

Y
Yu Yang 已提交
17
#include <memory>
D
dongzhihong 已提交
18
#include <sstream>
Y
Yu Yang 已提交
19 20 21
#include "paddle/fluid/platform/enforce.h"
#include "snappystream.hpp"
#include "zlib.h"
D
dongzhihong 已提交
22 23 24

namespace paddle {
namespace recordio {
Y
Yu Yang 已提交
25
constexpr size_t kMaxBufSize = 1024;
D
dongzhihong 已提交
26

Y
Yu Yang 已提交
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
template <typename Callback>
static void ReadStreamByBuf(std::istream& in, int limit, Callback callback) {
  char buf[kMaxBufSize];
  std::streamsize actual_size;
  size_t counter = 0;
  do {
    auto actual_max =
        limit > 0 ? std::min(limit - counter, kMaxBufSize) : kMaxBufSize;
    actual_size = in.readsome(buf, actual_max);
    if (actual_size == 0) {
      break;
    }
    callback(buf, actual_size);
    if (limit > 0) {
      counter += actual_size;
    }
  } while (actual_size == kMaxBufSize);
D
dongzhihong 已提交
44 45
}

Y
Yu Yang 已提交
46 47 48 49 50 51 52 53 54 55 56 57 58
static void PipeStream(std::istream& in, std::ostream& os) {
  ReadStreamByBuf(
      in, -1, [&os](const char* buf, size_t len) { os.write(buf, len); });
}
static uint32_t Crc32Stream(std::istream& in, int limit = -1) {
  auto crc = crc32(0, nullptr, 0);
  ReadStreamByBuf(in, limit, [&crc](const char* buf, size_t len) {
    crc = crc32(crc, reinterpret_cast<const Bytef*>(buf), len);
  });
  return crc;
}

bool Chunk::Write(std::ostream& os, Compressor ct) const {
D
dongzhihong 已提交
59 60
  // NOTE(dzhwinter): don't check records.numBytes instead, because
  // empty records are allowed.
Y
Yu Yang 已提交
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
  if (records_.empty()) {
    return false;
  }
  std::stringstream sout;
  std::unique_ptr<std::ostream> compressed_stream;
  switch (ct) {
    case Compressor::kNoCompress:
      break;
    case Compressor::kSnappy:
      compressed_stream.reset(new snappy::oSnappyStream(sout));
      break;
    default:
      PADDLE_THROW("Not implemented");
  }

  std::ostream& buf_stream = compressed_stream ? *compressed_stream : sout;
D
dongzhihong 已提交
77 78

  for (auto& record : records_) {
Y
Yu Yang 已提交
79 80 81
    size_t sz = record.size();
    buf_stream.write(reinterpret_cast<const char*>(&sz), sizeof(uint32_t))
        .write(record.data(), record.size());
D
dongzhihong 已提交
82 83
  }

Y
Yu Yang 已提交
84 85 86 87 88 89 90 91 92 93 94 95 96
  if (compressed_stream) {
    compressed_stream.reset();
  }

  auto end_pos = sout.tellg();
  sout.seekg(0, std::ios::beg);
  uint32_t len = static_cast<uint32_t>(end_pos - sout.tellg());
  uint32_t crc = Crc32Stream(sout);
  sout.seekg(0, std::ios::beg);

  Header hdr(static_cast<uint32_t>(records_.size()), crc, ct, len);
  hdr.Write(os);
  PipeStream(sout, os);
D
dongzhihong 已提交
97 98 99
  return true;
}

Y
Yu Yang 已提交
100
void Chunk::Parse(std::istream& sin) {
D
dongzhihong 已提交
101
  Header hdr;
Y
Yu Yang 已提交
102 103 104 105
  hdr.Parse(sin);
  auto beg_pos = sin.tellg();
  auto crc = Crc32Stream(sin, hdr.CompressSize());
  PADDLE_ENFORCE_EQ(hdr.Checksum(), crc);
D
dongzhihong 已提交
106

Y
Yu Yang 已提交
107 108 109 110 111
  Clear();

  sin.seekg(beg_pos, std::ios::beg);
  std::unique_ptr<std::istream> compressed_stream;
  switch (hdr.CompressType()) {
D
dongzhihong 已提交
112 113 114
    case Compressor::kNoCompress:
      break;
    case Compressor::kSnappy:
Y
Yu Yang 已提交
115
      compressed_stream.reset(new snappy::iSnappyStream(sin));
D
dongzhihong 已提交
116
      break;
Y
Yu Yang 已提交
117 118
    default:
      PADDLE_THROW("Not implemented");
D
dongzhihong 已提交
119 120
  }

Y
Yu Yang 已提交
121 122 123 124 125 126 127 128 129
  std::istream& stream = compressed_stream ? *compressed_stream : sin;

  for (uint32_t i = 0; i < hdr.NumRecords(); ++i) {
    uint32_t rec_len;
    stream.read(reinterpret_cast<char*>(&rec_len), sizeof(uint32_t));
    std::string buf;
    buf.resize(rec_len);
    stream.read(&buf[0], rec_len);
    Add(buf);
D
dongzhihong 已提交
130 131 132 133 134
  }
}

}  // namespace recordio
}  // namespace paddle