builder.cc 10.2 KB
Newer Older
1 2 3 4 5
//  Copyright (c) 2013, Facebook, Inc.  All rights reserved.
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.
//
J
jorlow@chromium.org 已提交
6 7 8 9 10 11
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include "db/builder.h"

12
#include <vector>
J
jorlow@chromium.org 已提交
13
#include "db/dbformat.h"
K
kailiu 已提交
14
#include "db/filename.h"
15
#include "db/merge_helper.h"
J
jorlow@chromium.org 已提交
16 17
#include "db/table_cache.h"
#include "db/version_edit.h"
18 19 20
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/iterator.h"
S
Siying Dong 已提交
21
#include "rocksdb/options.h"
K
kailiu 已提交
22
#include "rocksdb/table.h"
S
Siying Dong 已提交
23
#include "table/block_based_table_builder.h"
24 25
#include "util/iostats_context_imp.h"
#include "util/thread_status_util.h"
26
#include "util/stop_watch.h"
J
jorlow@chromium.org 已提交
27

28
namespace rocksdb {
J
jorlow@chromium.org 已提交
29

S
Siying Dong 已提交
30 31
class TableFactory;

32 33 34 35 36 37 38 39 40 41 42 43
TableBuilder* NewTableBuilder(
    const ImmutableCFOptions& ioptions,
    const InternalKeyComparator& internal_comparator,
    const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
        int_tbl_prop_collector_factories,
    WritableFile* file, const CompressionType compression_type,
    const CompressionOptions& compression_opts, const bool skip_filters) {
  return ioptions.table_factory->NewTableBuilder(
      TableBuilderOptions(ioptions, internal_comparator,
                          int_tbl_prop_collector_factories, compression_type,
                          compression_opts, skip_filters),
      file);
S
Siying Dong 已提交
44 45
}

46 47 48 49 50 51 52 53 54
Status BuildTable(
    const std::string& dbname, Env* env, const ImmutableCFOptions& ioptions,
    const EnvOptions& env_options, TableCache* table_cache, Iterator* iter,
    FileMetaData* meta, const InternalKeyComparator& internal_comparator,
    const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
        int_tbl_prop_collector_factories,
    const SequenceNumber newest_snapshot,
    const SequenceNumber earliest_seqno_in_memtable,
    const CompressionType compression,
55
    const CompressionOptions& compression_opts, bool paranoid_file_checks,
56
    const Env::IOPriority io_priority, TableProperties* table_properties) {
57 58
  // Reports the IOStats for flush for every following bytes.
  const size_t kReportFlushIOStatsEvery = 1048576;
J
jorlow@chromium.org 已提交
59
  Status s;
60
  meta->fd.file_size = 0;
61
  meta->smallest_seqno = meta->largest_seqno = 0;
J
jorlow@chromium.org 已提交
62 63
  iter->SeekToFirst();

64 65 66
  // If the sequence number of the smallest entry in the memtable is
  // smaller than the most recent snapshot, then we do not trigger
  // removal of duplicate/deleted keys as part of this builder.
L
Lei Jin 已提交
67
  bool purge = ioptions.purge_redundant_kvs_while_flush;
68 69 70 71
  if (earliest_seqno_in_memtable <= newest_snapshot) {
    purge = false;
  }

L
Lei Jin 已提交
72
  std::string fname = TableFileName(ioptions.db_paths, meta->fd.GetNumber(),
73
                                    meta->fd.GetPathId());
J
jorlow@chromium.org 已提交
74
  if (iter->Valid()) {
75
    unique_ptr<WritableFile> file;
L
Lei Jin 已提交
76
    s = env->NewWritableFile(fname, &file, env_options);
J
jorlow@chromium.org 已提交
77 78 79
    if (!s.ok()) {
      return s;
    }
L
Lei Jin 已提交
80
    file->SetIOPriority(io_priority);
S
Siying Dong 已提交
81

L
Lei Jin 已提交
82
    TableBuilder* builder = NewTableBuilder(
83 84
        ioptions, internal_comparator, int_tbl_prop_collector_factories,
        file.get(), compression, compression_opts);
85

I
Igor Canadi 已提交
86 87 88 89 90 91 92
    {
      // the first key is the smallest key
      Slice key = iter->key();
      meta->smallest.DecodeFrom(key);
      meta->smallest_seqno = GetInternalKeySeqno(key);
      meta->largest_seqno = meta->smallest_seqno;
    }
93

94
    MergeHelper merge(internal_comparator.user_comparator(),
L
Lei Jin 已提交
95 96
                      ioptions.merge_operator, ioptions.info_log,
                      ioptions.min_partial_merge_operands,
97 98
                      true /* internal key corruption is not ok */);

99
    if (purge) {
100
      // Ugly walkaround to avoid compiler error for release build
101 102 103
      bool ok __attribute__((unused)) = true;

      // Will write to builder if current key != prev key
104 105
      ParsedInternalKey prev_ikey;
      std::string prev_key;
106
      bool is_first_key = true;    // Also write if this is the very first key
107

108 109
      while (iter->Valid()) {
        bool iterator_at_next = false;
110 111

        // Get current key
112 113
        ParsedInternalKey this_ikey;
        Slice key = iter->key();
114 115 116 117
        Slice value = iter->value();

        // In-memory key corruption is not ok;
        // TODO: find a clean way to treat in memory key corruption
118 119
        ok = ParseInternalKey(key, &this_ikey);
        assert(ok);
120 121
        assert(this_ikey.sequence >= earliest_seqno_in_memtable);

122 123 124
        // If the key is the same as the previous key (and it is not the
        // first key), then we skip it, since it is an older version.
        // Otherwise we output the key and mark it as the "new" previous key.
125 126
        if (!is_first_key && !internal_comparator.user_comparator()->Compare(
                                  prev_ikey.user_key, this_ikey.user_key)) {
127 128 129 130 131
          // seqno within the same key are in decreasing order
          assert(this_ikey.sequence < prev_ikey.sequence);
        } else {
          is_first_key = false;

132
          if (this_ikey.type == kTypeMerge) {
133 134 135 136 137 138 139
            // TODO(tbd): Add a check here to prevent RocksDB from crash when
            // reopening a DB w/o properly specifying the merge operator.  But
            // currently we observed a memory leak on failing in RocksDB
            // recovery, so we decide to let it crash instead of causing
            // memory leak for now before we have identified the real cause
            // of the memory leak.

140
            // Handle merge-type keys using the MergeHelper
141
            // TODO: pass statistics to MergeUntil
142 143
            merge.MergeUntil(iter, 0 /* don't worry about snapshot */);
            iterator_at_next = true;
144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173
            if (merge.IsSuccess()) {
              // Merge completed correctly.
              // Add the resulting merge key/value and continue to next
              builder->Add(merge.key(), merge.value());
              prev_key.assign(merge.key().data(), merge.key().size());
              ok = ParseInternalKey(Slice(prev_key), &prev_ikey);
              assert(ok);
            } else {
              // Merge did not find a Put/Delete.
              // Can not compact these merges into a kValueType.
              // Write them out one-by-one. (Proceed back() to front())
              const std::deque<std::string>& keys = merge.keys();
              const std::deque<std::string>& values = merge.values();
              assert(keys.size() == values.size() && keys.size() >= 1);
              std::deque<std::string>::const_reverse_iterator key_iter;
              std::deque<std::string>::const_reverse_iterator value_iter;
              for (key_iter=keys.rbegin(), value_iter = values.rbegin();
                   key_iter != keys.rend() && value_iter != values.rend();
                   ++key_iter, ++value_iter) {

                builder->Add(Slice(*key_iter), Slice(*value_iter));
              }

              // Sanity check. Both iterators should end at the same time
              assert(key_iter == keys.rend() && value_iter == values.rend());

              prev_key.assign(keys.front());
              ok = ParseInternalKey(Slice(prev_key), &prev_ikey);
              assert(ok);
            }
174
          } else {
175 176
            // Handle Put/Delete-type keys by simply writing them
            builder->Add(key, value);
177 178 179 180
            prev_key.assign(key.data(), key.size());
            ok = ParseInternalKey(Slice(prev_key), &prev_ikey);
            assert(ok);
          }
181
        }
182

183 184 185 186 187 188 189
        if (io_priority == Env::IO_HIGH &&
            IOSTATS(bytes_written) >= kReportFlushIOStatsEvery) {
          ThreadStatusUtil::IncreaseThreadOperationProperty(
              ThreadStatus::FLUSH_BYTES_WRITTEN,
              IOSTATS(bytes_written));
          IOSTATS_RESET(bytes_written);
        }
190
        if (!iterator_at_next) iter->Next();
191
      }
192 193

      // The last key is the largest key
194
      meta->largest.DecodeFrom(Slice(prev_key));
195 196 197
      SequenceNumber seqno = GetInternalKeySeqno(Slice(prev_key));
      meta->smallest_seqno = std::min(meta->smallest_seqno, seqno);
      meta->largest_seqno = std::max(meta->largest_seqno, seqno);
198 199 200 201 202 203

    } else {
      for (; iter->Valid(); iter->Next()) {
        Slice key = iter->key();
        meta->largest.DecodeFrom(key);
        builder->Add(key, iter->value());
204 205 206
        SequenceNumber seqno = GetInternalKeySeqno(key);
        meta->smallest_seqno = std::min(meta->smallest_seqno, seqno);
        meta->largest_seqno = std::max(meta->largest_seqno, seqno);
207 208 209 210 211 212 213
        if (io_priority == Env::IO_HIGH &&
            IOSTATS(bytes_written) >= kReportFlushIOStatsEvery) {
          ThreadStatusUtil::IncreaseThreadOperationProperty(
              ThreadStatus::FLUSH_BYTES_WRITTEN,
              IOSTATS(bytes_written));
          IOSTATS_RESET(bytes_written);
        }
214
      }
J
jorlow@chromium.org 已提交
215 216 217 218 219 220 221 222
    }

    // Finish and check for builder errors
    if (s.ok()) {
      s = builder->Finish();
    } else {
      builder->Abandon();
    }
223 224
    if (s.ok()) {
      meta->fd.file_size = builder->FileSize();
225
      meta->marked_for_compaction = builder->NeedCompact();
226 227 228 229 230
      assert(meta->fd.GetFileSize() > 0);
      if (table_properties) {
        *table_properties = builder->GetTableProperties();
      }
    }
J
jorlow@chromium.org 已提交
231 232 233
    delete builder;

    // Finish and check for file errors
L
Lei Jin 已提交
234 235 236
    if (s.ok() && !ioptions.disable_data_sync) {
      if (ioptions.use_fsync) {
        StopWatch sw(env, ioptions.statistics, TABLE_SYNC_MICROS);
237 238
        s = file->Fsync();
      } else {
L
Lei Jin 已提交
239
        StopWatch sw(env, ioptions.statistics, TABLE_SYNC_MICROS);
240 241
        s = file->Sync();
      }
J
jorlow@chromium.org 已提交
242 243 244 245 246 247 248
    }
    if (s.ok()) {
      s = file->Close();
    }

    if (s.ok()) {
      // Verify that the table is usable
L
Lei Jin 已提交
249
      Iterator* it = table_cache->NewIterator(ReadOptions(), env_options,
250
                                              internal_comparator, meta->fd);
J
jorlow@chromium.org 已提交
251
      s = it->status();
252 253 254 255 256
      if (s.ok() && paranoid_file_checks) {
        for (it->SeekToFirst(); it->Valid(); it->Next()) {}
        s = it->status();
      }

J
jorlow@chromium.org 已提交
257 258 259 260 261 262 263 264 265
      delete it;
    }
  }

  // Check for input iterator errors
  if (!iter->status().ok()) {
    s = iter->status();
  }

266
  if (s.ok() && meta->fd.GetFileSize() > 0) {
267
    // Keep it
J
jorlow@chromium.org 已提交
268 269 270 271 272 273
  } else {
    env->DeleteFile(fname);
  }
  return s;
}

274
}  // namespace rocksdb