db_iter.cc 31.9 KB
Newer Older
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 3 4 5
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.
//
J
jorlow@chromium.org 已提交
6 7 8 9 10
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include "db/db_iter.h"
11
#include <stdexcept>
12
#include <deque>
S
Stanislau Hlebik 已提交
13
#include <string>
S
Stanislau Hlebik 已提交
14
#include <limits>
J
jorlow@chromium.org 已提交
15 16

#include "db/dbformat.h"
17
#include "db/filename.h"
18
#include "db/merge_context.h"
19
#include "db/merge_helper.h"
20
#include "db/pinned_iterators_manager.h"
S
sdong 已提交
21
#include "port/port.h"
22 23 24
#include "rocksdb/env.h"
#include "rocksdb/iterator.h"
#include "rocksdb/merge_operator.h"
25
#include "rocksdb/options.h"
S
sdong 已提交
26
#include "table/internal_iterator.h"
27
#include "util/arena.h"
J
jorlow@chromium.org 已提交
28 29
#include "util/logging.h"
#include "util/mutexlock.h"
30
#include "util/perf_context_imp.h"
31
#include "util/string_util.h"
J
jorlow@chromium.org 已提交
32

33
namespace rocksdb {
J
jorlow@chromium.org 已提交
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54

#if 0
static void DumpInternalIter(Iterator* iter) {
  for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
    ParsedInternalKey k;
    if (!ParseInternalKey(iter->key(), &k)) {
      fprintf(stderr, "Corrupt '%s'\n", EscapeString(iter->key()).c_str());
    } else {
      fprintf(stderr, "@ '%s'\n", k.DebugString().c_str());
    }
  }
}
#endif

// Memtables and sstables that make the DB representation contain
// (userkey,seq,type) => uservalue entries.  DBIter
// combines multiple entries for the same userkey found in the DB
// representation into a single entry while accounting for sequence
// numbers, deletion markers, overwrites, etc.
class DBIter: public Iterator {
 public:
55
  // The following is grossly complicated. TODO: clean it up
J
jorlow@chromium.org 已提交
56 57 58 59 60 61 62 63 64 65
  // Which direction is the iterator currently moving?
  // (1) When moving forward, the internal iterator is positioned at
  //     the exact entry that yields this->key(), this->value()
  // (2) When moving backwards, the internal iterator is positioned
  //     just before all entries whose user key == this->key().
  enum Direction {
    kForward,
    kReverse
  };

66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
  // LocalStatistics contain Statistics counters that will be aggregated per
  // each iterator instance and then will be sent to the global statistics when
  // the iterator is destroyed.
  //
  // The purpose of this approach is to avoid perf regression happening
  // when multiple threads bump the atomic counters from a DBIter::Next().
  struct LocalStatistics {
    explicit LocalStatistics() { ResetCounters(); }

    void ResetCounters() {
      next_count_ = 0;
      next_found_count_ = 0;
      prev_count_ = 0;
      prev_found_count_ = 0;
      bytes_read_ = 0;
    }

    void BumpGlobalStatistics(Statistics* global_statistics) {
      RecordTick(global_statistics, NUMBER_DB_NEXT, next_count_);
      RecordTick(global_statistics, NUMBER_DB_NEXT_FOUND, next_found_count_);
      RecordTick(global_statistics, NUMBER_DB_PREV, prev_count_);
      RecordTick(global_statistics, NUMBER_DB_PREV_FOUND, prev_found_count_);
      RecordTick(global_statistics, ITER_BYTES_READ, bytes_read_);
      ResetCounters();
    }

