db_iter.cc 33.3 KB
Newer Older
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 3 4 5
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.
//
J
jorlow@chromium.org 已提交
6 7 8 9 10
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include "db/db_iter.h"
11
#include <stdexcept>
12
#include <deque>
S
Stanislau Hlebik 已提交
13
#include <string>
S
Stanislau Hlebik 已提交
14
#include <limits>
J
jorlow@chromium.org 已提交
15 16

#include "db/dbformat.h"
17
#include "db/filename.h"
18
#include "db/merge_context.h"
19
#include "db/merge_helper.h"
20
#include "db/pinned_iterators_manager.h"
S
sdong 已提交
21
#include "port/port.h"
22 23 24
#include "rocksdb/env.h"
#include "rocksdb/iterator.h"
#include "rocksdb/merge_operator.h"
25
#include "rocksdb/options.h"
S
sdong 已提交
26
#include "table/internal_iterator.h"
27
#include "util/arena.h"
J
jorlow@chromium.org 已提交
28 29
#include "util/logging.h"
#include "util/mutexlock.h"
30
#include "util/perf_context_imp.h"
31
#include "util/string_util.h"
J
jorlow@chromium.org 已提交
32

33
namespace rocksdb {
J
jorlow@chromium.org 已提交
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54

#if 0
static void DumpInternalIter(Iterator* iter) {
  for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
    ParsedInternalKey k;
    if (!ParseInternalKey(iter->key(), &k)) {
      fprintf(stderr, "Corrupt '%s'\n", EscapeString(iter->key()).c_str());
    } else {
      fprintf(stderr, "@ '%s'\n", k.DebugString().c_str());
    }
  }
}
#endif

// Memtables and sstables that make the DB representation contain
// (userkey,seq,type) => uservalue entries.  DBIter
// combines multiple entries for the same userkey found in the DB
// representation into a single entry while accounting for sequence
// numbers, deletion markers, overwrites, etc.
class DBIter: public Iterator {
 public:
55
  // The following is grossly complicated. TODO: clean it up
J
jorlow@chromium.org 已提交
56 57 58 59 60 61 62 63 64 65
  // Which direction is the iterator currently moving?
  // (1) When moving forward, the internal iterator is positioned at
  //     the exact entry that yields this->key(), this->value()
  // (2) When moving backwards, the internal iterator is positioned
  //     just before all entries whose user key == this->key().
  enum Direction {
    kForward,
    kReverse
  };

66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
  // LocalStatistics contain Statistics counters that will be aggregated per
  // each iterator instance and then will be sent to the global statistics when
  // the iterator is destroyed.
  //
  // The purpose of this approach is to avoid perf regression happening
  // when multiple threads bump the atomic counters from a DBIter::Next().
  struct LocalStatistics {
    explicit LocalStatistics() { ResetCounters(); }

    void ResetCounters() {
      next_count_ = 0;
      next_found_count_ = 0;
      prev_count_ = 0;
      prev_found_count_ = 0;
      bytes_read_ = 0;
    }

    void BumpGlobalStatistics(Statistics* global_statistics) {
      RecordTick(global_statistics, NUMBER_DB_NEXT, next_count_);
      RecordTick(global_statistics, NUMBER_DB_NEXT_FOUND, next_found_count_);
      RecordTick(global_statistics, NUMBER_DB_PREV, prev_count_);
      RecordTick(global_statistics, NUMBER_DB_PREV_FOUND, prev_found_count_);
      RecordTick(global_statistics, ITER_BYTES_READ, bytes_read_);
      ResetCounters();
    }

    // Map to Tickers::NUMBER_DB_NEXT
    uint64_t next_count_;
    // Map to Tickers::NUMBER_DB_NEXT_FOUND
    uint64_t next_found_count_;
    // Map to Tickers::NUMBER_DB_PREV
    uint64_t prev_count_;
    // Map to Tickers::NUMBER_DB_PREV_FOUND
    uint64_t prev_found_count_;
    // Map to Tickers::ITER_BYTES_READ
    uint64_t bytes_read_;
  };

S
sdong 已提交
104 105
  DBIter(Env* env, const ImmutableCFOptions& ioptions, const Comparator* cmp,
         InternalIterator* iter, SequenceNumber s, bool arena_mode,
106
         uint64_t max_sequential_skip_in_iterations, uint64_t version_number,
107
         const Slice* iterate_upper_bound = nullptr,
108
         bool prefix_same_as_start = false, bool pin_data = false)
109 110
      : arena_mode_(arena_mode),
        env_(env),
111
        logger_(ioptions.info_log),
J
jorlow@chromium.org 已提交
112
        user_comparator_(cmp),
113
        merge_operator_(ioptions.merge_operator),
J
jorlow@chromium.org 已提交
114 115
        iter_(iter),
        sequence_(s),
J
jorlow@chromium.org 已提交
116
        direction_(kForward),
117
        valid_(false),
118
        current_entry_is_merged_(false),
119
        statistics_(ioptions.statistics),
120
        version_number_(version_number),
121
        iterate_upper_bound_(iterate_upper_bound),
122
        prefix_same_as_start_(prefix_same_as_start),
123
        pin_thru_lifetime_(pin_data) {
L
Lei Jin 已提交
124
    RecordTick(statistics_, NO_ITERATORS);
125 126
    prefix_extractor_ = ioptions.prefix_extractor;
    max_skip_ = max_sequential_skip_in_iterations;
127 128 129 130 131 132
    if (pin_thru_lifetime_) {
      pinned_iters_mgr_.StartPinning();
    }
    if (iter_) {
      iter_->SetPinnedItersMgr(&pinned_iters_mgr_);
    }
J
jorlow@chromium.org 已提交
133 134
  }
  virtual ~DBIter() {
135
    // Release pinned data if any
136 137 138
    if (pinned_iters_mgr_.PinningEnabled()) {
      pinned_iters_mgr_.ReleasePinnedData();
    }
139
    RecordTick(statistics_, NO_ITERATORS, -1);
140
    local_stats_.BumpGlobalStatistics(statistics_);
141 142 143
    if (!arena_mode_) {
      delete iter_;
    } else {
S
sdong 已提交
144
      iter_->~InternalIterator();
145 146
    }
  }
S
sdong 已提交
147
  virtual void SetIter(InternalIterator* iter) {
148 149
    assert(iter_ == nullptr);
    iter_ = iter;
150
    iter_->SetPinnedItersMgr(&pinned_iters_mgr_);
J
jorlow@chromium.org 已提交
151
  }
I
Igor Sugak 已提交
152 153
  virtual bool Valid() const override { return valid_; }
  virtual Slice key() const override {
J
jorlow@chromium.org 已提交
154
    assert(valid_);
155
    return saved_key_.GetKey();
J
jorlow@chromium.org 已提交
156
  }
I
Igor Sugak 已提交
157
  virtual Slice value() const override {
J
jorlow@chromium.org 已提交
158
    assert(valid_);
159
    if (current_entry_is_merged_) {
160 161 162
      // If pinned_value_ is set then the result of merge operator is one of
      // the merge operands and we should return it.
      return pinned_value_.data() ? pinned_value_ : saved_value_;
163 164 165 166 167
    } else if (direction_ == kReverse) {
      return pinned_value_;
    } else {
      return iter_->value();
    }
J
jorlow@chromium.org 已提交
168
  }
I
Igor Sugak 已提交
169
  virtual Status status() const override {
J
jorlow@chromium.org 已提交
170 171 172 173 174 175
    if (status_.ok()) {
      return iter_->status();
    } else {
      return status_;
    }
  }
176 177 178 179 180 181

