db_iter.cc 42.3 KB
Newer Older
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
S
Siying Dong 已提交
2 3 4
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).
5
//
J
jorlow@chromium.org 已提交
6 7 8 9 10
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include "db/db_iter.h"
11
#include <stdexcept>
12
#include <deque>
S
Stanislau Hlebik 已提交
13
#include <string>
S
Stanislau Hlebik 已提交
14
#include <limits>
J
jorlow@chromium.org 已提交
15 16

#include "db/dbformat.h"
17
#include "db/merge_context.h"
18
#include "db/merge_helper.h"
19
#include "db/pinned_iterators_manager.h"
20
#include "monitoring/perf_context_imp.h"
S
sdong 已提交
21
#include "port/port.h"
22 23 24
#include "rocksdb/env.h"
#include "rocksdb/iterator.h"
#include "rocksdb/merge_operator.h"
25
#include "rocksdb/options.h"
S
sdong 已提交
26
#include "table/internal_iterator.h"
27
#include "util/arena.h"
28
#include "util/filename.h"
J
jorlow@chromium.org 已提交
29 30
#include "util/logging.h"
#include "util/mutexlock.h"
31
#include "util/string_util.h"
J
jorlow@chromium.org 已提交
32

33
namespace rocksdb {
J
jorlow@chromium.org 已提交
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54

#if 0
static void DumpInternalIter(Iterator* iter) {
  for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
    ParsedInternalKey k;
    if (!ParseInternalKey(iter->key(), &k)) {
      fprintf(stderr, "Corrupt '%s'\n", EscapeString(iter->key()).c_str());
    } else {
      fprintf(stderr, "@ '%s'\n", k.DebugString().c_str());
    }
  }
}
#endif

// Memtables and sstables that make the DB representation contain
// (userkey,seq,type) => uservalue entries.  DBIter
// combines multiple entries for the same userkey found in the DB
// representation into a single entry while accounting for sequence
// numbers, deletion markers, overwrites, etc.
class DBIter: public Iterator {
 public:
55
  // The following is grossly complicated. TODO: clean it up
J
jorlow@chromium.org 已提交
56 57 58 59 60 61 62 63 64 65
  // Which direction is the iterator currently moving?
  // (1) When moving forward, the internal iterator is positioned at
  //     the exact entry that yields this->key(), this->value()
  // (2) When moving backwards, the internal iterator is positioned
  //     just before all entries whose user key == this->key().
  enum Direction {
    kForward,
    kReverse
  };

66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
  // LocalStatistics contain Statistics counters that will be aggregated per
  // each iterator instance and then will be sent to the global statistics when
  // the iterator is destroyed.
  //
  // The purpose of this approach is to avoid perf regression happening
  // when multiple threads bump the atomic counters from a DBIter::Next().
  struct LocalStatistics {
    explicit LocalStatistics() { ResetCounters(); }

    void ResetCounters() {
      next_count_ = 0;
      next_found_count_ = 0;
      prev_count_ = 0;
      prev_found_count_ = 0;
      bytes_read_ = 0;
    }

    void BumpGlobalStatistics(Statistics* global_statistics) {
      RecordTick(global_statistics, NUMBER_DB_NEXT, next_count_);
      RecordTick(global_statistics, NUMBER_DB_NEXT_FOUND, next_found_count_);
      RecordTick(global_statistics, NUMBER_DB_PREV, prev_count_);
      RecordTick(global_statistics, NUMBER_DB_PREV_FOUND, prev_found_count_);
      RecordTick(global_statistics, ITER_BYTES_READ, bytes_read_);
89
      PERF_COUNTER_ADD(iter_read_bytes, bytes_read_);
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
      ResetCounters();
    }

    // Map to Tickers::NUMBER_DB_NEXT
    uint64_t next_count_;
    // Map to Tickers::NUMBER_DB_NEXT_FOUND
    uint64_t next_found_count_;
    // Map to Tickers::NUMBER_DB_PREV
    uint64_t prev_count_;
    // Map to Tickers::NUMBER_DB_PREV_FOUND
    uint64_t prev_found_count_;
    // Map to Tickers::ITER_BYTES_READ
    uint64_t bytes_read_;
  };

S
Siying Dong 已提交
105
  DBIter(Env* _env, const ReadOptions& read_options,
106
         const ImmutableCFOptions& cf_options, const Comparator* cmp,
S
sdong 已提交
107
         InternalIterator* iter, SequenceNumber s, bool arena_mode,
S
Siying Dong 已提交
108
         uint64_t max_sequential_skip_in_iterations)
109
      : arena_mode_(arena_mode),
S
Siying Dong 已提交
110
        env_(_env),
111
        logger_(cf_options.info_log),
J
jorlow@chromium.org 已提交
112
        user_comparator_(cmp),
113
        merge_operator_(cf_options.merge_operator),
J
jorlow@chromium.org 已提交
114 115
        iter_(iter),
        sequence_(s),
J
jorlow@chromium.org 已提交
116
        direction_(kForward),
117
        valid_(false),
118
        current_entry_is_merged_(false),
119 120 121 122 123 124
        statistics_(cf_options.statistics),
        iterate_upper_bound_(read_options.iterate_upper_bound),
        prefix_same_as_start_(read_options.prefix_same_as_start),
        pin_thru_lifetime_(read_options.pin_data),
        total_order_seek_(read_options.total_order_seek),
        range_del_agg_(cf_options.internal_comparator, s,
A
Andrew Kryczka 已提交
125
                       true /* collapse_deletions */) {
L
Lei Jin 已提交
126
    RecordTick(statistics_, NO_ITERATORS);
127
    prefix_extractor_ = cf_options.prefix_extractor;
128
    max_skip_ = max_sequential_skip_in_iterations;
129
    max_skippable_internal_keys_ = read_options.max_skippable_internal_keys;
130 131 132 133 134 135
    if (pin_thru_lifetime_) {
      pinned_iters_mgr_.StartPinning();
    }
    if (iter_) {
      iter_->SetPinnedItersMgr(&pinned_iters_mgr_);
    }
J
jorlow@chromium.org 已提交
136 137
  }
  virtual ~DBIter() {
138
    // Release pinned data if any
139 140 141
    if (pinned_iters_mgr_.PinningEnabled()) {
      pinned_iters_mgr_.ReleasePinnedData();
    }
142
    RecordTick(statistics_, NO_ITERATORS, -1);
143
    local_stats_.BumpGlobalStatistics(statistics_);
144 145 146
    if (!arena_mode_) {
      delete iter_;
    } else {
S
sdong 已提交
147
      iter_->~InternalIterator();
148 149
    }
  }
S
sdong 已提交
150
  virtual void SetIter(InternalIterator* iter) {
151 152
    assert(iter_ == nullptr);
    iter_ = iter;
153
    iter_->SetPinnedItersMgr(&pinned_iters_mgr_);
J
jorlow@chromium.org 已提交
154
  }
A
Andrew Kryczka 已提交
155 156 157 158
  virtual RangeDelAggregator* GetRangeDelAggregator() {
    return &range_del_agg_;
  }

I
Igor Sugak 已提交
159 160
  virtual bool Valid() const override { return valid_; }
  virtual Slice key() const override {
J
jorlow@chromium.org 已提交
161
    assert(valid_);
162
    return saved_key_.GetUserKey();
J
jorlow@chromium.org 已提交
163
  }
I
Igor Sugak 已提交
164
  virtual Slice value() const override {
J
jorlow@chromium.org 已提交
165
    assert(valid_);
166
    if (current_entry_is_merged_) {
167 168 169
      // If pinned_value_ is set then the result of merge operator is one of
      // the merge operands and we should return it.
      return pinned_value_.data() ? pinned_value_ : saved_value_;
170 171 172 173 174
    } else if (direction_ == kReverse) {
      return pinned_value_;
    } else {
      return iter_->value();
    }
J
jorlow@chromium.org 已提交
175
  }
I
Igor Sugak 已提交
176
  virtual Status status() const override {
J
jorlow@chromium.org 已提交
177 178 179 180 181 182
    if (status_.ok()) {
      return iter_->status();
    } else {
      return status_;
    }
  }
183 184 185 186 187 188