    // Map to Tickers::NUMBER_DB_NEXT
    uint64_t next_count_;
    // Map to Tickers::NUMBER_DB_NEXT_FOUND
    uint64_t next_found_count_;
    // Map to Tickers::NUMBER_DB_PREV
    uint64_t prev_count_;
    // Map to Tickers::NUMBER_DB_PREV_FOUND
    uint64_t prev_found_count_;
    // Map to Tickers::ITER_BYTES_READ
    uint64_t bytes_read_;
  };

S
sdong 已提交
104 105
  DBIter(Env* env, const ImmutableCFOptions& ioptions, const Comparator* cmp,
         InternalIterator* iter, SequenceNumber s, bool arena_mode,
106
         uint64_t max_sequential_skip_in_iterations, uint64_t version_number,
107
         const Slice* iterate_upper_bound = nullptr,
108
         bool prefix_same_as_start = false, bool pin_data = false)
109 110
      : arena_mode_(arena_mode),
        env_(env),
111
        logger_(ioptions.info_log),
J
jorlow@chromium.org 已提交
112
        user_comparator_(cmp),
113
        merge_operator_(ioptions.merge_operator),
J
jorlow@chromium.org 已提交
114 115
        iter_(iter),
        sequence_(s),
J
jorlow@chromium.org 已提交
116
        direction_(kForward),
117
        valid_(false),
118
        current_entry_is_merged_(false),
119
        statistics_(ioptions.statistics),
120
        version_number_(version_number),
121
        iterate_upper_bound_(iterate_upper_bound),
122
        prefix_same_as_start_(prefix_same_as_start),
123
        pin_thru_lifetime_(pin_data) {
L
Lei Jin 已提交
124
    RecordTick(statistics_, NO_ITERATORS);
125 126
    prefix_extractor_ = ioptions.prefix_extractor;
    max_skip_ = max_sequential_skip_in_iterations;
127 128 129 130 131 132
    if (pin_thru_lifetime_) {
      pinned_iters_mgr_.StartPinning();
    }
    if (iter_) {
      iter_->SetPinnedItersMgr(&pinned_iters_mgr_);
    }
J
jorlow@chromium.org 已提交
133 134
  }
  virtual ~DBIter() {
135
    // Release pinned data if any
136 137 138
    if (pinned_iters_mgr_.PinningEnabled()) {
      pinned_iters_mgr_.ReleasePinnedData();
    }
139
    RecordTick(statistics_, NO_ITERATORS, -1);
140
    local_stats_.BumpGlobalStatistics(statistics_);
141 142 143
    if (!arena_mode_) {
      delete iter_;
    } else {
S
sdong 已提交
144
      iter_->~InternalIterator();
145 146
    }
  }
S
sdong 已提交
147
  virtual void SetIter(InternalIterator* iter) {
148 149
    assert(iter_ == nullptr);
    iter_ = iter;
150
    iter_->SetPinnedItersMgr(&pinned_iters_mgr_);
J
jorlow@chromium.org 已提交
151
  }
I
Igor Sugak 已提交
152 153
  virtual bool Valid() const override { return valid_; }
  virtual Slice key() const override {
J
jorlow@chromium.org 已提交
154
    assert(valid_);
155
    return saved_key_.GetKey();
J
jorlow@chromium.org 已提交
156
  }
I
Igor Sugak 已提交
157
  virtual Slice value() const override {
J
jorlow@chromium.org 已提交
158
    assert(valid_);
159
    if (current_entry_is_merged_) {
160 161 162
      // If pinned_value_ is set then the result of merge operator is one of
      // the merge operands and we should return it.
      return pinned_value_.data() ? pinned_value_ : saved_value_;
163 164 165 166 167
    } else if (direction_ == kReverse) {
      return pinned_value_;
    } else {
      return iter_->value();
    }
J
jorlow@chromium.org 已提交
168
  }
I
Igor Sugak 已提交
169
  virtual Status status() const override {
J
jorlow@chromium.org 已提交
170 171 172 173 174 175
    if (status_.ok()) {
      return iter_->status();
    } else {
      return status_;
    }
  }
176 177 178 179 180 181

  virtual Status GetProperty(std::string prop_name,
                             std::string* prop) override {
    if (prop == nullptr) {
      return Status::InvalidArgument("prop is nullptr");
    }
182
    if (prop_name == "rocksdb.iterator.super-version-number") {
183 184 185 186 187 188
      // First try to pass the value returned from inner iterator.
      if (!iter_->GetProperty(prop_name, prop).ok()) {
        *prop = ToString(version_number_);
      }
      return Status::OK();
    } else if (prop_name == "rocksdb.iterator.is-key-pinned") {
189
      if (valid_) {
190
        *prop = (pin_thru_lifetime_ && saved_key_.IsKeyPinned()) ? "1" : "0";
191 192 193 194 195 196
      } else {
        *prop = "Iterator is not valid.";
      }
      return Status::OK();
    }
    return Status::InvalidArgument("Undentified property.");
197
  }
J
jorlow@chromium.org 已提交
198

I
Igor Sugak 已提交
199 200 201 202 203
  virtual void Next() override;
  virtual void Prev() override;
  virtual void Seek(const Slice& target) override;
  virtual void SeekToFirst() override;
  virtual void SeekToLast() override;
J
jorlow@chromium.org 已提交
204

J
jorlow@chromium.org 已提交
205
 private:
206
  void ReverseToBackward();
S
Stanislau Hlebik 已提交
207 208 209 210 211 212
  void PrevInternal();
  void FindParseableKey(ParsedInternalKey* ikey, Direction direction);
  bool FindValueForCurrentKey();
  bool FindValueForCurrentKeyUsingSeek();
  void FindPrevUserKey();
  void FindNextUserKey();
213 214
  inline void FindNextUserEntry(bool skipping, bool prefix_check);
  void FindNextUserEntryInternal(bool skipping, bool prefix_check);
J
jorlow@chromium.org 已提交
215
  bool ParseKey(ParsedInternalKey* key);
216
  void MergeValuesNewToOld();
J
jorlow@chromium.org 已提交
217

218 219 220 221 222 223 224 225 226 227
  // Temporarily pin the blocks that we encounter until ReleaseTempPinnedData()
  // is called
  void TempPinData() {
    if (!pin_thru_lifetime_) {
      pinned_iters_mgr_.StartPinning();
    }
  }