  virtual Status GetProperty(std::string prop_name,
                             std::string* prop) override {
    if (prop == nullptr) {
      return Status::InvalidArgument("prop is nullptr");
    }
182
    if (prop_name == "rocksdb.iterator.super-version-number") {
183 184 185 186 187 188
      // First try to pass the value returned from inner iterator.
      if (!iter_->GetProperty(prop_name, prop).ok()) {
        *prop = ToString(version_number_);
      }
      return Status::OK();
    } else if (prop_name == "rocksdb.iterator.is-key-pinned") {
189
      if (valid_) {
190
        *prop = (pin_thru_lifetime_ && saved_key_.IsKeyPinned()) ? "1" : "0";
191 192 193 194 195 196
      } else {
        *prop = "Iterator is not valid.";
      }
      return Status::OK();
    }
    return Status::InvalidArgument("Undentified property.");
197
  }
J
jorlow@chromium.org 已提交
198

I
Igor Sugak 已提交
199 200 201
  virtual void Next() override;
  virtual void Prev() override;
  virtual void Seek(const Slice& target) override;
A
Aaron Gao 已提交
202
  virtual void SeekForPrev(const Slice& target) override;
I
Igor Sugak 已提交
203 204
  virtual void SeekToFirst() override;
  virtual void SeekToLast() override;
J
jorlow@chromium.org 已提交
205

J
jorlow@chromium.org 已提交
206
 private:
207
  void ReverseToBackward();
S
Stanislau Hlebik 已提交
208 209 210 211 212 213
  void PrevInternal();
  void FindParseableKey(ParsedInternalKey* ikey, Direction direction);
  bool FindValueForCurrentKey();
  bool FindValueForCurrentKeyUsingSeek();
  void FindPrevUserKey();
  void FindNextUserKey();
214 215
  inline void FindNextUserEntry(bool skipping, bool prefix_check);
  void FindNextUserEntryInternal(bool skipping, bool prefix_check);
J
jorlow@chromium.org 已提交
216
  bool ParseKey(ParsedInternalKey* key);
217
  void MergeValuesNewToOld();
J
jorlow@chromium.org 已提交
218

219 220 221 222 223 224 225 226 227 228
  // Temporarily pin the blocks that we encounter until ReleaseTempPinnedData()
  // is called
  void TempPinData() {
    if (!pin_thru_lifetime_) {
      pinned_iters_mgr_.StartPinning();
    }
  }

  // Release blocks pinned by TempPinData()
  void ReleaseTempPinnedData() {
229 230
    if (!pin_thru_lifetime_ && pinned_iters_mgr_.PinningEnabled()) {
      pinned_iters_mgr_.ReleasePinnedData();
231 232 233
    }
  }

J
jorlow@chromium.org 已提交
234 235 236 237 238 239 240 241 242
  inline void ClearSavedValue() {
    if (saved_value_.capacity() > 1048576) {
      std::string empty;
      swap(empty, saved_value_);
    } else {
      saved_value_.clear();
    }
  }

243
  const SliceTransform* prefix_extractor_;
244
  bool arena_mode_;
J
jorlow@chromium.org 已提交
245
  Env* const env_;
I
Igor Canadi 已提交
246
  Logger* logger_;
J
jorlow@chromium.org 已提交
247
  const Comparator* const user_comparator_;
248
  const MergeOperator* const merge_operator_;
S
sdong 已提交
249
  InternalIterator* iter_;
J
jorlow@chromium.org 已提交
250
  SequenceNumber const sequence_;
J
jorlow@chromium.org 已提交
251

J
jorlow@chromium.org 已提交
252
  Status status_;
S
Stanislau Hlebik 已提交
253 254
  IterKey saved_key_;
  std::string saved_value_;
255
  Slice pinned_value_;
J
jorlow@chromium.org 已提交
256
  Direction direction_;
J
jorlow@chromium.org 已提交
257
  bool valid_;
258
  bool current_entry_is_merged_;
259
  Statistics* statistics_;
260
  uint64_t max_skip_;
261
  uint64_t version_number_;
262
  const Slice* iterate_upper_bound_;
263 264 265
  IterKey prefix_start_buf_;
  Slice prefix_start_key_;
  const bool prefix_same_as_start_;
266 267 268
  // Means that we will pin all data blocks we read as long the Iterator
  // is not deleted, will be true if ReadOptions::pin_data is true
  const bool pin_thru_lifetime_;
269
  // List of operands for merge operator.
270
  MergeContext merge_context_;
271
  LocalStatistics local_stats_;
272
  PinnedIteratorsManager pinned_iters_mgr_;
J
jorlow@chromium.org 已提交
273 274 275 276 277 278 279 280 281

