sst_file_writer.cc 8.8 KB
Newer Older
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
S
Siying Dong 已提交
2 3 4
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).
5 6 7 8 9 10 11

#include "rocksdb/sst_file_writer.h"

#include <vector>
#include "db/dbformat.h"
#include "rocksdb/table.h"
#include "table/block_based_table_builder.h"
12
#include "table/sst_file_writer_collectors.h"
13
#include "util/file_reader_writer.h"
14
#include "util/sync_point.h"
15 16 17 18 19

namespace rocksdb {

const std::string ExternalSstFilePropertyNames::kVersion =
    "rocksdb.external_sst_file.version";
20 21
const std::string ExternalSstFilePropertyNames::kGlobalSeqno =
    "rocksdb.external_sst_file.global_seqno";
22 23 24

#ifndef ROCKSDB_LITE

25
const size_t kFadviseTrigger = 1024 * 1024; // 1MB
26 27

struct SstFileWriter::Rep {
A
Aaron Gao 已提交
28
  Rep(const EnvOptions& _env_options, const Options& options,
29
      Env::IOPriority _io_priority, const Comparator* _user_comparator,
30
      ColumnFamilyHandle* _cfh, bool _invalidate_page_cache, bool _skip_filters)
31
      : env_options(_env_options),
A
Aaron Gao 已提交
32
        ioptions(options),
Y
Yi Wu 已提交
33
        mutable_cf_options(options),
34
        io_priority(_io_priority),
35
        internal_comparator(_user_comparator),
36 37
        cfh(_cfh),
        invalidate_page_cache(_invalidate_page_cache),
38 39
        last_fadvise_size(0),
        skip_filters(_skip_filters) {}
40 41 42 43 44

  std::unique_ptr<WritableFileWriter> file_writer;
  std::unique_ptr<TableBuilder> builder;
  EnvOptions env_options;
  ImmutableCFOptions ioptions;
A
Aaron Gao 已提交
45
  MutableCFOptions mutable_cf_options;
46
  Env::IOPriority io_priority;
47 48
  InternalKeyComparator internal_comparator;
  ExternalSstFileInfo file_info;
49
  InternalKey ikey;
50 51
  std::string column_family_name;
  ColumnFamilyHandle* cfh;
52
  // If true, We will give the OS a hint that this file pages is not needed
53
  // every time we write 1MB to the file.
54
  bool invalidate_page_cache;
55
  // The size of the file during the last time we called Fadvise to remove
56 57
  // cached pages from page cache.
  uint64_t last_fadvise_size;
58
  bool skip_filters;
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
  Status Add(const Slice& user_key, const Slice& value,
             const ValueType value_type) {
    if (!builder) {
      return Status::InvalidArgument("File is not opened");
    }

    if (file_info.num_entries == 0) {
      file_info.smallest_key.assign(user_key.data(), user_key.size());
    } else {
      if (internal_comparator.user_comparator()->Compare(
              user_key, file_info.largest_key) <= 0) {
        // Make sure that keys are added in order
        return Status::InvalidArgument("Keys must be added in order");
      }
    }

    // TODO(tec) : For external SST files we could omit the seqno and type.
    switch (value_type) {
      case ValueType::kTypeValue:
        ikey.Set(user_key, 0 /* Sequence Number */,
                 ValueType::kTypeValue /* Put */);
        break;
      case ValueType::kTypeMerge:
        ikey.Set(user_key, 0 /* Sequence Number */,
                 ValueType::kTypeMerge /* Merge */);
        break;
      case ValueType::kTypeDeletion:
        ikey.Set(user_key, 0 /* Sequence Number */,
                 ValueType::kTypeDeletion /* Delete */);
        break;
      default:
        return Status::InvalidArgument("Value type is not supported");
    }
    builder->Add(ikey.Encode(), value);

    // update file info
    file_info.num_entries++;
    file_info.largest_key.assign(user_key.data(), user_key.size());
    file_info.file_size = builder->FileSize();

    InvalidatePageCache(false /* closing */);

    return Status::OK();
  }