  // Release blocks pinned by TempPinData()
  void ReleaseTempPinnedData() {
228 229
    if (!pin_thru_lifetime_ && pinned_iters_mgr_.PinningEnabled()) {
      pinned_iters_mgr_.ReleasePinnedData();
230 231 232
    }
  }

J
jorlow@chromium.org 已提交
233 234 235 236 237 238 239 240 241
  inline void ClearSavedValue() {
    if (saved_value_.capacity() > 1048576) {
      std::string empty;
      swap(empty, saved_value_);
    } else {
      saved_value_.clear();
    }
  }

242
  const SliceTransform* prefix_extractor_;
243
  bool arena_mode_;
J
jorlow@chromium.org 已提交
244
  Env* const env_;
I
Igor Canadi 已提交
245
  Logger* logger_;
J
jorlow@chromium.org 已提交
246
  const Comparator* const user_comparator_;
247
  const MergeOperator* const merge_operator_;
S
sdong 已提交
248
  InternalIterator* iter_;
J
jorlow@chromium.org 已提交
249
  SequenceNumber const sequence_;
J
jorlow@chromium.org 已提交
250

J
jorlow@chromium.org 已提交
251
  Status status_;
S
Stanislau Hlebik 已提交
252 253
  IterKey saved_key_;
  std::string saved_value_;
254
  Slice pinned_value_;
J
jorlow@chromium.org 已提交
255
  Direction direction_;
J
jorlow@chromium.org 已提交
256
  bool valid_;
257
  bool current_entry_is_merged_;
258
  Statistics* statistics_;
259
  uint64_t max_skip_;
260
  uint64_t version_number_;
261
  const Slice* iterate_upper_bound_;
262 263 264
  IterKey prefix_start_buf_;
  Slice prefix_start_key_;
  const bool prefix_same_as_start_;
265 266 267
  // Means that we will pin all data blocks we read as long the Iterator
  // is not deleted, will be true if ReadOptions::pin_data is true
  const bool pin_thru_lifetime_;
268
  // List of operands for merge operator.
269
  MergeContext merge_context_;
270
  LocalStatistics local_stats_;
271
  PinnedIteratorsManager pinned_iters_mgr_;
J
jorlow@chromium.org 已提交
272 273 274 275 276 277 278 279 280

  // No copying allowed
  DBIter(const DBIter&);
  void operator=(const DBIter&);
};

inline bool DBIter::ParseKey(ParsedInternalKey* ikey) {
  if (!ParseInternalKey(iter_->key(), ikey)) {
    status_ = Status::Corruption("corrupted internal key in DBIter");
281 282
    Log(InfoLogLevel::ERROR_LEVEL,
        logger_, "corrupted internal key in DBIter: %s",
283
        iter_->key().ToString(true).c_str());
J
jorlow@chromium.org 已提交
284 285 286 287 288 289
    return false;
  } else {
    return true;
  }
}

J
jorlow@chromium.org 已提交
290 291 292
void DBIter::Next() {
  assert(valid_);

293 294
  // Release temporarily pinned blocks from last operation
  ReleaseTempPinnedData();
S
Stanislau Hlebik 已提交
295
  if (direction_ == kReverse) {
296 297 298 299 300
    FindNextUserKey();
    direction_ = kForward;
    if (!iter_->Valid()) {
      iter_->SeekToFirst();
    }
301 302 303 304 305 306 307
  } else if (iter_->Valid() && !current_entry_is_merged_) {
    // If the current value is not a merge, the iter position is the
    // current key, which is already returned. We can safely issue a
    // Next() without checking the current key.
    // If the current key is a merge, very likely iter already points
    // to the next internal position.
    iter_->Next();
308
    PERF_COUNTER_ADD(internal_key_skipped_count, 1);
J
jorlow@chromium.org 已提交
309
  }
J
jorlow@chromium.org 已提交
310

311 312 313
  if (statistics_ != nullptr) {
    local_stats_.next_count_++;
  }
314 315
  // Now we point to the next internal position, for both of merge and
  // not merge cases.
316 317 318 319
  if (!iter_->Valid()) {
    valid_ = false;
    return;
  }
320
  FindNextUserEntry(true /* skipping the current user key */, prefix_same_as_start_);
321 322 323 324
  if (statistics_ != nullptr && valid_) {
    local_stats_.next_found_count_++;
    local_stats_.bytes_read_ += (key().size() + value().size());
  }
J
jorlow@chromium.org 已提交
325 326
}

327 328 329 330 331 332 333 334
// PRE: saved_key_ has the current user key if skipping
// POST: saved_key_ should have the next user key if valid_,
//       if the current entry is a result of merge
//           current_entry_is_merged_ => true
//           saved_value_             => the merged value
//
// NOTE: In between, saved_key_ can point to a user key that has
//       a delete marker
335 336 337 338 339 340
//
// The prefix_check parameter controls whether we check the iterated
// keys against the prefix of the seeked key. Set to false when
// performing a seek without a key (e.g. SeekToFirst). Set to
// prefix_same_as_start_ for other iterations.
inline void DBIter::FindNextUserEntry(bool skipping, bool prefix_check) {
341
  PERF_TIMER_GUARD(find_next_user_entry_time);
342
  FindNextUserEntryInternal(skipping, prefix_check);
343 344 345
}