  // No copying allowed
  DBIter(const DBIter&);
  void operator=(const DBIter&);
};

inline bool DBIter::ParseKey(ParsedInternalKey* ikey) {
  if (!ParseInternalKey(iter_->key(), ikey)) {
    status_ = Status::Corruption("corrupted internal key in DBIter");
282 283
    Log(InfoLogLevel::ERROR_LEVEL,
        logger_, "corrupted internal key in DBIter: %s",
284
        iter_->key().ToString(true).c_str());
J
jorlow@chromium.org 已提交
285 286 287 288 289 290
    return false;
  } else {
    return true;
  }
}

J
jorlow@chromium.org 已提交
291 292 293
void DBIter::Next() {
  assert(valid_);

294 295
  // Release temporarily pinned blocks from last operation
  ReleaseTempPinnedData();
S
Stanislau Hlebik 已提交
296
  if (direction_ == kReverse) {
297 298 299 300 301
    FindNextUserKey();
    direction_ = kForward;
    if (!iter_->Valid()) {
      iter_->SeekToFirst();
    }
302 303 304 305 306 307 308
  } else if (iter_->Valid() && !current_entry_is_merged_) {
    // If the current value is not a merge, the iter position is the
    // current key, which is already returned. We can safely issue a
    // Next() without checking the current key.
    // If the current key is a merge, very likely iter already points
    // to the next internal position.
    iter_->Next();
309
    PERF_COUNTER_ADD(internal_key_skipped_count, 1);
J
jorlow@chromium.org 已提交
310
  }
J
jorlow@chromium.org 已提交
311

312 313 314
  if (statistics_ != nullptr) {
    local_stats_.next_count_++;
  }
315 316
  // Now we point to the next internal position, for both of merge and
  // not merge cases.
317 318 319 320
  if (!iter_->Valid()) {
    valid_ = false;
    return;
  }
321
  FindNextUserEntry(true /* skipping the current user key */, prefix_same_as_start_);
322 323 324 325
  if (statistics_ != nullptr && valid_) {
    local_stats_.next_found_count_++;
    local_stats_.bytes_read_ += (key().size() + value().size());
  }
J
jorlow@chromium.org 已提交
326 327
}

328 329 330 331 332 333 334 335
// PRE: saved_key_ has the current user key if skipping
// POST: saved_key_ should have the next user key if valid_,
//       if the current entry is a result of merge
//           current_entry_is_merged_ => true
//           saved_value_             => the merged value
//
// NOTE: In between, saved_key_ can point to a user key that has
//       a delete marker
336 337 338 339 340 341
//
// The prefix_check parameter controls whether we check the iterated
// keys against the prefix of the seeked key. Set to false when
// performing a seek without a key (e.g. SeekToFirst). Set to
// prefix_same_as_start_ for other iterations.
inline void DBIter::FindNextUserEntry(bool skipping, bool prefix_check) {
342
  PERF_TIMER_GUARD(find_next_user_entry_time);
343
  FindNextUserEntryInternal(skipping, prefix_check);
344 345 346
}

