sst_file_writer.cc 6.1 KB
Newer Older
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.

#include "rocksdb/sst_file_writer.h"

#include <vector>
#include "db/dbformat.h"
#include "rocksdb/table.h"
#include "table/block_based_table_builder.h"
#include "util/file_reader_writer.h"
#include "util/string_util.h"

namespace rocksdb {

const std::string ExternalSstFilePropertyNames::kVersion =
    "rocksdb.external_sst_file.version";

// PropertiesCollector used to add properties specific to tables
// generated by SstFileWriter
class SstFileWriter::SstFileWriterPropertiesCollector
    : public IntTblPropCollector {
 public:
  explicit SstFileWriterPropertiesCollector(int32_t version)
      : version_(version) {}

  virtual Status InternalAdd(const Slice& key, const Slice& value,
                             uint64_t file_size) override {
    // Intentionally left blank. Have no interest in collecting stats for
    // individual key/value pairs.
    return Status::OK();
  }

  virtual Status Finish(UserCollectedProperties* properties) override {
    std::string version_val;
    PutFixed32(&version_val, static_cast<int32_t>(version_));
    properties->insert({ExternalSstFilePropertyNames::kVersion, version_val});
    return Status::OK();
  }

  virtual const char* Name() const override {
    return "SstFileWriterPropertiesCollector";
  }

  virtual UserCollectedProperties GetReadableProperties() const override {
    return {{ExternalSstFilePropertyNames::kVersion, ToString(version_)}};
  }

 private:
  int32_t version_;
};

class SstFileWriter::SstFileWriterPropertiesCollectorFactory
    : public IntTblPropCollectorFactory {
 public:
  explicit SstFileWriterPropertiesCollectorFactory(int32_t version)
      : version_(version) {}

60 61
  virtual IntTblPropCollector* CreateIntTblPropCollector(
      uint32_t column_family_id) override {
62 63 64 65 66 67 68 69 70 71 72 73
    return new SstFileWriterPropertiesCollector(version_);
  }

  virtual const char* Name() const override {
    return "SstFileWriterPropertiesCollector";
  }

 private:
  int32_t version_;
};

struct SstFileWriter::Rep {
A
Aaron Gao 已提交
74
  Rep(const EnvOptions& _env_options, const Options& options,
75 76
      const Comparator* _user_comparator)
      : env_options(_env_options),
A
Aaron Gao 已提交
77 78
        ioptions(options),
        mutable_cf_options(options, ioptions),
79 80 81 82 83 84
        internal_comparator(_user_comparator) {}

  std::unique_ptr<WritableFileWriter> file_writer;
  std::unique_ptr<TableBuilder> builder;
  EnvOptions env_options;
  ImmutableCFOptions ioptions;
A
Aaron Gao 已提交
85
  MutableCFOptions mutable_cf_options;
86 87
  InternalKeyComparator internal_comparator;
  ExternalSstFileInfo file_info;
88
  std::string column_family_name;
89 90 91
};

SstFileWriter::SstFileWriter(const EnvOptions& env_options,
A
Aaron Gao 已提交
92
                             const Options& options,
93
                             const Comparator* user_comparator)
A
Aaron Gao 已提交
94
    : rep_(new Rep(env_options, options, user_comparator)) {}
95 96 97 98 99 100 101 102 103 104 105 106

SstFileWriter::~SstFileWriter() { delete rep_; }

Status SstFileWriter::Open(const std::string& file_path) {
  Rep* r = rep_;
  Status s;
  std::unique_ptr<WritableFile> sst_file;
  s = r->ioptions.env->NewWritableFile(file_path, &sst_file, r->env_options);
  if (!s.ok()) {
    return s;
  }

A
Aaron Gao 已提交
107
  CompressionType compression_type = r->mutable_cf_options.compression;
108 109 110 111 112 113 114 115 116 117 118 119
  if (!r->ioptions.compression_per_level.empty()) {
    // Use the compression of the last level if we have per level compression
    compression_type = *(r->ioptions.compression_per_level.rbegin());
  }

  std::vector<std::unique_ptr<IntTblPropCollectorFactory>>
      int_tbl_prop_collector_factories;
  int_tbl_prop_collector_factories.emplace_back(
      new SstFileWriterPropertiesCollectorFactory(1 /* version */));

  TableBuilderOptions table_builder_options(
      r->ioptions, r->internal_comparator, &int_tbl_prop_collector_factories,
120 121
      compression_type, r->ioptions.compression_opts,
      nullptr /* compression_dict */, false /* skip_filters */,
122
      r->column_family_name);
123 124 125
  r->file_writer.reset(
      new WritableFileWriter(std::move(sst_file), r->env_options));
  r->builder.reset(r->ioptions.table_factory->NewTableBuilder(
126 127 128
      table_builder_options,
      TablePropertiesCollectorFactory::Context::kUnknownColumnFamily,
      r->file_writer.get()));
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170

  r->file_info.file_path = file_path;
  r->file_info.file_size = 0;
  r->file_info.num_entries = 0;
  r->file_info.sequence_number = 0;
  r->file_info.version = 1;
  return s;
}

Status SstFileWriter::Add(const Slice& user_key, const Slice& value) {
  Rep* r = rep_;
  if (!r->builder) {
    return Status::InvalidArgument("File is not opened");
  }

  if (r->file_info.num_entries == 0) {
    r->file_info.smallest_key = user_key.ToString();
  } else {
    if (r->internal_comparator.user_comparator()->Compare(
            user_key, r->file_info.largest_key) <= 0) {
      // Make sure that keys are added in order
      return Status::InvalidArgument("Keys must be added in order");
    }
  }

  // update file info
  r->file_info.num_entries++;
  r->file_info.largest_key = user_key.ToString();
  r->file_info.file_size = r->builder->FileSize();

  InternalKey ikey(user_key, 0 /* Sequence Number */,
                   ValueType::kTypeValue /* Put */);
  r->builder->Add(ikey.Encode(), value);

  return Status::OK();
}

Status SstFileWriter::Finish(ExternalSstFileInfo* file_info) {
  Rep* r = rep_;
  if (!r->builder) {
    return Status::InvalidArgument("File is not opened");
  }
171 172 173
  if (r->file_info.num_entries == 0) {
    return Status::InvalidArgument("Cannot create sst file with no entries");
  }
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199

  Status s = r->builder->Finish();
  if (s.ok()) {
    if (!r->ioptions.disable_data_sync) {
      s = r->file_writer->Sync(r->ioptions.use_fsync);
    }
    if (s.ok()) {
      s = r->file_writer->Close();
    }
  } else {
    r->builder->Abandon();
  }

  if (!s.ok()) {
    r->ioptions.env->DeleteFile(r->file_info.file_path);
  }

  if (s.ok() && file_info != nullptr) {
    r->file_info.file_size = r->builder->FileSize();
    *file_info = r->file_info;
  }

  r->builder.reset();
  return s;
}
}  // namespace rocksdb