db_iter.cc 32.5 KB
Newer Older
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 3 4 5
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.
//
J
jorlow@chromium.org 已提交
6 7 8 9 10
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include "db/db_iter.h"
11
#include <stdexcept>
12
#include <deque>
S
Stanislau Hlebik 已提交
13
#include <string>
S
Stanislau Hlebik 已提交
14
#include <limits>
J
jorlow@chromium.org 已提交
15 16

#include "db/dbformat.h"
17
#include "db/filename.h"
18
#include "db/merge_context.h"
19
#include "db/merge_helper.h"
20
#include "db/pinned_iterators_manager.h"
S
sdong 已提交
21
#include "port/port.h"
22 23 24
#include "rocksdb/env.h"
#include "rocksdb/iterator.h"
#include "rocksdb/merge_operator.h"
25
#include "rocksdb/options.h"
S
sdong 已提交
26
#include "table/internal_iterator.h"
27
#include "util/arena.h"
J
jorlow@chromium.org 已提交
28 29
#include "util/logging.h"
#include "util/mutexlock.h"
30
#include "util/perf_context_imp.h"
31
#include "util/string_util.h"
J
jorlow@chromium.org 已提交
32

33
namespace rocksdb {
J
jorlow@chromium.org 已提交
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54

#if 0
static void DumpInternalIter(Iterator* iter) {
  for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
    ParsedInternalKey k;
    if (!ParseInternalKey(iter->key(), &k)) {
      fprintf(stderr, "Corrupt '%s'\n", EscapeString(iter->key()).c_str());
    } else {
      fprintf(stderr, "@ '%s'\n", k.DebugString().c_str());
    }
  }
}
#endif

// Memtables and sstables that make the DB representation contain
// (userkey,seq,type) => uservalue entries.  DBIter
// combines multiple entries for the same userkey found in the DB
// representation into a single entry while accounting for sequence
// numbers, deletion markers, overwrites, etc.
class DBIter: public Iterator {
 public:
55
  // The following is grossly complicated. TODO: clean it up
J
jorlow@chromium.org 已提交
56 57 58 59 60 61 62 63 64 65
  // Which direction is the iterator currently moving?
  // (1) When moving forward, the internal iterator is positioned at
  //     the exact entry that yields this->key(), this->value()
  // (2) When moving backwards, the internal iterator is positioned
  //     just before all entries whose user key == this->key().
  enum Direction {
    kForward,
    kReverse
  };

66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
  // LocalStatistics contain Statistics counters that will be aggregated per
  // each iterator instance and then will be sent to the global statistics when
  // the iterator is destroyed.
  //
  // The purpose of this approach is to avoid perf regression happening
  // when multiple threads bump the atomic counters from a DBIter::Next().
  struct LocalStatistics {
    explicit LocalStatistics() { ResetCounters(); }

    void ResetCounters() {
      next_count_ = 0;
      next_found_count_ = 0;
      prev_count_ = 0;
      prev_found_count_ = 0;
      bytes_read_ = 0;
    }

    void BumpGlobalStatistics(Statistics* global_statistics) {
      RecordTick(global_statistics, NUMBER_DB_NEXT, next_count_);
      RecordTick(global_statistics, NUMBER_DB_NEXT_FOUND, next_found_count_);
      RecordTick(global_statistics, NUMBER_DB_PREV, prev_count_);
      RecordTick(global_statistics, NUMBER_DB_PREV_FOUND, prev_found_count_);
      RecordTick(global_statistics, ITER_BYTES_READ, bytes_read_);
      ResetCounters();
    }

    // Map to Tickers::NUMBER_DB_NEXT
    uint64_t next_count_;
    // Map to Tickers::NUMBER_DB_NEXT_FOUND
    uint64_t next_found_count_;
    // Map to Tickers::NUMBER_DB_PREV
    uint64_t prev_count_;
    // Map to Tickers::NUMBER_DB_PREV_FOUND
    uint64_t prev_found_count_;
    // Map to Tickers::ITER_BYTES_READ
    uint64_t bytes_read_;
  };

S
sdong 已提交
104 105
  DBIter(Env* env, const ImmutableCFOptions& ioptions, const Comparator* cmp,
         InternalIterator* iter, SequenceNumber s, bool arena_mode,
106
         uint64_t max_sequential_skip_in_iterations, uint64_t version_number,
107
         const Slice* iterate_upper_bound = nullptr,
108
         bool prefix_same_as_start = false, bool pin_data = false)
109 110
      : arena_mode_(arena_mode),
        env_(env),
111
        logger_(ioptions.info_log),
J
jorlow@chromium.org 已提交
112
        user_comparator_(cmp),
113
        merge_operator_(ioptions.merge_operator),
J
jorlow@chromium.org 已提交
114 115
        iter_(iter),
        sequence_(s),
J
jorlow@chromium.org 已提交
116
        direction_(kForward),
117
        valid_(false),
118
        current_entry_is_merged_(false),
A
Aaron Gao 已提交
119
        prefix_is_saved_(false),
120
        statistics_(ioptions.statistics),
121
        version_number_(version_number),
122
        iterate_upper_bound_(iterate_upper_bound),
123
        prefix_same_as_start_(prefix_same_as_start),
124
        pin_thru_lifetime_(pin_data) {
L
Lei Jin 已提交
125
    RecordTick(statistics_, NO_ITERATORS);
126 127
    prefix_extractor_ = ioptions.prefix_extractor;
    max_skip_ = max_sequential_skip_in_iterations;
128 129 130 131 132 133
    if (pin_thru_lifetime_) {
      pinned_iters_mgr_.StartPinning();
    }
    if (iter_) {
      iter_->SetPinnedItersMgr(&pinned_iters_mgr_);
    }
J
jorlow@chromium.org 已提交
134 135
  }
  virtual ~DBIter() {
136
    // Release pinned data if any
137 138 139
    if (pinned_iters_mgr_.PinningEnabled()) {
      pinned_iters_mgr_.ReleasePinnedData();
    }
140
    RecordTick(statistics_, NO_ITERATORS, -1);
141
    local_stats_.BumpGlobalStatistics(statistics_);
142 143 144
    if (!arena_mode_) {
      delete iter_;
    } else {
S
sdong 已提交
145
      iter_->~InternalIterator();
146 147
    }
  }
S
sdong 已提交
148
  virtual void SetIter(InternalIterator* iter) {
149 150
    assert(iter_ == nullptr);
    iter_ = iter;
151
    iter_->SetPinnedItersMgr(&pinned_iters_mgr_);
J
jorlow@chromium.org 已提交
152
  }
I
Igor Sugak 已提交
153 154
  virtual bool Valid() const override { return valid_; }
  virtual Slice key() const override {
J
jorlow@chromium.org 已提交
155
    assert(valid_);
156
    return saved_key_.GetKey();
J
jorlow@chromium.org 已提交
157
  }
I
Igor Sugak 已提交
158
  virtual Slice value() const override {
J
jorlow@chromium.org 已提交
159
    assert(valid_);
160
    if (current_entry_is_merged_) {
161 162 163
      // If pinned_value_ is set then the result of merge operator is one of
      // the merge operands and we should return it.
      return pinned_value_.data() ? pinned_value_ : saved_value_;
164 165 166 167 168
    } else if (direction_ == kReverse) {
      return pinned_value_;
    } else {
      return iter_->value();
    }
J
jorlow@chromium.org 已提交
169
  }
I
Igor Sugak 已提交
170
  virtual Status status() const override {
J
jorlow@chromium.org 已提交
171 172 173 174 175 176
    if (status_.ok()) {
      return iter_->status();
    } else {
      return status_;
    }
  }
177 178 179 180 181 182

