db_iter.cc 40.8 KB
Newer Older
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 3 4 5
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.
//
J
jorlow@chromium.org 已提交
6 7 8 9 10
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include "db/db_iter.h"
11
#include <stdexcept>
12
#include <deque>
S
Stanislau Hlebik 已提交
13
#include <string>
S
Stanislau Hlebik 已提交
14
#include <limits>
J
jorlow@chromium.org 已提交
15 16

#include "db/dbformat.h"
17
#include "db/filename.h"
18
#include "db/merge_context.h"
19
#include "db/merge_helper.h"
20
#include "db/pinned_iterators_manager.h"
S
sdong 已提交
21
#include "port/port.h"
22 23 24
#include "rocksdb/env.h"
#include "rocksdb/iterator.h"
#include "rocksdb/merge_operator.h"
25
#include "rocksdb/options.h"
S
sdong 已提交
26
#include "table/internal_iterator.h"
27
#include "util/arena.h"
J
jorlow@chromium.org 已提交
28 29
#include "util/logging.h"
#include "util/mutexlock.h"
30
#include "util/perf_context_imp.h"
31
#include "util/string_util.h"
J
jorlow@chromium.org 已提交
32

33
namespace rocksdb {
J
jorlow@chromium.org 已提交
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54

#if 0
static void DumpInternalIter(Iterator* iter) {
  for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
    ParsedInternalKey k;
    if (!ParseInternalKey(iter->key(), &k)) {
      fprintf(stderr, "Corrupt '%s'\n", EscapeString(iter->key()).c_str());
    } else {
      fprintf(stderr, "@ '%s'\n", k.DebugString().c_str());
    }
  }
}
#endif

// Memtables and sstables that make the DB representation contain
// (userkey,seq,type) => uservalue entries.  DBIter
// combines multiple entries for the same userkey found in the DB
// representation into a single entry while accounting for sequence
// numbers, deletion markers, overwrites, etc.
class DBIter: public Iterator {
 public:
55
  // The following is grossly complicated. TODO: clean it up
J
jorlow@chromium.org 已提交
56 57 58 59 60 61 62 63 64 65
  // Which direction is the iterator currently moving?
  // (1) When moving forward, the internal iterator is positioned at
  //     the exact entry that yields this->key(), this->value()
  // (2) When moving backwards, the internal iterator is positioned
  //     just before all entries whose user key == this->key().
  enum Direction {
    kForward,
    kReverse
  };

66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
  // LocalStatistics contain Statistics counters that will be aggregated per
  // each iterator instance and then will be sent to the global statistics when
  // the iterator is destroyed.
  //
  // The purpose of this approach is to avoid perf regression happening
  // when multiple threads bump the atomic counters from a DBIter::Next().
  struct LocalStatistics {
    explicit LocalStatistics() { ResetCounters(); }

    void ResetCounters() {
      next_count_ = 0;
      next_found_count_ = 0;
      prev_count_ = 0;
      prev_found_count_ = 0;
      bytes_read_ = 0;
    }

    void BumpGlobalStatistics(Statistics* global_statistics) {
      RecordTick(global_statistics, NUMBER_DB_NEXT, next_count_);
      RecordTick(global_statistics, NUMBER_DB_NEXT_FOUND, next_found_count_);
      RecordTick(global_statistics, NUMBER_DB_PREV, prev_count_);
      RecordTick(global_statistics, NUMBER_DB_PREV_FOUND, prev_found_count_);
      RecordTick(global_statistics, ITER_BYTES_READ, bytes_read_);
      ResetCounters();
    }

    // Map to Tickers::NUMBER_DB_NEXT
    uint64_t next_count_;
    // Map to Tickers::NUMBER_DB_NEXT_FOUND
    uint64_t next_found_count_;
    // Map to Tickers::NUMBER_DB_PREV
    uint64_t prev_count_;
    // Map to Tickers::NUMBER_DB_PREV_FOUND
    uint64_t prev_found_count_;
    // Map to Tickers::ITER_BYTES_READ
    uint64_t bytes_read_;
  };

S
sdong 已提交
104 105
  DBIter(Env* env, const ImmutableCFOptions& ioptions, const Comparator* cmp,
         InternalIterator* iter, SequenceNumber s, bool arena_mode,
106
         uint64_t max_sequential_skip_in_iterations, uint64_t version_number,
107
         const Slice* iterate_upper_bound = nullptr,
108
         bool prefix_same_as_start = false, bool pin_data = false,
109 110
         bool total_order_seek = false,
         uint64_t max_skippable_internal_keys = 0)
111 112
      : arena_mode_(arena_mode),
        env_(env),
113
        logger_(ioptions.info_log),
J
jorlow@chromium.org 已提交
114
        user_comparator_(cmp),
115
        merge_operator_(ioptions.merge_operator),
J
jorlow@chromium.org 已提交
116 117
        iter_(iter),
        sequence_(s),
J
jorlow@chromium.org 已提交
118
        direction_(kForward),
119
        valid_(false),
120
        current_entry_is_merged_(false),
121
        statistics_(ioptions.statistics),
122
        version_number_(version_number),
123
        iterate_upper_bound_(iterate_upper_bound),
124
        prefix_same_as_start_(prefix_same_as_start),
125
        pin_thru_lifetime_(pin_data),
A
Andrew Kryczka 已提交
126
        total_order_seek_(total_order_seek),
A
Andrew Kryczka 已提交
127 128
        range_del_agg_(ioptions.internal_comparator, s,
                       true /* collapse_deletions */) {
L
Lei Jin 已提交
129
    RecordTick(statistics_, NO_ITERATORS);
130 131
    prefix_extractor_ = ioptions.prefix_extractor;
    max_skip_ = max_sequential_skip_in_iterations;
132
    max_skippable_internal_keys_ = max_skippable_internal_keys;
133 134 135 136 137 138
    if (pin_thru_lifetime_) {
      pinned_iters_mgr_.StartPinning();
    }
    if (iter_) {
      iter_->SetPinnedItersMgr(&pinned_iters_mgr_);
    }
J
jorlow@chromium.org 已提交
139 140
  }
  virtual ~DBIter() {
141
    // Release pinned data if any
142 143 144
    if (pinned_iters_mgr_.PinningEnabled()) {
      pinned_iters_mgr_.ReleasePinnedData();
    }
145
    RecordTick(statistics_, NO_ITERATORS, -1);
146
    local_stats_.BumpGlobalStatistics(statistics_);
147 148 149
    if (!arena_mode_) {
      delete iter_;
    } else {
S
sdong 已提交
150
      iter_->~InternalIterator();
151 152
    }
  }
S
sdong 已提交
153
  virtual void SetIter(InternalIterator* iter) {
154 155
    assert(iter_ == nullptr);
    iter_ = iter;
156
    iter_->SetPinnedItersMgr(&pinned_iters_mgr_);
J
jorlow@chromium.org 已提交
157
  }
A
Andrew Kryczka 已提交
158 159 160 161
  virtual RangeDelAggregator* GetRangeDelAggregator() {
    return &range_del_agg_;
  }

I
Igor Sugak 已提交
162 163
  virtual bool Valid() const override { return valid_; }
  virtual Slice key() const override {
J
jorlow@chromium.org 已提交
164
    assert(valid_);
165
    return saved_key_.GetKey();
J
jorlow@chromium.org 已提交
166
  }
I
Igor Sugak 已提交
167
  virtual Slice value() const override {
J
jorlow@chromium.org 已提交
168
    assert(valid_);
169
    if (current_entry_is_merged_) {
170 171 172
      // If pinned_value_ is set then the result of merge operator is one of
      // the merge operands and we should return it.
      return pinned_value_.data() ? pinned_value_ : saved_value_;
173 174 175 176 177
    } else if (direction_ == kReverse) {
      return pinned_value_;
    } else {
      return iter_->value();
    }
J
jorlow@chromium.org 已提交
178
  }
I
Igor Sugak 已提交
179
  virtual Status status() const override {
J
jorlow@chromium.org 已提交
180 181 182 183 184 185
    if (status_.ok()) {
      return iter_->status();
    } else {
      return status_;
    }
  }
186 187 188 189 190 191

