builder.cc 8.7 KB
Newer Older
1 2 3 4 5
//  Copyright (c) 2013, Facebook, Inc.  All rights reserved.
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.
//
J
jorlow@chromium.org 已提交
6 7 8 9 10 11 12
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include "db/builder.h"

#include "db/dbformat.h"
K
kailiu 已提交
13
#include "db/filename.h"
14
#include "db/merge_helper.h"
J
jorlow@chromium.org 已提交
15 16
#include "db/table_cache.h"
#include "db/version_edit.h"
17 18 19
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/iterator.h"
S
Siying Dong 已提交
20
#include "rocksdb/options.h"
K
kailiu 已提交
21
#include "rocksdb/table.h"
S
Siying Dong 已提交
22
#include "table/block_based_table_builder.h"
23
#include "util/stop_watch.h"
J
jorlow@chromium.org 已提交
24

25
namespace rocksdb {
J
jorlow@chromium.org 已提交
26

S
Siying Dong 已提交
27 28
class TableFactory;

29 30 31
TableBuilder* NewTableBuilder(const Options& options,
                              const InternalKeyComparator& internal_comparator,
                              WritableFile* file,
S
Siying Dong 已提交
32
                              CompressionType compression_type) {
33 34
  return options.table_factory->NewTableBuilder(options, internal_comparator,
                                                file, compression_type);
S
Siying Dong 已提交
35 36
}

37 38 39 40
Status BuildTable(const std::string& dbname, Env* env, const Options& options,
                  const EnvOptions& soptions, TableCache* table_cache,
                  Iterator* iter, FileMetaData* meta,
                  const InternalKeyComparator& internal_comparator,
41
                  const SequenceNumber newest_snapshot,
42
                  const SequenceNumber earliest_seqno_in_memtable,
L
Lei Jin 已提交
43 44
                  const CompressionType compression,
                  const Env::IOPriority io_priority) {
J
jorlow@chromium.org 已提交
45
  Status s;
46
  meta->fd.file_size = 0;
47
  meta->smallest_seqno = meta->largest_seqno = 0;
J
jorlow@chromium.org 已提交
48 49
  iter->SeekToFirst();

50 51 52 53 54 55 56 57
  // If the sequence number of the smallest entry in the memtable is
  // smaller than the most recent snapshot, then we do not trigger
  // removal of duplicate/deleted keys as part of this builder.
  bool purge = options.purge_redundant_kvs_while_flush;
  if (earliest_seqno_in_memtable <= newest_snapshot) {
    purge = false;
  }

58 59
  std::string fname = TableFileName(options.db_paths, meta->fd.GetNumber(),
                                    meta->fd.GetPathId());
J
jorlow@chromium.org 已提交
60
  if (iter->Valid()) {
61
    unique_ptr<WritableFile> file;
62
    s = env->NewWritableFile(fname, &file, soptions);
J
jorlow@chromium.org 已提交
63 64 65
    if (!s.ok()) {
      return s;
    }
L
Lei Jin 已提交
66
    file->SetIOPriority(io_priority);
S
Siying Dong 已提交
67

68 69
    TableBuilder* builder =
        NewTableBuilder(options, internal_comparator, file.get(), compression);
70 71 72 73

    // the first key is the smallest key
    Slice key = iter->key();
    meta->smallest.DecodeFrom(key);
74 75
    meta->smallest_seqno = GetInternalKeySeqno(key);
    meta->largest_seqno = meta->smallest_seqno;
76

77 78
    MergeHelper merge(internal_comparator.user_comparator(),
                      options.merge_operator.get(), options.info_log.get(),
79
                      options.min_partial_merge_operands,
80 81
                      true /* internal key corruption is not ok */);

82
    if (purge) {
83
      // Ugly walkaround to avoid compiler error for release build
84 85 86
      bool ok __attribute__((unused)) = true;

      // Will write to builder if current key != prev key
87 88
      ParsedInternalKey prev_ikey;
      std::string prev_key;
89
      bool is_first_key = true;    // Also write if this is the very first key
90

91 92
      while (iter->Valid()) {
        bool iterator_at_next = false;
93 94

        // Get current key
95 96
        ParsedInternalKey this_ikey;
        Slice key = iter->key();
97 98 99 100
        Slice value = iter->value();

        // In-memory key corruption is not ok;
        // TODO: find a clean way to treat in memory key corruption
101 102
        ok = ParseInternalKey(key, &this_ikey);
        assert(ok);
103 104
        assert(this_ikey.sequence >= earliest_seqno_in_memtable);

105 106 107
        // If the key is the same as the previous key (and it is not the
        // first key), then we skip it, since it is an older version.
        // Otherwise we output the key and mark it as the "new" previous key.
108 109
        if (!is_first_key && !internal_comparator.user_comparator()->Compare(
                                  prev_ikey.user_key, this_ikey.user_key)) {
110 111 112 113 114
          // seqno within the same key are in decreasing order
          assert(this_ikey.sequence < prev_ikey.sequence);
        } else {
          is_first_key = false;

115
          if (this_ikey.type == kTypeMerge) {
116 117 118 119 120 121 122
            // TODO(tbd): Add a check here to prevent RocksDB from crash when
            // reopening a DB w/o properly specifying the merge operator.  But
            // currently we observed a memory leak on failing in RocksDB
            // recovery, so we decide to let it crash instead of causing
            // memory leak for now before we have identified the real cause
            // of the memory leak.

123
            // Handle merge-type keys using the MergeHelper
124
            // TODO: pass statistics to MergeUntil
125 126
            merge.MergeUntil(iter, 0 /* don't worry about snapshot */);
            iterator_at_next = true;
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156
            if (merge.IsSuccess()) {
              // Merge completed correctly.
              // Add the resulting merge key/value and continue to next
              builder->Add(merge.key(), merge.value());
              prev_key.assign(merge.key().data(), merge.key().size());
              ok = ParseInternalKey(Slice(prev_key), &prev_ikey);
              assert(ok);
            } else {
              // Merge did not find a Put/Delete.
              // Can not compact these merges into a kValueType.
              // Write them out one-by-one. (Proceed back() to front())
              const std::deque<std::string>& keys = merge.keys();
              const std::deque<std::string>& values = merge.values();
              assert(keys.size() == values.size() && keys.size() >= 1);
              std::deque<std::string>::const_reverse_iterator key_iter;
              std::deque<std::string>::const_reverse_iterator value_iter;
              for (key_iter=keys.rbegin(), value_iter = values.rbegin();
                   key_iter != keys.rend() && value_iter != values.rend();
                   ++key_iter, ++value_iter) {

                builder->Add(Slice(*key_iter), Slice(*value_iter));
              }

              // Sanity check. Both iterators should end at the same time
              assert(key_iter == keys.rend() && value_iter == values.rend());

              prev_key.assign(keys.front());
              ok = ParseInternalKey(Slice(prev_key), &prev_ikey);
              assert(ok);
            }
157
          } else {
158 159
            // Handle Put/Delete-type keys by simply writing them
            builder->Add(key, value);
160 161 162 163
            prev_key.assign(key.data(), key.size());
            ok = ParseInternalKey(Slice(prev_key), &prev_ikey);
            assert(ok);
          }
164
        }
165 166

        if (!iterator_at_next) iter->Next();
167
      }
168 169

      // The last key is the largest key
170
      meta->largest.DecodeFrom(Slice(prev_key));
171 172 173
      SequenceNumber seqno = GetInternalKeySeqno(Slice(prev_key));
      meta->smallest_seqno = std::min(meta->smallest_seqno, seqno);
      meta->largest_seqno = std::max(meta->largest_seqno, seqno);
174 175 176 177 178 179

    } else {
      for (; iter->Valid(); iter->Next()) {
        Slice key = iter->key();
        meta->largest.DecodeFrom(key);
        builder->Add(key, iter->value());
180 181 182
        SequenceNumber seqno = GetInternalKeySeqno(key);
        meta->smallest_seqno = std::min(meta->smallest_seqno, seqno);
        meta->largest_seqno = std::max(meta->largest_seqno, seqno);
183
      }
J
jorlow@chromium.org 已提交
184 185 186 187 188 189
    }

    // Finish and check for builder errors
    if (s.ok()) {
      s = builder->Finish();
      if (s.ok()) {
190 191
        meta->fd.file_size = builder->FileSize();
        assert(meta->fd.GetFileSize() > 0);
J
jorlow@chromium.org 已提交
192 193 194 195 196 197 198
      }
    } else {
      builder->Abandon();
    }
    delete builder;

    // Finish and check for file errors
199
    if (s.ok() && !options.disableDataSync) {
200
      if (options.use_fsync) {
201
        StopWatch sw(env, options.statistics.get(), TABLE_SYNC_MICROS);
202 203
        s = file->Fsync();
      } else {
204
        StopWatch sw(env, options.statistics.get(), TABLE_SYNC_MICROS);
205 206
        s = file->Sync();
      }
J
jorlow@chromium.org 已提交
207 208 209 210 211 212 213
    }
    if (s.ok()) {
      s = file->Close();
    }

    if (s.ok()) {
      // Verify that the table is usable
214
      Iterator* it = table_cache->NewIterator(ReadOptions(), soptions,
215
                                              internal_comparator, meta->fd);
J
jorlow@chromium.org 已提交
216 217 218 219 220 221 222 223 224 225
      s = it->status();
      delete it;
    }
  }

  // Check for input iterator errors
  if (!iter->status().ok()) {
    s = iter->status();
  }

226
  if (s.ok() && meta->fd.GetFileSize() > 0) {
227
    // Keep it
J
jorlow@chromium.org 已提交
228 229 230 231 232 233
  } else {
    env->DeleteFile(fname);
  }
  return s;
}

234
}  // namespace rocksdb