  virtual Status GetProperty(std::string prop_name,
                             std::string* prop) override {
    if (prop == nullptr) {
      return Status::InvalidArgument("prop is nullptr");
    }
189
    if (prop_name == "rocksdb.iterator.super-version-number") {
190
      // First try to pass the value returned from inner iterator.
S
Siying Dong 已提交
191
      return iter_->GetProperty(prop_name, prop);
192
    } else if (prop_name == "rocksdb.iterator.is-key-pinned") {
193
      if (valid_) {
194
        *prop = (pin_thru_lifetime_ && saved_key_.IsKeyPinned()) ? "1" : "0";
195 196 197 198 199 200
      } else {
        *prop = "Iterator is not valid.";
      }
      return Status::OK();
    }
    return Status::InvalidArgument("Undentified property.");
201
  }
J
jorlow@chromium.org 已提交
202

I
Igor Sugak 已提交
203 204 205
  virtual void Next() override;
  virtual void Prev() override;
  virtual void Seek(const Slice& target) override;
A
Aaron Gao 已提交
206
  virtual void SeekForPrev(const Slice& target) override;
I
Igor Sugak 已提交
207 208
  virtual void SeekToFirst() override;
  virtual void SeekToLast() override;
S
Siying Dong 已提交
209 210 211
  Env* env() { return env_; }
  void set_sequence(uint64_t s) { sequence_ = s; }
  void set_valid(bool v) { valid_ = v; }
J
jorlow@chromium.org 已提交
212

J
jorlow@chromium.org 已提交
213
 private:
214
  void ReverseToForward();
215
  void ReverseToBackward();
S
Stanislau Hlebik 已提交
216 217 218 219 220 221
  void PrevInternal();
  void FindParseableKey(ParsedInternalKey* ikey, Direction direction);
  bool FindValueForCurrentKey();
  bool FindValueForCurrentKeyUsingSeek();
  void FindPrevUserKey();
  void FindNextUserKey();
222 223
  inline void FindNextUserEntry(bool skipping, bool prefix_check);
  void FindNextUserEntryInternal(bool skipping, bool prefix_check);
J
jorlow@chromium.org 已提交
224
  bool ParseKey(ParsedInternalKey* key);
225
  void MergeValuesNewToOld();
226
  bool TooManyInternalKeysSkipped(bool increment = true);
J
jorlow@chromium.org 已提交
227

228 229 230 231 232 233 234 235 236 237
  // Temporarily pin the blocks that we encounter until ReleaseTempPinnedData()
  // is called
  void TempPinData() {
    if (!pin_thru_lifetime_) {
      pinned_iters_mgr_.StartPinning();
    }
  }

  // Release blocks pinned by TempPinData()
  void ReleaseTempPinnedData() {
238 239
    if (!pin_thru_lifetime_ && pinned_iters_mgr_.PinningEnabled()) {
      pinned_iters_mgr_.ReleasePinnedData();
240 241 242
    }
  }

J
jorlow@chromium.org 已提交
243 244 245 246 247 248 249 250 251
  inline void ClearSavedValue() {
    if (saved_value_.capacity() > 1048576) {
      std::string empty;
      swap(empty, saved_value_);
    } else {
      saved_value_.clear();
    }
  }

252 253 254 255
  inline void ResetInternalKeysSkippedCounter() {
    num_internal_keys_skipped_ = 0;
  }

256
  const SliceTransform* prefix_extractor_;
257
  bool arena_mode_;
J
jorlow@chromium.org 已提交
258
  Env* const env_;
I
Igor Canadi 已提交
259
  Logger* logger_;
J
jorlow@chromium.org 已提交
260
  const Comparator* const user_comparator_;
261
  const MergeOperator* const merge_operator_;
S
sdong 已提交
262
  InternalIterator* iter_;
S
Siying Dong 已提交
263
  SequenceNumber sequence_;
J
jorlow@chromium.org 已提交
264

J
jorlow@chromium.org 已提交
265
  Status status_;
S
Stanislau Hlebik 已提交
266 267
  IterKey saved_key_;
  std::string saved_value_;
268
  Slice pinned_value_;
J
jorlow@chromium.org 已提交
269
  Direction direction_;
J
jorlow@chromium.org 已提交
270
  bool valid_;
271
  bool current_entry_is_merged_;
272
  // for prefix seek mode to support prev()
273
  Statistics* statistics_;
274
  uint64_t max_skip_;
275 276
  uint64_t max_skippable_internal_keys_;
  uint64_t num_internal_keys_skipped_;
277
  const Slice* iterate_upper_bound_;
278 279 280
  IterKey prefix_start_buf_;
  Slice prefix_start_key_;
  const bool prefix_same_as_start_;
281 282 283
  // Means that we will pin all data blocks we read as long the Iterator
  // is not deleted, will be true if ReadOptions::pin_data is true
  const bool pin_thru_lifetime_;
284
  const bool total_order_seek_;
285
  // List of operands for merge operator.
286
  MergeContext merge_context_;
A
Andrew Kryczka 已提交
287
  RangeDelAggregator range_del_agg_;
288
  LocalStatistics local_stats_;
289
  PinnedIteratorsManager pinned_iters_mgr_;
J
jorlow@chromium.org 已提交
290 291 292 293 294 295 296 297 298

  // No copying allowed
  DBIter(const DBIter&);
  void operator=(const DBIter&);
};

inline bool DBIter::ParseKey(ParsedInternalKey* ikey) {
  if (!ParseInternalKey(iter_->key(), ikey)) {
    status_ = Status::Corruption("corrupted internal key in DBIter");
299 300
    ROCKS_LOG_ERROR(logger_, "corrupted internal key in DBIter: %s",
                    iter_->key().ToString(true).c_str());
J
jorlow@chromium.org 已提交
301 302 303 304 305 306
    return false;
  } else {
    return true;
  }
}