// Actual implementation of DBIter::FindNextUserEntry()
347
void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) {
J
jorlow@chromium.org 已提交
348 349 350
  // Loop until we hit an acceptable entry to yield
  assert(iter_->Valid());
  assert(direction_ == kForward);
351
  current_entry_is_merged_ = false;
352
  uint64_t num_skipped = 0;
J
jorlow@chromium.org 已提交
353
  do {
J
jorlow@chromium.org 已提交
354
    ParsedInternalKey ikey;
355 356 357

    if (ParseKey(&ikey)) {
      if (iterate_upper_bound_ != nullptr &&
358
          user_comparator_->Compare(ikey.user_key, *iterate_upper_bound_) >= 0) {
359 360 361
        break;
      }

362 363 364 365 366
      if (prefix_extractor_ && prefix_check &&
          prefix_extractor_->Transform(ikey.user_key).compare(prefix_start_key_) != 0) {
        break;
      }

367 368 369 370 371 372 373 374
      if (ikey.sequence <= sequence_) {
        if (skipping &&
           user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) <= 0) {
          num_skipped++;  // skip this entry
          PERF_COUNTER_ADD(internal_key_skipped_count, 1);
        } else {
          switch (ikey.type) {
            case kTypeDeletion:
A
Andres Noetzli 已提交
375
            case kTypeSingleDeletion:
376 377
              // Arrange to skip all upcoming entries for this key since
              // they are hidden by this deletion.
378 379 380
              saved_key_.SetKey(
                  ikey.user_key,
                  !iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
381 382 383 384 385 386
              skipping = true;
              num_skipped = 0;
              PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
              break;
            case kTypeValue:
              valid_ = true;
387 388 389
              saved_key_.SetKey(
                  ikey.user_key,
                  !iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
390 391 392
              return;
            case kTypeMerge:
              // By now, we are sure the current ikey is going to yield a value
393 394 395
              saved_key_.SetKey(
                  ikey.user_key,
                  !iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
396 397 398 399 400 401 402 403
              current_entry_is_merged_ = true;
              valid_ = true;
              MergeValuesNewToOld();  // Go to a different state machine
              return;
            default:
              assert(false);
              break;
          }
404
        }
J
jorlow@chromium.org 已提交
405
      }
J
jorlow@chromium.org 已提交
406
    }
407 408
    // If we have sequentially iterated via numerous keys and still not
    // found the next user-key, then it is better to seek so that we can
C
clark.kang 已提交
409
    // avoid too many key comparisons. We seek to the last occurrence of
410 411
    // our current key by looking for sequence number 0 and type deletion
    // (the smallest type).
412 413 414
    if (skipping && num_skipped > max_skip_) {
      num_skipped = 0;
      std::string last_key;
415
      AppendInternalKey(&last_key, ParsedInternalKey(saved_key_.GetKey(), 0,
416
                                                     kTypeDeletion));
417 418 419 420 421
      iter_->Seek(last_key);
      RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
    } else {
      iter_->Next();
    }
J
jorlow@chromium.org 已提交
422 423
  } while (iter_->Valid());
  valid_ = false;
J
jorlow@chromium.org 已提交
424 425
}

426 427 428 429 430 431 432
// Merge values of the same user key starting from the current iter_ position
// Scan from the newer entries to older entries.
// PRE: iter_->key() points to the first merge type entry
//      saved_key_ stores the user key
// POST: saved_value_ has the merged value for the user key
//       iter_ points to the next entry (or invalid)
void DBIter::MergeValuesNewToOld() {
433
  if (!merge_operator_) {
434 435
    Log(InfoLogLevel::ERROR_LEVEL,
        logger_, "Options::merge_operator is null.");
436
    status_ = Status::InvalidArgument("merge_operator_ must be set.");
437 438
    valid_ = false;
    return;
D
Deon Nicholas 已提交
439
  }
440

441 442
  // Temporarily pin the blocks that hold merge operands
  TempPinData();
443
  merge_context_.Clear();
444
  // Start the merge process by pushing the first operand
445 446
  merge_context_.PushOperand(iter_->value(),
                             iter_->IsValuePinned() /* operand_pinned */);
447 448 449 450 451 452 453 454

  ParsedInternalKey ikey;
  for (iter_->Next(); iter_->Valid(); iter_->Next()) {
    if (!ParseKey(&ikey)) {
      // skip corrupted key
      continue;
    }

455
    if (!user_comparator_->Equal(ikey.user_key, saved_key_.GetKey())) {
456 457
      // hit the next user key, stop right here
      break;
A
Andres Noetzli 已提交
458
    } else if (kTypeDeletion == ikey.type || kTypeSingleDeletion == ikey.type) {
459 460 461 462
      // hit a delete with the same user key, stop right here
      // iter_ is positioned after delete
      iter_->Next();
      break;
A
Andres Noetzli 已提交
463
    } else if (kTypeValue == ikey.type) {
464 465 466
      // hit a put, merge the put value with operands and store the
      // final result in saved_value_. We are done!
      // ignore corruption if there is any.
I
Igor Canadi 已提交
467
      const Slice val = iter_->value();
468 469
      MergeHelper::TimedFullMerge(merge_operator_, ikey.user_key, &val,
                                  merge_context_.GetOperands(), &saved_value_,
470
                                  logger_, statistics_, env_, &pinned_value_);
471 472 473
      // iter_ is positioned after put
      iter_->Next();
      return;
A
Andres Noetzli 已提交
474
    } else if (kTypeMerge == ikey.type) {
475 476
      // hit a merge, add the value as an operand and run associative merge.
      // when complete, add result to operands and continue.
477 478
      merge_context_.PushOperand(iter_->value(),
                                 iter_->IsValuePinned() /* operand_pinned */);
A
Andres Noetzli 已提交
479 480
    } else {
      assert(false);
481 482 483
    }
  }

484 485 486 487 488 489
  // we either exhausted all internal keys under this user key, or hit
  // a deletion marker.
  // feed null as the existing value to the merge operator, such that
  // client can differentiate this scenario and do things accordingly.
  MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(), nullptr,
                              merge_context_.GetOperands(), &saved_value_,
490
                              logger_, statistics_, env_, &pinned_value_);
491 492
}

J
jorlow@chromium.org 已提交
493 494
void DBIter::Prev() {
  assert(valid_);
495
  ReleaseTempPinnedData();
S
Stanislau Hlebik 已提交
496
  if (direction_ == kForward) {
497
    ReverseToBackward();
S
Stanislau Hlebik 已提交
498 499
  }
  PrevInternal();
M
Manuel Ung 已提交
500
  if (statistics_ != nullptr) {
501
    local_stats_.prev_count_++;
M
Manuel Ung 已提交
502
    if (valid_) {
503 504
      local_stats_.prev_found_count_++;
      local_stats_.bytes_read_ += (key().size() + value().size());
M
Manuel Ung 已提交
505 506
    }
  }
S
Stanislau Hlebik 已提交
507
}
J
jorlow@chromium.org 已提交
508

509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535
void DBIter::ReverseToBackward() {
  if (current_entry_is_merged_) {
    // Not placed in the same key. Need to call Prev() until finding the
    // previous key.
    if (!iter_->Valid()) {
      iter_->SeekToLast();
    }
    ParsedInternalKey ikey;
    FindParseableKey(&ikey, kReverse);
    while (iter_->Valid() &&
           user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) > 0) {
      iter_->Prev();
      FindParseableKey(&ikey, kReverse);
    }
  }
#ifndef NDEBUG
  if (iter_->Valid()) {
    ParsedInternalKey ikey;
    assert(ParseKey(&ikey));
    assert(user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) <= 0);
  }