  virtual Status GetProperty(std::string prop_name,
                             std::string* prop) override {
    if (prop == nullptr) {
      return Status::InvalidArgument("prop is nullptr");
    }
192
    if (prop_name == "rocksdb.iterator.super-version-number") {
193 194 195 196 197 198
      // First try to pass the value returned from inner iterator.
      if (!iter_->GetProperty(prop_name, prop).ok()) {
        *prop = ToString(version_number_);
      }
      return Status::OK();
    } else if (prop_name == "rocksdb.iterator.is-key-pinned") {
199
      if (valid_) {
200
        *prop = (pin_thru_lifetime_ && saved_key_.IsKeyPinned()) ? "1" : "0";
201 202 203 204 205 206
      } else {
        *prop = "Iterator is not valid.";
      }
      return Status::OK();
    }
    return Status::InvalidArgument("Undentified property.");
207
  }
J
jorlow@chromium.org 已提交
208

I
Igor Sugak 已提交
209 210 211
  virtual void Next() override;
  virtual void Prev() override;
  virtual void Seek(const Slice& target) override;
A
Aaron Gao 已提交
212
  virtual void SeekForPrev(const Slice& target) override;
I
Igor Sugak 已提交
213 214
  virtual void SeekToFirst() override;
  virtual void SeekToLast() override;
J
jorlow@chromium.org 已提交
215

J
jorlow@chromium.org 已提交
216
 private:
217
  void ReverseToForward();
218
  void ReverseToBackward();
S
Stanislau Hlebik 已提交
219 220 221 222 223 224
  void PrevInternal();
  void FindParseableKey(ParsedInternalKey* ikey, Direction direction);
  bool FindValueForCurrentKey();
  bool FindValueForCurrentKeyUsingSeek();
  void FindPrevUserKey();
  void FindNextUserKey();
225 226
  inline void FindNextUserEntry(bool skipping, bool prefix_check);
  void FindNextUserEntryInternal(bool skipping, bool prefix_check);
J
jorlow@chromium.org 已提交
227
  bool ParseKey(ParsedInternalKey* key);
228
  void MergeValuesNewToOld();
229
  bool TooManyInternalKeysSkipped(bool increment = true);
J
jorlow@chromium.org 已提交
230

231 232 233 234 235 236 237 238 239 240
  // Temporarily pin the blocks that we encounter until ReleaseTempPinnedData()
  // is called
  void TempPinData() {
    if (!pin_thru_lifetime_) {
      pinned_iters_mgr_.StartPinning();
    }
  }

  // Release blocks pinned by TempPinData()
  void ReleaseTempPinnedData() {
241 242
    if (!pin_thru_lifetime_ && pinned_iters_mgr_.PinningEnabled()) {
      pinned_iters_mgr_.ReleasePinnedData();
243 244 245
    }
  }

J
jorlow@chromium.org 已提交
246 247 248 249 250 251 252 253 254
  inline void ClearSavedValue() {
    if (saved_value_.capacity() > 1048576) {
      std::string empty;
      swap(empty, saved_value_);
    } else {
      saved_value_.clear();
    }
  }

255 256 257 258
  inline void ResetInternalKeysSkippedCounter() {
    num_internal_keys_skipped_ = 0;
  }

259
  const SliceTransform* prefix_extractor_;
260
  bool arena_mode_;
J
jorlow@chromium.org 已提交
261
  Env* const env_;
I
Igor Canadi 已提交
262
  Logger* logger_;
J
jorlow@chromium.org 已提交
263
  const Comparator* const user_comparator_;
264
  const MergeOperator* const merge_operator_;
S
sdong 已提交
265
  InternalIterator* iter_;
J
jorlow@chromium.org 已提交
266
  SequenceNumber const sequence_;
J
jorlow@chromium.org 已提交
267

J
jorlow@chromium.org 已提交
268
  Status status_;
S
Stanislau Hlebik 已提交
269 270
  IterKey saved_key_;
  std::string saved_value_;
271
  Slice pinned_value_;
J
jorlow@chromium.org 已提交
272
  Direction direction_;
J
jorlow@chromium.org 已提交
273
  bool valid_;
274
  bool current_entry_is_merged_;
275
  // for prefix seek mode to support prev()
276
  Statistics* statistics_;
277
  uint64_t max_skip_;
278 279
  uint64_t max_skippable_internal_keys_;
  uint64_t num_internal_keys_skipped_;
280
  uint64_t version_number_;
281
  const Slice* iterate_upper_bound_;
282 283 284
  IterKey prefix_start_buf_;
  Slice prefix_start_key_;
  const bool prefix_same_as_start_;
285 286 287
  // Means that we will pin all data blocks we read as long the Iterator
  // is not deleted, will be true if ReadOptions::pin_data is true
  const bool pin_thru_lifetime_;
288
  const bool total_order_seek_;
289
  // List of operands for merge operator.
290
  MergeContext merge_context_;
A
Andrew Kryczka 已提交
291
  RangeDelAggregator range_del_agg_;
292
  LocalStatistics local_stats_;
293
  PinnedIteratorsManager pinned_iters_mgr_;
J
jorlow@chromium.org 已提交
294 295 296 297 298 299 300 301 302

  // No copying allowed
  DBIter(const DBIter&);
  void operator=(const DBIter&);
};

inline bool DBIter::ParseKey(ParsedInternalKey* ikey) {
  if (!ParseInternalKey(iter_->key(), ikey)) {
    status_ = Status::Corruption("corrupted internal key in DBIter");
303 304
    ROCKS_LOG_ERROR(logger_, "corrupted internal key in DBIter: %s",
                    iter_->key().ToString(true).c_str());
J
jorlow@chromium.org 已提交
305 306 307 308 309 310
    return false;
  } else {
    return true;
  }
}