J
jorlow@chromium.org 已提交
307 308 309
void DBIter::Next() {
  assert(valid_);

310 311
  // Release temporarily pinned blocks from last operation
  ReleaseTempPinnedData();
312
  ResetInternalKeysSkippedCounter();
S
Stanislau Hlebik 已提交
313
  if (direction_ == kReverse) {
314
    ReverseToForward();
315 316 317 318 319 320 321
  } else if (iter_->Valid() && !current_entry_is_merged_) {
    // If the current value is not a merge, the iter position is the
    // current key, which is already returned. We can safely issue a
    // Next() without checking the current key.
    // If the current key is a merge, very likely iter already points
    // to the next internal position.
    iter_->Next();
322
    PERF_COUNTER_ADD(internal_key_skipped_count, 1);
J
jorlow@chromium.org 已提交
323
  }
J
jorlow@chromium.org 已提交
324

325 326 327
  if (statistics_ != nullptr) {
    local_stats_.next_count_++;
  }
328 329
  // Now we point to the next internal position, for both of merge and
  // not merge cases.
330 331 332 333
  if (!iter_->Valid()) {
    valid_ = false;
    return;
  }
334
  FindNextUserEntry(true /* skipping the current user key */, prefix_same_as_start_);
335 336 337 338
  if (statistics_ != nullptr && valid_) {
    local_stats_.next_found_count_++;
    local_stats_.bytes_read_ += (key().size() + value().size());
  }
J
jorlow@chromium.org 已提交
339 340
}

341 342 343 344 345 346 347
// PRE: saved_key_ has the current user key if skipping
// POST: saved_key_ should have the next user key if valid_,
//       if the current entry is a result of merge
//           current_entry_is_merged_ => true
//           saved_value_             => the merged value
//
// NOTE: In between, saved_key_ can point to a user key that has
348
//       a delete marker or a sequence number higher than sequence_
349
//       saved_key_ MUST have a proper user_key before calling this function
350 351 352 353 354 355
//
// The prefix_check parameter controls whether we check the iterated
// keys against the prefix of the seeked key. Set to false when
// performing a seek without a key (e.g. SeekToFirst). Set to
// prefix_same_as_start_ for other iterations.
inline void DBIter::FindNextUserEntry(bool skipping, bool prefix_check) {
356
  PERF_TIMER_GUARD(find_next_user_entry_time);
357
  FindNextUserEntryInternal(skipping, prefix_check);
358 359 360
}