  virtual Status GetProperty(std::string prop_name,
                             std::string* prop) override {
    if (prop == nullptr) {
      return Status::InvalidArgument("prop is nullptr");
    }
183
    if (prop_name == "rocksdb.iterator.super-version-number") {
184 185 186 187 188 189
      // First try to pass the value returned from inner iterator.
      if (!iter_->GetProperty(prop_name, prop).ok()) {
        *prop = ToString(version_number_);
      }
      return Status::OK();
    } else if (prop_name == "rocksdb.iterator.is-key-pinned") {
190
      if (valid_) {
191
        *prop = (pin_thru_lifetime_ && saved_key_.IsKeyPinned()) ? "1" : "0";
192 193 194 195 196 197
      } else {
        *prop = "Iterator is not valid.";
      }
      return Status::OK();
    }
    return Status::InvalidArgument("Undentified property.");
198
  }
J
jorlow@chromium.org 已提交
199

I
Igor Sugak 已提交
200 201 202 203 204
  virtual void Next() override;
  virtual void Prev() override;
  virtual void Seek(const Slice& target) override;
  virtual void SeekToFirst() override;
  virtual void SeekToLast() override;
J
jorlow@chromium.org 已提交
205

J
jorlow@chromium.org 已提交
206
 private:
A
Aaron Gao 已提交
207
  void ReverseToForward();
208
  void ReverseToBackward();
S
Stanislau Hlebik 已提交
209 210 211 212 213 214
  void PrevInternal();
  void FindParseableKey(ParsedInternalKey* ikey, Direction direction);
  bool FindValueForCurrentKey();
  bool FindValueForCurrentKeyUsingSeek();
  void FindPrevUserKey();
  void FindNextUserKey();
215 216
  inline void FindNextUserEntry(bool skipping, bool prefix_check);
  void FindNextUserEntryInternal(bool skipping, bool prefix_check);
J
jorlow@chromium.org 已提交
217
  bool ParseKey(ParsedInternalKey* key);
218
  void MergeValuesNewToOld();
J
jorlow@chromium.org 已提交
219

220 221 222 223 224 225 226 227 228 229
  // Temporarily pin the blocks that we encounter until ReleaseTempPinnedData()
  // is called
  void TempPinData() {
    if (!pin_thru_lifetime_) {
      pinned_iters_mgr_.StartPinning();
    }
  }

  // Release blocks pinned by TempPinData()
  void ReleaseTempPinnedData() {
230 231
    if (!pin_thru_lifetime_ && pinned_iters_mgr_.PinningEnabled()) {
      pinned_iters_mgr_.ReleasePinnedData();
232 233 234
    }
  }

J
jorlow@chromium.org 已提交
235 236 237 238 239 240 241 242 243
  inline void ClearSavedValue() {
    if (saved_value_.capacity() > 1048576) {
      std::string empty;
      swap(empty, saved_value_);
    } else {
      saved_value_.clear();
    }
  }

244
  const SliceTransform* prefix_extractor_;
245
  bool arena_mode_;
J
jorlow@chromium.org 已提交
246
  Env* const env_;
I
Igor Canadi 已提交
247
  Logger* logger_;
J
jorlow@chromium.org 已提交
248
  const Comparator* const user_comparator_;
249
  const MergeOperator* const merge_operator_;
S
sdong 已提交
250
  InternalIterator* iter_;
J
jorlow@chromium.org 已提交
251
  SequenceNumber const sequence_;
J
jorlow@chromium.org 已提交
252

J
jorlow@chromium.org 已提交
253
  Status status_;
S
Stanislau Hlebik 已提交
254 255
  IterKey saved_key_;
  std::string saved_value_;
256
  Slice pinned_value_;
J
jorlow@chromium.org 已提交
257
  Direction direction_;
J
jorlow@chromium.org 已提交
258
  bool valid_;
259
  bool current_entry_is_merged_;
A
Aaron Gao 已提交
260 261
  // for prefix seek mode to support prev()
  bool prefix_is_saved_;
262
  Statistics* statistics_;
263
  uint64_t max_skip_;
264
  uint64_t version_number_;
265
  const Slice* iterate_upper_bound_;
266 267 268
  IterKey prefix_start_buf_;
  Slice prefix_start_key_;
  const bool prefix_same_as_start_;
269 270 271
  // Means that we will pin all data blocks we read as long the Iterator
  // is not deleted, will be true if ReadOptions::pin_data is true
  const bool pin_thru_lifetime_;
272
  // List of operands for merge operator.
273
  MergeContext merge_context_;
274
  LocalStatistics local_stats_;
275
  PinnedIteratorsManager pinned_iters_mgr_;
J
jorlow@chromium.org 已提交
276 277 278 279 280 281 282 283 284