#endif

  FindPrevUserKey();
  direction_ = kReverse;
}

S
Stanislau Hlebik 已提交
536 537 538 539
void DBIter::PrevInternal() {
  if (!iter_->Valid()) {
    valid_ = false;
    return;
540 541
  }

S
Stanislau Hlebik 已提交
542
  ParsedInternalKey ikey;
A
Aaron Gao 已提交
543
  bool match_prefix = true;
S
Stanislau Hlebik 已提交
544 545

  while (iter_->Valid()) {
546
    saved_key_.SetKey(ExtractUserKey(iter_->key()),
547
                      !iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
S
Stanislau Hlebik 已提交
548 549
    if (FindValueForCurrentKey()) {
      valid_ = true;
J
jorlow@chromium.org 已提交
550 551 552
      if (!iter_->Valid()) {
        return;
      }
S
Stanislau Hlebik 已提交
553
      FindParseableKey(&ikey, kReverse);
554
      if (user_comparator_->Equal(ikey.user_key, saved_key_.GetKey())) {
S
Stanislau Hlebik 已提交
555
        FindPrevUserKey();
J
jorlow@chromium.org 已提交
556
      }
A
Aaron Gao 已提交
557 558 559 560 561 562
      if (valid_ && prefix_extractor_ && prefix_same_as_start_ &&
          prefix_extractor_->Transform(saved_key_.GetKey())
                  .compare(prefix_start_key_) != 0) {
        match_prefix = false;
        break;
      }
S
Stanislau Hlebik 已提交
563
      return;
J
jorlow@chromium.org 已提交
564
    }
S
Stanislau Hlebik 已提交
565 566 567 568
    if (!iter_->Valid()) {
      break;
    }
    FindParseableKey(&ikey, kReverse);
569
    if (user_comparator_->Equal(ikey.user_key, saved_key_.GetKey())) {
S
Stanislau Hlebik 已提交
570 571 572 573
      FindPrevUserKey();
    }
  }
  // We haven't found any key - iterator is not valid
A
Aaron Gao 已提交
574 575
  // Or the prefix is different than start prefix
  assert(!iter_->Valid() || !match_prefix);
S
Stanislau Hlebik 已提交
576
  valid_ = false;
J
jorlow@chromium.org 已提交
577 578
}

S
Stanislau Hlebik 已提交
579
// This function checks, if the entry with biggest sequence_number <= sequence_
A
Andres Noetzli 已提交
580 581
// is non kTypeDeletion or kTypeSingleDeletion. If it's not, we save value in
// saved_value_
S
Stanislau Hlebik 已提交
582 583
bool DBIter::FindValueForCurrentKey() {
  assert(iter_->Valid());
584
  merge_context_.Clear();
585
  current_entry_is_merged_ = false;
A
Andres Noetzli 已提交
586 587
  // last entry before merge (could be kTypeDeletion, kTypeSingleDeletion or
  // kTypeValue)
S
Stanislau Hlebik 已提交
588 589
  ValueType last_not_merge_type = kTypeDeletion;
  ValueType last_key_entry_type = kTypeDeletion;
J
jorlow@chromium.org 已提交
590

S
Stanislau Hlebik 已提交
591 592 593
  ParsedInternalKey ikey;
  FindParseableKey(&ikey, kReverse);

594 595 596
  // Temporarily pin blocks that hold (merge operands / the value)
  ReleaseTempPinnedData();
  TempPinData();
S
Stanislau Hlebik 已提交
597 598
  size_t num_skipped = 0;
  while (iter_->Valid() && ikey.sequence <= sequence_ &&
599
         user_comparator_->Equal(ikey.user_key, saved_key_.GetKey())) {
S
Stanislau Hlebik 已提交
600 601 602 603 604 605 606 607
    // We iterate too much: let's use Seek() to avoid too much key comparisons
    if (num_skipped >= max_skip_) {
      return FindValueForCurrentKeyUsingSeek();
    }

    last_key_entry_type = ikey.type;
    switch (last_key_entry_type) {
      case kTypeValue:
608
        merge_context_.Clear();
609
        assert(iter_->IsValuePinned());
610
        pinned_value_ = iter_->value();
S
Stanislau Hlebik 已提交
611 612 613
        last_not_merge_type = kTypeValue;
        break;
      case kTypeDeletion:
A
Andres Noetzli 已提交
614
      case kTypeSingleDeletion:
615
        merge_context_.Clear();
A
Andres Noetzli 已提交
616
        last_not_merge_type = last_key_entry_type;
617
        PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
S
Stanislau Hlebik 已提交
618 619
        break;
      case kTypeMerge:
620
        assert(merge_operator_ != nullptr);
621 622
        merge_context_.PushOperandBack(
            iter_->value(), iter_->IsValuePinned() /* operand_pinned */);
S
Stanislau Hlebik 已提交
623 624 625 626 627
        break;
      default:
        assert(false);
    }

628
    PERF_COUNTER_ADD(internal_key_skipped_count, 1);
629
    assert(user_comparator_->Equal(ikey.user_key, saved_key_.GetKey()));
S
Stanislau Hlebik 已提交
630 631 632 633 634 635 636
    iter_->Prev();
    ++num_skipped;
    FindParseableKey(&ikey, kReverse);
  }

  switch (last_key_entry_type) {
    case kTypeDeletion:
A
Andres Noetzli 已提交
637
    case kTypeSingleDeletion:
S
Stanislau Hlebik 已提交
638 639 640
      valid_ = false;
      return false;
    case kTypeMerge:
641
      current_entry_is_merged_ = true;
S
Stanislau Hlebik 已提交
642
      if (last_not_merge_type == kTypeDeletion) {
643 644
        MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(),
                                    nullptr, merge_context_.GetOperands(),
645 646
                                    &saved_value_, logger_, statistics_, env_,
                                    &pinned_value_);
647
      } else {
S
Stanislau Hlebik 已提交
648
        assert(last_not_merge_type == kTypeValue);
649 650 651
        MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(),
                                    &pinned_value_,
                                    merge_context_.GetOperands(), &saved_value_,
652
                                    logger_, statistics_, env_, &pinned_value_);
653
      }
S
Stanislau Hlebik 已提交
654 655 656 657 658 659 660
      break;
    case kTypeValue:
      // do nothing - we've already has value in saved_value_
      break;
    default:
      assert(false);
      break;
J
jorlow@chromium.org 已提交
661
  }