// Actual implementation of DBIter::FindNextUserEntry()
361
void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) {
J
jorlow@chromium.org 已提交
362 363 364
  // Loop until we hit an acceptable entry to yield
  assert(iter_->Valid());
  assert(direction_ == kForward);
365
  current_entry_is_merged_ = false;
366 367 368 369 370 371 372 373 374 375 376

  // How many times in a row we have skipped an entry with user key less than
  // or equal to saved_key_. We could skip these entries either because
  // sequence numbers were too high or because skipping = true.
  // What saved_key_ contains throughout this method:
  //  - if skipping        : saved_key_ contains the key that we need to skip,
  //                         and we haven't seen any keys greater than that,
  //  - if num_skipped > 0 : saved_key_ contains the key that we have skipped
  //                         num_skipped times, and we haven't seen any keys
  //                         greater than that,
  //  - none of the above  : saved_key_ can contain anything, it doesn't matter.
377
  uint64_t num_skipped = 0;
378

J
jorlow@chromium.org 已提交
379
  do {
J
jorlow@chromium.org 已提交
380
    ParsedInternalKey ikey;
381

382 383 384 385 386
    if (!ParseKey(&ikey)) {
      // Skip corrupted keys.
      iter_->Next();
      continue;
    }
387

388 389 390 391
    if (iterate_upper_bound_ != nullptr &&
        user_comparator_->Compare(ikey.user_key, *iterate_upper_bound_) >= 0) {
      break;
    }
392

393 394 395 396 397 398
    if (prefix_extractor_ && prefix_check &&
        prefix_extractor_->Transform(ikey.user_key)
          .compare(prefix_start_key_) != 0) {
      break;
    }

399 400 401 402
    if (TooManyInternalKeysSkipped()) {
      return;
    }

403 404
    if (ikey.sequence <= sequence_) {
      if (skipping &&
405 406
          user_comparator_->Compare(ikey.user_key, saved_key_.GetUserKey()) <=
              0) {
407 408 409 410 411 412 413 414 415
        num_skipped++;  // skip this entry
        PERF_COUNTER_ADD(internal_key_skipped_count, 1);
      } else {
        num_skipped = 0;
        switch (ikey.type) {
          case kTypeDeletion:
          case kTypeSingleDeletion:
            // Arrange to skip all upcoming entries for this key since
            // they are hidden by this deletion.
416
            saved_key_.SetUserKey(
417 418 419 420 421 422
                ikey.user_key,
                !iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
            skipping = true;
            PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
            break;
          case kTypeValue:
423
            saved_key_.SetUserKey(
424 425
                ikey.user_key,
                !iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
426 427 428
            if (range_del_agg_.ShouldDelete(
                    ikey, RangeDelAggregator::RangePositioningMode::
                              kForwardTraversal)) {
429 430 431 432 433
              // Arrange to skip all upcoming entries for this key since
              // they are hidden by this deletion.
              skipping = true;
              num_skipped = 0;
              PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
434 435 436 437 438 439
            } else {
              valid_ = true;
              return;
            }
            break;
          case kTypeMerge:
440
            saved_key_.SetUserKey(
441 442
                ikey.user_key,
                !iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
443 444 445
            if (range_del_agg_.ShouldDelete(
                    ikey, RangeDelAggregator::RangePositioningMode::
                              kForwardTraversal)) {
446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462
              // Arrange to skip all upcoming entries for this key since
              // they are hidden by this deletion.
              skipping = true;
              num_skipped = 0;
              PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
            } else {
              // By now, we are sure the current ikey is going to yield a
              // value
              current_entry_is_merged_ = true;
              valid_ = true;
              MergeValuesNewToOld();  // Go to a different state machine
              return;
            }
            break;
          default:
            assert(false);
            break;
463
        }
J
jorlow@chromium.org 已提交
464
      }
465 466 467 468 469 470
    } else {
      // This key was inserted after our snapshot was taken.
      PERF_COUNTER_ADD(internal_recent_skipped_count, 1);

      // Here saved_key_ may contain some old key, or the default empty key, or
      // key assigned by some random other method. We don't care.
471 472
      if (user_comparator_->Compare(ikey.user_key, saved_key_.GetUserKey()) <=
          0) {
473 474
        num_skipped++;
      } else {
475 476 477
        saved_key_.SetUserKey(
            ikey.user_key,
            !iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
478 479 480
        skipping = false;
        num_skipped = 0;
      }
J
jorlow@chromium.org 已提交
481
    }
482 483 484 485

    // If we have sequentially iterated via numerous equal keys, then it's
    // better to seek so that we can avoid too many key comparisons.
    if (num_skipped > max_skip_) {
486 487
      num_skipped = 0;
      std::string last_key;
488 489 490 491
      if (skipping) {
        // We're looking for the next user-key but all we see are the same
        // user-key with decreasing sequence numbers. Fast forward to
        // sequence number 0 and type deletion (the smallest type).
492 493
        AppendInternalKey(&last_key, ParsedInternalKey(saved_key_.GetUserKey(),
                                                       0, kTypeDeletion));
494 495 496 497 498 499 500 501 502
        // Don't set skipping = false because we may still see more user-keys
        // equal to saved_key_.
      } else {
        // We saw multiple entries with this user key and sequence numbers
        // higher than sequence_. Fast forward to sequence_.
        // Note that this only covers a case when a higher key was overwritten
        // many times since our snapshot was taken, not the case when a lot of
        // different keys were inserted after our snapshot was taken.
        AppendInternalKey(&last_key,
503
                          ParsedInternalKey(saved_key_.GetUserKey(), sequence_,
504 505
                                            kValueTypeForSeek));
      }
506 507 508 509 510
      iter_->Seek(last_key);
      RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
    } else {
      iter_->Next();
    }
J
jorlow@chromium.org 已提交
511 512
  } while (iter_->Valid());
  valid_ = false;
J
jorlow@chromium.org 已提交
513 514
}

515 516 517 518 519 520 521
// Merge values of the same user key starting from the current iter_ position
// Scan from the newer entries to older entries.
// PRE: iter_->key() points to the first merge type entry
//      saved_key_ stores the user key
// POST: saved_value_ has the merged value for the user key
//       iter_ points to the next entry (or invalid)
void DBIter::MergeValuesNewToOld() {
522
  if (!merge_operator_) {
523
    ROCKS_LOG_ERROR(logger_, "Options::merge_operator is null.");
524
    status_ = Status::InvalidArgument("merge_operator_ must be set.");
525 526
    valid_ = false;
    return;
D
Deon Nicholas 已提交
527
  }
528

529 530
  // Temporarily pin the blocks that hold merge operands
  TempPinData();
531
  merge_context_.Clear();
532
  // Start the merge process by pushing the first operand
533 534
  merge_context_.PushOperand(iter_->value(),
                             iter_->IsValuePinned() /* operand_pinned */);
535 536

  ParsedInternalKey ikey;
537
  Status s;
538 539 540 541 542 543
  for (iter_->Next(); iter_->Valid(); iter_->Next()) {
    if (!ParseKey(&ikey)) {
      // skip corrupted key
      continue;
    }

544
    if (!user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey())) {
545 546
      // hit the next user key, stop right here
      break;
A
Andrew Kryczka 已提交
547
    } else if (kTypeDeletion == ikey.type || kTypeSingleDeletion == ikey.type ||
548 549 550
               range_del_agg_.ShouldDelete(
                   ikey, RangeDelAggregator::RangePositioningMode::
                             kForwardTraversal)) {
551 552 553 554
      // hit a delete with the same user key, stop right here
      // iter_ is positioned after delete
      iter_->Next();
      break;
A
Andres Noetzli 已提交
555
    } else if (kTypeValue == ikey.type) {
556 557 558
      // hit a put, merge the put value with operands and store the
      // final result in saved_value_. We are done!
      // ignore corruption if there is any.
I
Igor Canadi 已提交
559
      const Slice val = iter_->value();
560 561
      s = MergeHelper::TimedFullMerge(
          merge_operator_, ikey.user_key, &val, merge_context_.GetOperands(),
562
          &saved_value_, logger_, statistics_, env_, &pinned_value_, true);
563 564 565
      if (!s.ok()) {
        status_ = s;
      }
566 567 568
      // iter_ is positioned after put
      iter_->Next();
      return;
A
Andres Noetzli 已提交
569
    } else if (kTypeMerge == ikey.type) {
570 571
      // hit a merge, add the value as an operand and run associative merge.
      // when complete, add result to operands and continue.
572 573
      merge_context_.PushOperand(iter_->value(),
                                 iter_->IsValuePinned() /* operand_pinned */);
574
      PERF_COUNTER_ADD(internal_merge_count, 1);
A
Andres Noetzli 已提交
575 576
    } else {
      assert(false);
577 578 579
    }
  }

580 581 582 583
  // we either exhausted all internal keys under this user key, or hit
  // a deletion marker.
  // feed null as the existing value to the merge operator, such that
  // client can differentiate this scenario and do things accordingly.
584 585 586
  s = MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetUserKey(),
                                  nullptr, merge_context_.GetOperands(),
                                  &saved_value_, logger_, statistics_, env_,
587
                                  &pinned_value_, true);
588 589 590
  if (!s.ok()) {
    status_ = s;
  }
591 592
}

J
jorlow@chromium.org 已提交
593 594
void DBIter::Prev() {
  assert(valid_);
595
  ReleaseTempPinnedData();
596
  ResetInternalKeysSkippedCounter();
S
Stanislau Hlebik 已提交
597
  if (direction_ == kForward) {
598
    ReverseToBackward();
S
Stanislau Hlebik 已提交
599 600
  }
  PrevInternal();
M
Manuel Ung 已提交
601
  if (statistics_ != nullptr) {
602
    local_stats_.prev_count_++;
M
Manuel Ung 已提交
603
    if (valid_) {
604 605
      local_stats_.prev_found_count_++;
      local_stats_.bytes_read_ += (key().size() + value().size());
M
Manuel Ung 已提交
606 607
    }
  }
S
Stanislau Hlebik 已提交
608
}
J
jorlow@chromium.org 已提交
609

610 611 612 613
void DBIter::ReverseToForward() {
  if (prefix_extractor_ != nullptr && !total_order_seek_) {
    IterKey last_key;
    last_key.SetInternalKey(ParsedInternalKey(
614 615
        saved_key_.GetUserKey(), kMaxSequenceNumber, kValueTypeForSeek));
    iter_->Seek(last_key.GetInternalKey());
616 617 618 619 620
  }
  FindNextUserKey();
  direction_ = kForward;
  if (!iter_->Valid()) {
    iter_->SeekToFirst();
621
    range_del_agg_.InvalidateTombstoneMapPositions();
622 623 624
  }
}

625
void DBIter::ReverseToBackward() {
626 627
  if (prefix_extractor_ != nullptr && !total_order_seek_) {
    IterKey last_key;
628 629 630
    last_key.SetInternalKey(ParsedInternalKey(saved_key_.GetUserKey(), 0,
                                              kValueTypeForSeekForPrev));
    iter_->SeekForPrev(last_key.GetInternalKey());
631
  }
632 633 634 635 636
  if (current_entry_is_merged_) {
    // Not placed in the same key. Need to call Prev() until finding the
    // previous key.
    if (!iter_->Valid()) {
      iter_->SeekToLast();
637
      range_del_agg_.InvalidateTombstoneMapPositions();
638 639 640 641
    }
    ParsedInternalKey ikey;
    FindParseableKey(&ikey, kReverse);
    while (iter_->Valid() &&
642 643
           user_comparator_->Compare(ikey.user_key, saved_key_.GetUserKey()) >
               0) {
S
Siying Dong 已提交
644
      assert(ikey.sequence != kMaxSequenceNumber);
645 646 647 648 649
      if (ikey.sequence > sequence_) {
        PERF_COUNTER_ADD(internal_recent_skipped_count, 1);
      } else {
        PERF_COUNTER_ADD(internal_key_skipped_count, 1);
      }
650 651 652 653 654 655 656 657
      iter_->Prev();
      FindParseableKey(&ikey, kReverse);
    }
  }
#ifndef NDEBUG
  if (iter_->Valid()) {
    ParsedInternalKey ikey;
    assert(ParseKey(&ikey));
658 659
    assert(user_comparator_->Compare(ikey.user_key, saved_key_.GetUserKey()) <=
           0);
660 661 662 663 664 665 666
  }
#endif

  FindPrevUserKey();
  direction_ = kReverse;
}

S
Stanislau Hlebik 已提交
667 668 669 670
void DBIter::PrevInternal() {
  if (!iter_->Valid()) {
    valid_ = false;
    return;
671 672
  }

S
Stanislau Hlebik 已提交
673 674 675
  ParsedInternalKey ikey;

  while (iter_->Valid()) {
676 677 678
    saved_key_.SetUserKey(
        ExtractUserKey(iter_->key()),
        !iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
679

S
Stanislau Hlebik 已提交
680 681
    if (FindValueForCurrentKey()) {
      valid_ = true;
J
jorlow@chromium.org 已提交
682 683 684
      if (!iter_->Valid()) {
        return;
      }
S
Stanislau Hlebik 已提交
685
      FindParseableKey(&ikey, kReverse);
686
      if (user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey())) {
S
Stanislau Hlebik 已提交
687
        FindPrevUserKey();
J
jorlow@chromium.org 已提交
688
      }
A
Aaron Gao 已提交
689
      if (valid_ && prefix_extractor_ && prefix_same_as_start_ &&
690
          prefix_extractor_->Transform(saved_key_.GetUserKey())
A
Aaron Gao 已提交
691
                  .compare(prefix_start_key_) != 0) {
692
        valid_ = false;
A
Aaron Gao 已提交
693
      }
S
Stanislau Hlebik 已提交
694
      return;
J
jorlow@chromium.org 已提交
695
    }
696 697 698 699 700

    if (TooManyInternalKeysSkipped(false)) {
      return;
    }

S
Stanislau Hlebik 已提交
701 702 703 704
    if (!iter_->Valid()) {
      break;
    }
    FindParseableKey(&ikey, kReverse);
705
    if (user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey())) {
S
Stanislau Hlebik 已提交
706 707 708 709
      FindPrevUserKey();
    }
  }
  // We haven't found any key - iterator is not valid
A
Aaron Gao 已提交
710
  // Or the prefix is different than start prefix
711
  assert(!iter_->Valid());
S
Stanislau Hlebik 已提交
712
  valid_ = false;
J
jorlow@chromium.org 已提交
713 714
}

S
Stanislau Hlebik 已提交
715
// This function checks, if the entry with biggest sequence_number <= sequence_
A
Andres Noetzli 已提交
716 717
// is non kTypeDeletion or kTypeSingleDeletion. If it's not, we save value in
// saved_value_
S
Stanislau Hlebik 已提交
718 719
bool DBIter::FindValueForCurrentKey() {
  assert(iter_->Valid());
720
  merge_context_.Clear();
721
  current_entry_is_merged_ = false;
A
Andres Noetzli 已提交
722 723
  // last entry before merge (could be kTypeDeletion, kTypeSingleDeletion or
  // kTypeValue)
S
Stanislau Hlebik 已提交
724 725
  ValueType last_not_merge_type = kTypeDeletion;
  ValueType last_key_entry_type = kTypeDeletion;
J
jorlow@chromium.org 已提交
726

S
Stanislau Hlebik 已提交
727 728 729
  ParsedInternalKey ikey;
  FindParseableKey(&ikey, kReverse);

730 731 732
  // Temporarily pin blocks that hold (merge operands / the value)
  ReleaseTempPinnedData();
  TempPinData();
S
Stanislau Hlebik 已提交
733 734
  size_t num_skipped = 0;
  while (iter_->Valid() && ikey.sequence <= sequence_ &&
735
         user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey())) {
736 737 738 739
    if (TooManyInternalKeysSkipped()) {
      return false;
    }

S
Stanislau Hlebik 已提交
740 741 742 743 744 745 746 747
    // We iterate too much: let's use Seek() to avoid too much key comparisons
    if (num_skipped >= max_skip_) {
      return FindValueForCurrentKeyUsingSeek();
    }

    last_key_entry_type = ikey.type;
    switch (last_key_entry_type) {
      case kTypeValue:
748 749 750
        if (range_del_agg_.ShouldDelete(
                ikey,
                RangeDelAggregator::RangePositioningMode::kBackwardTraversal)) {
A
Andrew Kryczka 已提交
751 752 753 754 755 756
          last_key_entry_type = kTypeRangeDeletion;
          PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
        } else {
          assert(iter_->IsValuePinned());
          pinned_value_ = iter_->value();
        }
757
        merge_context_.Clear();
A
Andrew Kryczka 已提交
758
        last_not_merge_type = last_key_entry_type;
S
Stanislau Hlebik 已提交
759 760
        break;
      case kTypeDeletion:
A
Andres Noetzli 已提交
761
      case kTypeSingleDeletion:
762
        merge_context_.Clear();
A
Andres Noetzli 已提交
763
        last_not_merge_type = last_key_entry_type;
764
        PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
S
Stanislau Hlebik 已提交
765 766
        break;
      case kTypeMerge:
767 768 769
        if (range_del_agg_.ShouldDelete(
                ikey,
                RangeDelAggregator::RangePositioningMode::kBackwardTraversal)) {
A
Andrew Kryczka 已提交
770 771 772 773 774 775 776 777
          merge_context_.Clear();
          last_key_entry_type = kTypeRangeDeletion;
          last_not_merge_type = last_key_entry_type;
          PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
        } else {
          assert(merge_operator_ != nullptr);
          merge_context_.PushOperandBack(
              iter_->value(), iter_->IsValuePinned() /* operand_pinned */);
778
          PERF_COUNTER_ADD(internal_merge_count, 1);
A
Andrew Kryczka 已提交
779
        }
S
Stanislau Hlebik 已提交
780 781 782 783 784
        break;
      default:
        assert(false);
    }

785
    PERF_COUNTER_ADD(internal_key_skipped_count, 1);
786
    assert(user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey()));
S
Stanislau Hlebik 已提交
787 788 789 790 791
    iter_->Prev();
    ++num_skipped;
    FindParseableKey(&ikey, kReverse);
  }