  // No copying allowed
  DBIter(const DBIter&);
  void operator=(const DBIter&);
};

inline bool DBIter::ParseKey(ParsedInternalKey* ikey) {
  if (!ParseInternalKey(iter_->key(), ikey)) {
    status_ = Status::Corruption("corrupted internal key in DBIter");
285 286
    Log(InfoLogLevel::ERROR_LEVEL,
        logger_, "corrupted internal key in DBIter: %s",
287
        iter_->key().ToString(true).c_str());
J
jorlow@chromium.org 已提交
288 289 290 291 292 293
    return false;
  } else {
    return true;
  }
}

J
jorlow@chromium.org 已提交
294 295 296
void DBIter::Next() {
  assert(valid_);

297 298
  // Release temporarily pinned blocks from last operation
  ReleaseTempPinnedData();
S
Stanislau Hlebik 已提交
299
  if (direction_ == kReverse) {
A
Aaron Gao 已提交
300
    ReverseToForward();
301 302 303 304 305 306 307
  } else if (iter_->Valid() && !current_entry_is_merged_) {
    // If the current value is not a merge, the iter position is the
    // current key, which is already returned. We can safely issue a
    // Next() without checking the current key.
    // If the current key is a merge, very likely iter already points
    // to the next internal position.
    iter_->Next();
308
    PERF_COUNTER_ADD(internal_key_skipped_count, 1);
J
jorlow@chromium.org 已提交
309
  }
J
jorlow@chromium.org 已提交
310

311 312 313
  if (statistics_ != nullptr) {
    local_stats_.next_count_++;
  }
314 315
  // Now we point to the next internal position, for both of merge and
  // not merge cases.
316 317 318 319
  if (!iter_->Valid()) {
    valid_ = false;
    return;
  }
320
  FindNextUserEntry(true /* skipping the current user key */, prefix_same_as_start_);
321 322 323 324
  if (statistics_ != nullptr && valid_) {
    local_stats_.next_found_count_++;
    local_stats_.bytes_read_ += (key().size() + value().size());
  }
J
jorlow@chromium.org 已提交
325 326
}

327 328 329 330 331 332 333 334
// PRE: saved_key_ has the current user key if skipping
// POST: saved_key_ should have the next user key if valid_,
//       if the current entry is a result of merge
//           current_entry_is_merged_ => true
//           saved_value_             => the merged value
//
// NOTE: In between, saved_key_ can point to a user key that has
//       a delete marker
335 336 337 338 339 340
//
// The prefix_check parameter controls whether we check the iterated
// keys against the prefix of the seeked key. Set to false when
// performing a seek without a key (e.g. SeekToFirst). Set to
// prefix_same_as_start_ for other iterations.
inline void DBIter::FindNextUserEntry(bool skipping, bool prefix_check) {
341
  PERF_TIMER_GUARD(find_next_user_entry_time);
342
  FindNextUserEntryInternal(skipping, prefix_check);
343 344 345
}