S
Stanislau Hlebik 已提交
662 663 664
  valid_ = true;
  return true;
}
J
jorlow@chromium.org 已提交
665

S
Stanislau Hlebik 已提交
666 667 668
// This function is used in FindValueForCurrentKey.
// We use Seek() function instead of Prev() to find necessary value
bool DBIter::FindValueForCurrentKeyUsingSeek() {
669 670 671
  // FindValueForCurrentKey will enable pinning before calling
  // FindValueForCurrentKeyUsingSeek()
  assert(pinned_iters_mgr_.PinningEnabled());
S
Stanislau Hlebik 已提交
672 673 674 675 676 677 678 679 680 681
  std::string last_key;
  AppendInternalKey(&last_key, ParsedInternalKey(saved_key_.GetKey(), sequence_,
                                                 kValueTypeForSeek));
  iter_->Seek(last_key);
  RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);

  // assume there is at least one parseable key for this user key
  ParsedInternalKey ikey;
  FindParseableKey(&ikey, kForward);

A
Andres Noetzli 已提交
682 683
  if (ikey.type == kTypeValue || ikey.type == kTypeDeletion ||
      ikey.type == kTypeSingleDeletion) {
S
Stanislau Hlebik 已提交
684
    if (ikey.type == kTypeValue) {
685
      assert(iter_->IsValuePinned());
686
      pinned_value_ = iter_->value();
S
Stanislau Hlebik 已提交
687 688 689
      valid_ = true;
      return true;
    }
J
jorlow@chromium.org 已提交
690
    valid_ = false;
S
Stanislau Hlebik 已提交
691 692 693 694 695
    return false;
  }

  // kTypeMerge. We need to collect all kTypeMerge values and save them
  // in operands
696
  current_entry_is_merged_ = true;
697
  merge_context_.Clear();
S
Stanislau Hlebik 已提交
698
  while (iter_->Valid() &&
699
         user_comparator_->Equal(ikey.user_key, saved_key_.GetKey()) &&
S
Stanislau Hlebik 已提交
700
         ikey.type == kTypeMerge) {
701 702
    merge_context_.PushOperand(iter_->value(),
                               iter_->IsValuePinned() /* operand_pinned */);
S
Stanislau Hlebik 已提交
703 704 705 706 707
    iter_->Next();
    FindParseableKey(&ikey, kForward);
  }

  if (!iter_->Valid() ||
708
      !user_comparator_->Equal(ikey.user_key, saved_key_.GetKey()) ||
A
Andres Noetzli 已提交
709
      ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion) {
710 711
    MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(), nullptr,
                                merge_context_.GetOperands(), &saved_value_,
712
                                logger_, statistics_, env_, &pinned_value_);
S
Stanislau Hlebik 已提交
713 714
    // Make iter_ valid and point to saved_key_
    if (!iter_->Valid() ||
715
        !user_comparator_->Equal(ikey.user_key, saved_key_.GetKey())) {
S
Stanislau Hlebik 已提交
716 717 718
      iter_->Seek(last_key);
      RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
    }
J
jorlow@chromium.org 已提交
719
    valid_ = true;
S
Stanislau Hlebik 已提交
720 721 722
    return true;
  }

I
Igor Canadi 已提交
723
  const Slice& val = iter_->value();
724 725
  MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(), &val,
                              merge_context_.GetOperands(), &saved_value_,
726
                              logger_, statistics_, env_, &pinned_value_);
S
Stanislau Hlebik 已提交
727 728 729 730 731 732 733 734 735 736 737 738 739 740 741
  valid_ = true;
  return true;
}

// Used in Next to change directions
// Go to next user key
// Don't use Seek(),
// because next user key will be very close
void DBIter::FindNextUserKey() {
  if (!iter_->Valid()) {
    return;
  }
  ParsedInternalKey ikey;
  FindParseableKey(&ikey, kForward);
  while (iter_->Valid() &&
742
         !user_comparator_->Equal(ikey.user_key, saved_key_.GetKey())) {
S
Stanislau Hlebik 已提交
743 744 745 746 747 748 749 750 751 752 753 754 755
    iter_->Next();
    FindParseableKey(&ikey, kForward);
  }
}

// Go to previous user_key
void DBIter::FindPrevUserKey() {
  if (!iter_->Valid()) {
    return;
  }
  size_t num_skipped = 0;
  ParsedInternalKey ikey;
  FindParseableKey(&ikey, kReverse);
756 757 758 759 760 761 762 763 764 765 766 767 768 769 770
  int cmp;
  while (iter_->Valid() && ((cmp = user_comparator_->Compare(
                                 ikey.user_key, saved_key_.GetKey())) == 0 ||
                            (cmp > 0 && ikey.sequence > sequence_))) {
    if (cmp == 0) {
      if (num_skipped >= max_skip_) {
        num_skipped = 0;
        IterKey last_key;
        last_key.SetInternalKey(ParsedInternalKey(
            saved_key_.GetKey(), kMaxSequenceNumber, kValueTypeForSeek));
        iter_->Seek(last_key.GetKey());
        RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
      } else {
        ++num_skipped;
      }
S
Stanislau Hlebik 已提交
771 772 773 774 775 776 777 778 779 780 781 782 783 784
    }
    iter_->Prev();
    FindParseableKey(&ikey, kReverse);
  }
}

