db_iter.cc 31.8 KB
Newer Older
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 3 4 5
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.
//
J
jorlow@chromium.org 已提交
6 7 8 9 10
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include "db/db_iter.h"
11
#include <stdexcept>
12
#include <deque>
S
Stanislau Hlebik 已提交
13
#include <string>
S
Stanislau Hlebik 已提交
14
#include <limits>
J
jorlow@chromium.org 已提交
15 16

#include "db/dbformat.h"
17
#include "db/filename.h"
18
#include "db/merge_context.h"
19
#include "db/merge_helper.h"
20
#include "db/pinned_iterators_manager.h"
S
sdong 已提交
21
#include "port/port.h"
22 23 24
#include "rocksdb/env.h"
#include "rocksdb/iterator.h"
#include "rocksdb/merge_operator.h"
25
#include "rocksdb/options.h"
S
sdong 已提交
26
#include "table/internal_iterator.h"
27
#include "util/arena.h"
J
jorlow@chromium.org 已提交
28 29
#include "util/logging.h"
#include "util/mutexlock.h"
30
#include "util/perf_context_imp.h"
31
#include "util/string_util.h"
J
jorlow@chromium.org 已提交
32

33
namespace rocksdb {
J
jorlow@chromium.org 已提交
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54

#if 0
static void DumpInternalIter(Iterator* iter) {
  for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
    ParsedInternalKey k;
    if (!ParseInternalKey(iter->key(), &k)) {
      fprintf(stderr, "Corrupt '%s'\n", EscapeString(iter->key()).c_str());
    } else {
      fprintf(stderr, "@ '%s'\n", k.DebugString().c_str());
    }
  }
}
#endif

// Memtables and sstables that make the DB representation contain
// (userkey,seq,type) => uservalue entries.  DBIter
// combines multiple entries for the same userkey found in the DB
// representation into a single entry while accounting for sequence
// numbers, deletion markers, overwrites, etc.
class DBIter: public Iterator {
 public:
55
  // The following is grossly complicated. TODO: clean it up
J
jorlow@chromium.org 已提交
56 57 58 59 60 61 62 63 64 65
  // Which direction is the iterator currently moving?
  // (1) When moving forward, the internal iterator is positioned at
  //     the exact entry that yields this->key(), this->value()
  // (2) When moving backwards, the internal iterator is positioned
  //     just before all entries whose user key == this->key().
  enum Direction {
    kForward,
    kReverse
  };

66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
  // LocalStatistics contain Statistics counters that will be aggregated per
  // each iterator instance and then will be sent to the global statistics when
  // the iterator is destroyed.
  //
  // The purpose of this approach is to avoid perf regression happening
  // when multiple threads bump the atomic counters from a DBIter::Next().
  struct LocalStatistics {
    explicit LocalStatistics() { ResetCounters(); }

    void ResetCounters() {
      next_count_ = 0;
      next_found_count_ = 0;
      prev_count_ = 0;
      prev_found_count_ = 0;
      bytes_read_ = 0;
    }

    void BumpGlobalStatistics(Statistics* global_statistics) {
      RecordTick(global_statistics, NUMBER_DB_NEXT, next_count_);
      RecordTick(global_statistics, NUMBER_DB_NEXT_FOUND, next_found_count_);
      RecordTick(global_statistics, NUMBER_DB_PREV, prev_count_);
      RecordTick(global_statistics, NUMBER_DB_PREV_FOUND, prev_found_count_);
      RecordTick(global_statistics, ITER_BYTES_READ, bytes_read_);
      ResetCounters();
    }

