merger.cc 7.6 KB
Newer Older
1 2 3 4 5
//  Copyright (c) 2013, Facebook, Inc.  All rights reserved.
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.
//
J
jorlow@chromium.org 已提交
6 7 8 9 10 11
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include "table/merger.h"

12 13
#include "rocksdb/comparator.h"
#include "rocksdb/iterator.h"
14
#include "rocksdb/options.h"
15
#include "table/iter_heap.h"
J
jorlow@chromium.org 已提交
16
#include "table/iterator_wrapper.h"
17 18
#include "util/stop_watch.h"
#include "util/perf_context_imp.h"
J
jorlow@chromium.org 已提交
19

H
Haobo Xu 已提交
20 21
#include <vector>

22
namespace rocksdb {
J
jorlow@chromium.org 已提交
23 24

namespace {
25

J
jorlow@chromium.org 已提交
26 27
class MergingIterator : public Iterator {
 public:
28 29
  MergingIterator(Env* const env, const Comparator* comparator,
                  Iterator** children, int n)
J
jorlow@chromium.org 已提交
30
      : comparator_(comparator),
H
Haobo Xu 已提交
31
        children_(n),
A
Abhishek Kona 已提交
32
        current_(nullptr),
33
        use_heap_(true),
34
        env_(env),
35 36 37
        direction_(kForward),
        maxHeap_(NewMaxIterHeap(comparator_)),
        minHeap_ (NewMinIterHeap(comparator_)) {
J
jorlow@chromium.org 已提交
38 39 40
    for (int i = 0; i < n; i++) {
      children_[i].Set(children[i]);
    }
H
Haobo Xu 已提交
41 42 43
    for (auto& child : children_) {
      if (child.Valid()) {
        minHeap_.push(&child);
44 45
      }
    }
J
jorlow@chromium.org 已提交
46 47
  }

H
Haobo Xu 已提交
48
  virtual ~MergingIterator() { }
J
jorlow@chromium.org 已提交
49 50

  virtual bool Valid() const {
A
Abhishek Kona 已提交
51
    return (current_ != nullptr);
J
jorlow@chromium.org 已提交
52 53 54
  }

  virtual void SeekToFirst() {
55
    ClearHeaps();
H
Haobo Xu 已提交
56 57 58 59
    for (auto& child : children_) {
      child.SeekToFirst();
      if (child.Valid()) {
        minHeap_.push(&child);
60
      }
J
jorlow@chromium.org 已提交
61 62
    }
    FindSmallest();
J
jorlow@chromium.org 已提交
63
    direction_ = kForward;
J
jorlow@chromium.org 已提交
64 65 66
  }

  virtual void SeekToLast() {
67
    ClearHeaps();
H
Haobo Xu 已提交
68 69 70 71
    for (auto& child : children_) {
      child.SeekToLast();
      if (child.Valid()) {
        maxHeap_.push(&child);
72
      }
J
jorlow@chromium.org 已提交
73 74
    }
    FindLargest();
J
jorlow@chromium.org 已提交
75
    direction_ = kReverse;
J
jorlow@chromium.org 已提交
76 77 78
  }

  virtual void Seek(const Slice& target) {
79 80 81
    // Invalidate the heap.
    use_heap_ = false;
    IteratorWrapper* first_child = nullptr;
82 83
    StopWatchNano child_seek_timer(env_, false);
    StopWatchNano min_heap_timer(env_, false);
H
Haobo Xu 已提交
84
    for (auto& child : children_) {
85
      StartPerfTimer(&child_seek_timer);
H
Haobo Xu 已提交
86
      child.Seek(target);
87 88 89
      BumpPerfTime(&perf_context.seek_child_seek_time, &child_seek_timer);
      BumpPerfCount(&perf_context.seek_child_seek_count);

H
Haobo Xu 已提交
90
      if (child.Valid()) {
91 92 93 94 95 96 97 98 99
        // This child has valid key
        if (!use_heap_) {
          if (first_child == nullptr) {
            // It's the first child has valid key. Only put it int
            // current_. Now the values in the heap should be invalid.
            first_child = &child;
          } else {
            // We have more than one children with valid keys. Initialize
            // the heap and put the first child into the heap.
100
            StartPerfTimer(&min_heap_timer);
101
            ClearHeaps();
102 103
            BumpPerfTime(&perf_context.seek_min_heap_time, &child_seek_timer);
            StartPerfTimer(&min_heap_timer);
104
            minHeap_.push(first_child);
105
            BumpPerfTime(&perf_context.seek_min_heap_time, &child_seek_timer);
106 107 108
          }
        }
        if (use_heap_) {
109
          StartPerfTimer(&min_heap_timer);
110
          minHeap_.push(&child);
111
          BumpPerfTime(&perf_context.seek_min_heap_time, &child_seek_timer);
112
        }
113
      }
J
jorlow@chromium.org 已提交
114
    }
115 116
    if (use_heap_) {
      // If heap is valid, need to put the smallest key to curent_.
117
      StartPerfTimer(&min_heap_timer);
118
      FindSmallest();
119
      BumpPerfTime(&perf_context.seek_min_heap_time, &child_seek_timer);
120 121 122 123 124
    } else {
      // The heap is not valid, then the current_ iterator is the first
      // one, or null if there is no first child.
      current_ = first_child;
    }
J
jorlow@chromium.org 已提交
125 126 127 128
  }