// Skip all unparseable keys
void DBIter::FindParseableKey(ParsedInternalKey* ikey, Direction direction) {
  while (iter_->Valid() && !ParseKey(ikey)) {
    if (direction == kReverse) {
      iter_->Prev();
    } else {
      iter_->Next();
    }
J
jorlow@chromium.org 已提交
785 786
  }
}
J
jorlow@chromium.org 已提交
787

J
jorlow@chromium.org 已提交
788
void DBIter::Seek(const Slice& target) {
L
Lei Jin 已提交
789
  StopWatch sw(env_, statistics_, DB_SEEK);
790
  ReleaseTempPinnedData();
791 792 793
  saved_key_.Clear();
  // now savved_key is used to store internal key.
  saved_key_.SetInternalKey(target, sequence_);
794 795 796 797 798

  {
    PERF_TIMER_GUARD(seek_internal_seek_time);
    iter_->Seek(saved_key_.GetKey());
  }
799

M
Manuel Ung 已提交
800
  RecordTick(statistics_, NUMBER_DB_SEEK);
J
jorlow@chromium.org 已提交
801
  if (iter_->Valid()) {
802 803 804
    if (prefix_extractor_ && prefix_same_as_start_) {
      prefix_start_key_ = prefix_extractor_->Transform(target);
    }
805 806
    direction_ = kForward;
    ClearSavedValue();
807 808 809 810
    FindNextUserEntry(false /* not skipping */, prefix_same_as_start_);
    if (!valid_) {
      prefix_start_key_.clear();
    }
M
Manuel Ung 已提交
811 812 813 814 815 816
    if (statistics_ != nullptr) {
      if (valid_) {
        RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
        RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
      }
    }
J
jorlow@chromium.org 已提交
817 818 819
  } else {
    valid_ = false;
  }
820
  if (valid_ && prefix_extractor_ && prefix_same_as_start_) {
821 822
    prefix_start_buf_.SetKey(prefix_start_key_);
    prefix_start_key_ = prefix_start_buf_.GetKey();
823
  }
J
jorlow@chromium.org 已提交
824
}
J
jorlow@chromium.org 已提交
825

A
Aaron Gao 已提交
826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864
void DBIter::SeekForPrev(const Slice& target) {
  StopWatch sw(env_, statistics_, DB_SEEK);
  ReleaseTempPinnedData();
  saved_key_.Clear();
  // now saved_key is used to store internal key.
  saved_key_.SetInternalKey(target, 0 /* sequence_number */,
                            kValueTypeForSeekForPrev);

  {
    PERF_TIMER_GUARD(seek_internal_seek_time);
    iter_->SeekForPrev(saved_key_.GetKey());
  }

  RecordTick(statistics_, NUMBER_DB_SEEK);
  if (iter_->Valid()) {
    if (prefix_extractor_ && prefix_same_as_start_) {
      prefix_start_key_ = prefix_extractor_->Transform(target);
    }
    direction_ = kReverse;
    ClearSavedValue();
    PrevInternal();
    if (!valid_) {
      prefix_start_key_.clear();
    }
    if (statistics_ != nullptr) {
      if (valid_) {
        RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
        RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
      }
    }
  } else {
    valid_ = false;
  }
  if (valid_ && prefix_extractor_ && prefix_same_as_start_) {
    prefix_start_buf_.SetKey(prefix_start_key_);
    prefix_start_key_ = prefix_start_buf_.GetKey();
  }
}

J
jorlow@chromium.org 已提交
865
void DBIter::SeekToFirst() {
S
Stanislau Hlebik 已提交
866
  // Don't use iter_::Seek() if we set a prefix extractor
867
  // because prefix seek will be used.
868
  if (prefix_extractor_ != nullptr) {
S
Stanislau Hlebik 已提交
869 870
    max_skip_ = std::numeric_limits<uint64_t>::max();
  }
J
jorlow@chromium.org 已提交
871
  direction_ = kForward;
872
  ReleaseTempPinnedData();
J
jorlow@chromium.org 已提交
873
  ClearSavedValue();
874 875 876 877 878 879

  {
    PERF_TIMER_GUARD(seek_internal_seek_time);
    iter_->SeekToFirst();
  }

M
Manuel Ung 已提交
880
  RecordTick(statistics_, NUMBER_DB_SEEK);
J
jorlow@chromium.org 已提交
881
  if (iter_->Valid()) {
882
    FindNextUserEntry(false /* not skipping */, false /* no prefix check */);
M
Manuel Ung 已提交
883 884 885 886 887 888
    if (statistics_ != nullptr) {
      if (valid_) {
        RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
        RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
      }
    }
J
jorlow@chromium.org 已提交
889 890
  } else {
    valid_ = false;
J
jorlow@chromium.org 已提交
891
  }
892
  if (valid_ && prefix_extractor_ && prefix_same_as_start_) {
893 894
    prefix_start_buf_.SetKey(prefix_extractor_->Transform(saved_key_.GetKey()));
    prefix_start_key_ = prefix_start_buf_.GetKey();
895
  }
J
jorlow@chromium.org 已提交
896 897
}