  void InvalidatePageCache(bool closing) {
    if (invalidate_page_cache == false) {
      // Fadvise disabled
      return;
    }
    uint64_t bytes_since_last_fadvise =
      builder->FileSize() - last_fadvise_size;
    if (bytes_since_last_fadvise > kFadviseTrigger || closing) {
      TEST_SYNC_POINT_CALLBACK("SstFileWriter::Rep::InvalidatePageCache",
                               &(bytes_since_last_fadvise));
      // Tell the OS that we dont need this file in page cache
      file_writer->InvalidateCache(0, 0);
      last_fadvise_size = builder->FileSize();
    }
  }

120 121 122
};

SstFileWriter::SstFileWriter(const EnvOptions& env_options,
A
Aaron Gao 已提交
123
                             const Options& options,
124
                             const Comparator* user_comparator,
125
                             ColumnFamilyHandle* column_family,
126
                             bool invalidate_page_cache,
127
                             Env::IOPriority io_priority, bool skip_filters)
128
    : rep_(new Rep(env_options, options, io_priority, user_comparator,
129
                   column_family, invalidate_page_cache, skip_filters)) {
130 131
  rep_->file_info.file_size = 0;
}
132

I
Islam AbdelRahman 已提交
133 134 135 136 137 138 139
SstFileWriter::~SstFileWriter() {
  if (rep_->builder) {
    // User did not call Finish() or Finish() failed, we need to
    // abandon the builder.
    rep_->builder->Abandon();
  }
}
140 141

Status SstFileWriter::Open(const std::string& file_path) {
142
  Rep* r = rep_.get();
143 144 145 146 147 148 149
  Status s;
  std::unique_ptr<WritableFile> sst_file;
  s = r->ioptions.env->NewWritableFile(file_path, &sst_file, r->env_options);
  if (!s.ok()) {
    return s;
  }

150 151
  sst_file->SetIOPriority(r->io_priority);

152 153 154 155
  CompressionType compression_type;
  if (r->ioptions.bottommost_compression != kDisableCompressionOption) {
    compression_type = r->ioptions.bottommost_compression;
  } else if (!r->ioptions.compression_per_level.empty()) {
156 157
    // Use the compression of the last level if we have per level compression
    compression_type = *(r->ioptions.compression_per_level.rbegin());
158 159
  } else {
    compression_type = r->mutable_cf_options.compression;
160 161 162 163
  }

  std::vector<std::unique_ptr<IntTblPropCollectorFactory>>
      int_tbl_prop_collector_factories;
164 165

  // SstFileWriter properties collector to add SstFileWriter version.
166
  int_tbl_prop_collector_factories.emplace_back(
167 168
      new SstFileWriterPropertiesCollectorFactory(2 /* version */,
                                                  0 /* global_seqno*/));
169

170 171 172 173 174 175 176 177
  // User collector factories
  auto user_collector_factories =
      r->ioptions.table_properties_collector_factories;
  for (size_t i = 0; i < user_collector_factories.size(); i++) {
    int_tbl_prop_collector_factories.emplace_back(
        new UserKeyTablePropertiesCollectorFactory(
            user_collector_factories[i]));
  }
178
  int unknown_level = -1;
179 180 181 182 183 184 185 186 187 188 189 190
  uint32_t cf_id;

  if (r->cfh != nullptr) {
    // user explicitly specified that this file will be ingested into cfh,
    // we can persist this information in the file.
    cf_id = r->cfh->GetID();
    r->column_family_name = r->cfh->GetName();
  } else {
    r->column_family_name = "";
    cf_id = TablePropertiesCollectorFactory::Context::kUnknownColumnFamily;
  }

191
  TableBuilderOptions table_builder_options(
192 193 194 195
      r->ioptions, r->mutable_cf_options, r->internal_comparator,
      &int_tbl_prop_collector_factories, compression_type,
      r->ioptions.compression_opts, nullptr /* compression_dict */,
      r->skip_filters, r->column_family_name, unknown_level);
196 197
  r->file_writer.reset(
      new WritableFileWriter(std::move(sst_file), r->env_options));
198 199 200

  // TODO(tec) : If table_factory is using compressed block cache, we will
  // be adding the external sst file blocks into it, which is wasteful.
201
  r->builder.reset(r->ioptions.table_factory->NewTableBuilder(
202
      table_builder_options, cf_id, r->file_writer.get()));
203 204 205 206 207

  r->file_info.file_path = file_path;
  r->file_info.file_size = 0;
  r->file_info.num_entries = 0;
  r->file_info.sequence_number = 0;
208
  r->file_info.version = 2;
209 210 211 212
  return s;
}

Status SstFileWriter::Add(const Slice& user_key, const Slice& value) {
213 214
  return rep_->Add(user_key, value, ValueType::kTypeValue);
}
215

216 217 218
Status SstFileWriter::Put(const Slice& user_key, const Slice& value) {
  return rep_->Add(user_key, value, ValueType::kTypeValue);
}
219

220 221 222
Status SstFileWriter::Merge(const Slice& user_key, const Slice& value) {
  return rep_->Add(user_key, value, ValueType::kTypeMerge);
}
223

224 225
Status SstFileWriter::Delete(const Slice& user_key) {
  return rep_->Add(user_key, Slice(), ValueType::kTypeDeletion);
226 227 228
}

Status SstFileWriter::Finish(ExternalSstFileInfo* file_info) {
229
  Rep* r = rep_.get();
230 231 232
  if (!r->builder) {
    return Status::InvalidArgument("File is not opened");
  }
233 234 235
  if (r->file_info.num_entries == 0) {
    return Status::InvalidArgument("Cannot create sst file with no entries");
  }
236 237

  Status s = r->builder->Finish();
238 239
  r->file_info.file_size = r->builder->FileSize();

240
  if (s.ok()) {
S
Sagar Vemuri 已提交
241
    s = r->file_writer->Sync(r->ioptions.use_fsync);
242
    r->InvalidatePageCache(true /* closing */);
243 244 245 246 247 248 249 250
    if (s.ok()) {
      s = r->file_writer->Close();
    }
  }
  if (!s.ok()) {
    r->ioptions.env->DeleteFile(r->file_info.file_path);
  }

251
  if (file_info != nullptr) {
252 253 254 255 256 257
    *file_info = r->file_info;
  }

  r->builder.reset();
  return s;
}
D
Ding Ma 已提交
258 259 260 261

uint64_t SstFileWriter::FileSize() {
  return rep_->file_info.file_size;
}
262 263
#endif  // !ROCKSDB_LITE

264
}  // namespace rocksdb