// Actual implementation of DBIter::FindNextUserEntry()
346
void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) {
J
jorlow@chromium.org 已提交
347 348 349
  // Loop until we hit an acceptable entry to yield
  assert(iter_->Valid());
  assert(direction_ == kForward);
350
  current_entry_is_merged_ = false;
351
  uint64_t num_skipped = 0;
J
jorlow@chromium.org 已提交
352
  do {
J
jorlow@chromium.org 已提交
353
    ParsedInternalKey ikey;
354 355 356

    if (ParseKey(&ikey)) {
      if (iterate_upper_bound_ != nullptr &&
357
          user_comparator_->Compare(ikey.user_key, *iterate_upper_bound_) >= 0) {
358 359 360
        break;
      }

361 362 363 364 365
      if (prefix_extractor_ && prefix_check &&
          prefix_extractor_->Transform(ikey.user_key).compare(prefix_start_key_) != 0) {
        break;
      }

366 367 368 369 370 371 372 373
      if (ikey.sequence <= sequence_) {
        if (skipping &&
           user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) <= 0) {
          num_skipped++;  // skip this entry
          PERF_COUNTER_ADD(internal_key_skipped_count, 1);
        } else {
          switch (ikey.type) {
            case kTypeDeletion:
A
Andres Noetzli 已提交
374
            case kTypeSingleDeletion:
375 376
              // Arrange to skip all upcoming entries for this key since
              // they are hidden by this deletion.
377 378 379
              saved_key_.SetKey(
                  ikey.user_key,
                  !iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
380 381 382 383 384 385
              skipping = true;
              num_skipped = 0;
              PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
              break;
            case kTypeValue:
              valid_ = true;
386 387 388
              saved_key_.SetKey(
                  ikey.user_key,
                  !iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
389 390 391
              return;
            case kTypeMerge:
              // By now, we are sure the current ikey is going to yield a value
392 393 394
              saved_key_.SetKey(
                  ikey.user_key,
                  !iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
395 396 397 398 399 400 401 402
              current_entry_is_merged_ = true;
              valid_ = true;
              MergeValuesNewToOld();  // Go to a different state machine
              return;
            default:
              assert(false);
              break;
          }
403
        }
J
jorlow@chromium.org 已提交
404
      }
J
jorlow@chromium.org 已提交
405
    }
406 407
    // If we have sequentially iterated via numerous keys and still not
    // found the next user-key, then it is better to seek so that we can
C
clark.kang 已提交
408
    // avoid too many key comparisons. We seek to the last occurrence of
409 410
    // our current key by looking for sequence number 0 and type deletion
    // (the smallest type).
411 412 413
    if (skipping && num_skipped > max_skip_) {
      num_skipped = 0;
      std::string last_key;
414
      AppendInternalKey(&last_key, ParsedInternalKey(saved_key_.GetKey(), 0,
415
                                                     kTypeDeletion));
416 417 418 419 420
      iter_->Seek(last_key);
      RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
    } else {
      iter_->Next();
    }
J
jorlow@chromium.org 已提交
421 422
  } while (iter_->Valid());
  valid_ = false;
J
jorlow@chromium.org 已提交
423 424
}

425 426 427 428 429 430 431
// Merge values of the same user key starting from the current iter_ position
// Scan from the newer entries to older entries.
// PRE: iter_->key() points to the first merge type entry
//      saved_key_ stores the user key
// POST: saved_value_ has the merged value for the user key
//       iter_ points to the next entry (or invalid)
void DBIter::MergeValuesNewToOld() {
432
  if (!merge_operator_) {
433 434
    Log(InfoLogLevel::ERROR_LEVEL,
        logger_, "Options::merge_operator is null.");
435
    status_ = Status::InvalidArgument("merge_operator_ must be set.");
436 437
    valid_ = false;
    return;
D
Deon Nicholas 已提交
438
  }
439

440 441
  // Temporarily pin the blocks that hold merge operands
  TempPinData();
442
  merge_context_.Clear();
443
  // Start the merge process by pushing the first operand
444 445
  merge_context_.PushOperand(iter_->value(),
                             iter_->IsValuePinned() /* operand_pinned */);
446 447 448 449 450 451 452 453

  ParsedInternalKey ikey;
  for (iter_->Next(); iter_->Valid(); iter_->Next()) {
    if (!ParseKey(&ikey)) {
      // skip corrupted key
      continue;
    }

454
    if (!user_comparator_->Equal(ikey.user_key, saved_key_.GetKey())) {
455 456
      // hit the next user key, stop right here
      break;
A
Andres Noetzli 已提交
457
    } else if (kTypeDeletion == ikey.type || kTypeSingleDeletion == ikey.type) {
458 459 460 461
      // hit a delete with the same user key, stop right here
      // iter_ is positioned after delete
      iter_->Next();
      break;
A
Andres Noetzli 已提交
462
    } else if (kTypeValue == ikey.type) {
463 464 465
      // hit a put, merge the put value with operands and store the
      // final result in saved_value_. We are done!
      // ignore corruption if there is any.
I
Igor Canadi 已提交
466
      const Slice val = iter_->value();
467 468
      MergeHelper::TimedFullMerge(merge_operator_, ikey.user_key, &val,
                                  merge_context_.GetOperands(), &saved_value_,
469
                                  logger_, statistics_, env_, &pinned_value_);
470 471 472
      // iter_ is positioned after put
      iter_->Next();
      return;
A
Andres Noetzli 已提交
473
    } else if (kTypeMerge == ikey.type) {
474 475
      // hit a merge, add the value as an operand and run associative merge.
      // when complete, add result to operands and continue.
476 477
      merge_context_.PushOperand(iter_->value(),
                                 iter_->IsValuePinned() /* operand_pinned */);
A
Andres Noetzli 已提交
478 479
    } else {
      assert(false);
480 481 482
    }
  }

483 484 485 486 487 488
  // we either exhausted all internal keys under this user key, or hit
  // a deletion marker.
  // feed null as the existing value to the merge operator, such that
  // client can differentiate this scenario and do things accordingly.
  MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(), nullptr,
                              merge_context_.GetOperands(), &saved_value_,
489
                              logger_, statistics_, env_, &pinned_value_);
490 491
}

J
jorlow@chromium.org 已提交
492 493
void DBIter::Prev() {
  assert(valid_);
494
  ReleaseTempPinnedData();
S
Stanislau Hlebik 已提交
495
  if (direction_ == kForward) {
496
    ReverseToBackward();
S
Stanislau Hlebik 已提交
497 498
  }
  PrevInternal();
M
Manuel Ung 已提交
499
  if (statistics_ != nullptr) {
500
    local_stats_.prev_count_++;
M
Manuel Ung 已提交
501
    if (valid_) {
502 503
      local_stats_.prev_found_count_++;
      local_stats_.bytes_read_ += (key().size() + value().size());
M
Manuel Ung 已提交
504 505
    }
  }
506 507
  if (valid_ && prefix_extractor_ && prefix_same_as_start_ &&
      prefix_extractor_->Transform(saved_key_.GetKey())
508
              .compare(prefix_start_key_) != 0) {
509 510
    valid_ = false;
  }
S
Stanislau Hlebik 已提交
511
}
J
jorlow@chromium.org 已提交
512

A
Aaron Gao 已提交
513 514 515 516 517 518 519 520
void DBIter::ReverseToForward() {
  FindNextUserKey();
  direction_ = kForward;
  if (!iter_->Valid()) {
    iter_->SeekToFirst();
  }
}

521
void DBIter::ReverseToBackward() {
A
Aaron Gao 已提交
522 523 524 525 526
  if (prefix_extractor_ != nullptr) {
    Slice prefix = prefix_extractor_->Transform(key());
    iter_->ResetPrefix(&prefix);
    prefix_is_saved_ = true;
  }
527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552
  if (current_entry_is_merged_) {
    // Not placed in the same key. Need to call Prev() until finding the
    // previous key.
    if (!iter_->Valid()) {
      iter_->SeekToLast();
    }
    ParsedInternalKey ikey;
    FindParseableKey(&ikey, kReverse);
    while (iter_->Valid() &&
           user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) > 0) {
      iter_->Prev();
      FindParseableKey(&ikey, kReverse);
    }
  }
#ifndef NDEBUG
  if (iter_->Valid()) {
    ParsedInternalKey ikey;
    assert(ParseKey(&ikey));
    assert(user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) <= 0);
  }