    // Map to Tickers::NUMBER_DB_NEXT
    uint64_t next_count_;
    // Map to Tickers::NUMBER_DB_NEXT_FOUND
    uint64_t next_found_count_;
    // Map to Tickers::NUMBER_DB_PREV
    uint64_t prev_count_;
    // Map to Tickers::NUMBER_DB_PREV_FOUND
    uint64_t prev_found_count_;
    // Map to Tickers::ITER_BYTES_READ
    uint64_t bytes_read_;
  };

S
sdong 已提交
104 105
  DBIter(Env* env, const ImmutableCFOptions& ioptions, const Comparator* cmp,
         InternalIterator* iter, SequenceNumber s, bool arena_mode,
106
         uint64_t max_sequential_skip_in_iterations, uint64_t version_number,
107
         const Slice* iterate_upper_bound = nullptr,
108
         bool prefix_same_as_start = false, bool pin_data = false)
109 110
      : arena_mode_(arena_mode),
        env_(env),
111
        logger_(ioptions.info_log),
J
jorlow@chromium.org 已提交
112
        user_comparator_(cmp),
113
        merge_operator_(ioptions.merge_operator),
J
jorlow@chromium.org 已提交
114 115
        iter_(iter),
        sequence_(s),
J
jorlow@chromium.org 已提交
116
        direction_(kForward),
117
        valid_(false),
118
        current_entry_is_merged_(false),
119
        statistics_(ioptions.statistics),
120
        version_number_(version_number),
121
        iterate_upper_bound_(iterate_upper_bound),
122
        prefix_same_as_start_(prefix_same_as_start),
123
        pin_thru_lifetime_(pin_data) {
L
Lei Jin 已提交
124
    RecordTick(statistics_, NO_ITERATORS);
125 126
    prefix_extractor_ = ioptions.prefix_extractor;
    max_skip_ = max_sequential_skip_in_iterations;
127 128 129 130 131 132
    if (pin_thru_lifetime_) {
      pinned_iters_mgr_.StartPinning();
    }
    if (iter_) {
      iter_->SetPinnedItersMgr(&pinned_iters_mgr_);
    }
J
jorlow@chromium.org 已提交
133 134
  }
  virtual ~DBIter() {
135 136
    // Release pinned data if any
    pinned_iters_mgr_.ReleasePinnedIterators();
137
    RecordTick(statistics_, NO_ITERATORS, -1);
138
    local_stats_.BumpGlobalStatistics(statistics_);
139 140 141
    if (!arena_mode_) {
      delete iter_;
    } else {
S
sdong 已提交
142
      iter_->~InternalIterator();
143 144
    }
  }
S
sdong 已提交
145
  virtual void SetIter(InternalIterator* iter) {
146 147
    assert(iter_ == nullptr);
    iter_ = iter;
148
    iter_->SetPinnedItersMgr(&pinned_iters_mgr_);
J
jorlow@chromium.org 已提交
149
  }
I
Igor Sugak 已提交
150 151
  virtual bool Valid() const override { return valid_; }
  virtual Slice key() const override {
J
jorlow@chromium.org 已提交
152
    assert(valid_);
153
    return saved_key_.GetKey();
J
jorlow@chromium.org 已提交
154
  }
I
Igor Sugak 已提交
155
  virtual Slice value() const override {
J
jorlow@chromium.org 已提交
156
    assert(valid_);
157
    if (current_entry_is_merged_) {
158 159 160
      // If pinned_value_ is set then the result of merge operator is one of
      // the merge operands and we should return it.
      return pinned_value_.data() ? pinned_value_ : saved_value_;
161 162 163 164 165
    } else if (direction_ == kReverse) {
      return pinned_value_;
    } else {
      return iter_->value();
    }
J
jorlow@chromium.org 已提交
166
  }
I
Igor Sugak 已提交
167
  virtual Status status() const override {
J
jorlow@chromium.org 已提交
168 169 170 171 172 173
    if (status_.ok()) {
      return iter_->status();
    } else {
      return status_;
    }
  }
174 175 176 177 178 179

  virtual Status GetProperty(std::string prop_name,
                             std::string* prop) override {
    if (prop == nullptr) {
      return Status::InvalidArgument("prop is nullptr");
    }
180
    if (prop_name == "rocksdb.iterator.super-version-number") {
181 182 183 184 185 186
      // First try to pass the value returned from inner iterator.
      if (!iter_->GetProperty(prop_name, prop).ok()) {
        *prop = ToString(version_number_);
      }
      return Status::OK();
    } else if (prop_name == "rocksdb.iterator.is-key-pinned") {
187
      if (valid_) {
188
        *prop = (pin_thru_lifetime_ && saved_key_.IsKeyPinned()) ? "1" : "0";
189 190 191 192 193 194
      } else {
        *prop = "Iterator is not valid.";
      }
      return Status::OK();
    }
    return Status::InvalidArgument("Undentified property.");
195
  }
J
jorlow@chromium.org 已提交
196

I
Igor Sugak 已提交
197 198 199 200 201
  virtual void Next() override;
  virtual void Prev() override;
  virtual void Seek(const Slice& target) override;
  virtual void SeekToFirst() override;
  virtual void SeekToLast() override;
J
jorlow@chromium.org 已提交
202

J
jorlow@chromium.org 已提交
203
 private:
204
  void ReverseToBackward();
S
Stanislau Hlebik 已提交
205 206 207 208 209 210
  void PrevInternal();
  void FindParseableKey(ParsedInternalKey* ikey, Direction direction);
  bool FindValueForCurrentKey();
  bool FindValueForCurrentKeyUsingSeek();
  void FindPrevUserKey();
  void FindNextUserKey();
211 212
  inline void FindNextUserEntry(bool skipping, bool prefix_check);
  void FindNextUserEntryInternal(bool skipping, bool prefix_check);
J
jorlow@chromium.org 已提交
213
  bool ParseKey(ParsedInternalKey* key);
214
  void MergeValuesNewToOld();
J
jorlow@chromium.org 已提交
215

216 217 218 219 220 221 222 223 224 225 226 227 228 229 230
  // Temporarily pin the blocks that we encounter until ReleaseTempPinnedData()
  // is called
  void TempPinData() {
    if (!pin_thru_lifetime_) {
      pinned_iters_mgr_.StartPinning();
    }
  }

  // Release blocks pinned by TempPinData()
  void ReleaseTempPinnedData() {
    if (!pin_thru_lifetime_) {
      pinned_iters_mgr_.ReleasePinnedIterators();
    }
  }

J
jorlow@chromium.org 已提交
231 232 233 234 235 236 237 238 239
  inline void ClearSavedValue() {
    if (saved_value_.capacity() > 1048576) {
      std::string empty;
      swap(empty, saved_value_);
    } else {
      saved_value_.clear();
    }
  }

240
  const SliceTransform* prefix_extractor_;
241
  bool arena_mode_;
J
jorlow@chromium.org 已提交
242
  Env* const env_;
I
Igor Canadi 已提交
243
  Logger* logger_;
J
jorlow@chromium.org 已提交
244
  const Comparator* const user_comparator_;
245
  const MergeOperator* const merge_operator_;
S
sdong 已提交
246
  InternalIterator* iter_;
J
jorlow@chromium.org 已提交
247
  SequenceNumber const sequence_;
J
jorlow@chromium.org 已提交
248

J
jorlow@chromium.org 已提交
249
  Status status_;
S
Stanislau Hlebik 已提交
250 251
  IterKey saved_key_;
  std::string saved_value_;
252
  Slice pinned_value_;
J
jorlow@chromium.org 已提交
253
  Direction direction_;
J
jorlow@chromium.org 已提交
254
  bool valid_;
255
  bool current_entry_is_merged_;
256
  Statistics* statistics_;
257
  uint64_t max_skip_;
258
  uint64_t version_number_;
259
  const Slice* iterate_upper_bound_;
260 261 262
  IterKey prefix_start_buf_;
  Slice prefix_start_key_;
  const bool prefix_same_as_start_;
263 264 265
  // Means that we will pin all data blocks we read as long the Iterator
  // is not deleted, will be true if ReadOptions::pin_data is true
  const bool pin_thru_lifetime_;
266
  // List of operands for merge operator.
267
  MergeContext merge_context_;
268
  LocalStatistics local_stats_;
269
  PinnedIteratorsManager pinned_iters_mgr_;
J
jorlow@chromium.org 已提交
270 271 272 273 274 275 276 277 278