// Actual implementation of DBIter::FindNextUserEntry()
346
void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) {
J
jorlow@chromium.org 已提交
347 348 349
  // Loop until we hit an acceptable entry to yield
  assert(iter_->Valid());
  assert(direction_ == kForward);
350
  current_entry_is_merged_ = false;
351
  uint64_t num_skipped = 0;
J
jorlow@chromium.org 已提交
352
  do {
J
jorlow@chromium.org 已提交
353
    ParsedInternalKey ikey;
354 355 356

    if (ParseKey(&ikey)) {
      if (iterate_upper_bound_ != nullptr &&
357
          user_comparator_->Compare(ikey.user_key, *iterate_upper_bound_) >= 0) {
358 359 360
        break;
      }

361 362 363 364 365
      if (prefix_extractor_ && prefix_check &&
          prefix_extractor_->Transform(ikey.user_key).compare(prefix_start_key_) != 0) {
        break;
      }

366 367 368 369 370 371 372 373
      if (ikey.sequence <= sequence_) {
        if (skipping &&
           user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) <= 0) {
          num_skipped++;  // skip this entry
          PERF_COUNTER_ADD(internal_key_skipped_count, 1);
        } else {
          switch (ikey.type) {
            case kTypeDeletion:
A
Andres Noetzli 已提交
374
            case kTypeSingleDeletion:
375 376
              // Arrange to skip all upcoming entries for this key since
              // they are hidden by this deletion.
377 378 379
              saved_key_.SetKey(
                  ikey.user_key,
                  !iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
380 381 382 383 384 385
              skipping = true;
              num_skipped = 0;
              PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
              break;
            case kTypeValue:
              valid_ = true;
386 387 388
              saved_key_.SetKey(
                  ikey.user_key,
                  !iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
389 390 391
              return;
            case kTypeMerge:
              // By now, we are sure the current ikey is going to yield a value
392 393 394
              saved_key_.SetKey(
                  ikey.user_key,
                  !iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
395 396 397 398 399 400 401 402
              current_entry_is_merged_ = true;
              valid_ = true;
              MergeValuesNewToOld();  // Go to a different state machine
              return;
            default:
              assert(false);
              break;
          }
403
        }
J
jorlow@chromium.org 已提交
404
      }
J
jorlow@chromium.org 已提交
405
    }
406 407
    // If we have sequentially iterated via numerous keys and still not
    // found the next user-key, then it is better to seek so that we can
C
clark.kang 已提交
408
    // avoid too many key comparisons. We seek to the last occurrence of
409 410
    // our current key by looking for sequence number 0 and type deletion
    // (the smallest type).
411 412 413
    if (skipping && num_skipped > max_skip_) {
      num_skipped = 0;
      std::string last_key;
414
      AppendInternalKey(&last_key, ParsedInternalKey(saved_key_.GetKey(), 0,
415
                                                     kTypeDeletion));
416 417 418 419 420
      iter_->Seek(last_key);
      RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
    } else {
      iter_->Next();
    }
J
jorlow@chromium.org 已提交
421 422
  } while (iter_->Valid());
  valid_ = false;
J
jorlow@chromium.org 已提交
423 424
}

425 426 427 428 429 430 431
// Merge values of the same user key starting from the current iter_ position
// Scan from the newer entries to older entries.
// PRE: iter_->key() points to the first merge type entry
//      saved_key_ stores the user key
// POST: saved_value_ has the merged value for the user key
//       iter_ points to the next entry (or invalid)
void DBIter::MergeValuesNewToOld() {
432
  if (!merge_operator_) {
433 434
    Log(InfoLogLevel::ERROR_LEVEL,
        logger_, "Options::merge_operator is null.");
435
    status_ = Status::InvalidArgument("merge_operator_ must be set.");
436 437
    valid_ = false;
    return;
D
Deon Nicholas 已提交
438
  }
439

440 441
  // Temporarily pin the blocks that hold merge operands
  TempPinData();
442
  merge_context_.Clear();
443
  // Start the merge process by pushing the first operand
444 445
  merge_context_.PushOperand(iter_->value(),
                             iter_->IsValuePinned() /* operand_pinned */);
446 447 448 449 450 451 452 453

  ParsedInternalKey ikey;
  for (iter_->Next(); iter_->Valid(); iter_->Next()) {
    if (!ParseKey(&ikey)) {
      // skip corrupted key
      continue;
    }

454
    if (!user_comparator_->Equal(ikey.user_key, saved_key_.GetKey())) {
455 456
      // hit the next user key, stop right here
      break;
A
Andres Noetzli 已提交
457
    } else if (kTypeDeletion == ikey.type || kTypeSingleDeletion == ikey.type) {
458 459 460 461
      // hit a delete with the same user key, stop right here
      // iter_ is positioned after delete
      iter_->Next();
      break;
A
Andres Noetzli 已提交
462
    } else if (kTypeValue == ikey.type) {
463 464 465
      // hit a put, merge the put value with operands and store the
      // final result in saved_value_. We are done!
      // ignore corruption if there is any.
I
Igor Canadi 已提交
466
      const Slice val = iter_->value();
467 468
      MergeHelper::TimedFullMerge(merge_operator_, ikey.user_key, &val,
                                  merge_context_.GetOperands(), &saved_value_,
469
                                  logger_, statistics_, env_, &pinned_value_);
470 471 472
      // iter_ is positioned after put
      iter_->Next();
      return;
A
Andres Noetzli 已提交
473
    } else if (kTypeMerge == ikey.type) {
474 475
      // hit a merge, add the value as an operand and run associative merge.
      // when complete, add result to operands and continue.
476 477
      merge_context_.PushOperand(iter_->value(),
                                 iter_->IsValuePinned() /* operand_pinned */);
A
Andres Noetzli 已提交
478 479
    } else {
      assert(false);
480 481 482
    }
  }

483 484 485 486 487 488
  // we either exhausted all internal keys under this user key, or hit
  // a deletion marker.
  // feed null as the existing value to the merge operator, such that
  // client can differentiate this scenario and do things accordingly.
  MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(), nullptr,
                              merge_context_.GetOperands(), &saved_value_,
489
                              logger_, statistics_, env_, &pinned_value_);
490 491
}

J
jorlow@chromium.org 已提交
492 493
void DBIter::Prev() {
  assert(valid_);
494
  ReleaseTempPinnedData();
S
Stanislau Hlebik 已提交
495
  if (direction_ == kForward) {
496
    ReverseToBackward();
S
Stanislau Hlebik 已提交
497 498
  }
  PrevInternal();
M
Manuel Ung 已提交
499
  if (statistics_ != nullptr) {
500
    local_stats_.prev_count_++;
M
Manuel Ung 已提交
501
    if (valid_) {
502 503
      local_stats_.prev_found_count_++;
      local_stats_.bytes_read_ += (key().size() + value().size());
M
Manuel Ung 已提交
504 505
    }
  }
506 507
  if (valid_ && prefix_extractor_ && prefix_same_as_start_ &&
      prefix_extractor_->Transform(saved_key_.GetKey())
508
              .compare(prefix_start_key_) != 0) {
509 510
    valid_ = false;
  }
S
Stanislau Hlebik 已提交
511
}
J
jorlow@chromium.org 已提交
512

513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539
void DBIter::ReverseToBackward() {
  if (current_entry_is_merged_) {
    // Not placed in the same key. Need to call Prev() until finding the
    // previous key.
    if (!iter_->Valid()) {
      iter_->SeekToLast();
    }
    ParsedInternalKey ikey;
    FindParseableKey(&ikey, kReverse);
    while (iter_->Valid() &&
           user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) > 0) {
      iter_->Prev();
      FindParseableKey(&ikey, kReverse);
    }
  }
#ifndef NDEBUG
  if (iter_->Valid()) {
    ParsedInternalKey ikey;
    assert(ParseKey(&ikey));
    assert(user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) <= 0);
  }