  virtual void Next() {
    assert(Valid());
J
jorlow@chromium.org 已提交
129 130 131 132 133 134 135

    // Ensure that all children are positioned after key().
    // If we are moving in the forward direction, it is already
    // true for all of the non-current_ children since current_ is
    // the smallest child and key() == current_->key().  Otherwise,
    // we explicitly position the non-current_ children.
    if (direction_ != kForward) {
136
      ClearHeaps();
H
Haobo Xu 已提交
137 138 139 140 141 142
      for (auto& child : children_) {
        if (&child != current_) {
          child.Seek(key());
          if (child.Valid() &&
              comparator_->Compare(key(), child.key()) == 0) {
            child.Next();
J
jorlow@chromium.org 已提交
143
          }
H
Haobo Xu 已提交
144 145
          if (child.Valid()) {
            minHeap_.push(&child);
146
          }
J
jorlow@chromium.org 已提交
147 148 149 150 151
        }
      }
      direction_ = kForward;
    }

152 153
    // as the current points to the current record. move the iterator forward.
    // and if it is valid add it to the heap.
J
jorlow@chromium.org 已提交
154
    current_->Next();
155 156 157 158 159 160 161
    if (use_heap_) {
      if (current_->Valid()) {
        minHeap_.push(current_);
      }
      FindSmallest();
    } else if (!current_->Valid()) {
      current_ = nullptr;
162
    }
J
jorlow@chromium.org 已提交
163 164 165 166
  }

  virtual void Prev() {
    assert(Valid());
J
jorlow@chromium.org 已提交
167 168 169 170 171 172
    // Ensure that all children are positioned before key().
    // If we are moving in the reverse direction, it is already
    // true for all of the non-current_ children since current_ is
    // the largest child and key() == current_->key().  Otherwise,
    // we explicitly position the non-current_ children.
    if (direction_ != kReverse) {
173
      ClearHeaps();
H
Haobo Xu 已提交
174 175 176 177
      for (auto& child : children_) {
        if (&child != current_) {
          child.Seek(key());
          if (child.Valid()) {
J
jorlow@chromium.org 已提交
178
            // Child is at first entry >= key().  Step back one to be < key()
H
Haobo Xu 已提交
179
            child.Prev();
J
jorlow@chromium.org 已提交
180 181
          } else {
            // Child has no entries >= key().  Position at last entry.
H
Haobo Xu 已提交
182
            child.SeekToLast();
J
jorlow@chromium.org 已提交
183
          }
H
Haobo Xu 已提交
184 185
          if (child.Valid()) {
            maxHeap_.push(&child);
186
          }
J
jorlow@chromium.org 已提交
187 188 189 190 191
        }
      }
      direction_ = kReverse;
    }

J
jorlow@chromium.org 已提交
192
    current_->Prev();
193 194 195
    if (current_->Valid()) {
      maxHeap_.push(current_);
    }
J
jorlow@chromium.org 已提交
196 197 198 199 200 201 202 203 204 205 206 207 208 209 210
    FindLargest();
  }

  virtual Slice key() const {
    assert(Valid());
    return current_->key();
  }

  virtual Slice value() const {
    assert(Valid());
    return current_->value();
  }

  virtual Status status() const {
    Status status;
H
Haobo Xu 已提交
211 212
    for (auto& child : children_) {
      status = child.status();
J
jorlow@chromium.org 已提交
213 214 215 216 217 218 219 220 221 222
      if (!status.ok()) {
        break;
      }
    }
    return status;
  }

 private:
  void FindSmallest();
  void FindLargest();
223
  void ClearHeaps();
J
jorlow@chromium.org 已提交
224 225

  const Comparator* comparator_;
H
Haobo Xu 已提交
226
  std::vector<IteratorWrapper> children_;
J
jorlow@chromium.org 已提交
227
  IteratorWrapper* current_;
228 229 230 231
  // If the value is true, both of iterators in the heap and current_
  // contain valid rows. If it is false, only current_ can possibly contain
  // valid rows.
  bool use_heap_;
232
  Env* const env_;
J
jorlow@chromium.org 已提交
233 234 235 236 237 238
  // Which direction is the iterator moving?
  enum Direction {
    kForward,
    kReverse
  };
  Direction direction_;
239 240
  MaxIterHeap maxHeap_;
  MinIterHeap minHeap_;
J
jorlow@chromium.org 已提交
241 242 243
};

void MergingIterator::FindSmallest() {
244
  assert(use_heap_);
245
  if (minHeap_.empty()) {
A
Abhishek Kona 已提交
246
    current_ = nullptr;
247 248 249 250
  } else {
    current_ = minHeap_.top();
    assert(current_->Valid());
    minHeap_.pop();
J
jorlow@chromium.org 已提交
251 252 253 254
  }
}

void MergingIterator::FindLargest() {
255
  assert(use_heap_);
256
  if (maxHeap_.empty()) {
A
Abhishek Kona 已提交
257
    current_ = nullptr;
258 259 260 261
  } else {
    current_ = maxHeap_.top();
    assert(current_->Valid());
    maxHeap_.pop();
J
jorlow@chromium.org 已提交
262
  }
263 264 265
}

void MergingIterator::ClearHeaps() {
266
  use_heap_ = true;
267 268
  maxHeap_ = NewMaxIterHeap(comparator_);
  minHeap_ = NewMinIterHeap(comparator_);
J
jorlow@chromium.org 已提交
269
}
H
Hans Wennborg 已提交
270
}  // namespace
J
jorlow@chromium.org 已提交
271

272 273
Iterator* NewMergingIterator(Env* const env, const Comparator* cmp,
                             Iterator** list, int n) {
J
jorlow@chromium.org 已提交
274 275 276 277 278 279
  assert(n >= 0);
  if (n == 0) {
    return NewEmptyIterator();
  } else if (n == 1) {
    return list[0];
  } else {
280
    return new MergingIterator(env, cmp, list, n);
J
jorlow@chromium.org 已提交
281 282 283
  }
}

284
}  // namespace rocksdb