#endif

  FindPrevUserKey();
  direction_ = kReverse;
}

S
Stanislau Hlebik 已提交
553 554 555 556
void DBIter::PrevInternal() {
  if (!iter_->Valid()) {
    valid_ = false;
    return;
557 558
  }

S
Stanislau Hlebik 已提交
559 560 561
  ParsedInternalKey ikey;

  while (iter_->Valid()) {
562
    saved_key_.SetKey(ExtractUserKey(iter_->key()),
563
                      !iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
S
Stanislau Hlebik 已提交
564 565
    if (FindValueForCurrentKey()) {
      valid_ = true;
J
jorlow@chromium.org 已提交
566 567 568
      if (!iter_->Valid()) {
        return;
      }
S
Stanislau Hlebik 已提交
569
      FindParseableKey(&ikey, kReverse);
570
      if (user_comparator_->Equal(ikey.user_key, saved_key_.GetKey())) {
S
Stanislau Hlebik 已提交
571
        FindPrevUserKey();
J
jorlow@chromium.org 已提交
572
      }
S
Stanislau Hlebik 已提交
573
      return;
J
jorlow@chromium.org 已提交
574
    }
S
Stanislau Hlebik 已提交
575 576 577 578
    if (!iter_->Valid()) {
      break;
    }
    FindParseableKey(&ikey, kReverse);
579
    if (user_comparator_->Equal(ikey.user_key, saved_key_.GetKey())) {
S
Stanislau Hlebik 已提交
580 581 582 583 584 585
      FindPrevUserKey();
    }
  }
  // We haven't found any key - iterator is not valid
  assert(!iter_->Valid());
  valid_ = false;
J
jorlow@chromium.org 已提交
586 587
}

S
Stanislau Hlebik 已提交
588
// This function checks, if the entry with biggest sequence_number <= sequence_
A
Andres Noetzli 已提交
589 590
// is non kTypeDeletion or kTypeSingleDeletion. If it's not, we save value in
// saved_value_
S
Stanislau Hlebik 已提交
591 592
bool DBIter::FindValueForCurrentKey() {
  assert(iter_->Valid());
593
  merge_context_.Clear();
594
  current_entry_is_merged_ = false;
A
Andres Noetzli 已提交
595 596
  // last entry before merge (could be kTypeDeletion, kTypeSingleDeletion or
  // kTypeValue)
S
Stanislau Hlebik 已提交
597 598
  ValueType last_not_merge_type = kTypeDeletion;
  ValueType last_key_entry_type = kTypeDeletion;
J
jorlow@chromium.org 已提交
599

S
Stanislau Hlebik 已提交
600 601 602
  ParsedInternalKey ikey;
  FindParseableKey(&ikey, kReverse);

603 604 605
  // Temporarily pin blocks that hold (merge operands / the value)
  ReleaseTempPinnedData();
  TempPinData();
S
Stanislau Hlebik 已提交
606 607
  size_t num_skipped = 0;
  while (iter_->Valid() && ikey.sequence <= sequence_ &&
608
         user_comparator_->Equal(ikey.user_key, saved_key_.GetKey())) {
S
Stanislau Hlebik 已提交
609 610 611 612 613 614 615 616
    // We iterate too much: let's use Seek() to avoid too much key comparisons
    if (num_skipped >= max_skip_) {
      return FindValueForCurrentKeyUsingSeek();
    }

    last_key_entry_type = ikey.type;
    switch (last_key_entry_type) {
      case kTypeValue:
617
        merge_context_.Clear();
618
        assert(iter_->IsValuePinned());
619
        pinned_value_ = iter_->value();
S
Stanislau Hlebik 已提交
620 621 622
        last_not_merge_type = kTypeValue;
        break;
      case kTypeDeletion:
A
Andres Noetzli 已提交
623
      case kTypeSingleDeletion:
624
        merge_context_.Clear();
A
Andres Noetzli 已提交
625
        last_not_merge_type = last_key_entry_type;
626
        PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
S
Stanislau Hlebik 已提交
627 628
        break;
      case kTypeMerge:
629
        assert(merge_operator_ != nullptr);
630 631
        merge_context_.PushOperandBack(
            iter_->value(), iter_->IsValuePinned() /* operand_pinned */);
S
Stanislau Hlebik 已提交
632 633 634 635 636
        break;
      default:
        assert(false);
    }

637
    PERF_COUNTER_ADD(internal_key_skipped_count, 1);
638
    assert(user_comparator_->Equal(ikey.user_key, saved_key_.GetKey()));
S
Stanislau Hlebik 已提交
639 640 641 642 643 644 645
    iter_->Prev();
    ++num_skipped;
    FindParseableKey(&ikey, kReverse);
  }

  switch (last_key_entry_type) {
    case kTypeDeletion:
A
Andres Noetzli 已提交
646
    case kTypeSingleDeletion:
S
Stanislau Hlebik 已提交
647 648 649
      valid_ = false;
      return false;
    case kTypeMerge:
650
      current_entry_is_merged_ = true;
S
Stanislau Hlebik 已提交
651
      if (last_not_merge_type == kTypeDeletion) {
652 653
        MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(),
                                    nullptr, merge_context_.GetOperands(),
654 655
                                    &saved_value_, logger_, statistics_, env_,
                                    &pinned_value_);
656
      } else {
S
Stanislau Hlebik 已提交
657
        assert(last_not_merge_type == kTypeValue);
658 659 660
        MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(),
                                    &pinned_value_,
                                    merge_context_.GetOperands(), &saved_value_,
661
                                    logger_, statistics_, env_, &pinned_value_);
662
      }
S
Stanislau Hlebik 已提交
663 664 665 666 667 668 669
      break;
    case kTypeValue:
      // do nothing - we've already has value in saved_value_
      break;
    default:
      assert(false);
      break;
J
jorlow@chromium.org 已提交
670
  }