792
  Status s;
S
Stanislau Hlebik 已提交
793 794
  switch (last_key_entry_type) {
    case kTypeDeletion:
A
Andres Noetzli 已提交
795
    case kTypeSingleDeletion:
A
Andrew Kryczka 已提交
796
    case kTypeRangeDeletion:
S
Stanislau Hlebik 已提交
797 798 799
      valid_ = false;
      return false;
    case kTypeMerge:
800
      current_entry_is_merged_ = true;
A
Aaron Gao 已提交
801
      if (last_not_merge_type == kTypeDeletion ||
A
Andrew Kryczka 已提交
802 803
          last_not_merge_type == kTypeSingleDeletion ||
          last_not_merge_type == kTypeRangeDeletion) {
804 805 806
        s = MergeHelper::TimedFullMerge(
            merge_operator_, saved_key_.GetUserKey(), nullptr,
            merge_context_.GetOperands(), &saved_value_, logger_, statistics_,
807
            env_, &pinned_value_, true);
808
      } else {
S
Stanislau Hlebik 已提交
809
        assert(last_not_merge_type == kTypeValue);
810
        s = MergeHelper::TimedFullMerge(
811
            merge_operator_, saved_key_.GetUserKey(), &pinned_value_,
812
            merge_context_.GetOperands(), &saved_value_, logger_, statistics_,
813
            env_, &pinned_value_, true);
814
      }
S
Stanislau Hlebik 已提交
815 816 817 818 819 820 821
      break;
    case kTypeValue:
      // do nothing - we've already has value in saved_value_
      break;
    default:
      assert(false);
      break;
J
jorlow@chromium.org 已提交
822
  }
