builder.cc 7.5 KB
Newer Older
1 2 3 4 5
//  Copyright (c) 2013, Facebook, Inc.  All rights reserved.
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.
//
J
jorlow@chromium.org 已提交
6 7 8 9 10 11 12 13
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include "db/builder.h"

#include "db/filename.h"
#include "db/dbformat.h"
14
#include "db/merge_helper.h"
J
jorlow@chromium.org 已提交
15 16
#include "db/table_cache.h"
#include "db/version_edit.h"
17 18 19
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/iterator.h"
20
#include "util/stop_watch.h"
J
jorlow@chromium.org 已提交
21

22
namespace rocksdb {
J
jorlow@chromium.org 已提交
23 24 25 26

Status BuildTable(const std::string& dbname,
                  Env* env,
                  const Options& options,
H
Haobo Xu 已提交
27
                  const EnvOptions& soptions,
J
jorlow@chromium.org 已提交
28 29
                  TableCache* table_cache,
                  Iterator* iter,
30 31 32 33
                  FileMetaData* meta,
                  const Comparator* user_comparator,
                  const SequenceNumber newest_snapshot,
                  const SequenceNumber earliest_seqno_in_memtable) {
J
jorlow@chromium.org 已提交
34 35
  Status s;
  meta->file_size = 0;
36
  meta->smallest_seqno = meta->largest_seqno = 0;
J
jorlow@chromium.org 已提交
37 38
  iter->SeekToFirst();

39 40 41 42 43 44 45 46
  // If the sequence number of the smallest entry in the memtable is
  // smaller than the most recent snapshot, then we do not trigger
  // removal of duplicate/deleted keys as part of this builder.
  bool purge = options.purge_redundant_kvs_while_flush;
  if (earliest_seqno_in_memtable <= newest_snapshot) {
    purge = false;
  }

J
jorlow@chromium.org 已提交
47 48
  std::string fname = TableFileName(dbname, meta->number);
  if (iter->Valid()) {
49
    unique_ptr<WritableFile> file;
50
    s = env->NewWritableFile(fname, &file, soptions);
J
jorlow@chromium.org 已提交
51 52 53
    if (!s.ok()) {
      return s;
    }
54
    TableBuilder* builder = new TableBuilder(options, file.get(), 0);
55 56 57 58

    // the first key is the smallest key
    Slice key = iter->key();
    meta->smallest.DecodeFrom(key);
59 60
    meta->smallest_seqno = GetInternalKeySeqno(key);
    meta->largest_seqno = meta->smallest_seqno;
61

62
    MergeHelper merge(user_comparator, options.merge_operator.get(),
63 64 65
                      options.info_log.get(),
                      true /* internal key corruption is not ok */);

66
    if (purge) {
67
      // Ugly walkaround to avoid compiler error for release build
68 69 70
      bool ok __attribute__((unused)) = true;

      // Will write to builder if current key != prev key
71 72
      ParsedInternalKey prev_ikey;
      std::string prev_key;
73
      bool is_first_key = true;    // Also write if this is the very first key
74

75 76
      while (iter->Valid()) {
        bool iterator_at_next = false;
77 78

        // Get current key
79 80
        ParsedInternalKey this_ikey;
        Slice key = iter->key();
81 82 83 84
        Slice value = iter->value();

        // In-memory key corruption is not ok;
        // TODO: find a clean way to treat in memory key corruption
85 86
        ok = ParseInternalKey(key, &this_ikey);
        assert(ok);
87 88
        assert(this_ikey.sequence >= earliest_seqno_in_memtable);

89 90 91 92 93 94 95 96 97 98
        // If the key is the same as the previous key (and it is not the
        // first key), then we skip it, since it is an older version.
        // Otherwise we output the key and mark it as the "new" previous key.
        if (!is_first_key && !user_comparator->Compare(prev_ikey.user_key,
                                                       this_ikey.user_key)) {
          // seqno within the same key are in decreasing order
          assert(this_ikey.sequence < prev_ikey.sequence);
        } else {
          is_first_key = false;

99
          if (this_ikey.type == kTypeMerge) {
100
            // Handle merge-type keys using the MergeHelper
101 102
            merge.MergeUntil(iter, 0 /* don't worry about snapshot */);
            iterator_at_next = true;
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
            if (merge.IsSuccess()) {
              // Merge completed correctly.
              // Add the resulting merge key/value and continue to next
              builder->Add(merge.key(), merge.value());
              prev_key.assign(merge.key().data(), merge.key().size());
              ok = ParseInternalKey(Slice(prev_key), &prev_ikey);
              assert(ok);
            } else {
              // Merge did not find a Put/Delete.
              // Can not compact these merges into a kValueType.
              // Write them out one-by-one. (Proceed back() to front())
              const std::deque<std::string>& keys = merge.keys();
              const std::deque<std::string>& values = merge.values();
              assert(keys.size() == values.size() && keys.size() >= 1);
              std::deque<std::string>::const_reverse_iterator key_iter;
              std::deque<std::string>::const_reverse_iterator value_iter;
              for (key_iter=keys.rbegin(), value_iter = values.rbegin();
                   key_iter != keys.rend() && value_iter != values.rend();
                   ++key_iter, ++value_iter) {

                builder->Add(Slice(*key_iter), Slice(*value_iter));
              }

              // Sanity check. Both iterators should end at the same time
              assert(key_iter == keys.rend() && value_iter == values.rend());

              prev_key.assign(keys.front());
              ok = ParseInternalKey(Slice(prev_key), &prev_ikey);
              assert(ok);
            }
133
          } else {
134 135
            // Handle Put/Delete-type keys by simply writing them
            builder->Add(key, value);
136 137 138 139
            prev_key.assign(key.data(), key.size());
            ok = ParseInternalKey(Slice(prev_key), &prev_ikey);
            assert(ok);
          }
140
        }
141 142

        if (!iterator_at_next) iter->Next();
143
      }
144 145

      // The last key is the largest key
146
      meta->largest.DecodeFrom(Slice(prev_key));
147 148 149
      SequenceNumber seqno = GetInternalKeySeqno(Slice(prev_key));
      meta->smallest_seqno = std::min(meta->smallest_seqno, seqno);
      meta->largest_seqno = std::max(meta->largest_seqno, seqno);
150 151 152 153 154 155

    } else {
      for (; iter->Valid(); iter->Next()) {
        Slice key = iter->key();
        meta->largest.DecodeFrom(key);
        builder->Add(key, iter->value());
156 157 158
        SequenceNumber seqno = GetInternalKeySeqno(key);
        meta->smallest_seqno = std::min(meta->smallest_seqno, seqno);
        meta->largest_seqno = std::max(meta->largest_seqno, seqno);
159
      }
J
jorlow@chromium.org 已提交
160 161 162 163 164 165 166 167 168 169 170 171 172 173 174
    }

    // Finish and check for builder errors
    if (s.ok()) {
      s = builder->Finish();
      if (s.ok()) {
        meta->file_size = builder->FileSize();
        assert(meta->file_size > 0);
      }
    } else {
      builder->Abandon();
    }
    delete builder;

    // Finish and check for file errors
175
    if (s.ok() && !options.disableDataSync) {
176
      if (options.use_fsync) {
177
        StopWatch sw(env, options.statistics, TABLE_SYNC_MICROS);
178 179
        s = file->Fsync();
      } else {
180
        StopWatch sw(env, options.statistics, TABLE_SYNC_MICROS);
181 182
        s = file->Sync();
      }
J
jorlow@chromium.org 已提交
183 184 185 186 187 188 189
    }
    if (s.ok()) {
      s = file->Close();
    }

    if (s.ok()) {
      // Verify that the table is usable
J
jorlow@chromium.org 已提交
190
      Iterator* it = table_cache->NewIterator(ReadOptions(),
191
                                              soptions,
J
jorlow@chromium.org 已提交
192 193
                                              meta->number,
                                              meta->file_size);
J
jorlow@chromium.org 已提交
194 195 196 197 198 199 200 201 202 203 204
      s = it->status();
      delete it;
    }
  }

  // Check for input iterator errors
  if (!iter->status().ok()) {
    s = iter->status();
  }

  if (s.ok() && meta->file_size > 0) {
205
    // Keep it
J
jorlow@chromium.org 已提交
206 207 208 209 210 211
  } else {
    env->DeleteFile(fname);
  }
  return s;
}

212
}  // namespace rocksdb