S
Stanislau Hlebik 已提交
671 672 673
  valid_ = true;
  return true;
}
J
jorlow@chromium.org 已提交
674

S
Stanislau Hlebik 已提交
675 676 677
// This function is used in FindValueForCurrentKey.
// We use Seek() function instead of Prev() to find necessary value
bool DBIter::FindValueForCurrentKeyUsingSeek() {
678 679 680
  // FindValueForCurrentKey will enable pinning before calling
  // FindValueForCurrentKeyUsingSeek()
  assert(pinned_iters_mgr_.PinningEnabled());
S
Stanislau Hlebik 已提交
681 682 683 684 685 686 687 688 689 690
  std::string last_key;
  AppendInternalKey(&last_key, ParsedInternalKey(saved_key_.GetKey(), sequence_,
                                                 kValueTypeForSeek));
  iter_->Seek(last_key);
  RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);

  // assume there is at least one parseable key for this user key
  ParsedInternalKey ikey;
  FindParseableKey(&ikey, kForward);

A
Andres Noetzli 已提交
691 692
  if (ikey.type == kTypeValue || ikey.type == kTypeDeletion ||
      ikey.type == kTypeSingleDeletion) {
S
Stanislau Hlebik 已提交
693
    if (ikey.type == kTypeValue) {
694
      assert(iter_->IsValuePinned());
695
      pinned_value_ = iter_->value();
S
Stanislau Hlebik 已提交
696 697 698
      valid_ = true;
      return true;
    }
J
jorlow@chromium.org 已提交
699
    valid_ = false;
S
Stanislau Hlebik 已提交
700 701 702 703 704
    return false;
  }

  // kTypeMerge. We need to collect all kTypeMerge values and save them
  // in operands
705
  current_entry_is_merged_ = true;
706
  merge_context_.Clear();
S
Stanislau Hlebik 已提交
707
  while (iter_->Valid() &&
708
         user_comparator_->Equal(ikey.user_key, saved_key_.GetKey()) &&
S
Stanislau Hlebik 已提交
709
         ikey.type == kTypeMerge) {
710 711
    merge_context_.PushOperand(iter_->value(),
                               iter_->IsValuePinned() /* operand_pinned */);
S
Stanislau Hlebik 已提交
712 713 714 715 716
    iter_->Next();
    FindParseableKey(&ikey, kForward);
  }

  if (!iter_->Valid() ||
717
      !user_comparator_->Equal(ikey.user_key, saved_key_.GetKey()) ||
A
Andres Noetzli 已提交
718
      ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion) {
719 720
    MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(), nullptr,
                                merge_context_.GetOperands(), &saved_value_,
721
                                logger_, statistics_, env_, &pinned_value_);
S
Stanislau Hlebik 已提交
722 723
    // Make iter_ valid and point to saved_key_
    if (!iter_->Valid() ||
724
        !user_comparator_->Equal(ikey.user_key, saved_key_.GetKey())) {
S
Stanislau Hlebik 已提交
725 726 727
      iter_->Seek(last_key);
      RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
    }
J
jorlow@chromium.org 已提交
728
    valid_ = true;
S
Stanislau Hlebik 已提交
729 730 731
    return true;
  }

I
Igor Canadi 已提交
732
  const Slice& val = iter_->value();
733 734
  MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(), &val,
                              merge_context_.GetOperands(), &saved_value_,
735
                              logger_, statistics_, env_, &pinned_value_);
S
Stanislau Hlebik 已提交
736 737 738 739 740 741 742 743 744
  valid_ = true;
  return true;
}

// Used in Next to change directions
// Go to next user key
// Don't use Seek(),
// because next user key will be very close
void DBIter::FindNextUserKey() {
A
Aaron Gao 已提交
745 746 747 748 749
  if (prefix_extractor_ != nullptr) {
    Slice prefix = prefix_extractor_->Transform(key());
    iter_->ResetPrefix(&prefix);
    prefix_is_saved_ = true;
  }
S
Stanislau Hlebik 已提交
750 751 752 753 754 755
  if (!iter_->Valid()) {
    return;
  }
  ParsedInternalKey ikey;
  FindParseableKey(&ikey, kForward);
  while (iter_->Valid() &&
756
         !user_comparator_->Equal(ikey.user_key, saved_key_.GetKey())) {
S
Stanislau Hlebik 已提交
757 758 759 760 761 762 763 764 765 766 767 768 769
    iter_->Next();
    FindParseableKey(&ikey, kForward);
  }
}