  // No copying allowed
  DBIter(const DBIter&);
  void operator=(const DBIter&);
};

inline bool DBIter::ParseKey(ParsedInternalKey* ikey) {
  if (!ParseInternalKey(iter_->key(), ikey)) {
    status_ = Status::Corruption("corrupted internal key in DBIter");
279 280
    Log(InfoLogLevel::ERROR_LEVEL,
        logger_, "corrupted internal key in DBIter: %s",
281
        iter_->key().ToString(true).c_str());
J
jorlow@chromium.org 已提交
282 283 284 285 286 287
    return false;
  } else {
    return true;
  }
}

J
jorlow@chromium.org 已提交
288 289 290
void DBIter::Next() {
  assert(valid_);

291 292
  // Release temporarily pinned blocks from last operation
  ReleaseTempPinnedData();
S
Stanislau Hlebik 已提交
293 294
  if (direction_ == kReverse) {
    FindNextUserKey();
J
jorlow@chromium.org 已提交
295 296 297
    direction_ = kForward;
    if (!iter_->Valid()) {
      iter_->SeekToFirst();
J
jorlow@chromium.org 已提交
298
    }
299 300 301 302 303 304 305
  } else if (iter_->Valid() && !current_entry_is_merged_) {
    // If the current value is not a merge, the iter position is the
    // current key, which is already returned. We can safely issue a
    // Next() without checking the current key.
    // If the current key is a merge, very likely iter already points
    // to the next internal position.
    iter_->Next();
306
    PERF_COUNTER_ADD(internal_key_skipped_count, 1);
J
jorlow@chromium.org 已提交
307
  }
J
jorlow@chromium.org 已提交
308

309 310 311
  if (statistics_ != nullptr) {
    local_stats_.next_count_++;
  }
312 313
  // Now we point to the next internal position, for both of merge and
  // not merge cases.
314 315 316 317
  if (!iter_->Valid()) {
    valid_ = false;
    return;
  }
318
  FindNextUserEntry(true /* skipping the current user key */, prefix_same_as_start_);
319 320 321 322
  if (statistics_ != nullptr && valid_) {
    local_stats_.next_found_count_++;
    local_stats_.bytes_read_ += (key().size() + value().size());
  }
J
jorlow@chromium.org 已提交
323 324
}

325 326 327 328 329 330 331 332
// PRE: saved_key_ has the current user key if skipping
// POST: saved_key_ should have the next user key if valid_,
//       if the current entry is a result of merge
//           current_entry_is_merged_ => true
//           saved_value_             => the merged value
//
// NOTE: In between, saved_key_ can point to a user key that has
//       a delete marker
333 334 335 336 337 338
//
// The prefix_check parameter controls whether we check the iterated
// keys against the prefix of the seeked key. Set to false when
// performing a seek without a key (e.g. SeekToFirst). Set to
// prefix_same_as_start_ for other iterations.
inline void DBIter::FindNextUserEntry(bool skipping, bool prefix_check) {
339
  PERF_TIMER_GUARD(find_next_user_entry_time);
340
  FindNextUserEntryInternal(skipping, prefix_check);
341 342 343
}