#endif

  FindPrevUserKey();
  direction_ = kReverse;
}

S
Stanislau Hlebik 已提交
540 541 542 543
void DBIter::PrevInternal() {
  if (!iter_->Valid()) {
    valid_ = false;
    return;
544 545
  }

S
Stanislau Hlebik 已提交
546 547 548
  ParsedInternalKey ikey;

  while (iter_->Valid()) {
549
    saved_key_.SetKey(ExtractUserKey(iter_->key()),
550
                      !iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
S
Stanislau Hlebik 已提交
551 552
    if (FindValueForCurrentKey()) {
      valid_ = true;
J
jorlow@chromium.org 已提交
553 554 555
      if (!iter_->Valid()) {
        return;
      }
S
Stanislau Hlebik 已提交
556
      FindParseableKey(&ikey, kReverse);
557
      if (user_comparator_->Equal(ikey.user_key, saved_key_.GetKey())) {
S
Stanislau Hlebik 已提交
558
        FindPrevUserKey();
J
jorlow@chromium.org 已提交
559
      }
S
Stanislau Hlebik 已提交
560
      return;
J
jorlow@chromium.org 已提交
561
    }
S
Stanislau Hlebik 已提交
562 563 564 565
    if (!iter_->Valid()) {
      break;
    }
    FindParseableKey(&ikey, kReverse);
566
    if (user_comparator_->Equal(ikey.user_key, saved_key_.GetKey())) {
S
Stanislau Hlebik 已提交
567 568 569 570 571 572
      FindPrevUserKey();
    }
  }
  // We haven't found any key - iterator is not valid
  assert(!iter_->Valid());
  valid_ = false;
J
jorlow@chromium.org 已提交
573 574
}

S
Stanislau Hlebik 已提交
575
// This function checks, if the entry with biggest sequence_number <= sequence_
A
Andres Noetzli 已提交
576 577
// is non kTypeDeletion or kTypeSingleDeletion. If it's not, we save value in
// saved_value_
S
Stanislau Hlebik 已提交
578 579
bool DBIter::FindValueForCurrentKey() {
  assert(iter_->Valid());
580
  merge_context_.Clear();
581
  current_entry_is_merged_ = false;
A
Andres Noetzli 已提交
582 583
  // last entry before merge (could be kTypeDeletion, kTypeSingleDeletion or
  // kTypeValue)
S
Stanislau Hlebik 已提交
584 585
  ValueType last_not_merge_type = kTypeDeletion;
  ValueType last_key_entry_type = kTypeDeletion;
J
jorlow@chromium.org 已提交
586

S
Stanislau Hlebik 已提交
587 588 589
  ParsedInternalKey ikey;
  FindParseableKey(&ikey, kReverse);

590 591 592
  // Temporarily pin blocks that hold (merge operands / the value)
  ReleaseTempPinnedData();
  TempPinData();
S
Stanislau Hlebik 已提交
593 594
  size_t num_skipped = 0;
  while (iter_->Valid() && ikey.sequence <= sequence_ &&
595
         user_comparator_->Equal(ikey.user_key, saved_key_.GetKey())) {
S
Stanislau Hlebik 已提交
596 597 598 599 600 601 602 603
    // We iterate too much: let's use Seek() to avoid too much key comparisons
    if (num_skipped >= max_skip_) {
      return FindValueForCurrentKeyUsingSeek();
    }

    last_key_entry_type = ikey.type;
    switch (last_key_entry_type) {
      case kTypeValue:
604
        merge_context_.Clear();
605
        assert(iter_->IsValuePinned());
606
        pinned_value_ = iter_->value();
S
Stanislau Hlebik 已提交
607 608 609
        last_not_merge_type = kTypeValue;
        break;
      case kTypeDeletion:
A
Andres Noetzli 已提交
610
      case kTypeSingleDeletion:
611
        merge_context_.Clear();
A
Andres Noetzli 已提交
612
        last_not_merge_type = last_key_entry_type;
613
        PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
S
Stanislau Hlebik 已提交
614 615
        break;
      case kTypeMerge:
616
        assert(merge_operator_ != nullptr);
617 618
        merge_context_.PushOperandBack(
            iter_->value(), iter_->IsValuePinned() /* operand_pinned */);
S
Stanislau Hlebik 已提交
619 620 621 622 623
        break;
      default:
        assert(false);
    }

624
    PERF_COUNTER_ADD(internal_key_skipped_count, 1);
625
    assert(user_comparator_->Equal(ikey.user_key, saved_key_.GetKey()));
S
Stanislau Hlebik 已提交
626 627 628 629 630 631 632
    iter_->Prev();
    ++num_skipped;
    FindParseableKey(&ikey, kReverse);
  }

  switch (last_key_entry_type) {
    case kTypeDeletion:
A
Andres Noetzli 已提交
633
    case kTypeSingleDeletion:
S
Stanislau Hlebik 已提交
634 635 636
      valid_ = false;
      return false;
    case kTypeMerge:
637
      current_entry_is_merged_ = true;
S
Stanislau Hlebik 已提交
638
      if (last_not_merge_type == kTypeDeletion) {
639 640
        MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(),
                                    nullptr, merge_context_.GetOperands(),
641 642
                                    &saved_value_, logger_, statistics_, env_,
                                    &pinned_value_);
643
      } else {
S
Stanislau Hlebik 已提交
644
        assert(last_not_merge_type == kTypeValue);
645 646 647
        MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(),
                                    &pinned_value_,
                                    merge_context_.GetOperands(), &saved_value_,
648
                                    logger_, statistics_, env_, &pinned_value_);
649
      }
S
Stanislau Hlebik 已提交
650 651 652 653 654 655 656
      break;
    case kTypeValue:
      // do nothing - we've already has value in saved_value_
      break;
    default:
      assert(false);
      break;
J
jorlow@chromium.org 已提交
657
  }