S
Stanislau Hlebik 已提交
823
  valid_ = true;
824 825 826
  if (!s.ok()) {
    status_ = s;
  }
S
Stanislau Hlebik 已提交
827 828
  return true;
}
J
jorlow@chromium.org 已提交
829

S
Stanislau Hlebik 已提交
830 831 832
// This function is used in FindValueForCurrentKey.
// We use Seek() function instead of Prev() to find necessary value
bool DBIter::FindValueForCurrentKeyUsingSeek() {
833 834 835
  // FindValueForCurrentKey will enable pinning before calling
  // FindValueForCurrentKeyUsingSeek()
  assert(pinned_iters_mgr_.PinningEnabled());
S
Stanislau Hlebik 已提交
836
  std::string last_key;
837 838
  AppendInternalKey(&last_key, ParsedInternalKey(saved_key_.GetUserKey(),
                                                 sequence_, kValueTypeForSeek));
S
Stanislau Hlebik 已提交
839 840 841 842 843 844 845
  iter_->Seek(last_key);
  RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);

  // assume there is at least one parseable key for this user key
  ParsedInternalKey ikey;
  FindParseableKey(&ikey, kForward);

A
Andrew Kryczka 已提交
846
  if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion ||
847 848
      range_del_agg_.ShouldDelete(
          ikey, RangeDelAggregator::RangePositioningMode::kBackwardTraversal)) {
J
jorlow@chromium.org 已提交
849
    valid_ = false;
S
Stanislau Hlebik 已提交
850 851
    return false;
  }
A
Andrew Kryczka 已提交
852 853 854 855 856 857
  if (ikey.type == kTypeValue) {
    assert(iter_->IsValuePinned());
    pinned_value_ = iter_->value();
    valid_ = true;
    return true;
  }
S
Stanislau Hlebik 已提交
858 859 860

  // kTypeMerge. We need to collect all kTypeMerge values and save them
  // in operands
861
  current_entry_is_merged_ = true;
862
  merge_context_.Clear();
863 864
  while (
      iter_->Valid() &&
865
      user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey()) &&
866 867 868
      ikey.type == kTypeMerge &&
      !range_del_agg_.ShouldDelete(
          ikey, RangeDelAggregator::RangePositioningMode::kBackwardTraversal)) {
869 870
    merge_context_.PushOperand(iter_->value(),
                               iter_->IsValuePinned() /* operand_pinned */);
871
    PERF_COUNTER_ADD(internal_merge_count, 1);
S
Stanislau Hlebik 已提交
872 873 874 875
    iter_->Next();
    FindParseableKey(&ikey, kForward);
  }

876
  Status s;
S
Stanislau Hlebik 已提交
877
  if (!iter_->Valid() ||
878
      !user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey()) ||
A
Andrew Kryczka 已提交
879
      ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion ||
880 881
      range_del_agg_.ShouldDelete(
          ikey, RangeDelAggregator::RangePositioningMode::kBackwardTraversal)) {
882
    s = MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetUserKey(),
883 884
                                    nullptr, merge_context_.GetOperands(),
                                    &saved_value_, logger_, statistics_, env_,
885
                                    &pinned_value_, true);
S
Stanislau Hlebik 已提交
886 887
    // Make iter_ valid and point to saved_key_
    if (!iter_->Valid() ||
888
        !user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey())) {
S
Stanislau Hlebik 已提交
889 890 891
      iter_->Seek(last_key);
      RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
    }
J
jorlow@chromium.org 已提交
892
    valid_ = true;
893 894 895
    if (!s.ok()) {
      status_ = s;
    }
S
Stanislau Hlebik 已提交
896 897 898
    return true;
  }

I
Igor Canadi 已提交
899
  const Slice& val = iter_->value();
900 901 902
  s = MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetUserKey(),
                                  &val, merge_context_.GetOperands(),
                                  &saved_value_, logger_, statistics_, env_,
903
                                  &pinned_value_, true);
S
Stanislau Hlebik 已提交
904
  valid_ = true;
905 906 907
  if (!s.ok()) {
    status_ = s;
  }
S
Stanislau Hlebik 已提交
908 909 910 911 912 913 914 915 916 917 918 919 920 921
  return true;
}

// Used in Next to change directions
// Go to next user key
// Don't use Seek(),
// because next user key will be very close
void DBIter::FindNextUserKey() {
  if (!iter_->Valid()) {
    return;
  }
  ParsedInternalKey ikey;
  FindParseableKey(&ikey, kForward);
  while (iter_->Valid() &&
922
         !user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey())) {
S
Stanislau Hlebik 已提交
923 924 925 926 927 928 929 930 931 932 933 934 935
    iter_->Next();
    FindParseableKey(&ikey, kForward);
  }
}

// Go to previous user_key
void DBIter::FindPrevUserKey() {
  if (!iter_->Valid()) {
    return;
  }
  size_t num_skipped = 0;
  ParsedInternalKey ikey;
  FindParseableKey(&ikey, kReverse);
936
  int cmp;
937 938 939 940
  while (iter_->Valid() &&
         ((cmp = user_comparator_->Compare(ikey.user_key,
                                           saved_key_.GetUserKey())) == 0 ||
          (cmp > 0 && ikey.sequence > sequence_))) {
941 942 943 944
    if (TooManyInternalKeysSkipped()) {
      return;
    }

945 946 947 948 949
    if (cmp == 0) {
      if (num_skipped >= max_skip_) {
        num_skipped = 0;
        IterKey last_key;
        last_key.SetInternalKey(ParsedInternalKey(
950 951
            saved_key_.GetUserKey(), kMaxSequenceNumber, kValueTypeForSeek));
        iter_->Seek(last_key.GetInternalKey());
952 953 954 955
        RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
      } else {
        ++num_skipped;
      }
S
Stanislau Hlebik 已提交
956
    }
S
Siying Dong 已提交
957
    assert(ikey.sequence != kMaxSequenceNumber);
958 959 960 961 962
    if (ikey.sequence > sequence_) {
      PERF_COUNTER_ADD(internal_recent_skipped_count, 1);
    } else {
      PERF_COUNTER_ADD(internal_key_skipped_count, 1);
    }
S
Stanislau Hlebik 已提交
963 964 965 966 967
    iter_->Prev();
    FindParseableKey(&ikey, kReverse);
  }
}

968 969 970 971 972 973 974 975 976 977 978 979
bool DBIter::TooManyInternalKeysSkipped(bool increment) {
  if ((max_skippable_internal_keys_ > 0) &&
      (num_internal_keys_skipped_ > max_skippable_internal_keys_)) {
    valid_ = false;
    status_ = Status::Incomplete("Too many internal keys skipped.");
    return true;
  } else if (increment) {
    num_internal_keys_skipped_++;
  }
  return false;
}