J
jorlow@chromium.org 已提交
311 312 313
void DBIter::Next() {
  assert(valid_);

314 315
  // Release temporarily pinned blocks from last operation
  ReleaseTempPinnedData();
316
  ResetInternalKeysSkippedCounter();
S
Stanislau Hlebik 已提交
317
  if (direction_ == kReverse) {
318
    ReverseToForward();
319 320 321 322 323 324 325
  } else if (iter_->Valid() && !current_entry_is_merged_) {
    // If the current value is not a merge, the iter position is the
    // current key, which is already returned. We can safely issue a
    // Next() without checking the current key.
    // If the current key is a merge, very likely iter already points
    // to the next internal position.
    iter_->Next();
326
    PERF_COUNTER_ADD(internal_key_skipped_count, 1);
J
jorlow@chromium.org 已提交
327
  }
J
jorlow@chromium.org 已提交
328

329 330 331
  if (statistics_ != nullptr) {
    local_stats_.next_count_++;
  }
332 333
  // Now we point to the next internal position, for both of merge and
  // not merge cases.
334 335 336 337
  if (!iter_->Valid()) {
    valid_ = false;
    return;
  }
338
  FindNextUserEntry(true /* skipping the current user key */, prefix_same_as_start_);
339 340 341 342
  if (statistics_ != nullptr && valid_) {
    local_stats_.next_found_count_++;
    local_stats_.bytes_read_ += (key().size() + value().size());
  }
J
jorlow@chromium.org 已提交
343 344
}

345 346 347 348 349 350 351
// PRE: saved_key_ has the current user key if skipping
// POST: saved_key_ should have the next user key if valid_,
//       if the current entry is a result of merge
//           current_entry_is_merged_ => true
//           saved_value_             => the merged value
//
// NOTE: In between, saved_key_ can point to a user key that has
352
//       a delete marker or a sequence number higher than sequence_
353
//       saved_key_ MUST have a proper user_key before calling this function
354 355 356 357 358 359
//
// The prefix_check parameter controls whether we check the iterated
// keys against the prefix of the seeked key. Set to false when
// performing a seek without a key (e.g. SeekToFirst). Set to
// prefix_same_as_start_ for other iterations.
inline void DBIter::FindNextUserEntry(bool skipping, bool prefix_check) {
360
  PERF_TIMER_GUARD(find_next_user_entry_time);
361
  FindNextUserEntryInternal(skipping, prefix_check);
362 363 364
}

