chunk.cc 3.9 KB
Newer Older
D
dongzhihong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
//   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "paddle/fluid/recordio/chunk.h"

Y
Yu Yang 已提交
17
#include <memory>
D
dongzhihong 已提交
18
#include <sstream>
Y
Yu Yang 已提交
19 20 21
#include "paddle/fluid/platform/enforce.h"
#include "snappystream.hpp"
#include "zlib.h"
D
dongzhihong 已提交
22 23 24

namespace paddle {
namespace recordio {
Y
Yu Yang 已提交
25
constexpr size_t kMaxBufSize = 1024;
D
dongzhihong 已提交
26

Y
Yu Yang 已提交
27
template <typename Callback>
Y
Yu Yang 已提交
28
static void ReadStreamByBuf(std::istream& in, size_t limit, Callback callback) {
Y
Yu Yang 已提交
29 30 31
  char buf[kMaxBufSize];
  std::streamsize actual_size;
  size_t counter = 0;
Y
Yu Yang 已提交
32 33 34 35 36 37
  size_t actual_max;
  while (!in.eof() || (limit != 0 && counter >= limit)) {
    actual_max =
        limit != 0 ? std::min(limit - counter, kMaxBufSize) : kMaxBufSize;
    in.read(buf, actual_max);
    actual_size = in.gcount();
Y
Yu Yang 已提交
38 39 40 41
    if (actual_size == 0) {
      break;
    }
    callback(buf, actual_size);
Y
Yu Yang 已提交
42
    if (limit != 0) {
Y
Yu Yang 已提交
43 44
      counter += actual_size;
    }
Y
Yu Yang 已提交
45 46
  }
  in.clear();  // unset eof state
D
dongzhihong 已提交
47 48
}

Y
Yu Yang 已提交
49 50
static void PipeStream(std::istream& in, std::ostream& os) {
  ReadStreamByBuf(
Y
Yu Yang 已提交
51
      in, 0, [&os](const char* buf, size_t len) { os.write(buf, len); });
Y
Yu Yang 已提交
52
}
Y
Yu Yang 已提交
53 54
static uint32_t Crc32Stream(std::istream& in, size_t limit = 0) {
  uint32_t crc = static_cast<uint32_t>(crc32(0, nullptr, 0));
Y
Yu Yang 已提交
55
  ReadStreamByBuf(in, limit, [&crc](const char* buf, size_t len) {
Y
Yu Yang 已提交
56 57
    crc = static_cast<uint32_t>(crc32(
        crc, reinterpret_cast<const Bytef*>(buf), static_cast<uInt>(len)));
Y
Yu Yang 已提交
58 59 60 61 62
  });
  return crc;
}

bool Chunk::Write(std::ostream& os, Compressor ct) const {
D
dongzhihong 已提交
63 64
  // NOTE(dzhwinter): don't check records.numBytes instead, because
  // empty records are allowed.
Y
Yu Yang 已提交
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
  if (records_.empty()) {
    return false;
  }
  std::stringstream sout;
  std::unique_ptr<std::ostream> compressed_stream;
  switch (ct) {
    case Compressor::kNoCompress:
      break;
    case Compressor::kSnappy:
      compressed_stream.reset(new snappy::oSnappyStream(sout));
      break;
    default:
      PADDLE_THROW("Not implemented");
  }

  std::ostream& buf_stream = compressed_stream ? *compressed_stream : sout;
D
dongzhihong 已提交
81 82

  for (auto& record : records_) {
Y
Yu Yang 已提交
83 84 85
    size_t sz = record.size();
    buf_stream.write(reinterpret_cast<const char*>(&sz), sizeof(uint32_t))
        .write(record.data(), record.size());
D
dongzhihong 已提交
86 87
  }

Y
Yu Yang 已提交
88 89 90 91
  if (compressed_stream) {
    compressed_stream.reset();
  }

Y
Yu Yang 已提交
92
  uint32_t len = static_cast<uint32_t>(sout.str().size());
Y
Yu Yang 已提交
93 94 95
  uint32_t crc = Crc32Stream(sout);
  Header hdr(static_cast<uint32_t>(records_.size()), crc, ct, len);
  hdr.Write(os);
Y
Yu Yang 已提交
96 97
  sout.seekg(0, std::ios::beg);
  sout.clear();
Y
Yu Yang 已提交
98
  PipeStream(sout, os);
D
dongzhihong 已提交
99 100 101
  return true;
}

Y
Yu Yang 已提交
102
bool Chunk::Parse(std::istream& sin) {
D
dongzhihong 已提交
103
  Header hdr;
Y
Yu Yang 已提交
104 105 106 107
  bool ok = hdr.Parse(sin);
  if (!ok) {
    return ok;
  }
Y
Yu Yang 已提交
108
  auto beg_pos = sin.tellg();
Y
Yu Yang 已提交
109
  uint32_t crc = Crc32Stream(sin, hdr.CompressSize());
Y
Yu Yang 已提交
110 111
  PADDLE_ENFORCE_EQ(hdr.Checksum(), crc);
  Clear();
Y
Yu Yang 已提交
112
  sin.seekg(beg_pos, sin.beg);
Y
Yu Yang 已提交
113 114
  std::unique_ptr<std::istream> compressed_stream;
  switch (hdr.CompressType()) {
D
dongzhihong 已提交
115 116 117
    case Compressor::kNoCompress:
      break;
    case Compressor::kSnappy:
Y
Yu Yang 已提交
118
      compressed_stream.reset(new snappy::iSnappyStream(sin));
D
dongzhihong 已提交
119
      break;
Y
Yu Yang 已提交
120 121
    default:
      PADDLE_THROW("Not implemented");
D
dongzhihong 已提交
122 123
  }

Y
Yu Yang 已提交
124 125 126 127 128 129 130 131 132
  std::istream& stream = compressed_stream ? *compressed_stream : sin;

  for (uint32_t i = 0; i < hdr.NumRecords(); ++i) {
    uint32_t rec_len;
    stream.read(reinterpret_cast<char*>(&rec_len), sizeof(uint32_t));
    std::string buf;
    buf.resize(rec_len);
    stream.read(&buf[0], rec_len);
    Add(buf);
D
dongzhihong 已提交
133
  }
Y
Yu Yang 已提交
134
  return true;
D
dongzhihong 已提交
135 136 137 138
}

}  // namespace recordio
}  // namespace paddle