// Actual implementation of DBIter::FindNextUserEntry()
344
void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) {
J
jorlow@chromium.org 已提交
345 346 347
  // Loop until we hit an acceptable entry to yield
  assert(iter_->Valid());
  assert(direction_ == kForward);
348
  current_entry_is_merged_ = false;
349
  uint64_t num_skipped = 0;
J
jorlow@chromium.org 已提交
350
  do {
J
jorlow@chromium.org 已提交
351
    ParsedInternalKey ikey;
352 353 354

    if (ParseKey(&ikey)) {
      if (iterate_upper_bound_ != nullptr &&
355
          user_comparator_->Compare(ikey.user_key, *iterate_upper_bound_) >= 0) {
356 357 358
        break;
      }

359 360 361 362 363
      if (prefix_extractor_ && prefix_check &&
          prefix_extractor_->Transform(ikey.user_key).compare(prefix_start_key_) != 0) {
        break;
      }

364 365 366 367 368 369 370 371
      if (ikey.sequence <= sequence_) {
        if (skipping &&
           user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) <= 0) {
          num_skipped++;  // skip this entry
          PERF_COUNTER_ADD(internal_key_skipped_count, 1);
        } else {
          switch (ikey.type) {
            case kTypeDeletion:
A
Andres Noetzli 已提交
372
            case kTypeSingleDeletion:
373 374
              // Arrange to skip all upcoming entries for this key since
              // they are hidden by this deletion.
375 376 377
              saved_key_.SetKey(
                  ikey.user_key,
                  !iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
378 379 380 381 382 383
              skipping = true;
              num_skipped = 0;
              PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
              break;
            case kTypeValue:
              valid_ = true;
384 385 386
              saved_key_.SetKey(
                  ikey.user_key,
                  !iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
387 388 389
              return;
            case kTypeMerge:
              // By now, we are sure the current ikey is going to yield a value
390 391 392
              saved_key_.SetKey(
                  ikey.user_key,
                  !iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
393 394 395 396 397 398 399 400
              current_entry_is_merged_ = true;
              valid_ = true;
              MergeValuesNewToOld();  // Go to a different state machine
              return;
            default:
              assert(false);
              break;
          }
401
        }
J
jorlow@chromium.org 已提交
402
      }
J
jorlow@chromium.org 已提交
403
    }
404 405
    // If we have sequentially iterated via numerous keys and still not
    // found the next user-key, then it is better to seek so that we can
C
clark.kang 已提交
406
    // avoid too many key comparisons. We seek to the last occurrence of
407 408
    // our current key by looking for sequence number 0 and type deletion
    // (the smallest type).
409 410 411
    if (skipping && num_skipped > max_skip_) {
      num_skipped = 0;
      std::string last_key;
412
      AppendInternalKey(&last_key, ParsedInternalKey(saved_key_.GetKey(), 0,
413
                                                     kTypeDeletion));
414 415 416 417 418
      iter_->Seek(last_key);
      RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
    } else {
      iter_->Next();
    }
J
jorlow@chromium.org 已提交
419 420
  } while (iter_->Valid());
  valid_ = false;
J
jorlow@chromium.org 已提交
421 422
}

423 424 425 426 427 428 429
// Merge values of the same user key starting from the current iter_ position
// Scan from the newer entries to older entries.
// PRE: iter_->key() points to the first merge type entry
//      saved_key_ stores the user key
// POST: saved_value_ has the merged value for the user key
//       iter_ points to the next entry (or invalid)
void DBIter::MergeValuesNewToOld() {
430
  if (!merge_operator_) {
431 432
    Log(InfoLogLevel::ERROR_LEVEL,
        logger_, "Options::merge_operator is null.");
433
    status_ = Status::InvalidArgument("merge_operator_ must be set.");
434 435
    valid_ = false;
    return;
D
Deon Nicholas 已提交
436
  }
437

438 439
  // Temporarily pin the blocks that hold merge operands
  TempPinData();
440
  merge_context_.Clear();
441
  // Start the merge process by pushing the first operand
442 443
  merge_context_.PushOperand(iter_->value(),
                             iter_->IsValuePinned() /* operand_pinned */);
444 445 446 447 448 449 450 451

  ParsedInternalKey ikey;
  for (iter_->Next(); iter_->Valid(); iter_->Next()) {
    if (!ParseKey(&ikey)) {
      // skip corrupted key
      continue;
    }

452
    if (!user_comparator_->Equal(ikey.user_key, saved_key_.GetKey())) {
453 454
      // hit the next user key, stop right here
      break;
A
Andres Noetzli 已提交
455
    } else if (kTypeDeletion == ikey.type || kTypeSingleDeletion == ikey.type) {
456 457 458 459
      // hit a delete with the same user key, stop right here
      // iter_ is positioned after delete
      iter_->Next();
      break;
A
Andres Noetzli 已提交
460
    } else if (kTypeValue == ikey.type) {
461 462 463
      // hit a put, merge the put value with operands and store the
      // final result in saved_value_. We are done!
      // ignore corruption if there is any.
I
Igor Canadi 已提交
464
      const Slice val = iter_->value();
465 466
      MergeHelper::TimedFullMerge(merge_operator_, ikey.user_key, &val,
                                  merge_context_.GetOperands(), &saved_value_,
467
                                  logger_, statistics_, env_, &pinned_value_);
468 469 470
      // iter_ is positioned after put
      iter_->Next();
      return;
A
Andres Noetzli 已提交
471
    } else if (kTypeMerge == ikey.type) {
472 473
      // hit a merge, add the value as an operand and run associative merge.
      // when complete, add result to operands and continue.
474 475
      merge_context_.PushOperand(iter_->value(),
                                 iter_->IsValuePinned() /* operand_pinned */);
A
Andres Noetzli 已提交
476 477
    } else {
      assert(false);
478 479 480
    }
  }

481 482 483 484 485 486
  // we either exhausted all internal keys under this user key, or hit
  // a deletion marker.
  // feed null as the existing value to the merge operator, such that
  // client can differentiate this scenario and do things accordingly.
  MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(), nullptr,
                              merge_context_.GetOperands(), &saved_value_,
487
                              logger_, statistics_, env_, &pinned_value_);
488 489
}

J
jorlow@chromium.org 已提交
490 491
void DBIter::Prev() {
  assert(valid_);
492
  ReleaseTempPinnedData();
S
Stanislau Hlebik 已提交
493
  if (direction_ == kForward) {
494
    ReverseToBackward();
S
Stanislau Hlebik 已提交
495 496
  }
  PrevInternal();
M
Manuel Ung 已提交
497
  if (statistics_ != nullptr) {
498
    local_stats_.prev_count_++;
M
Manuel Ung 已提交
499
    if (valid_) {
500 501
      local_stats_.prev_found_count_++;
      local_stats_.bytes_read_ += (key().size() + value().size());
M
Manuel Ung 已提交
502 503
    }
  }
504 505
  if (valid_ && prefix_extractor_ && prefix_same_as_start_ &&
      prefix_extractor_->Transform(saved_key_.GetKey())
506
              .compare(prefix_start_key_) != 0) {
507 508
    valid_ = false;
  }
S
Stanislau Hlebik 已提交
509
}
J
jorlow@chromium.org 已提交
510

511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537
void DBIter::ReverseToBackward() {
  if (current_entry_is_merged_) {
    // Not placed in the same key. Need to call Prev() until finding the
    // previous key.
    if (!iter_->Valid()) {
      iter_->SeekToLast();
    }
    ParsedInternalKey ikey;
    FindParseableKey(&ikey, kReverse);
    while (iter_->Valid() &&
           user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) > 0) {
      iter_->Prev();
      FindParseableKey(&ikey, kReverse);
    }
  }
#ifndef NDEBUG
  if (iter_->Valid()) {
    ParsedInternalKey ikey;
    assert(ParseKey(&ikey));
    assert(user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) <= 0);
  }