// Actual implementation of DBIter::FindNextUserEntry()
365
void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) {
J
jorlow@chromium.org 已提交
366 367 368
  // Loop until we hit an acceptable entry to yield
  assert(iter_->Valid());
  assert(direction_ == kForward);
369
  current_entry_is_merged_ = false;
370 371 372 373 374 375 376 377 378 379 380

  // How many times in a row we have skipped an entry with user key less than
  // or equal to saved_key_. We could skip these entries either because
  // sequence numbers were too high or because skipping = true.
  // What saved_key_ contains throughout this method:
  //  - if skipping        : saved_key_ contains the key that we need to skip,
  //                         and we haven't seen any keys greater than that,
  //  - if num_skipped > 0 : saved_key_ contains the key that we have skipped
  //                         num_skipped times, and we haven't seen any keys
  //                         greater than that,
  //  - none of the above  : saved_key_ can contain anything, it doesn't matter.
381
  uint64_t num_skipped = 0;
382

J
jorlow@chromium.org 已提交
383
  do {
J
jorlow@chromium.org 已提交
384
    ParsedInternalKey ikey;
385

386 387 388 389 390
    if (!ParseKey(&ikey)) {
      // Skip corrupted keys.
      iter_->Next();
      continue;
    }
391

392 393 394 395
    if (iterate_upper_bound_ != nullptr &&
        user_comparator_->Compare(ikey.user_key, *iterate_upper_bound_) >= 0) {
      break;
    }
396

397 398 399 400 401 402
    if (prefix_extractor_ && prefix_check &&
        prefix_extractor_->Transform(ikey.user_key)
          .compare(prefix_start_key_) != 0) {
      break;
    }

403 404 405 406
    if (TooManyInternalKeysSkipped()) {
      return;
    }

407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428
    if (ikey.sequence <= sequence_) {
      if (skipping &&
          user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) <= 0) {
        num_skipped++;  // skip this entry
        PERF_COUNTER_ADD(internal_key_skipped_count, 1);
      } else {
        num_skipped = 0;
        switch (ikey.type) {
          case kTypeDeletion:
          case kTypeSingleDeletion:
            // Arrange to skip all upcoming entries for this key since
            // they are hidden by this deletion.
            saved_key_.SetKey(
                ikey.user_key,
                !iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
            skipping = true;
            PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
            break;
          case kTypeValue:
            saved_key_.SetKey(
                ikey.user_key,
                !iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
429 430 431
            if (range_del_agg_.ShouldDelete(
                    ikey, RangeDelAggregator::RangePositioningMode::
                              kForwardTraversal)) {
432 433 434 435 436
              // Arrange to skip all upcoming entries for this key since
              // they are hidden by this deletion.
              skipping = true;
              num_skipped = 0;
              PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
437 438 439 440 441 442 443 444 445
            } else {
              valid_ = true;
              return;
            }
            break;
          case kTypeMerge:
            saved_key_.SetKey(
                ikey.user_key,
                !iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
446 447 448
            if (range_del_agg_.ShouldDelete(
                    ikey, RangeDelAggregator::RangePositioningMode::
                              kForwardTraversal)) {
449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465
              // Arrange to skip all upcoming entries for this key since
              // they are hidden by this deletion.
              skipping = true;
              num_skipped = 0;
              PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
            } else {
              // By now, we are sure the current ikey is going to yield a
              // value
              current_entry_is_merged_ = true;
              valid_ = true;
              MergeValuesNewToOld();  // Go to a different state machine
              return;
            }
            break;
          default:
            assert(false);
            break;
466
        }
J
jorlow@chromium.org 已提交
467
      }
468 469 470 471 472 473 474 475 476 477 478 479 480 481 482
    } else {
      // This key was inserted after our snapshot was taken.
      PERF_COUNTER_ADD(internal_recent_skipped_count, 1);

      // Here saved_key_ may contain some old key, or the default empty key, or
      // key assigned by some random other method. We don't care.
      if (user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) <= 0) {
        num_skipped++;
      } else {
        saved_key_.SetKey(
                ikey.user_key,
                !iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
        skipping = false;
        num_skipped = 0;
      }
J
jorlow@chromium.org 已提交
483
    }
484 485 486 487

    // If we have sequentially iterated via numerous equal keys, then it's
    // better to seek so that we can avoid too many key comparisons.
    if (num_skipped > max_skip_) {
488 489
      num_skipped = 0;
      std::string last_key;
490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507
      if (skipping) {
        // We're looking for the next user-key but all we see are the same
        // user-key with decreasing sequence numbers. Fast forward to
        // sequence number 0 and type deletion (the smallest type).
        AppendInternalKey(&last_key, ParsedInternalKey(saved_key_.GetKey(), 0,
                                                       kTypeDeletion));
        // Don't set skipping = false because we may still see more user-keys
        // equal to saved_key_.
      } else {
        // We saw multiple entries with this user key and sequence numbers
        // higher than sequence_. Fast forward to sequence_.
        // Note that this only covers a case when a higher key was overwritten
        // many times since our snapshot was taken, not the case when a lot of
        // different keys were inserted after our snapshot was taken.
        AppendInternalKey(&last_key,
                          ParsedInternalKey(saved_key_.GetKey(), sequence_,
                                            kValueTypeForSeek));
      }
508 509 510 511 512
      iter_->Seek(last_key);
      RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
    } else {
      iter_->Next();
    }
J
jorlow@chromium.org 已提交
513 514
  } while (iter_->Valid());
  valid_ = false;
J
jorlow@chromium.org 已提交
515 516
}

517 518 519 520 521 522 523
// Merge values of the same user key starting from the current iter_ position
// Scan from the newer entries to older entries.
// PRE: iter_->key() points to the first merge type entry
//      saved_key_ stores the user key
// POST: saved_value_ has the merged value for the user key
//       iter_ points to the next entry (or invalid)
void DBIter::MergeValuesNewToOld() {
524
  if (!merge_operator_) {
525
    ROCKS_LOG_ERROR(logger_, "Options::merge_operator is null.");
526
    status_ = Status::InvalidArgument("merge_operator_ must be set.");
527 528
    valid_ = false;
    return;
D
Deon Nicholas 已提交
529
  }
530

531 532
  // Temporarily pin the blocks that hold merge operands
  TempPinData();
533
  merge_context_.Clear();
534
  // Start the merge process by pushing the first operand
535 536
  merge_context_.PushOperand(iter_->value(),
                             iter_->IsValuePinned() /* operand_pinned */);
537 538

  ParsedInternalKey ikey;
539
  Status s;
540 541 542 543 544 545
  for (iter_->Next(); iter_->Valid(); iter_->Next()) {
    if (!ParseKey(&ikey)) {
      // skip corrupted key
      continue;
    }

546
    if (!user_comparator_->Equal(ikey.user_key, saved_key_.GetKey())) {
547 548
      // hit the next user key, stop right here
      break;
A
Andrew Kryczka 已提交
549
    } else if (kTypeDeletion == ikey.type || kTypeSingleDeletion == ikey.type ||
550 551 552
               range_del_agg_.ShouldDelete(
                   ikey, RangeDelAggregator::RangePositioningMode::
                             kForwardTraversal)) {
553 554 555 556
      // hit a delete with the same user key, stop right here
      // iter_ is positioned after delete
      iter_->Next();
      break;
A
Andres Noetzli 已提交
557
    } else if (kTypeValue == ikey.type) {
558 559 560
      // hit a put, merge the put value with operands and store the
      // final result in saved_value_. We are done!
      // ignore corruption if there is any.
I
Igor Canadi 已提交
561
      const Slice val = iter_->value();
562 563 564 565 566 567
      s = MergeHelper::TimedFullMerge(
          merge_operator_, ikey.user_key, &val, merge_context_.GetOperands(),
          &saved_value_, logger_, statistics_, env_, &pinned_value_);
      if (!s.ok()) {
        status_ = s;
      }
568 569 570
      // iter_ is positioned after put
      iter_->Next();
      return;
A
Andres Noetzli 已提交
571
    } else if (kTypeMerge == ikey.type) {
572 573
      // hit a merge, add the value as an operand and run associative merge.
      // when complete, add result to operands and continue.
574 575
      merge_context_.PushOperand(iter_->value(),
                                 iter_->IsValuePinned() /* operand_pinned */);
576
      PERF_COUNTER_ADD(internal_merge_count, 1);
A
Andres Noetzli 已提交
577 578
    } else {
      assert(false);
579 580 581
    }
  }

582 583 584 585
  // we either exhausted all internal keys under this user key, or hit
  // a deletion marker.
  // feed null as the existing value to the merge operator, such that
  // client can differentiate this scenario and do things accordingly.
586 587 588 589 590 591
  s = MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(), nullptr,
                                  merge_context_.GetOperands(), &saved_value_,
                                  logger_, statistics_, env_, &pinned_value_);
  if (!s.ok()) {
    status_ = s;
  }
592 593
}

J
jorlow@chromium.org 已提交
594 595
void DBIter::Prev() {
  assert(valid_);
596
  ReleaseTempPinnedData();
597
  ResetInternalKeysSkippedCounter();
S
Stanislau Hlebik 已提交
598
  if (direction_ == kForward) {
599
    ReverseToBackward();
S
Stanislau Hlebik 已提交
600 601
  }
  PrevInternal();
M
Manuel Ung 已提交
602
  if (statistics_ != nullptr) {
603
    local_stats_.prev_count_++;
M
Manuel Ung 已提交
604
    if (valid_) {
605 606
      local_stats_.prev_found_count_++;
      local_stats_.bytes_read_ += (key().size() + value().size());
M
Manuel Ung 已提交
607 608
    }
  }
S
Stanislau Hlebik 已提交
609
}
J
jorlow@chromium.org 已提交
610

611 612 613 614 615
void DBIter::ReverseToForward() {
  if (prefix_extractor_ != nullptr && !total_order_seek_) {
    IterKey last_key;
    last_key.SetInternalKey(ParsedInternalKey(
        saved_key_.GetKey(), kMaxSequenceNumber, kValueTypeForSeek));
A
Aaron Gao 已提交
616
    iter_->Seek(last_key.GetKey());
617 618 619 620 621
  }
  FindNextUserKey();
  direction_ = kForward;
  if (!iter_->Valid()) {
    iter_->SeekToFirst();
622
    range_del_agg_.InvalidateTombstoneMapPositions();
623 624 625
  }
}

626
void DBIter::ReverseToBackward() {
627 628 629 630
  if (prefix_extractor_ != nullptr && !total_order_seek_) {
    IterKey last_key;
    last_key.SetInternalKey(
        ParsedInternalKey(saved_key_.GetKey(), 0, kValueTypeForSeekForPrev));
A
Aaron Gao 已提交
631
    iter_->SeekForPrev(last_key.GetKey());
632
  }
633 634 635 636 637
  if (current_entry_is_merged_) {
    // Not placed in the same key. Need to call Prev() until finding the
    // previous key.
    if (!iter_->Valid()) {
      iter_->SeekToLast();
638
      range_del_agg_.InvalidateTombstoneMapPositions();
639 640 641 642 643
    }
    ParsedInternalKey ikey;
    FindParseableKey(&ikey, kReverse);
    while (iter_->Valid() &&
           user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) > 0) {
644 645 646 647 648
      if (ikey.sequence > sequence_) {
        PERF_COUNTER_ADD(internal_recent_skipped_count, 1);
      } else {
        PERF_COUNTER_ADD(internal_key_skipped_count, 1);
      }
649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664
      iter_->Prev();
      FindParseableKey(&ikey, kReverse);
    }
  }
#ifndef NDEBUG
  if (iter_->Valid()) {
    ParsedInternalKey ikey;
    assert(ParseKey(&ikey));
    assert(user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) <= 0);
  }