S
Stanislau Hlebik 已提交
980 981 982 983 984 985 986 987
// Skip all unparseable keys
void DBIter::FindParseableKey(ParsedInternalKey* ikey, Direction direction) {
  while (iter_->Valid() && !ParseKey(ikey)) {
    if (direction == kReverse) {
      iter_->Prev();
    } else {
      iter_->Next();
    }
J
jorlow@chromium.org 已提交
988 989
  }
}
J
jorlow@chromium.org 已提交
990

J
jorlow@chromium.org 已提交
991
void DBIter::Seek(const Slice& target) {
L
Lei Jin 已提交
992
  StopWatch sw(env_, statistics_, DB_SEEK);
993
  ReleaseTempPinnedData();
994
  ResetInternalKeysSkippedCounter();
995 996
  saved_key_.Clear();
  saved_key_.SetInternalKey(target, sequence_);
997 998 999

  {
    PERF_TIMER_GUARD(seek_internal_seek_time);
1000
    iter_->Seek(saved_key_.GetInternalKey());
1001
    range_del_agg_.InvalidateTombstoneMapPositions();
1002
  }
M
Manuel Ung 已提交
1003
  RecordTick(statistics_, NUMBER_DB_SEEK);
J
jorlow@chromium.org 已提交
1004
  if (iter_->Valid()) {
1005 1006 1007
    if (prefix_extractor_ && prefix_same_as_start_) {
      prefix_start_key_ = prefix_extractor_->Transform(target);
    }
1008 1009
    direction_ = kForward;
    ClearSavedValue();
1010 1011 1012 1013
    FindNextUserEntry(false /* not skipping */, prefix_same_as_start_);
    if (!valid_) {
      prefix_start_key_.clear();
    }
M
Manuel Ung 已提交
1014 1015 1016 1017
    if (statistics_ != nullptr) {
      if (valid_) {
        RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
        RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
1018
        PERF_COUNTER_ADD(iter_read_bytes, key().size() + value().size());
M
Manuel Ung 已提交
1019 1020
      }
    }
J
jorlow@chromium.org 已提交
1021 1022 1023
  } else {
    valid_ = false;
  }
1024

1025
  if (valid_ && prefix_extractor_ && prefix_same_as_start_) {
1026 1027
    prefix_start_buf_.SetUserKey(prefix_start_key_);
    prefix_start_key_ = prefix_start_buf_.GetUserKey();
1028
  }
J
jorlow@chromium.org 已提交
1029
}
J
jorlow@chromium.org 已提交
1030

A
Aaron Gao 已提交
1031 1032 1033
void DBIter::SeekForPrev(const Slice& target) {
  StopWatch sw(env_, statistics_, DB_SEEK);
  ReleaseTempPinnedData();
1034
  ResetInternalKeysSkippedCounter();
A
Aaron Gao 已提交
1035 1036 1037 1038 1039 1040 1041
  saved_key_.Clear();
  // now saved_key is used to store internal key.
  saved_key_.SetInternalKey(target, 0 /* sequence_number */,
                            kValueTypeForSeekForPrev);

  {
    PERF_TIMER_GUARD(seek_internal_seek_time);
1042
    iter_->SeekForPrev(saved_key_.GetInternalKey());
1043
    range_del_agg_.InvalidateTombstoneMapPositions();
A
Aaron Gao 已提交
1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060
  }

  RecordTick(statistics_, NUMBER_DB_SEEK);
  if (iter_->Valid()) {
    if (prefix_extractor_ && prefix_same_as_start_) {
      prefix_start_key_ = prefix_extractor_->Transform(target);
    }
    direction_ = kReverse;
    ClearSavedValue();
    PrevInternal();
    if (!valid_) {
      prefix_start_key_.clear();
    }
    if (statistics_ != nullptr) {
      if (valid_) {
        RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
        RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
1061
        PERF_COUNTER_ADD(iter_read_bytes, key().size() + value().size());
A
Aaron Gao 已提交
1062 1063 1064 1065 1066 1067
      }
    }
  } else {
    valid_ = false;
  }
  if (valid_ && prefix_extractor_ && prefix_same_as_start_) {
1068 1069
    prefix_start_buf_.SetUserKey(prefix_start_key_);
    prefix_start_key_ = prefix_start_buf_.GetUserKey();
A
Aaron Gao 已提交
1070 1071 1072
  }
}

