sst_file_writer.cc 6.3 KB
Newer Older
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.

#include "rocksdb/sst_file_writer.h"

#include <vector>
#include "db/dbformat.h"
#include "rocksdb/table.h"
#include "table/block_based_table_builder.h"
#include "util/file_reader_writer.h"
#include "util/string_util.h"

namespace rocksdb {

const std::string ExternalSstFilePropertyNames::kVersion =
    "rocksdb.external_sst_file.version";

// PropertiesCollector used to add properties specific to tables
// generated by SstFileWriter
class SstFileWriter::SstFileWriterPropertiesCollector
    : public IntTblPropCollector {
 public:
  explicit SstFileWriterPropertiesCollector(int32_t version)
      : version_(version) {}

  virtual Status InternalAdd(const Slice& key, const Slice& value,
                             uint64_t file_size) override {
    // Intentionally left blank. Have no interest in collecting stats for
    // individual key/value pairs.
    return Status::OK();
  }

  virtual Status Finish(UserCollectedProperties* properties) override {
    std::string version_val;
    PutFixed32(&version_val, static_cast<int32_t>(version_));
    properties->insert({ExternalSstFilePropertyNames::kVersion, version_val});
    return Status::OK();
  }

  virtual const char* Name() const override {
    return "SstFileWriterPropertiesCollector";
  }

  virtual UserCollectedProperties GetReadableProperties() const override {
    return {{ExternalSstFilePropertyNames::kVersion, ToString(version_)}};
  }

 private:
  int32_t version_;
};

class SstFileWriter::SstFileWriterPropertiesCollectorFactory
    : public IntTblPropCollectorFactory {
 public:
  explicit SstFileWriterPropertiesCollectorFactory(int32_t version)
      : version_(version) {}

60 61
  virtual IntTblPropCollector* CreateIntTblPropCollector(
      uint32_t column_family_id) override {
62 63 64 65 66 67 68 69 70 71 72 73
    return new SstFileWriterPropertiesCollector(version_);
  }

  virtual const char* Name() const override {
    return "SstFileWriterPropertiesCollector";
  }

 private:
  int32_t version_;
};

struct SstFileWriter::Rep {
A
Aaron Gao 已提交
74
  Rep(const EnvOptions& _env_options, const Options& options,
75 76
      const Comparator* _user_comparator)
      : env_options(_env_options),
A
Aaron Gao 已提交
77 78
        ioptions(options),
        mutable_cf_options(options, ioptions),
79 80 81 82 83 84
        internal_comparator(_user_comparator) {}

  std::unique_ptr<WritableFileWriter> file_writer;
  std::unique_ptr<TableBuilder> builder;
  EnvOptions env_options;
  ImmutableCFOptions ioptions;
A
Aaron Gao 已提交
85
  MutableCFOptions mutable_cf_options;
86 87
  InternalKeyComparator internal_comparator;
  ExternalSstFileInfo file_info;
88
  std::string column_family_name;
89 90 91
};

SstFileWriter::SstFileWriter(const EnvOptions& env_options,
A
Aaron Gao 已提交
92
                             const Options& options,
93
                             const Comparator* user_comparator)
A
Aaron Gao 已提交
94
    : rep_(new Rep(env_options, options, user_comparator)) {}
95 96 97 98 99 100 101 102 103 104 105 106

SstFileWriter::~SstFileWriter() { delete rep_; }

Status SstFileWriter::Open(const std::string& file_path) {
  Rep* r = rep_;
  Status s;
  std::unique_ptr<WritableFile> sst_file;
  s = r->ioptions.env->NewWritableFile(file_path, &sst_file, r->env_options);
  if (!s.ok()) {
    return s;
  }

107 108 109 110
  CompressionType compression_type;
  if (r->ioptions.bottommost_compression != kDisableCompressionOption) {
    compression_type = r->ioptions.bottommost_compression;
  } else if (!r->ioptions.compression_per_level.empty()) {
111 112
    // Use the compression of the last level if we have per level compression
    compression_type = *(r->ioptions.compression_per_level.rbegin());
113 114
  } else {
    compression_type = r->mutable_cf_options.compression;
115 116 117 118 119 120 121 122 123
  }

  std::vector<std::unique_ptr<IntTblPropCollectorFactory>>
      int_tbl_prop_collector_factories;
  int_tbl_prop_collector_factories.emplace_back(
      new SstFileWriterPropertiesCollectorFactory(1 /* version */));

  TableBuilderOptions table_builder_options(
      r->ioptions, r->internal_comparator, &int_tbl_prop_collector_factories,
124 125
      compression_type, r->ioptions.compression_opts,
      nullptr /* compression_dict */, false /* skip_filters */,
126
      r->column_family_name);
127 128 129
  r->file_writer.reset(
      new WritableFileWriter(std::move(sst_file), r->env_options));
  r->builder.reset(r->ioptions.table_factory->NewTableBuilder(
130 131 132
      table_builder_options,
      TablePropertiesCollectorFactory::Context::kUnknownColumnFamily,
      r->file_writer.get()));
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174

  r->file_info.file_path = file_path;
  r->file_info.file_size = 0;
  r->file_info.num_entries = 0;
  r->file_info.sequence_number = 0;
  r->file_info.version = 1;
  return s;
}

Status SstFileWriter::Add(const Slice& user_key, const Slice& value) {
  Rep* r = rep_;
  if (!r->builder) {
    return Status::InvalidArgument("File is not opened");
  }

  if (r->file_info.num_entries == 0) {
    r->file_info.smallest_key = user_key.ToString();
  } else {
    if (r->internal_comparator.user_comparator()->Compare(
            user_key, r->file_info.largest_key) <= 0) {
      // Make sure that keys are added in order
      return Status::InvalidArgument("Keys must be added in order");
    }
  }

  // update file info
  r->file_info.num_entries++;
  r->file_info.largest_key = user_key.ToString();
  r->file_info.file_size = r->builder->FileSize();

  InternalKey ikey(user_key, 0 /* Sequence Number */,
                   ValueType::kTypeValue /* Put */);
  r->builder->Add(ikey.Encode(), value);

  return Status::OK();
}

Status SstFileWriter::Finish(ExternalSstFileInfo* file_info) {
  Rep* r = rep_;
  if (!r->builder) {
    return Status::InvalidArgument("File is not opened");
  }
175 176 177
  if (r->file_info.num_entries == 0) {
    return Status::InvalidArgument("Cannot create sst file with no entries");
  }
178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203

  Status s = r->builder->Finish();
  if (s.ok()) {
    if (!r->ioptions.disable_data_sync) {
      s = r->file_writer->Sync(r->ioptions.use_fsync);
    }
    if (s.ok()) {
      s = r->file_writer->Close();
    }
  } else {
    r->builder->Abandon();
  }

  if (!s.ok()) {
    r->ioptions.env->DeleteFile(r->file_info.file_path);
  }

  if (s.ok() && file_info != nullptr) {
    r->file_info.file_size = r->builder->FileSize();
    *file_info = r->file_info;
  }

  r->builder.reset();
  return s;
}
}  // namespace rocksdb