S
Stanislau Hlebik 已提交
658 659 660
  valid_ = true;
  return true;
}
J
jorlow@chromium.org 已提交
661

S
Stanislau Hlebik 已提交
662 663 664
// This function is used in FindValueForCurrentKey.
// We use Seek() function instead of Prev() to find necessary value
bool DBIter::FindValueForCurrentKeyUsingSeek() {
665 666 667
  // FindValueForCurrentKey will enable pinning before calling
  // FindValueForCurrentKeyUsingSeek()
  assert(pinned_iters_mgr_.PinningEnabled());
S
Stanislau Hlebik 已提交
668 669 670 671 672 673 674 675 676 677
  std::string last_key;
  AppendInternalKey(&last_key, ParsedInternalKey(saved_key_.GetKey(), sequence_,
                                                 kValueTypeForSeek));
  iter_->Seek(last_key);
  RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);

  // assume there is at least one parseable key for this user key
  ParsedInternalKey ikey;
  FindParseableKey(&ikey, kForward);

A
Andres Noetzli 已提交
678 679
  if (ikey.type == kTypeValue || ikey.type == kTypeDeletion ||
      ikey.type == kTypeSingleDeletion) {
S
Stanislau Hlebik 已提交
680
    if (ikey.type == kTypeValue) {
681
      assert(iter_->IsValuePinned());
682
      pinned_value_ = iter_->value();
S
Stanislau Hlebik 已提交
683 684 685
      valid_ = true;
      return true;
    }
J
jorlow@chromium.org 已提交
686
    valid_ = false;
S
Stanislau Hlebik 已提交
687 688 689 690 691
    return false;
  }

  // kTypeMerge. We need to collect all kTypeMerge values and save them
  // in operands
692
  current_entry_is_merged_ = true;
693
  merge_context_.Clear();
S
Stanislau Hlebik 已提交
694
  while (iter_->Valid() &&
695
         user_comparator_->Equal(ikey.user_key, saved_key_.GetKey()) &&
S
Stanislau Hlebik 已提交
696
         ikey.type == kTypeMerge) {
697 698
    merge_context_.PushOperand(iter_->value(),
                               iter_->IsValuePinned() /* operand_pinned */);
S
Stanislau Hlebik 已提交
699 700 701 702 703
    iter_->Next();
    FindParseableKey(&ikey, kForward);
  }

  if (!iter_->Valid() ||
704
      !user_comparator_->Equal(ikey.user_key, saved_key_.GetKey()) ||
A
Andres Noetzli 已提交
705
      ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion) {
706 707
    MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(), nullptr,
                                merge_context_.GetOperands(), &saved_value_,
708
                                logger_, statistics_, env_, &pinned_value_);
S
Stanislau Hlebik 已提交
709 710
    // Make iter_ valid and point to saved_key_
    if (!iter_->Valid() ||
711
        !user_comparator_->Equal(ikey.user_key, saved_key_.GetKey())) {
S
Stanislau Hlebik 已提交
712 713 714
      iter_->Seek(last_key);
      RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
    }
J
jorlow@chromium.org 已提交
715
    valid_ = true;
S
Stanislau Hlebik 已提交
716 717 718
    return true;
  }

I
Igor Canadi 已提交
719
  const Slice& val = iter_->value();
720 721
  MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(), &val,
                              merge_context_.GetOperands(), &saved_value_,
722
                              logger_, statistics_, env_, &pinned_value_);
S
Stanislau Hlebik 已提交
723 724 725 726 727 728 729 730 731 732 733 734 735 736 737
  valid_ = true;
  return true;
}

// Used in Next to change directions
// Go to next user key
// Don't use Seek(),
// because next user key will be very close
void DBIter::FindNextUserKey() {
  if (!iter_->Valid()) {
    return;
  }
  ParsedInternalKey ikey;
  FindParseableKey(&ikey, kForward);
  while (iter_->Valid() &&
738
         !user_comparator_->Equal(ikey.user_key, saved_key_.GetKey())) {
S
Stanislau Hlebik 已提交
739 740 741 742 743 744 745 746 747 748 749 750 751
    iter_->Next();
    FindParseableKey(&ikey, kForward);
  }
}