J
jorlow@chromium.org 已提交
1073
void DBIter::SeekToFirst() {
S
Stanislau Hlebik 已提交
1074
  // Don't use iter_::Seek() if we set a prefix extractor
1075
  // because prefix seek will be used.
1076
  if (prefix_extractor_ != nullptr) {
S
Stanislau Hlebik 已提交
1077 1078
    max_skip_ = std::numeric_limits<uint64_t>::max();
  }
J
jorlow@chromium.org 已提交
1079
  direction_ = kForward;
1080
  ReleaseTempPinnedData();
1081
  ResetInternalKeysSkippedCounter();
J
jorlow@chromium.org 已提交
1082
  ClearSavedValue();
1083 1084 1085 1086

  {
    PERF_TIMER_GUARD(seek_internal_seek_time);
    iter_->SeekToFirst();
1087
    range_del_agg_.InvalidateTombstoneMapPositions();
1088 1089
  }

M
Manuel Ung 已提交
1090
  RecordTick(statistics_, NUMBER_DB_SEEK);
J
jorlow@chromium.org 已提交
1091
  if (iter_->Valid()) {
1092 1093 1094
    saved_key_.SetUserKey(
        ExtractUserKey(iter_->key()),
        !iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
1095
    FindNextUserEntry(false /* not skipping */, false /* no prefix check */);
M
Manuel Ung 已提交
1096 1097 1098 1099
    if (statistics_ != nullptr) {
      if (valid_) {
        RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
        RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
1100
        PERF_COUNTER_ADD(iter_read_bytes, key().size() + value().size());
M
Manuel Ung 已提交
1101 1102
      }
    }
J
jorlow@chromium.org 已提交
1103 1104
  } else {
    valid_ = false;
J
jorlow@chromium.org 已提交
1105
  }
1106
  if (valid_ && prefix_extractor_ && prefix_same_as_start_) {
1107 1108 1109
    prefix_start_buf_.SetUserKey(
        prefix_extractor_->Transform(saved_key_.GetUserKey()));
    prefix_start_key_ = prefix_start_buf_.GetUserKey();
1110
  }
J
jorlow@chromium.org 已提交
1111 1112
}

J
jorlow@chromium.org 已提交
1113
void DBIter::SeekToLast() {
S
Stanislau Hlebik 已提交
1114
  // Don't use iter_::Seek() if we set a prefix extractor
1115
  // because prefix seek will be used.
1116
  if (prefix_extractor_ != nullptr) {
S
Stanislau Hlebik 已提交
1117 1118
    max_skip_ = std::numeric_limits<uint64_t>::max();
  }
J
jorlow@chromium.org 已提交
1119
  direction_ = kReverse;
1120
  ReleaseTempPinnedData();
1121
  ResetInternalKeysSkippedCounter();
J
jorlow@chromium.org 已提交
1122
  ClearSavedValue();
1123 1124 1125 1126

  {
    PERF_TIMER_GUARD(seek_internal_seek_time);
    iter_->SeekToLast();
1127
    range_del_agg_.InvalidateTombstoneMapPositions();
1128
  }
1129 1130 1131 1132
  // When the iterate_upper_bound is set to a value,
  // it will seek to the last key before the
  // ReadOptions.iterate_upper_bound
  if (iter_->Valid() && iterate_upper_bound_ != nullptr) {
1133
    SeekForPrev(*iterate_upper_bound_);
1134
    range_del_agg_.InvalidateTombstoneMapPositions();
1135 1136 1137 1138
    if (!Valid()) {
      return;
    } else if (user_comparator_->Equal(*iterate_upper_bound_, key())) {
      Prev();
1139
    }
1140 1141
  } else {
    PrevInternal();
1142
  }
M
Manuel Ung 已提交
1143 1144 1145 1146 1147
  if (statistics_ != nullptr) {
    RecordTick(statistics_, NUMBER_DB_SEEK);
    if (valid_) {
      RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
      RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
1148
      PERF_COUNTER_ADD(iter_read_bytes, key().size() + value().size());
M
Manuel Ung 已提交
1149 1150
    }
  }
1151
  if (valid_ && prefix_extractor_ && prefix_same_as_start_) {
1152 1153 1154
    prefix_start_buf_.SetUserKey(
        prefix_extractor_->Transform(saved_key_.GetUserKey()));
    prefix_start_key_ = prefix_start_buf_.GetUserKey();
1155
  }
J
jorlow@chromium.org 已提交
1156 1157
}

1158 1159 1160 1161 1162
Iterator* NewDBIterator(Env* env, const ReadOptions& read_options,
                        const ImmutableCFOptions& cf_options,
                        const Comparator* user_key_comparator,
                        InternalIterator* internal_iter,
                        const SequenceNumber& sequence,
S
Siying Dong 已提交
1163 1164 1165 1166
                        uint64_t max_sequential_skip_in_iterations) {
  DBIter* db_iter = new DBIter(env, read_options, cf_options,
                               user_key_comparator, internal_iter, sequence,
                               false, max_sequential_skip_in_iterations);
1167
  return db_iter;
1168 1169
}

I
Igor Canadi 已提交
1170
ArenaWrappedDBIter::~ArenaWrappedDBIter() { db_iter_->~DBIter(); }
1171

A
Andrew Kryczka 已提交
1172 1173 1174 1175
RangeDelAggregator* ArenaWrappedDBIter::GetRangeDelAggregator() {
  return db_iter_->GetRangeDelAggregator();
}

S
sdong 已提交
1176
void ArenaWrappedDBIter::SetIterUnderDBIter(InternalIterator* iter) {
1177 1178 1179 1180 1181 1182 1183 1184 1185
  static_cast<DBIter*>(db_iter_)->SetIter(iter);
}

inline bool ArenaWrappedDBIter::Valid() const { return db_iter_->Valid(); }
inline void ArenaWrappedDBIter::SeekToFirst() { db_iter_->SeekToFirst(); }
inline void ArenaWrappedDBIter::SeekToLast() { db_iter_->SeekToLast(); }
inline void ArenaWrappedDBIter::Seek(const Slice& target) {
  db_iter_->Seek(target);
}
A
Aaron Gao 已提交
1186 1187 1188
inline void ArenaWrappedDBIter::SeekForPrev(const Slice& target) {
  db_iter_->SeekForPrev(target);
}
1189 1190 1191 1192 1193
inline void ArenaWrappedDBIter::Next() { db_iter_->Next(); }
inline void ArenaWrappedDBIter::Prev() { db_iter_->Prev(); }
inline Slice ArenaWrappedDBIter::key() const { return db_iter_->key(); }
inline Slice ArenaWrappedDBIter::value() const { return db_iter_->value(); }
inline Status ArenaWrappedDBIter::status() const { return db_iter_->status(); }
1194 1195
inline Status ArenaWrappedDBIter::GetProperty(std::string prop_name,
                                              std::string* prop) {
S
Siying Dong 已提交
1196 1197 1198 1199 1200 1201 1202
  if (prop_name == "rocksdb.iterator.super-version-number") {
    // First try to pass the value returned from inner iterator.
    if (!db_iter_->GetProperty(prop_name, prop).ok()) {
      *prop = ToString(sv_number_);
    }
    return Status::OK();
  }
1203 1204
  return db_iter_->GetProperty(prop_name, prop);
}
S
Siying Dong 已提交
1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243

void ArenaWrappedDBIter::Init(Env* env, const ReadOptions& read_options,
                              const ImmutableCFOptions& cf_options,
                              const SequenceNumber& sequence,
                              uint64_t max_sequential_skip_in_iteration,
                              uint64_t version_number) {
  auto mem = arena_.AllocateAligned(sizeof(DBIter));
  db_iter_ = new (mem)
      DBIter(env, read_options, cf_options, cf_options.user_comparator, nullptr,
             sequence, true, max_sequential_skip_in_iteration);
  sv_number_ = version_number;
}

Status ArenaWrappedDBIter::Refresh() {
  if (cfd_ == nullptr || db_impl_ == nullptr) {
    return Status::NotSupported("Creating renew iterator is not allowed.");
  }
  assert(db_iter_ != nullptr);
  SequenceNumber latest_seq = db_impl_->GetLatestSequenceNumber();
  uint64_t cur_sv_number = cfd_->GetSuperVersionNumber();
  if (sv_number_ != cur_sv_number) {
    Env* env = db_iter_->env();
    db_iter_->~DBIter();
    arena_.~Arena();
    new (&arena_) Arena();

    SuperVersion* sv = cfd_->GetReferencedSuperVersion(db_impl_->mutex());
    Init(env, read_options_, *(cfd_->ioptions()), latest_seq,
         sv->mutable_cf_options.max_sequential_skip_in_iterations,
         cur_sv_number);

    InternalIterator* internal_iter = db_impl_->NewInternalIterator(
        read_options_, cfd_, sv, &arena_, db_iter_->GetRangeDelAggregator());
    SetIterUnderDBIter(internal_iter);
  } else {
    db_iter_->set_sequence(latest_seq);
    db_iter_->set_valid(false);
  }
  return Status::OK();
1244
}
J
jorlow@chromium.org 已提交
1245

1246
ArenaWrappedDBIter* NewArenaWrappedDbIterator(
1247
    Env* env, const ReadOptions& read_options,
S
Siying Dong 已提交
1248 1249 1250
    const ImmutableCFOptions& cf_options, const SequenceNumber& sequence,
    uint64_t max_sequential_skip_in_iterations, uint64_t version_number,
    DBImpl* db_impl, ColumnFamilyData* cfd) {
1251
  ArenaWrappedDBIter* iter = new ArenaWrappedDBIter();
S
Siying Dong 已提交
1252 1253 1254 1255 1256
  iter->Init(env, read_options, cf_options, sequence,
             max_sequential_skip_in_iterations, version_number);
  if (db_impl != nullptr && cfd != nullptr) {
    iter->StoreRefreshInfo(read_options, db_impl, cfd);
  }
1257

1258
  return iter;
J
jorlow@chromium.org 已提交
1259 1260
}

1261
}  // namespace rocksdb