#endif

  FindPrevUserKey();
  direction_ = kReverse;
}

S
Stanislau Hlebik 已提交
665 666 667 668
void DBIter::PrevInternal() {
  if (!iter_->Valid()) {
    valid_ = false;
    return;
669 670
  }

S
Stanislau Hlebik 已提交
671 672 673
  ParsedInternalKey ikey;

  while (iter_->Valid()) {
674
    saved_key_.SetKey(ExtractUserKey(iter_->key()),
675
                      !iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
676

S
Stanislau Hlebik 已提交
677 678
    if (FindValueForCurrentKey()) {
      valid_ = true;
J
jorlow@chromium.org 已提交
679 680 681
      if (!iter_->Valid()) {
        return;
      }
S
Stanislau Hlebik 已提交
682
      FindParseableKey(&ikey, kReverse);
683
      if (user_comparator_->Equal(ikey.user_key, saved_key_.GetKey())) {
S
Stanislau Hlebik 已提交
684
        FindPrevUserKey();
J
jorlow@chromium.org 已提交
685
      }
A
Aaron Gao 已提交
686 687 688
      if (valid_ && prefix_extractor_ && prefix_same_as_start_ &&
          prefix_extractor_->Transform(saved_key_.GetKey())
                  .compare(prefix_start_key_) != 0) {
689
        valid_ = false;
A
Aaron Gao 已提交
690
      }
S
Stanislau Hlebik 已提交
691
      return;
J
jorlow@chromium.org 已提交
692
    }
693 694 695 696 697

    if (TooManyInternalKeysSkipped(false)) {
      return;
    }

S
Stanislau Hlebik 已提交
698 699 700 701
    if (!iter_->Valid()) {
      break;
    }
    FindParseableKey(&ikey, kReverse);
702
    if (user_comparator_->Equal(ikey.user_key, saved_key_.GetKey())) {
S
Stanislau Hlebik 已提交
703 704 705 706
      FindPrevUserKey();
    }
  }
  // We haven't found any key - iterator is not valid
A
Aaron Gao 已提交
707
  // Or the prefix is different than start prefix
708
  assert(!iter_->Valid());
S
Stanislau Hlebik 已提交
709
  valid_ = false;
J
jorlow@chromium.org 已提交
710 711
}

S
Stanislau Hlebik 已提交
712
// This function checks, if the entry with biggest sequence_number <= sequence_
A
Andres Noetzli 已提交
713 714
// is non kTypeDeletion or kTypeSingleDeletion. If it's not, we save value in
// saved_value_
S
Stanislau Hlebik 已提交
715 716
bool DBIter::FindValueForCurrentKey() {
  assert(iter_->Valid());
717
  merge_context_.Clear();
718
  current_entry_is_merged_ = false;
A
Andres Noetzli 已提交
719 720
  // last entry before merge (could be kTypeDeletion, kTypeSingleDeletion or
  // kTypeValue)
S
Stanislau Hlebik 已提交
721 722
  ValueType last_not_merge_type = kTypeDeletion;
  ValueType last_key_entry_type = kTypeDeletion;
J
jorlow@chromium.org 已提交
723

S
Stanislau Hlebik 已提交
724 725 726
  ParsedInternalKey ikey;
  FindParseableKey(&ikey, kReverse);

727 728 729
  // Temporarily pin blocks that hold (merge operands / the value)
  ReleaseTempPinnedData();
  TempPinData();
S
Stanislau Hlebik 已提交
730 731
  size_t num_skipped = 0;
  while (iter_->Valid() && ikey.sequence <= sequence_ &&
732
         user_comparator_->Equal(ikey.user_key, saved_key_.GetKey())) {
733 734 735 736
    if (TooManyInternalKeysSkipped()) {
      return false;
    }

S
Stanislau Hlebik 已提交
737 738 739 740 741 742 743 744
    // We iterate too much: let's use Seek() to avoid too much key comparisons
    if (num_skipped >= max_skip_) {
      return FindValueForCurrentKeyUsingSeek();
    }

    last_key_entry_type = ikey.type;
    switch (last_key_entry_type) {
      case kTypeValue:
745 746 747
        if (range_del_agg_.ShouldDelete(
                ikey,
                RangeDelAggregator::RangePositioningMode::kBackwardTraversal)) {
A
Andrew Kryczka 已提交
748 749 750 751 752 753
          last_key_entry_type = kTypeRangeDeletion;
          PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
        } else {
          assert(iter_->IsValuePinned());
          pinned_value_ = iter_->value();
        }
754
        merge_context_.Clear();
A
Andrew Kryczka 已提交
755
        last_not_merge_type = last_key_entry_type;
S
Stanislau Hlebik 已提交
756 757
        break;
      case kTypeDeletion:
A
Andres Noetzli 已提交
758
      case kTypeSingleDeletion:
759
        merge_context_.Clear();
A
Andres Noetzli 已提交
760
        last_not_merge_type = last_key_entry_type;
761
        PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
S
Stanislau Hlebik 已提交
762 763
        break;
      case kTypeMerge:
764 765 766
        if (range_del_agg_.ShouldDelete(
                ikey,
                RangeDelAggregator::RangePositioningMode::kBackwardTraversal)) {
A
Andrew Kryczka 已提交
767 768 769 770 771 772 773 774
          merge_context_.Clear();
          last_key_entry_type = kTypeRangeDeletion;
          last_not_merge_type = last_key_entry_type;
          PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
        } else {
          assert(merge_operator_ != nullptr);
          merge_context_.PushOperandBack(
              iter_->value(), iter_->IsValuePinned() /* operand_pinned */);
775
          PERF_COUNTER_ADD(internal_merge_count, 1);
A
Andrew Kryczka 已提交
776
        }
S
Stanislau Hlebik 已提交
777 778 779 780 781
        break;
      default:
        assert(false);
    }

782
    PERF_COUNTER_ADD(internal_key_skipped_count, 1);
783
    assert(user_comparator_->Equal(ikey.user_key, saved_key_.GetKey()));
S
Stanislau Hlebik 已提交
784 785 786 787 788
    iter_->Prev();
    ++num_skipped;
    FindParseableKey(&ikey, kReverse);
  }

789
  Status s;
S
Stanislau Hlebik 已提交
790 791
  switch (last_key_entry_type) {
    case kTypeDeletion:
A
Andres Noetzli 已提交
792
    case kTypeSingleDeletion:
A
Andrew Kryczka 已提交
793
    case kTypeRangeDeletion:
S
Stanislau Hlebik 已提交
794 795 796
      valid_ = false;
      return false;
    case kTypeMerge:
797
      current_entry_is_merged_ = true;
A
Aaron Gao 已提交
798
      if (last_not_merge_type == kTypeDeletion ||
A
Andrew Kryczka 已提交
799 800
          last_not_merge_type == kTypeSingleDeletion ||
          last_not_merge_type == kTypeRangeDeletion) {
801 802 803 804
        s = MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(),
                                        nullptr, merge_context_.GetOperands(),
                                        &saved_value_, logger_, statistics_,
                                        env_, &pinned_value_);
805
      } else {
S
Stanislau Hlebik 已提交
806
        assert(last_not_merge_type == kTypeValue);
807 808 809 810
        s = MergeHelper::TimedFullMerge(
            merge_operator_, saved_key_.GetKey(), &pinned_value_,
            merge_context_.GetOperands(), &saved_value_, logger_, statistics_,
            env_, &pinned_value_);
811
      }
S
Stanislau Hlebik 已提交
812 813 814 815 816 817 818
      break;
    case kTypeValue:
      // do nothing - we've already has value in saved_value_
      break;
    default:
      assert(false);
      break;
J
jorlow@chromium.org 已提交
819
  }
S
Stanislau Hlebik 已提交
820
  valid_ = true;
821 822 823
  if (!s.ok()) {
    status_ = s;
  }
S
Stanislau Hlebik 已提交
824 825
  return true;
}
J
jorlow@chromium.org 已提交
826