#endif

  FindPrevUserKey();
  direction_ = kReverse;
}

S
Stanislau Hlebik 已提交
538 539 540 541
void DBIter::PrevInternal() {
  if (!iter_->Valid()) {
    valid_ = false;
    return;
542 543
  }

S
Stanislau Hlebik 已提交
544 545 546
  ParsedInternalKey ikey;

  while (iter_->Valid()) {
547
    saved_key_.SetKey(ExtractUserKey(iter_->key()),
548
                      !iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
S
Stanislau Hlebik 已提交
549 550
    if (FindValueForCurrentKey()) {
      valid_ = true;
J
jorlow@chromium.org 已提交
551 552 553
      if (!iter_->Valid()) {
        return;
      }
S
Stanislau Hlebik 已提交
554
      FindParseableKey(&ikey, kReverse);
555
      if (user_comparator_->Equal(ikey.user_key, saved_key_.GetKey())) {
S
Stanislau Hlebik 已提交
556
        FindPrevUserKey();
J
jorlow@chromium.org 已提交
557
      }
S
Stanislau Hlebik 已提交
558
      return;
J
jorlow@chromium.org 已提交
559
    }
S
Stanislau Hlebik 已提交
560 561 562 563
    if (!iter_->Valid()) {
      break;
    }
    FindParseableKey(&ikey, kReverse);
564
    if (user_comparator_->Equal(ikey.user_key, saved_key_.GetKey())) {
S
Stanislau Hlebik 已提交
565 566 567 568 569 570
      FindPrevUserKey();
    }
  }
  // We haven't found any key - iterator is not valid
  assert(!iter_->Valid());
  valid_ = false;
J
jorlow@chromium.org 已提交
571 572
}

S
Stanislau Hlebik 已提交
573
// This function checks, if the entry with biggest sequence_number <= sequence_
A
Andres Noetzli 已提交
574 575
// is non kTypeDeletion or kTypeSingleDeletion. If it's not, we save value in
// saved_value_
S
Stanislau Hlebik 已提交
576 577
bool DBIter::FindValueForCurrentKey() {
  assert(iter_->Valid());
578
  merge_context_.Clear();
579
  current_entry_is_merged_ = false;
A
Andres Noetzli 已提交
580 581
  // last entry before merge (could be kTypeDeletion, kTypeSingleDeletion or
  // kTypeValue)
S
Stanislau Hlebik 已提交
582 583
  ValueType last_not_merge_type = kTypeDeletion;
  ValueType last_key_entry_type = kTypeDeletion;
J
jorlow@chromium.org 已提交
584

S
Stanislau Hlebik 已提交
585 586 587
  ParsedInternalKey ikey;
  FindParseableKey(&ikey, kReverse);

588 589 590
  // Temporarily pin blocks that hold (merge operands / the value)
  ReleaseTempPinnedData();
  TempPinData();
S
Stanislau Hlebik 已提交
591 592
  size_t num_skipped = 0;
  while (iter_->Valid() && ikey.sequence <= sequence_ &&
593
         user_comparator_->Equal(ikey.user_key, saved_key_.GetKey())) {
S
Stanislau Hlebik 已提交
594 595 596 597 598 599 600 601
    // We iterate too much: let's use Seek() to avoid too much key comparisons
    if (num_skipped >= max_skip_) {
      return FindValueForCurrentKeyUsingSeek();
    }

    last_key_entry_type = ikey.type;
    switch (last_key_entry_type) {
      case kTypeValue:
602
        merge_context_.Clear();
603
        assert(iter_->IsValuePinned());
604
        pinned_value_ = iter_->value();
S
Stanislau Hlebik 已提交
605 606 607
        last_not_merge_type = kTypeValue;
        break;
      case kTypeDeletion:
A
Andres Noetzli 已提交
608
      case kTypeSingleDeletion:
609
        merge_context_.Clear();
A
Andres Noetzli 已提交
610
        last_not_merge_type = last_key_entry_type;
611
        PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
S
Stanislau Hlebik 已提交
612 613
        break;
      case kTypeMerge:
614
        assert(merge_operator_ != nullptr);
615 616
        merge_context_.PushOperandBack(
            iter_->value(), iter_->IsValuePinned() /* operand_pinned */);
S
Stanislau Hlebik 已提交
617 618 619 620 621
        break;
      default:
        assert(false);
    }

622
    PERF_COUNTER_ADD(internal_key_skipped_count, 1);
623
    assert(user_comparator_->Equal(ikey.user_key, saved_key_.GetKey()));
S
Stanislau Hlebik 已提交
624 625 626 627 628 629 630
    iter_->Prev();
    ++num_skipped;
    FindParseableKey(&ikey, kReverse);
  }

  switch (last_key_entry_type) {
    case kTypeDeletion:
A
Andres Noetzli 已提交
631
    case kTypeSingleDeletion:
S
Stanislau Hlebik 已提交
632 633 634
      valid_ = false;
      return false;
    case kTypeMerge:
635
      current_entry_is_merged_ = true;
S
Stanislau Hlebik 已提交
636
      if (last_not_merge_type == kTypeDeletion) {
637 638
        MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(),
                                    nullptr, merge_context_.GetOperands(),
639 640
                                    &saved_value_, logger_, statistics_, env_,
                                    &pinned_value_);
641
      } else {
S
Stanislau Hlebik 已提交
642
        assert(last_not_merge_type == kTypeValue);
643 644 645
        MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(),
                                    &pinned_value_,
                                    merge_context_.GetOperands(), &saved_value_,
646
                                    logger_, statistics_, env_, &pinned_value_);
647
      }
S
Stanislau Hlebik 已提交
648 649 650 651 652 653 654
      break;
    case kTypeValue:
      // do nothing - we've already has value in saved_value_
      break;
    default:
      assert(false);
      break;
J
jorlow@chromium.org 已提交
655
  }