// Go to previous user_key
void DBIter::FindPrevUserKey() {
  if (!iter_->Valid()) {
    return;
  }
  size_t num_skipped = 0;
  ParsedInternalKey ikey;
  FindParseableKey(&ikey, kReverse);
752 753 754 755 756 757 758 759 760 761 762 763 764 765 766
  int cmp;
  while (iter_->Valid() && ((cmp = user_comparator_->Compare(
                                 ikey.user_key, saved_key_.GetKey())) == 0 ||
                            (cmp > 0 && ikey.sequence > sequence_))) {
    if (cmp == 0) {
      if (num_skipped >= max_skip_) {
        num_skipped = 0;
        IterKey last_key;
        last_key.SetInternalKey(ParsedInternalKey(
            saved_key_.GetKey(), kMaxSequenceNumber, kValueTypeForSeek));
        iter_->Seek(last_key.GetKey());
        RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
      } else {
        ++num_skipped;
      }
S
Stanislau Hlebik 已提交
767 768 769 770 771 772 773 774 775 776 777 778 779 780
    }
    iter_->Prev();
    FindParseableKey(&ikey, kReverse);
  }
}

// Skip all unparseable keys
void DBIter::FindParseableKey(ParsedInternalKey* ikey, Direction direction) {
  while (iter_->Valid() && !ParseKey(ikey)) {
    if (direction == kReverse) {
      iter_->Prev();
    } else {
      iter_->Next();
    }
J
jorlow@chromium.org 已提交
781 782
  }
}
J
jorlow@chromium.org 已提交
783

J
jorlow@chromium.org 已提交
784
void DBIter::Seek(const Slice& target) {
L
Lei Jin 已提交
785
  StopWatch sw(env_, statistics_, DB_SEEK);
786
  ReleaseTempPinnedData();
787 788 789
  saved_key_.Clear();
  // now savved_key is used to store internal key.
  saved_key_.SetInternalKey(target, sequence_);
790 791 792 793 794

  {
    PERF_TIMER_GUARD(seek_internal_seek_time);
    iter_->Seek(saved_key_.GetKey());
  }
795

M
Manuel Ung 已提交
796
  RecordTick(statistics_, NUMBER_DB_SEEK);
J
jorlow@chromium.org 已提交
797
  if (iter_->Valid()) {
798 799 800
    if (prefix_extractor_ && prefix_same_as_start_) {
      prefix_start_key_ = prefix_extractor_->Transform(target);
    }
801 802
    direction_ = kForward;
    ClearSavedValue();
803 804 805 806
    FindNextUserEntry(false /* not skipping */, prefix_same_as_start_);
    if (!valid_) {
      prefix_start_key_.clear();
    }
M
Manuel Ung 已提交
807 808 809 810 811 812
    if (statistics_ != nullptr) {
      if (valid_) {
        RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
        RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
      }
    }
J
jorlow@chromium.org 已提交
813 814 815
  } else {
    valid_ = false;
  }
816
  if (valid_ && prefix_extractor_ && prefix_same_as_start_) {
817 818
    prefix_start_buf_.SetKey(prefix_start_key_);
    prefix_start_key_ = prefix_start_buf_.GetKey();
819
  }
J
jorlow@chromium.org 已提交
820
}
J
jorlow@chromium.org 已提交
821

J
jorlow@chromium.org 已提交
822
void DBIter::SeekToFirst() {
S
Stanislau Hlebik 已提交
823
  // Don't use iter_::Seek() if we set a prefix extractor
824
  // because prefix seek will be used.
825
  if (prefix_extractor_ != nullptr) {
S
Stanislau Hlebik 已提交
826 827
    max_skip_ = std::numeric_limits<uint64_t>::max();
  }
J
jorlow@chromium.org 已提交
828
  direction_ = kForward;
829
  ReleaseTempPinnedData();
J
jorlow@chromium.org 已提交
830
  ClearSavedValue();
831 832 833 834 835 836

  {
    PERF_TIMER_GUARD(seek_internal_seek_time);
    iter_->SeekToFirst();
  }

M
Manuel Ung 已提交
837
  RecordTick(statistics_, NUMBER_DB_SEEK);
J
jorlow@chromium.org 已提交
838
  if (iter_->Valid()) {
839
    FindNextUserEntry(false /* not skipping */, false /* no prefix check */);
M
Manuel Ung 已提交
840 841 842 843 844 845
    if (statistics_ != nullptr) {
      if (valid_) {
        RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
        RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
      }
    }
J
jorlow@chromium.org 已提交
846 847
  } else {
    valid_ = false;
J
jorlow@chromium.org 已提交
848
  }
849
  if (valid_ && prefix_extractor_ && prefix_same_as_start_) {
850 851
    prefix_start_buf_.SetKey(prefix_extractor_->Transform(saved_key_.GetKey()));
    prefix_start_key_ = prefix_start_buf_.GetKey();
852
  }
J
jorlow@chromium.org 已提交
853 854
}

