builder.cc 6.1 KB
Newer Older
J
jorlow@chromium.org 已提交
1 2 3 4 5 6 7 8
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include "db/builder.h"

#include "db/filename.h"
#include "db/dbformat.h"
9
#include "db/merge_helper.h"
J
jorlow@chromium.org 已提交
10 11
#include "db/table_cache.h"
#include "db/version_edit.h"
12 13 14
#include "leveldb/db.h"
#include "leveldb/env.h"
#include "leveldb/iterator.h"
15
#include "util/stop_watch.h"
J
jorlow@chromium.org 已提交
16 17 18 19 20 21

namespace leveldb {

Status BuildTable(const std::string& dbname,
                  Env* env,
                  const Options& options,
H
Haobo Xu 已提交
22
                  const EnvOptions& soptions,
J
jorlow@chromium.org 已提交
23 24
                  TableCache* table_cache,
                  Iterator* iter,
25 26 27 28
                  FileMetaData* meta,
                  const Comparator* user_comparator,
                  const SequenceNumber newest_snapshot,
                  const SequenceNumber earliest_seqno_in_memtable) {
J
jorlow@chromium.org 已提交
29 30 31 32
  Status s;
  meta->file_size = 0;
  iter->SeekToFirst();

33 34 35 36 37 38 39 40
  // If the sequence number of the smallest entry in the memtable is
  // smaller than the most recent snapshot, then we do not trigger
  // removal of duplicate/deleted keys as part of this builder.
  bool purge = options.purge_redundant_kvs_while_flush;
  if (earliest_seqno_in_memtable <= newest_snapshot) {
    purge = false;
  }

J
jorlow@chromium.org 已提交
41 42
  std::string fname = TableFileName(dbname, meta->number);
  if (iter->Valid()) {
43
    unique_ptr<WritableFile> file;
44
    s = env->NewWritableFile(fname, &file, soptions);
J
jorlow@chromium.org 已提交
45 46 47
    if (!s.ok()) {
      return s;
    }
48
    TableBuilder* builder = new TableBuilder(options, file.get(), 0);
49 50 51 52 53

    // the first key is the smallest key
    Slice key = iter->key();
    meta->smallest.DecodeFrom(key);

54 55 56 57
    MergeHelper merge(user_comparator, options.merge_operator,
                      options.info_log.get(),
                      true /* internal key corruption is not ok */);

58
    if (purge) {
59 60 61 62
      ParsedInternalKey ikey;
      // Ugly walkaround to avoid compiler error for release build
      // TODO: find a clean way to treat in memory key corruption
      ikey.type = kTypeValue;
63 64 65 66
      ParsedInternalKey prev_ikey;
      std::string prev_value;
      std::string prev_key;

67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
      // Ugly walkaround to avoid compiler error for release build
      // TODO: find a clean way to treat in memory key corruption
      auto ok __attribute__((unused)) = ParseInternalKey(key, &ikey);
      // in-memory key corruption is not ok;
      assert(ok);

      if (ikey.type == kTypeMerge) {
        // merge values if the first entry is of merge type
        merge.MergeUntil(iter,  0 /* don't worry about snapshot */);
        prev_key.assign(merge.key().data(), merge.key().size());
        ok = ParseInternalKey(Slice(prev_key), &prev_ikey);
        assert(ok);
        prev_value.assign(merge.value().data(), merge.value().size());
      } else {
        // store first key-value
        prev_key.assign(key.data(), key.size());
        prev_value.assign(iter->value().data(), iter->value().size());
        ok = ParseInternalKey(Slice(prev_key), &prev_ikey);
        assert(ok);
        assert(prev_ikey.sequence >= earliest_seqno_in_memtable);
        iter->Next();
      }
89

90 91
      while (iter->Valid()) {
        bool iterator_at_next = false;
92 93
        ParsedInternalKey this_ikey;
        Slice key = iter->key();
94 95
        ok = ParseInternalKey(key, &this_ikey);
        assert(ok);
96 97 98 99 100 101
        assert(this_ikey.sequence >= earliest_seqno_in_memtable);

        if (user_comparator->Compare(prev_ikey.user_key, this_ikey.user_key)) {
          // This key is different from previous key.
          // Output prev key and remember current key
          builder->Add(Slice(prev_key), Slice(prev_value));
102 103 104 105 106 107 108 109 110 111 112 113 114
          if (this_ikey.type == kTypeMerge) {
            merge.MergeUntil(iter, 0 /* don't worry about snapshot */);
            iterator_at_next = true;
            prev_key.assign(merge.key().data(), merge.key().size());
            ok = ParseInternalKey(Slice(prev_key), &prev_ikey);
            assert(ok);
            prev_value.assign(merge.value().data(), merge.value().size());
          } else {
            prev_key.assign(key.data(), key.size());
            prev_value.assign(iter->value().data(), iter->value().size());
            ok = ParseInternalKey(Slice(prev_key), &prev_ikey);
            assert(ok);
          }
115 116 117 118 119 120
        } else {
          // seqno within the same key are in decreasing order
          assert(this_ikey.sequence < prev_ikey.sequence);
          // This key is an earlier version of the same key in prev_key.
          // Skip current key.
        }
121 122

        if (!iterator_at_next) iter->Next();
123 124 125 126 127 128 129 130 131 132 133
      }
      // output last key
      builder->Add(Slice(prev_key), Slice(prev_value));
      meta->largest.DecodeFrom(Slice(prev_key));

    } else {
      for (; iter->Valid(); iter->Next()) {
        Slice key = iter->key();
        meta->largest.DecodeFrom(key);
        builder->Add(key, iter->value());
      }
J
jorlow@chromium.org 已提交
134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
    }

    // Finish and check for builder errors
    if (s.ok()) {
      s = builder->Finish();
      if (s.ok()) {
        meta->file_size = builder->FileSize();
        assert(meta->file_size > 0);
      }
    } else {
      builder->Abandon();
    }
    delete builder;

    // Finish and check for file errors
149
    if (s.ok() && !options.disableDataSync) {
150
      if (options.use_fsync) {
151
        StopWatch sw(env, options.statistics, TABLE_SYNC_MICROS);
152 153
        s = file->Fsync();
      } else {
154
        StopWatch sw(env, options.statistics, TABLE_SYNC_MICROS);
155 156
        s = file->Sync();
      }
J
jorlow@chromium.org 已提交
157 158 159 160 161 162 163
    }
    if (s.ok()) {
      s = file->Close();
    }

    if (s.ok()) {
      // Verify that the table is usable
J
jorlow@chromium.org 已提交
164
      Iterator* it = table_cache->NewIterator(ReadOptions(),
165
                                              soptions,
J
jorlow@chromium.org 已提交
166 167
                                              meta->number,
                                              meta->file_size);
J
jorlow@chromium.org 已提交
168 169 170 171 172 173 174 175 176 177 178
      s = it->status();
      delete it;
    }
  }

  // Check for input iterator errors
  if (!iter->status().ok()) {
    s = iter->status();
  }

  if (s.ok() && meta->file_size > 0) {
179
    // Keep it
J
jorlow@chromium.org 已提交
180 181 182 183 184 185
  } else {
    env->DeleteFile(fname);
  }
  return s;
}

H
Hans Wennborg 已提交
186
}  // namespace leveldb