S
Stanislau Hlebik 已提交
656 657 658
  valid_ = true;
  return true;
}
J
jorlow@chromium.org 已提交
659

S
Stanislau Hlebik 已提交
660 661 662
// This function is used in FindValueForCurrentKey.
// We use Seek() function instead of Prev() to find necessary value
bool DBIter::FindValueForCurrentKeyUsingSeek() {
663 664 665
  // FindValueForCurrentKey will enable pinning before calling
  // FindValueForCurrentKeyUsingSeek()
  assert(pinned_iters_mgr_.PinningEnabled());
S
Stanislau Hlebik 已提交
666 667 668 669 670 671 672 673 674 675
  std::string last_key;
  AppendInternalKey(&last_key, ParsedInternalKey(saved_key_.GetKey(), sequence_,
                                                 kValueTypeForSeek));
  iter_->Seek(last_key);
  RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);

  // assume there is at least one parseable key for this user key
  ParsedInternalKey ikey;
  FindParseableKey(&ikey, kForward);

A
Andres Noetzli 已提交
676 677
  if (ikey.type == kTypeValue || ikey.type == kTypeDeletion ||
      ikey.type == kTypeSingleDeletion) {
S
Stanislau Hlebik 已提交
678
    if (ikey.type == kTypeValue) {
679
      assert(iter_->IsValuePinned());
680
      pinned_value_ = iter_->value();
S
Stanislau Hlebik 已提交
681 682 683
      valid_ = true;
      return true;
    }
J
jorlow@chromium.org 已提交
684
    valid_ = false;
S
Stanislau Hlebik 已提交
685 686 687 688 689
    return false;
  }

  // kTypeMerge. We need to collect all kTypeMerge values and save them
  // in operands
690
  current_entry_is_merged_ = true;
691
  merge_context_.Clear();
S
Stanislau Hlebik 已提交
692
  while (iter_->Valid() &&
693
         user_comparator_->Equal(ikey.user_key, saved_key_.GetKey()) &&
S
Stanislau Hlebik 已提交
694
         ikey.type == kTypeMerge) {
695 696
    merge_context_.PushOperand(iter_->value(),
                               iter_->IsValuePinned() /* operand_pinned */);
S
Stanislau Hlebik 已提交
697 698 699 700 701
    iter_->Next();
    FindParseableKey(&ikey, kForward);
  }

  if (!iter_->Valid() ||
702
      !user_comparator_->Equal(ikey.user_key, saved_key_.GetKey()) ||
A
Andres Noetzli 已提交
703
      ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion) {
704 705
    MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(), nullptr,
                                merge_context_.GetOperands(), &saved_value_,
706
                                logger_, statistics_, env_, &pinned_value_);
S
Stanislau Hlebik 已提交
707 708
    // Make iter_ valid and point to saved_key_
    if (!iter_->Valid() ||
709
        !user_comparator_->Equal(ikey.user_key, saved_key_.GetKey())) {
S
Stanislau Hlebik 已提交
710 711 712
      iter_->Seek(last_key);
      RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
    }
J
jorlow@chromium.org 已提交
713
    valid_ = true;
S
Stanislau Hlebik 已提交
714 715 716
    return true;
  }

I
Igor Canadi 已提交
717
  const Slice& val = iter_->value();
718 719
  MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(), &val,
                              merge_context_.GetOperands(), &saved_value_,
720
                              logger_, statistics_, env_, &pinned_value_);
S
Stanislau Hlebik 已提交
721 722 723 724 725 726 727 728 729 730 731 732 733 734 735
  valid_ = true;
  return true;
}

// Used in Next to change directions
// Go to next user key
// Don't use Seek(),
// because next user key will be very close
void DBIter::FindNextUserKey() {
  if (!iter_->Valid()) {
    return;
  }
  ParsedInternalKey ikey;
  FindParseableKey(&ikey, kForward);
  while (iter_->Valid() &&
736
         !user_comparator_->Equal(ikey.user_key, saved_key_.GetKey())) {
S
Stanislau Hlebik 已提交
737 738 739 740 741 742 743 744 745 746 747 748 749
    iter_->Next();
    FindParseableKey(&ikey, kForward);
  }
}