S
Stanislau Hlebik 已提交
827 828 829
// This function is used in FindValueForCurrentKey.
// We use Seek() function instead of Prev() to find necessary value
bool DBIter::FindValueForCurrentKeyUsingSeek() {
830 831 832
  // FindValueForCurrentKey will enable pinning before calling
  // FindValueForCurrentKeyUsingSeek()
  assert(pinned_iters_mgr_.PinningEnabled());
S
Stanislau Hlebik 已提交
833 834 835 836 837 838 839 840 841 842
  std::string last_key;
  AppendInternalKey(&last_key, ParsedInternalKey(saved_key_.GetKey(), sequence_,
                                                 kValueTypeForSeek));
  iter_->Seek(last_key);
  RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);

  // assume there is at least one parseable key for this user key
  ParsedInternalKey ikey;
  FindParseableKey(&ikey, kForward);

A
Andrew Kryczka 已提交
843
  if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion ||
844 845
      range_del_agg_.ShouldDelete(
          ikey, RangeDelAggregator::RangePositioningMode::kBackwardTraversal)) {
J
jorlow@chromium.org 已提交
846
    valid_ = false;
S
Stanislau Hlebik 已提交
847 848
    return false;
  }
A
Andrew Kryczka 已提交
849 850 851 852 853 854
  if (ikey.type == kTypeValue) {
    assert(iter_->IsValuePinned());
    pinned_value_ = iter_->value();
    valid_ = true;
    return true;
  }
S
Stanislau Hlebik 已提交
855 856 857

  // kTypeMerge. We need to collect all kTypeMerge values and save them
  // in operands
858
  current_entry_is_merged_ = true;
859
  merge_context_.Clear();
860 861 862 863 864 865
  while (
      iter_->Valid() &&
      user_comparator_->Equal(ikey.user_key, saved_key_.GetKey()) &&
      ikey.type == kTypeMerge &&
      !range_del_agg_.ShouldDelete(
          ikey, RangeDelAggregator::RangePositioningMode::kBackwardTraversal)) {
866 867
    merge_context_.PushOperand(iter_->value(),
                               iter_->IsValuePinned() /* operand_pinned */);
868
    PERF_COUNTER_ADD(internal_merge_count, 1);
S
Stanislau Hlebik 已提交
869 870 871 872
    iter_->Next();
    FindParseableKey(&ikey, kForward);
  }

873
  Status s;
S
Stanislau Hlebik 已提交
874
  if (!iter_->Valid() ||
875
      !user_comparator_->Equal(ikey.user_key, saved_key_.GetKey()) ||
A
Andrew Kryczka 已提交
876
      ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion ||
877 878
      range_del_agg_.ShouldDelete(
          ikey, RangeDelAggregator::RangePositioningMode::kBackwardTraversal)) {
879 880 881 882
    s = MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(),
                                    nullptr, merge_context_.GetOperands(),
                                    &saved_value_, logger_, statistics_, env_,
                                    &pinned_value_);
S
Stanislau Hlebik 已提交
883 884
    // Make iter_ valid and point to saved_key_
    if (!iter_->Valid() ||
885
        !user_comparator_->Equal(ikey.user_key, saved_key_.GetKey())) {
S
Stanislau Hlebik 已提交
886 887 888
      iter_->Seek(last_key);
      RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
    }
J
jorlow@chromium.org 已提交
889
    valid_ = true;
890 891 892
    if (!s.ok()) {
      status_ = s;
    }
S
Stanislau Hlebik 已提交
893 894 895
    return true;
  }

I
Igor Canadi 已提交
896
  const Slice& val = iter_->value();
897 898 899
  s = MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(), &val,
                                  merge_context_.GetOperands(), &saved_value_,
                                  logger_, statistics_, env_, &pinned_value_);
S
Stanislau Hlebik 已提交
900
  valid_ = true;
901 902 903
  if (!s.ok()) {
    status_ = s;
  }
S
Stanislau Hlebik 已提交
904 905 906 907 908 909 910 911 912 913 914 915 916 917
  return true;
}

// Used in Next to change directions
// Go to next user key
// Don't use Seek(),
// because next user key will be very close
void DBIter::FindNextUserKey() {
  if (!iter_->Valid()) {
    return;
  }
  ParsedInternalKey ikey;
  FindParseableKey(&ikey, kForward);
  while (iter_->Valid() &&
918
         !user_comparator_->Equal(ikey.user_key, saved_key_.GetKey())) {
S
Stanislau Hlebik 已提交
919 920 921 922 923 924 925 926 927 928 929 930 931
    iter_->Next();
    FindParseableKey(&ikey, kForward);
  }
}