J
jorlow@chromium.org 已提交
855
void DBIter::SeekToLast() {
S
Stanislau Hlebik 已提交
856
  // Don't use iter_::Seek() if we set a prefix extractor
857
  // because prefix seek will be used.
858
  if (prefix_extractor_ != nullptr) {
S
Stanislau Hlebik 已提交
859 860
    max_skip_ = std::numeric_limits<uint64_t>::max();
  }
J
jorlow@chromium.org 已提交
861
  direction_ = kReverse;
862
  ReleaseTempPinnedData();
J
jorlow@chromium.org 已提交
863
  ClearSavedValue();
864 865 866 867 868

  {
    PERF_TIMER_GUARD(seek_internal_seek_time);
    iter_->SeekToLast();
  }
869 870 871 872
  // When the iterate_upper_bound is set to a value,
  // it will seek to the last key before the
  // ReadOptions.iterate_upper_bound
  if (iter_->Valid() && iterate_upper_bound_ != nullptr) {
873
    saved_key_.SetKey(*iterate_upper_bound_, false /* copy */);
874 875 876 877 878 879
    std::string last_key;
    AppendInternalKey(&last_key,
                      ParsedInternalKey(saved_key_.GetKey(), kMaxSequenceNumber,
                                        kValueTypeForSeek));

    iter_->Seek(last_key);
S
Stanislau Hlebik 已提交
880

881 882 883 884 885 886 887 888 889 890
    if (!iter_->Valid()) {
      iter_->SeekToLast();
    } else {
      iter_->Prev();
      if (!iter_->Valid()) {
        valid_ = false;
        return;
      }
    }
  }
S
Stanislau Hlebik 已提交
891
  PrevInternal();
M
Manuel Ung 已提交
892 893 894 895 896 897 898
  if (statistics_ != nullptr) {
    RecordTick(statistics_, NUMBER_DB_SEEK);
    if (valid_) {
      RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
      RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
    }
  }
899
  if (valid_ && prefix_extractor_ && prefix_same_as_start_) {
900 901
    prefix_start_buf_.SetKey(prefix_extractor_->Transform(saved_key_.GetKey()));
    prefix_start_key_ = prefix_start_buf_.GetKey();
902
  }
J
jorlow@chromium.org 已提交
903 904
}

905
Iterator* NewDBIterator(Env* env, const ImmutableCFOptions& ioptions,
906
                        const Comparator* user_key_comparator,
S
sdong 已提交
907
                        InternalIterator* internal_iter,
908
                        const SequenceNumber& sequence,
909
                        uint64_t max_sequential_skip_in_iterations,
910
                        uint64_t version_number,
911
                        const Slice* iterate_upper_bound,
912 913 914
                        bool prefix_same_as_start, bool pin_data) {
  DBIter* db_iter =
      new DBIter(env, ioptions, user_key_comparator, internal_iter, sequence,
915
                 false, max_sequential_skip_in_iterations, version_number,
916
                 iterate_upper_bound, prefix_same_as_start, pin_data);
917
  return db_iter;
918 919
}

I
Igor Canadi 已提交
920
ArenaWrappedDBIter::~ArenaWrappedDBIter() { db_iter_->~DBIter(); }
921 922 923

void ArenaWrappedDBIter::SetDBIter(DBIter* iter) { db_iter_ = iter; }

S
sdong 已提交
924
void ArenaWrappedDBIter::SetIterUnderDBIter(InternalIterator* iter) {
925 926 927 928 929 930 931 932 933 934 935 936 937 938
  static_cast<DBIter*>(db_iter_)->SetIter(iter);
}

inline bool ArenaWrappedDBIter::Valid() const { return db_iter_->Valid(); }
inline void ArenaWrappedDBIter::SeekToFirst() { db_iter_->SeekToFirst(); }
inline void ArenaWrappedDBIter::SeekToLast() { db_iter_->SeekToLast(); }
inline void ArenaWrappedDBIter::Seek(const Slice& target) {
  db_iter_->Seek(target);
}
inline void ArenaWrappedDBIter::Next() { db_iter_->Next(); }
inline void ArenaWrappedDBIter::Prev() { db_iter_->Prev(); }
inline Slice ArenaWrappedDBIter::key() const { return db_iter_->key(); }
inline Slice ArenaWrappedDBIter::value() const { return db_iter_->value(); }
inline Status ArenaWrappedDBIter::status() const { return db_iter_->status(); }
939 940 941 942
inline Status ArenaWrappedDBIter::GetProperty(std::string prop_name,
                                              std::string* prop) {
  return db_iter_->GetProperty(prop_name, prop);
}
943 944 945 946
void ArenaWrappedDBIter::RegisterCleanup(CleanupFunction function, void* arg1,
                                         void* arg2) {
  db_iter_->RegisterCleanup(function, arg1, arg2);
}
J
jorlow@chromium.org 已提交
947

948
ArenaWrappedDBIter* NewArenaWrappedDbIterator(
949
    Env* env, const ImmutableCFOptions& ioptions,
950
    const Comparator* user_key_comparator, const SequenceNumber& sequence,
951
    uint64_t max_sequential_skip_in_iterations, uint64_t version_number,
952 953
    const Slice* iterate_upper_bound, bool prefix_same_as_start,
    bool pin_data) {
954 955 956
  ArenaWrappedDBIter* iter = new ArenaWrappedDBIter();
  Arena* arena = iter->GetArena();
  auto mem = arena->AllocateAligned(sizeof(DBIter));
957 958
  DBIter* db_iter =
      new (mem) DBIter(env, ioptions, user_key_comparator, nullptr, sequence,
959
                       true, max_sequential_skip_in_iterations, version_number,
960
                       iterate_upper_bound, prefix_same_as_start, pin_data);
961

962
  iter->SetDBIter(db_iter);
963

964
  return iter;
J
jorlow@chromium.org 已提交
965 966
}

967
}  // namespace rocksdb