// Go to previous user_key
void DBIter::FindPrevUserKey() {
  if (!iter_->Valid()) {
    return;
  }
  size_t num_skipped = 0;
  ParsedInternalKey ikey;
  FindParseableKey(&ikey, kReverse);
770 771 772 773 774 775 776 777 778 779 780 781 782 783 784
  int cmp;
  while (iter_->Valid() && ((cmp = user_comparator_->Compare(
                                 ikey.user_key, saved_key_.GetKey())) == 0 ||
                            (cmp > 0 && ikey.sequence > sequence_))) {
    if (cmp == 0) {
      if (num_skipped >= max_skip_) {
        num_skipped = 0;
        IterKey last_key;
        last_key.SetInternalKey(ParsedInternalKey(
            saved_key_.GetKey(), kMaxSequenceNumber, kValueTypeForSeek));
        iter_->Seek(last_key.GetKey());
        RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
      } else {
        ++num_skipped;
      }
S
Stanislau Hlebik 已提交
785 786 787 788 789 790 791 792 793 794 795 796 797 798
    }
    iter_->Prev();
    FindParseableKey(&ikey, kReverse);
  }
}

// Skip all unparseable keys
void DBIter::FindParseableKey(ParsedInternalKey* ikey, Direction direction) {
  while (iter_->Valid() && !ParseKey(ikey)) {
    if (direction == kReverse) {
      iter_->Prev();
    } else {
      iter_->Next();
    }
J
jorlow@chromium.org 已提交
799 800
  }
}
J
jorlow@chromium.org 已提交
801

J
jorlow@chromium.org 已提交
802
void DBIter::Seek(const Slice& target) {
L
Lei Jin 已提交
803
  StopWatch sw(env_, statistics_, DB_SEEK);
804
  ReleaseTempPinnedData();
805 806 807
  saved_key_.Clear();
  // now savved_key is used to store internal key.
  saved_key_.SetInternalKey(target, sequence_);
808 809 810 811 812

  {
    PERF_TIMER_GUARD(seek_internal_seek_time);
    iter_->Seek(saved_key_.GetKey());
  }
M
Manuel Ung 已提交
813
  RecordTick(statistics_, NUMBER_DB_SEEK);
J
jorlow@chromium.org 已提交
814
  if (iter_->Valid()) {
815 816 817
    if (prefix_extractor_ && prefix_same_as_start_) {
      prefix_start_key_ = prefix_extractor_->Transform(target);
    }
818 819
    direction_ = kForward;
    ClearSavedValue();
820 821 822 823
    FindNextUserEntry(false /* not skipping */, prefix_same_as_start_);
    if (!valid_) {
      prefix_start_key_.clear();
    }
M
Manuel Ung 已提交
824 825 826 827 828 829
    if (statistics_ != nullptr) {
      if (valid_) {
        RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
        RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
      }
    }
J
jorlow@chromium.org 已提交
830 831 832
  } else {
    valid_ = false;
  }
A
Aaron Gao 已提交
833 834 835 836 837
  // Need to reset prefix if change direction
  if (prefix_is_saved_) {
    iter_->ResetPrefix();
    prefix_is_saved_ = false;
  }
838
  if (valid_ && prefix_extractor_ && prefix_same_as_start_) {
839 840
    prefix_start_buf_.SetKey(prefix_start_key_);
    prefix_start_key_ = prefix_start_buf_.GetKey();
841
  }
J
jorlow@chromium.org 已提交
842
}
J
jorlow@chromium.org 已提交
843

J
jorlow@chromium.org 已提交
844
void DBIter::SeekToFirst() {
S
Stanislau Hlebik 已提交
845
  // Don't use iter_::Seek() if we set a prefix extractor
846
  // because prefix seek will be used.
847
  if (prefix_extractor_ != nullptr) {
S
Stanislau Hlebik 已提交
848 849
    max_skip_ = std::numeric_limits<uint64_t>::max();
  }
J
jorlow@chromium.org 已提交
850
  direction_ = kForward;
851
  ReleaseTempPinnedData();
J
jorlow@chromium.org 已提交
852
  ClearSavedValue();
853 854 855 856 857 858

  {
    PERF_TIMER_GUARD(seek_internal_seek_time);
    iter_->SeekToFirst();
  }

M
Manuel Ung 已提交
859
  RecordTick(statistics_, NUMBER_DB_SEEK);
J
jorlow@chromium.org 已提交
860
  if (iter_->Valid()) {
861
    FindNextUserEntry(false /* not skipping */, false /* no prefix check */);
M
Manuel Ung 已提交
862 863 864 865 866 867
    if (statistics_ != nullptr) {
      if (valid_) {
        RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
        RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
      }
    }
J
jorlow@chromium.org 已提交
868 869
  } else {
    valid_ = false;
J
jorlow@chromium.org 已提交
870
  }
871
  if (valid_ && prefix_extractor_ && prefix_same_as_start_) {
872 873
    prefix_start_buf_.SetKey(prefix_extractor_->Transform(saved_key_.GetKey()));
    prefix_start_key_ = prefix_start_buf_.GetKey();
874
  }
J
jorlow@chromium.org 已提交
875 876
}

