builder.cc 4.2 KB
Newer Older
J
jorlow@chromium.org 已提交
1 2 3 4 5 6 7 8 9 10
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include "db/builder.h"

#include "db/filename.h"
#include "db/dbformat.h"
#include "db/table_cache.h"
#include "db/version_edit.h"
11 12 13
#include "leveldb/db.h"
#include "leveldb/env.h"
#include "leveldb/iterator.h"
J
jorlow@chromium.org 已提交
14 15 16 17 18 19 20 21

namespace leveldb {

Status BuildTable(const std::string& dbname,
                  Env* env,
                  const Options& options,
                  TableCache* table_cache,
                  Iterator* iter,
22 23 24 25
                  FileMetaData* meta,
                  const Comparator* user_comparator,
                  const SequenceNumber newest_snapshot,
                  const SequenceNumber earliest_seqno_in_memtable) {
J
jorlow@chromium.org 已提交
26 27 28 29
  Status s;
  meta->file_size = 0;
  iter->SeekToFirst();

30 31 32 33 34 35 36 37
  // If the sequence number of the smallest entry in the memtable is
  // smaller than the most recent snapshot, then we do not trigger
  // removal of duplicate/deleted keys as part of this builder.
  bool purge = options.purge_redundant_kvs_while_flush;
  if (earliest_seqno_in_memtable <= newest_snapshot) {
    purge = false;
  }

J
jorlow@chromium.org 已提交
38 39
  std::string fname = TableFileName(dbname, meta->number);
  if (iter->Valid()) {
40
    unique_ptr<WritableFile> file;
J
jorlow@chromium.org 已提交
41 42 43 44
    s = env->NewWritableFile(fname, &file);
    if (!s.ok()) {
      return s;
    }
45
    TableBuilder* builder = new TableBuilder(options, file.get(), 0);
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91

    // the first key is the smallest key
    Slice key = iter->key();
    meta->smallest.DecodeFrom(key);

    if (purge) {
      ParsedInternalKey prev_ikey;
      std::string prev_value;
      std::string prev_key;

      // store first key-value
      prev_key.assign(key.data(), key.size());
      prev_value.assign(iter->value().data(), iter->value().size());
      ParseInternalKey(Slice(prev_key), &prev_ikey);
      assert(prev_ikey.sequence >= earliest_seqno_in_memtable);

      for (iter->Next(); iter->Valid(); iter->Next()) {
        ParsedInternalKey this_ikey;
        Slice key = iter->key();
        ParseInternalKey(key, &this_ikey);
        assert(this_ikey.sequence >= earliest_seqno_in_memtable);

        if (user_comparator->Compare(prev_ikey.user_key, this_ikey.user_key)) {
          // This key is different from previous key.
          // Output prev key and remember current key
          builder->Add(Slice(prev_key), Slice(prev_value));
          prev_key.assign(key.data(), key.size());
          prev_value.assign(iter->value().data(), iter->value().size());
          ParseInternalKey(Slice(prev_key), &prev_ikey);
        } else {
          // seqno within the same key are in decreasing order
          assert(this_ikey.sequence < prev_ikey.sequence);
          // This key is an earlier version of the same key in prev_key.
          // Skip current key.
        }
      }
      // output last key
      builder->Add(Slice(prev_key), Slice(prev_value));
      meta->largest.DecodeFrom(Slice(prev_key));

    } else {
      for (; iter->Valid(); iter->Next()) {
        Slice key = iter->key();
        meta->largest.DecodeFrom(key);
        builder->Add(key, iter->value());
      }
J
jorlow@chromium.org 已提交
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
    }

    // Finish and check for builder errors
    if (s.ok()) {
      s = builder->Finish();
      if (s.ok()) {
        meta->file_size = builder->FileSize();
        assert(meta->file_size > 0);
      }
    } else {
      builder->Abandon();
    }
    delete builder;

    // Finish and check for file errors
107
    if (s.ok() && !options.disableDataSync) {
108 109 110 111 112
      if (options.use_fsync) {
        s = file->Fsync();
      } else {
        s = file->Sync();
      }
J
jorlow@chromium.org 已提交
113 114 115 116 117 118 119
    }
    if (s.ok()) {
      s = file->Close();
    }

    if (s.ok()) {
      // Verify that the table is usable
J
jorlow@chromium.org 已提交
120 121 122
      Iterator* it = table_cache->NewIterator(ReadOptions(),
                                              meta->number,
                                              meta->file_size);
J
jorlow@chromium.org 已提交
123 124 125 126 127 128 129 130 131 132 133
      s = it->status();
      delete it;
    }
  }

  // Check for input iterator errors
  if (!iter->status().ok()) {
    s = iter->status();
  }

  if (s.ok() && meta->file_size > 0) {
134
    // Keep it
J
jorlow@chromium.org 已提交
135 136 137 138 139 140
  } else {
    env->DeleteFile(fname);
  }
  return s;
}

H
Hans Wennborg 已提交
141
}  // namespace leveldb