// Go to previous user_key
void DBIter::FindPrevUserKey() {
  if (!iter_->Valid()) {
    return;
  }
  size_t num_skipped = 0;
  ParsedInternalKey ikey;
  FindParseableKey(&ikey, kReverse);
932 933 934 935
  int cmp;
  while (iter_->Valid() && ((cmp = user_comparator_->Compare(
                                 ikey.user_key, saved_key_.GetKey())) == 0 ||
                            (cmp > 0 && ikey.sequence > sequence_))) {
936 937 938 939
    if (TooManyInternalKeysSkipped()) {
      return;
    }

940 941 942 943 944 945 946 947 948 949 950
    if (cmp == 0) {
      if (num_skipped >= max_skip_) {
        num_skipped = 0;
        IterKey last_key;
        last_key.SetInternalKey(ParsedInternalKey(
            saved_key_.GetKey(), kMaxSequenceNumber, kValueTypeForSeek));
        iter_->Seek(last_key.GetKey());
        RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
      } else {
        ++num_skipped;
      }
S
Stanislau Hlebik 已提交
951
    }
952 953 954 955 956
    if (ikey.sequence > sequence_) {
      PERF_COUNTER_ADD(internal_recent_skipped_count, 1);
    } else {
      PERF_COUNTER_ADD(internal_key_skipped_count, 1);
    }
S
Stanislau Hlebik 已提交
957 958 959 960 961
    iter_->Prev();
    FindParseableKey(&ikey, kReverse);
  }
}

962 963 964 965 966 967 968 969 970 971 972 973
bool DBIter::TooManyInternalKeysSkipped(bool increment) {
  if ((max_skippable_internal_keys_ > 0) &&
      (num_internal_keys_skipped_ > max_skippable_internal_keys_)) {
    valid_ = false;
    status_ = Status::Incomplete("Too many internal keys skipped.");
    return true;
  } else if (increment) {
    num_internal_keys_skipped_++;
  }
  return false;
}

S
Stanislau Hlebik 已提交
974 975 976 977 978 979 980 981
// Skip all unparseable keys
void DBIter::FindParseableKey(ParsedInternalKey* ikey, Direction direction) {
  while (iter_->Valid() && !ParseKey(ikey)) {
    if (direction == kReverse) {
      iter_->Prev();
    } else {
      iter_->Next();
    }
J
jorlow@chromium.org 已提交
982 983
  }
}
J
jorlow@chromium.org 已提交
984

J
jorlow@chromium.org 已提交
985
void DBIter::Seek(const Slice& target) {
L
Lei Jin 已提交
986
  StopWatch sw(env_, statistics_, DB_SEEK);
987
  ReleaseTempPinnedData();
988
  ResetInternalKeysSkippedCounter();
989 990
  saved_key_.Clear();
  saved_key_.SetInternalKey(target, sequence_);
991 992 993 994

  {
    PERF_TIMER_GUARD(seek_internal_seek_time);
    iter_->Seek(saved_key_.GetKey());
995
    range_del_agg_.InvalidateTombstoneMapPositions();
996
  }
M
Manuel Ung 已提交
997
  RecordTick(statistics_, NUMBER_DB_SEEK);
J
jorlow@chromium.org 已提交
998
  if (iter_->Valid()) {
999 1000 1001
    if (prefix_extractor_ && prefix_same_as_start_) {
      prefix_start_key_ = prefix_extractor_->Transform(target);
    }
1002 1003
    direction_ = kForward;
    ClearSavedValue();
1004 1005 1006
    // convert the InternalKey to UserKey in saved_key_ before
    // passed to FindNextUserEntry
    saved_key_.Reserve(saved_key_.Size() - 8);
1007 1008 1009 1010
    FindNextUserEntry(false /* not skipping */, prefix_same_as_start_);
    if (!valid_) {
      prefix_start_key_.clear();
    }
M
Manuel Ung 已提交
1011 1012 1013 1014 1015 1016
    if (statistics_ != nullptr) {
      if (valid_) {
        RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
        RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
      }
    }
J
jorlow@chromium.org 已提交
1017 1018 1019
  } else {
    valid_ = false;
  }
1020

1021
  if (valid_ && prefix_extractor_ && prefix_same_as_start_) {
1022 1023
    prefix_start_buf_.SetKey(prefix_start_key_);
    prefix_start_key_ = prefix_start_buf_.GetKey();
1024
  }
J
jorlow@chromium.org 已提交
1025
}
J
jorlow@chromium.org 已提交
1026

A
Aaron Gao 已提交
1027 1028 1029
void DBIter::SeekForPrev(const Slice& target) {
  StopWatch sw(env_, statistics_, DB_SEEK);
  ReleaseTempPinnedData();
1030
  ResetInternalKeysSkippedCounter();
A
Aaron Gao 已提交
1031 1032 1033 1034 1035 1036 1037 1038
  saved_key_.Clear();
  // now saved_key is used to store internal key.
  saved_key_.SetInternalKey(target, 0 /* sequence_number */,
                            kValueTypeForSeekForPrev);

  {
    PERF_TIMER_GUARD(seek_internal_seek_time);
    iter_->SeekForPrev(saved_key_.GetKey());
1039
    range_del_agg_.InvalidateTombstoneMapPositions();
A
Aaron Gao 已提交
1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067
  }

  RecordTick(statistics_, NUMBER_DB_SEEK);
  if (iter_->Valid()) {
    if (prefix_extractor_ && prefix_same_as_start_) {
      prefix_start_key_ = prefix_extractor_->Transform(target);
    }
    direction_ = kReverse;
    ClearSavedValue();
    PrevInternal();
    if (!valid_) {
      prefix_start_key_.clear();
    }
    if (statistics_ != nullptr) {
      if (valid_) {
        RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
        RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
      }
    }
  } else {
    valid_ = false;
  }
  if (valid_ && prefix_extractor_ && prefix_same_as_start_) {
    prefix_start_buf_.SetKey(prefix_start_key_);
    prefix_start_key_ = prefix_start_buf_.GetKey();
  }
}