// Go to previous user_key
void DBIter::FindPrevUserKey() {
  if (!iter_->Valid()) {
    return;
  }
  size_t num_skipped = 0;
  ParsedInternalKey ikey;
  FindParseableKey(&ikey, kReverse);
750 751 752 753 754 755 756 757 758 759 760 761 762 763 764
  int cmp;
  while (iter_->Valid() && ((cmp = user_comparator_->Compare(
                                 ikey.user_key, saved_key_.GetKey())) == 0 ||
                            (cmp > 0 && ikey.sequence > sequence_))) {
    if (cmp == 0) {
      if (num_skipped >= max_skip_) {
        num_skipped = 0;
        IterKey last_key;
        last_key.SetInternalKey(ParsedInternalKey(
            saved_key_.GetKey(), kMaxSequenceNumber, kValueTypeForSeek));
        iter_->Seek(last_key.GetKey());
        RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
      } else {
        ++num_skipped;
      }
S
Stanislau Hlebik 已提交
765 766 767 768 769 770 771 772 773 774 775 776 777 778
    }
    iter_->Prev();
    FindParseableKey(&ikey, kReverse);
  }
}

// Skip all unparseable keys
void DBIter::FindParseableKey(ParsedInternalKey* ikey, Direction direction) {
  while (iter_->Valid() && !ParseKey(ikey)) {
    if (direction == kReverse) {
      iter_->Prev();
    } else {
      iter_->Next();
    }
J
jorlow@chromium.org 已提交
779 780
  }
}
J
jorlow@chromium.org 已提交
781

J
jorlow@chromium.org 已提交
782
void DBIter::Seek(const Slice& target) {
L
Lei Jin 已提交
783
  StopWatch sw(env_, statistics_, DB_SEEK);
784
  ReleaseTempPinnedData();
785 786 787
  saved_key_.Clear();
  // now savved_key is used to store internal key.
  saved_key_.SetInternalKey(target, sequence_);
788 789 790 791 792 793

  {
    PERF_TIMER_GUARD(seek_internal_seek_time);
    iter_->Seek(saved_key_.GetKey());
  }

M
Manuel Ung 已提交
794
  RecordTick(statistics_, NUMBER_DB_SEEK);
J
jorlow@chromium.org 已提交
795
  if (iter_->Valid()) {
796 797 798
    if (prefix_extractor_ && prefix_same_as_start_) {
      prefix_start_key_ = prefix_extractor_->Transform(target);
    }
799 800
    direction_ = kForward;
    ClearSavedValue();
801 802 803 804
    FindNextUserEntry(false /* not skipping */, prefix_same_as_start_);
    if (!valid_) {
      prefix_start_key_.clear();
    }
M
Manuel Ung 已提交
805 806 807 808 809 810
    if (statistics_ != nullptr) {
      if (valid_) {
        RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
        RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
      }
    }
J
jorlow@chromium.org 已提交
811 812 813
  } else {
    valid_ = false;
  }
814
  if (valid_ && prefix_extractor_ && prefix_same_as_start_) {
815 816
    prefix_start_buf_.SetKey(prefix_start_key_);
    prefix_start_key_ = prefix_start_buf_.GetKey();
817
  }
J
jorlow@chromium.org 已提交
818
}
J
jorlow@chromium.org 已提交
819

J
jorlow@chromium.org 已提交
820
void DBIter::SeekToFirst() {
S
Stanislau Hlebik 已提交
821
  // Don't use iter_::Seek() if we set a prefix extractor
822
  // because prefix seek will be used.
823
  if (prefix_extractor_ != nullptr) {
S
Stanislau Hlebik 已提交
824 825
    max_skip_ = std::numeric_limits<uint64_t>::max();
  }
J
jorlow@chromium.org 已提交
826
  direction_ = kForward;
827
  ReleaseTempPinnedData();
J
jorlow@chromium.org 已提交
828
  ClearSavedValue();
829 830 831 832 833 834

  {
    PERF_TIMER_GUARD(seek_internal_seek_time);
    iter_->SeekToFirst();
  }

M
Manuel Ung 已提交
835
  RecordTick(statistics_, NUMBER_DB_SEEK);
J
jorlow@chromium.org 已提交
836
  if (iter_->Valid()) {
837
    FindNextUserEntry(false /* not skipping */, false /* no prefix check */);
M
Manuel Ung 已提交
838 839 840 841 842 843
    if (statistics_ != nullptr) {
      if (valid_) {
        RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
        RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
      }
    }
J
jorlow@chromium.org 已提交
844 845
  } else {
    valid_ = false;
J
jorlow@chromium.org 已提交
846
  }
847
  if (valid_ && prefix_extractor_ && prefix_same_as_start_) {
848 849
    prefix_start_buf_.SetKey(prefix_extractor_->Transform(saved_key_.GetKey()));
    prefix_start_key_ = prefix_start_buf_.GetKey();
850
  }
J
jorlow@chromium.org 已提交
851 852
}