J
jorlow@chromium.org 已提交
877
void DBIter::SeekToLast() {
S
Stanislau Hlebik 已提交
878
  // Don't use iter_::Seek() if we set a prefix extractor
879
  // because prefix seek will be used.
880
  if (prefix_extractor_ != nullptr) {
S
Stanislau Hlebik 已提交
881 882
    max_skip_ = std::numeric_limits<uint64_t>::max();
  }
J
jorlow@chromium.org 已提交
883
  direction_ = kReverse;
884
  ReleaseTempPinnedData();
J
jorlow@chromium.org 已提交
885
  ClearSavedValue();
886 887 888 889 890

  {
    PERF_TIMER_GUARD(seek_internal_seek_time);
    iter_->SeekToLast();
  }
891 892 893 894
  // When the iterate_upper_bound is set to a value,
  // it will seek to the last key before the
  // ReadOptions.iterate_upper_bound
  if (iter_->Valid() && iterate_upper_bound_ != nullptr) {
895
    saved_key_.SetKey(*iterate_upper_bound_, false /* copy */);
896 897 898 899 900 901
    std::string last_key;
    AppendInternalKey(&last_key,
                      ParsedInternalKey(saved_key_.GetKey(), kMaxSequenceNumber,
                                        kValueTypeForSeek));

    iter_->Seek(last_key);
S
Stanislau Hlebik 已提交
902

903 904 905 906 907 908 909 910 911 912
    if (!iter_->Valid()) {
      iter_->SeekToLast();
    } else {
      iter_->Prev();
      if (!iter_->Valid()) {
        valid_ = false;
        return;
      }
    }
  }
S
Stanislau Hlebik 已提交
913
  PrevInternal();
M
Manuel Ung 已提交
914 915 916 917 918 919 920
  if (statistics_ != nullptr) {
    RecordTick(statistics_, NUMBER_DB_SEEK);
    if (valid_) {
      RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
      RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
    }
  }
921
  if (valid_ && prefix_extractor_ && prefix_same_as_start_) {
922 923
    prefix_start_buf_.SetKey(prefix_extractor_->Transform(saved_key_.GetKey()));
    prefix_start_key_ = prefix_start_buf_.GetKey();
924
  }
J
jorlow@chromium.org 已提交
925 926
}

927
Iterator* NewDBIterator(Env* env, const ImmutableCFOptions& ioptions,
928
                        const Comparator* user_key_comparator,
S
sdong 已提交
929
                        InternalIterator* internal_iter,
930
                        const SequenceNumber& sequence,
931
                        uint64_t max_sequential_skip_in_iterations,
932
                        uint64_t version_number,
933
                        const Slice* iterate_upper_bound,
934 935 936
                        bool prefix_same_as_start, bool pin_data) {
  DBIter* db_iter =
      new DBIter(env, ioptions, user_key_comparator, internal_iter, sequence,
937
                 false, max_sequential_skip_in_iterations, version_number,
938
                 iterate_upper_bound, prefix_same_as_start, pin_data);
939
  return db_iter;
940 941
}

I
Igor Canadi 已提交
942
ArenaWrappedDBIter::~ArenaWrappedDBIter() { db_iter_->~DBIter(); }
943 944 945

void ArenaWrappedDBIter::SetDBIter(DBIter* iter) { db_iter_ = iter; }

S
sdong 已提交
946
void ArenaWrappedDBIter::SetIterUnderDBIter(InternalIterator* iter) {
947 948 949 950 951 952 953 954 955 956 957 958 959 960
  static_cast<DBIter*>(db_iter_)->SetIter(iter);
}

inline bool ArenaWrappedDBIter::Valid() const { return db_iter_->Valid(); }
inline void ArenaWrappedDBIter::SeekToFirst() { db_iter_->SeekToFirst(); }
inline void ArenaWrappedDBIter::SeekToLast() { db_iter_->SeekToLast(); }
inline void ArenaWrappedDBIter::Seek(const Slice& target) {
  db_iter_->Seek(target);
}
inline void ArenaWrappedDBIter::Next() { db_iter_->Next(); }
inline void ArenaWrappedDBIter::Prev() { db_iter_->Prev(); }
inline Slice ArenaWrappedDBIter::key() const { return db_iter_->key(); }
inline Slice ArenaWrappedDBIter::value() const { return db_iter_->value(); }
inline Status ArenaWrappedDBIter::status() const { return db_iter_->status(); }
961 962 963 964
inline Status ArenaWrappedDBIter::GetProperty(std::string prop_name,
                                              std::string* prop) {
  return db_iter_->GetProperty(prop_name, prop);
}
965 966 967 968
void ArenaWrappedDBIter::RegisterCleanup(CleanupFunction function, void* arg1,
                                         void* arg2) {
  db_iter_->RegisterCleanup(function, arg1, arg2);
}
J
jorlow@chromium.org 已提交
969

970
ArenaWrappedDBIter* NewArenaWrappedDbIterator(
971
    Env* env, const ImmutableCFOptions& ioptions,
972
    const Comparator* user_key_comparator, const SequenceNumber& sequence,
973
    uint64_t max_sequential_skip_in_iterations, uint64_t version_number,
974 975
    const Slice* iterate_upper_bound, bool prefix_same_as_start,
    bool pin_data) {
976 977 978
  ArenaWrappedDBIter* iter = new ArenaWrappedDBIter();
  Arena* arena = iter->GetArena();
  auto mem = arena->AllocateAligned(sizeof(DBIter));
979 980
  DBIter* db_iter =
      new (mem) DBIter(env, ioptions, user_key_comparator, nullptr, sequence,
981
                       true, max_sequential_skip_in_iterations, version_number,
982
                       iterate_upper_bound, prefix_same_as_start, pin_data);
983

984
  iter->SetDBIter(db_iter);
985

986
  return iter;
J
jorlow@chromium.org 已提交
987 988
}

989
}  // namespace rocksdb