J
jorlow@chromium.org 已提交
1068
void DBIter::SeekToFirst() {
S
Stanislau Hlebik 已提交
1069
  // Don't use iter_::Seek() if we set a prefix extractor
1070
  // because prefix seek will be used.
1071
  if (prefix_extractor_ != nullptr) {
S
Stanislau Hlebik 已提交
1072 1073
    max_skip_ = std::numeric_limits<uint64_t>::max();
  }
J
jorlow@chromium.org 已提交
1074
  direction_ = kForward;
1075
  ReleaseTempPinnedData();
1076
  ResetInternalKeysSkippedCounter();
J
jorlow@chromium.org 已提交
1077
  ClearSavedValue();
1078 1079 1080 1081

  {
    PERF_TIMER_GUARD(seek_internal_seek_time);
    iter_->SeekToFirst();
1082
    range_del_agg_.InvalidateTombstoneMapPositions();
1083 1084
  }

M
Manuel Ung 已提交
1085
  RecordTick(statistics_, NUMBER_DB_SEEK);
J
jorlow@chromium.org 已提交
1086
  if (iter_->Valid()) {
1087 1088
    saved_key_.SetKey(ExtractUserKey(iter_->key()),
                      !iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
1089
    FindNextUserEntry(false /* not skipping */, false /* no prefix check */);
M
Manuel Ung 已提交
1090 1091 1092 1093 1094 1095
    if (statistics_ != nullptr) {
      if (valid_) {
        RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
        RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
      }
    }
J
jorlow@chromium.org 已提交
1096 1097
  } else {
    valid_ = false;
J
jorlow@chromium.org 已提交
1098
  }
1099
  if (valid_ && prefix_extractor_ && prefix_same_as_start_) {
1100 1101
    prefix_start_buf_.SetKey(prefix_extractor_->Transform(saved_key_.GetKey()));
    prefix_start_key_ = prefix_start_buf_.GetKey();
1102
  }
J
jorlow@chromium.org 已提交
1103 1104
}

J
jorlow@chromium.org 已提交
1105
void DBIter::SeekToLast() {
S
Stanislau Hlebik 已提交
1106
  // Don't use iter_::Seek() if we set a prefix extractor
1107
  // because prefix seek will be used.
1108
  if (prefix_extractor_ != nullptr) {
S
Stanislau Hlebik 已提交
1109 1110
    max_skip_ = std::numeric_limits<uint64_t>::max();
  }
J
jorlow@chromium.org 已提交
1111
  direction_ = kReverse;
1112
  ReleaseTempPinnedData();
1113
  ResetInternalKeysSkippedCounter();
J
jorlow@chromium.org 已提交
1114
  ClearSavedValue();
1115 1116 1117 1118

  {
    PERF_TIMER_GUARD(seek_internal_seek_time);
    iter_->SeekToLast();
1119
    range_del_agg_.InvalidateTombstoneMapPositions();
1120
  }
1121 1122 1123 1124
  // When the iterate_upper_bound is set to a value,
  // it will seek to the last key before the
  // ReadOptions.iterate_upper_bound
  if (iter_->Valid() && iterate_upper_bound_ != nullptr) {
1125
    SeekForPrev(*iterate_upper_bound_);
1126
    range_del_agg_.InvalidateTombstoneMapPositions();
1127 1128 1129 1130
    if (!Valid()) {
      return;
    } else if (user_comparator_->Equal(*iterate_upper_bound_, key())) {
      Prev();
1131
    }
1132 1133
  } else {
    PrevInternal();
1134
  }
M
Manuel Ung 已提交
1135 1136 1137 1138 1139 1140 1141
  if (statistics_ != nullptr) {
    RecordTick(statistics_, NUMBER_DB_SEEK);
    if (valid_) {
      RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
      RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
    }
  }
1142
  if (valid_ && prefix_extractor_ && prefix_same_as_start_) {
1143 1144
    prefix_start_buf_.SetKey(prefix_extractor_->Transform(saved_key_.GetKey()));
    prefix_start_key_ = prefix_start_buf_.GetKey();
1145
  }
J
jorlow@chromium.org 已提交
1146 1147
}

1148 1149 1150 1151 1152
Iterator* NewDBIterator(
    Env* env, const ImmutableCFOptions& ioptions,
    const Comparator* user_key_comparator, InternalIterator* internal_iter,
    const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations,
    uint64_t version_number, const Slice* iterate_upper_bound,
1153 1154 1155 1156 1157 1158 1159
    bool prefix_same_as_start, bool pin_data, bool total_order_seek,
    uint64_t max_skippable_internal_keys) {
  DBIter* db_iter =
      new DBIter(env, ioptions, user_key_comparator, internal_iter, sequence,
                 false, max_sequential_skip_in_iterations, version_number,
                 iterate_upper_bound, prefix_same_as_start, pin_data,
                 total_order_seek, max_skippable_internal_keys);
1160
  return db_iter;
1161 1162
}

I
Igor Canadi 已提交
1163
ArenaWrappedDBIter::~ArenaWrappedDBIter() { db_iter_->~DBIter(); }
1164 1165 1166

void ArenaWrappedDBIter::SetDBIter(DBIter* iter) { db_iter_ = iter; }

A
Andrew Kryczka 已提交
1167 1168 1169 1170
RangeDelAggregator* ArenaWrappedDBIter::GetRangeDelAggregator() {
  return db_iter_->GetRangeDelAggregator();
}

S
sdong 已提交
1171
void ArenaWrappedDBIter::SetIterUnderDBIter(InternalIterator* iter) {
1172 1173 1174 1175 1176 1177 1178 1179 1180
  static_cast<DBIter*>(db_iter_)->SetIter(iter);
}

inline bool ArenaWrappedDBIter::Valid() const { return db_iter_->Valid(); }
inline void ArenaWrappedDBIter::SeekToFirst() { db_iter_->SeekToFirst(); }
inline void ArenaWrappedDBIter::SeekToLast() { db_iter_->SeekToLast(); }
inline void ArenaWrappedDBIter::Seek(const Slice& target) {
  db_iter_->Seek(target);
}
A
Aaron Gao 已提交
1181 1182 1183
inline void ArenaWrappedDBIter::SeekForPrev(const Slice& target) {
  db_iter_->SeekForPrev(target);
}
1184 1185 1186 1187 1188
inline void ArenaWrappedDBIter::Next() { db_iter_->Next(); }
inline void ArenaWrappedDBIter::Prev() { db_iter_->Prev(); }
inline Slice ArenaWrappedDBIter::key() const { return db_iter_->key(); }
inline Slice ArenaWrappedDBIter::value() const { return db_iter_->value(); }
inline Status ArenaWrappedDBIter::status() const { return db_iter_->status(); }
1189 1190 1191 1192
inline Status ArenaWrappedDBIter::GetProperty(std::string prop_name,
                                              std::string* prop) {
  return db_iter_->GetProperty(prop_name, prop);
}
1193 1194 1195 1196
void ArenaWrappedDBIter::RegisterCleanup(CleanupFunction function, void* arg1,
                                         void* arg2) {
  db_iter_->RegisterCleanup(function, arg1, arg2);
}
J
jorlow@chromium.org 已提交
1197

1198
ArenaWrappedDBIter* NewArenaWrappedDbIterator(
1199
    Env* env, const ImmutableCFOptions& ioptions,
1200
    const Comparator* user_key_comparator, const SequenceNumber& sequence,
1201
    uint64_t max_sequential_skip_in_iterations, uint64_t version_number,
1202
    const Slice* iterate_upper_bound, bool prefix_same_as_start, bool pin_data,
1203
    bool total_order_seek, uint64_t max_skippable_internal_keys) {
1204 1205 1206
  ArenaWrappedDBIter* iter = new ArenaWrappedDBIter();
  Arena* arena = iter->GetArena();
  auto mem = arena->AllocateAligned(sizeof(DBIter));
1207 1208 1209 1210 1211
  DBIter* db_iter =
      new (mem) DBIter(env, ioptions, user_key_comparator, nullptr, sequence,
                       true, max_sequential_skip_in_iterations, version_number,
                       iterate_upper_bound, prefix_same_as_start, pin_data,
                       total_order_seek, max_skippable_internal_keys);
1212

1213
  iter->SetDBIter(db_iter);
1214

1215
  return iter;
J
jorlow@chromium.org 已提交
1216 1217
}

1218
}  // namespace rocksdb