J
jorlow@chromium.org 已提交
853
void DBIter::SeekToLast() {
S
Stanislau Hlebik 已提交
854
  // Don't use iter_::Seek() if we set a prefix extractor
855
  // because prefix seek will be used.
856
  if (prefix_extractor_ != nullptr) {
S
Stanislau Hlebik 已提交
857 858
    max_skip_ = std::numeric_limits<uint64_t>::max();
  }
J
jorlow@chromium.org 已提交
859
  direction_ = kReverse;
860
  ReleaseTempPinnedData();
J
jorlow@chromium.org 已提交
861
  ClearSavedValue();
862 863 864 865 866

  {
    PERF_TIMER_GUARD(seek_internal_seek_time);
    iter_->SeekToLast();
  }
867 868 869 870
  // When the iterate_upper_bound is set to a value,
  // it will seek to the last key before the
  // ReadOptions.iterate_upper_bound
  if (iter_->Valid() && iterate_upper_bound_ != nullptr) {
871
    saved_key_.SetKey(*iterate_upper_bound_, false /* copy */);
872 873 874 875 876 877
    std::string last_key;
    AppendInternalKey(&last_key,
                      ParsedInternalKey(saved_key_.GetKey(), kMaxSequenceNumber,
                                        kValueTypeForSeek));

    iter_->Seek(last_key);
S
Stanislau Hlebik 已提交
878

879 880 881 882 883 884 885 886 887 888
    if (!iter_->Valid()) {
      iter_->SeekToLast();
    } else {
      iter_->Prev();
      if (!iter_->Valid()) {
        valid_ = false;
        return;
      }
    }
  }
S
Stanislau Hlebik 已提交
889
  PrevInternal();
M
Manuel Ung 已提交
890 891 892 893 894 895 896
  if (statistics_ != nullptr) {
    RecordTick(statistics_, NUMBER_DB_SEEK);
    if (valid_) {
      RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
      RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
    }
  }
897
  if (valid_ && prefix_extractor_ && prefix_same_as_start_) {
898 899
    prefix_start_buf_.SetKey(prefix_extractor_->Transform(saved_key_.GetKey()));
    prefix_start_key_ = prefix_start_buf_.GetKey();
900
  }
J
jorlow@chromium.org 已提交
901 902
}

903
Iterator* NewDBIterator(Env* env, const ImmutableCFOptions& ioptions,
904
                        const Comparator* user_key_comparator,
S
sdong 已提交
905
                        InternalIterator* internal_iter,
906
                        const SequenceNumber& sequence,
907
                        uint64_t max_sequential_skip_in_iterations,
908
                        uint64_t version_number,
909
                        const Slice* iterate_upper_bound,
910 911 912
                        bool prefix_same_as_start, bool pin_data) {
  DBIter* db_iter =
      new DBIter(env, ioptions, user_key_comparator, internal_iter, sequence,
913
                 false, max_sequential_skip_in_iterations, version_number,
914
                 iterate_upper_bound, prefix_same_as_start, pin_data);
915
  return db_iter;
916 917
}

I
Igor Canadi 已提交
918
ArenaWrappedDBIter::~ArenaWrappedDBIter() { db_iter_->~DBIter(); }
919 920 921

void ArenaWrappedDBIter::SetDBIter(DBIter* iter) { db_iter_ = iter; }

S
sdong 已提交
922
void ArenaWrappedDBIter::SetIterUnderDBIter(InternalIterator* iter) {
923 924 925 926 927 928 929 930 931 932 933 934 935 936
  static_cast<DBIter*>(db_iter_)->SetIter(iter);
}

inline bool ArenaWrappedDBIter::Valid() const { return db_iter_->Valid(); }
inline void ArenaWrappedDBIter::SeekToFirst() { db_iter_->SeekToFirst(); }
inline void ArenaWrappedDBIter::SeekToLast() { db_iter_->SeekToLast(); }
inline void ArenaWrappedDBIter::Seek(const Slice& target) {
  db_iter_->Seek(target);
}
inline void ArenaWrappedDBIter::Next() { db_iter_->Next(); }
inline void ArenaWrappedDBIter::Prev() { db_iter_->Prev(); }
inline Slice ArenaWrappedDBIter::key() const { return db_iter_->key(); }
inline Slice ArenaWrappedDBIter::value() const { return db_iter_->value(); }
inline Status ArenaWrappedDBIter::status() const { return db_iter_->status(); }
937 938 939 940
inline Status ArenaWrappedDBIter::GetProperty(std::string prop_name,
                                              std::string* prop) {
  return db_iter_->GetProperty(prop_name, prop);
}
941 942 943 944
void ArenaWrappedDBIter::RegisterCleanup(CleanupFunction function, void* arg1,
                                         void* arg2) {
  db_iter_->RegisterCleanup(function, arg1, arg2);
}
J
jorlow@chromium.org 已提交
945

946
ArenaWrappedDBIter* NewArenaWrappedDbIterator(
947
    Env* env, const ImmutableCFOptions& ioptions,
948
    const Comparator* user_key_comparator, const SequenceNumber& sequence,
949
    uint64_t max_sequential_skip_in_iterations, uint64_t version_number,
950 951
    const Slice* iterate_upper_bound, bool prefix_same_as_start,
    bool pin_data) {
952 953 954
  ArenaWrappedDBIter* iter = new ArenaWrappedDBIter();
  Arena* arena = iter->GetArena();
  auto mem = arena->AllocateAligned(sizeof(DBIter));
955 956
  DBIter* db_iter =
      new (mem) DBIter(env, ioptions, user_key_comparator, nullptr, sequence,
957
                       true, max_sequential_skip_in_iterations, version_number,
958
                       iterate_upper_bound, prefix_same_as_start, pin_data);
959

960
  iter->SetDBIter(db_iter);
961

962
  return iter;
J
jorlow@chromium.org 已提交
963 964
}

965
}  // namespace rocksdb