J
jorlow@chromium.org 已提交
898
void DBIter::SeekToLast() {
S
Stanislau Hlebik 已提交
899
  // Don't use iter_::Seek() if we set a prefix extractor
900
  // because prefix seek will be used.
901
  if (prefix_extractor_ != nullptr) {
S
Stanislau Hlebik 已提交
902 903
    max_skip_ = std::numeric_limits<uint64_t>::max();
  }
J
jorlow@chromium.org 已提交
904
  direction_ = kReverse;
905
  ReleaseTempPinnedData();
J
jorlow@chromium.org 已提交
906
  ClearSavedValue();
907 908 909 910 911

  {
    PERF_TIMER_GUARD(seek_internal_seek_time);
    iter_->SeekToLast();
  }
912 913 914 915
  // When the iterate_upper_bound is set to a value,
  // it will seek to the last key before the
  // ReadOptions.iterate_upper_bound
  if (iter_->Valid() && iterate_upper_bound_ != nullptr) {
916
    saved_key_.SetKey(*iterate_upper_bound_, false /* copy */);
917 918 919 920 921 922
    std::string last_key;
    AppendInternalKey(&last_key,
                      ParsedInternalKey(saved_key_.GetKey(), kMaxSequenceNumber,
                                        kValueTypeForSeek));

    iter_->Seek(last_key);
S
Stanislau Hlebik 已提交
923

924 925 926 927 928 929 930 931 932 933
    if (!iter_->Valid()) {
      iter_->SeekToLast();
    } else {
      iter_->Prev();
      if (!iter_->Valid()) {
        valid_ = false;
        return;
      }
    }
  }
S
Stanislau Hlebik 已提交
934
  PrevInternal();
M
Manuel Ung 已提交
935 936 937 938 939 940 941
  if (statistics_ != nullptr) {
    RecordTick(statistics_, NUMBER_DB_SEEK);
    if (valid_) {
      RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
      RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
    }
  }
942
  if (valid_ && prefix_extractor_ && prefix_same_as_start_) {
943 944
    prefix_start_buf_.SetKey(prefix_extractor_->Transform(saved_key_.GetKey()));
    prefix_start_key_ = prefix_start_buf_.GetKey();
945
  }
J
jorlow@chromium.org 已提交
946 947
}

948
Iterator* NewDBIterator(Env* env, const ImmutableCFOptions& ioptions,
949
                        const Comparator* user_key_comparator,
S
sdong 已提交
950
                        InternalIterator* internal_iter,
951
                        const SequenceNumber& sequence,
952
                        uint64_t max_sequential_skip_in_iterations,
953
                        uint64_t version_number,
954
                        const Slice* iterate_upper_bound,
955 956 957
                        bool prefix_same_as_start, bool pin_data) {
  DBIter* db_iter =
      new DBIter(env, ioptions, user_key_comparator, internal_iter, sequence,
958
                 false, max_sequential_skip_in_iterations, version_number,
959
                 iterate_upper_bound, prefix_same_as_start, pin_data);
960
  return db_iter;
961 962
}

I
Igor Canadi 已提交
963
ArenaWrappedDBIter::~ArenaWrappedDBIter() { db_iter_->~DBIter(); }
964 965 966

void ArenaWrappedDBIter::SetDBIter(DBIter* iter) { db_iter_ = iter; }

S
sdong 已提交
967
void ArenaWrappedDBIter::SetIterUnderDBIter(InternalIterator* iter) {
968 969 970 971 972 973 974 975 976
  static_cast<DBIter*>(db_iter_)->SetIter(iter);
}

inline bool ArenaWrappedDBIter::Valid() const { return db_iter_->Valid(); }
inline void ArenaWrappedDBIter::SeekToFirst() { db_iter_->SeekToFirst(); }
inline void ArenaWrappedDBIter::SeekToLast() { db_iter_->SeekToLast(); }
inline void ArenaWrappedDBIter::Seek(const Slice& target) {
  db_iter_->Seek(target);
}
A
Aaron Gao 已提交
977 978 979
inline void ArenaWrappedDBIter::SeekForPrev(const Slice& target) {
  db_iter_->SeekForPrev(target);
}
980 981 982 983 984
inline void ArenaWrappedDBIter::Next() { db_iter_->Next(); }
inline void ArenaWrappedDBIter::Prev() { db_iter_->Prev(); }
inline Slice ArenaWrappedDBIter::key() const { return db_iter_->key(); }
inline Slice ArenaWrappedDBIter::value() const { return db_iter_->value(); }
inline Status ArenaWrappedDBIter::status() const { return db_iter_->status(); }
985 986 987 988
inline Status ArenaWrappedDBIter::GetProperty(std::string prop_name,
                                              std::string* prop) {
  return db_iter_->GetProperty(prop_name, prop);
}
989 990 991 992
void ArenaWrappedDBIter::RegisterCleanup(CleanupFunction function, void* arg1,
                                         void* arg2) {
  db_iter_->RegisterCleanup(function, arg1, arg2);
}
J
jorlow@chromium.org 已提交
993

994
ArenaWrappedDBIter* NewArenaWrappedDbIterator(
995
    Env* env, const ImmutableCFOptions& ioptions,
996
    const Comparator* user_key_comparator, const SequenceNumber& sequence,
997
    uint64_t max_sequential_skip_in_iterations, uint64_t version_number,
998 999
    const Slice* iterate_upper_bound, bool prefix_same_as_start,
    bool pin_data) {
1000 1001 1002
  ArenaWrappedDBIter* iter = new ArenaWrappedDBIter();
  Arena* arena = iter->GetArena();
  auto mem = arena->AllocateAligned(sizeof(DBIter));
1003 1004
  DBIter* db_iter =
      new (mem) DBIter(env, ioptions, user_key_comparator, nullptr, sequence,
1005
                       true, max_sequential_skip_in_iterations, version_number,
1006
                       iterate_upper_bound, prefix_same_as_start, pin_data);
1007

1008
  iter->SetDBIter(db_iter);
1009

1010
  return iter;
J
jorlow@chromium.org 已提交
1011 1012
}

1013
}  // namespace rocksdb