db_iter.cc 50.5 KB
Newer Older
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
S
Siying Dong 已提交
2 3 4
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).
5
//
J
jorlow@chromium.org 已提交
6 7 8 9 10
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include "db/db_iter.h"
S
Stanislau Hlebik 已提交
11
#include <string>
12
#include <iostream>
S
Stanislau Hlebik 已提交
13
#include <limits>
J
jorlow@chromium.org 已提交
14 15

#include "db/dbformat.h"
16
#include "db/merge_context.h"
17
#include "db/merge_helper.h"
18
#include "db/pinned_iterators_manager.h"
19
#include "file/filename.h"
20
#include "logging/logging.h"
21
#include "memory/arena.h"
22
#include "monitoring/perf_context_imp.h"
23 24 25
#include "rocksdb/env.h"
#include "rocksdb/iterator.h"
#include "rocksdb/merge_operator.h"
26
#include "rocksdb/options.h"
S
sdong 已提交
27
#include "table/internal_iterator.h"
28
#include "table/iterator_wrapper.h"
29
#include "trace_replay/trace_replay.h"
J
jorlow@chromium.org 已提交
30
#include "util/mutexlock.h"
31
#include "util/string_util.h"
32
#include "util/user_comparator_wrapper.h"
J
jorlow@chromium.org 已提交
33

34
namespace ROCKSDB_NAMESPACE {
J
jorlow@chromium.org 已提交
35

36
DBIter::DBIter(Env* _env, const ReadOptions& read_options,
37 38
               const ImmutableCFOptions& cf_options,
               const MutableCFOptions& mutable_cf_options,
L
Levi Tamasi 已提交
39 40 41
               const Comparator* cmp, InternalIterator* iter,
               const Version* version, SequenceNumber s, bool arena_mode,
               uint64_t max_sequential_skip_in_iterations,
42
               ReadCallback* read_callback, DBImpl* db_impl,
L
Levi Tamasi 已提交
43
               ColumnFamilyData* cfd, bool expose_blob_index)
44 45
    : prefix_extractor_(mutable_cf_options.prefix_extractor.get()),
      env_(_env),
46 47 48 49
      logger_(cf_options.info_log),
      user_comparator_(cmp),
      merge_operator_(cf_options.merge_operator),
      iter_(iter),
L
Levi Tamasi 已提交
50
      version_(version),
51 52 53
      read_callback_(read_callback),
      sequence_(s),
      statistics_(cf_options.statistics),
Y
Yanqin Jin 已提交
54 55
      max_skip_(max_sequential_skip_in_iterations),
      max_skippable_internal_keys_(read_options.max_skippable_internal_keys),
56 57 58 59 60 61 62
      num_internal_keys_skipped_(0),
      iterate_lower_bound_(read_options.iterate_lower_bound),
      iterate_upper_bound_(read_options.iterate_upper_bound),
      direction_(kForward),
      valid_(false),
      current_entry_is_merged_(false),
      is_key_seqnum_zero_(false),
63 64 65
      prefix_same_as_start_(mutable_cf_options.prefix_extractor
                                ? read_options.prefix_same_as_start
                                : false),
66
      pin_thru_lifetime_(read_options.pin_data),
S
sdong 已提交
67 68 69
      expect_total_order_inner_iter_(prefix_extractor_ == nullptr ||
                                     read_options.total_order_seek ||
                                     read_options.auto_prefix_mode),
L
Levi Tamasi 已提交
70 71 72
      read_tier_(read_options.read_tier),
      verify_checksums_(read_options.verify_checksums),
      expose_blob_index_(expose_blob_index),
73 74 75 76 77
      is_blob_(false),
      arena_mode_(arena_mode),
      range_del_agg_(&cf_options.internal_comparator, s),
      db_impl_(db_impl),
      cfd_(cfd),
Y
Yanqin Jin 已提交
78 79
      start_seqnum_(read_options.iter_start_seqnum),
      timestamp_ub_(read_options.timestamp),
80
      timestamp_lb_(read_options.iter_start_ts),
Y
Yanqin Jin 已提交
81
      timestamp_size_(timestamp_ub_ ? timestamp_ub_->size() : 0) {
82 83 84 85 86
  RecordTick(statistics_, NO_ITERATOR_CREATED);
  if (pin_thru_lifetime_) {
    pinned_iters_mgr_.StartPinning();
  }
  if (iter_.iter()) {
87
    iter_.iter()->SetPinnedItersMgr(&pinned_iters_mgr_);
J
jorlow@chromium.org 已提交
88
  }
Y
Yanqin Jin 已提交
89
  assert(timestamp_size_ == user_comparator_.timestamp_size());
90
}
91

92 93 94
Status DBIter::GetProperty(std::string prop_name, std::string* prop) {
  if (prop == nullptr) {
    return Status::InvalidArgument("prop is nullptr");
95
  }
96 97 98 99
  if (prop_name == "rocksdb.iterator.super-version-number") {
    // First try to pass the value returned from inner iterator.
    return iter_.iter()->GetProperty(prop_name, prop);
  } else if (prop_name == "rocksdb.iterator.is-key-pinned") {
100
    if (valid_) {
101 102 103
      *prop = (pin_thru_lifetime_ && saved_key_.IsKeyPinned()) ? "1" : "0";
    } else {
      *prop = "Iterator is not valid.";
104
    }
105 106 107 108
    return Status::OK();
  } else if (prop_name == "rocksdb.iterator.internal-key") {
    *prop = saved_key_.GetUserKey().ToString();
    return Status::OK();
109
  }
110 111
  return Status::InvalidArgument("Unidentified property.");
}
112

113
bool DBIter::ParseKey(ParsedInternalKey* ikey) {
114 115 116 117
  Status s =
      ParseInternalKey(iter_.key(), ikey, false /* log_err_key */);  // TODO
  if (!s.ok()) {
    status_ = Status::Corruption("In DBIter: ", s.getState());
118
    valid_ = false;
119
    ROCKS_LOG_ERROR(logger_, "In DBIter: %s", status_.getState());
J
jorlow@chromium.org 已提交
120 121 122 123 124 125
    return false;
  } else {
    return true;
  }
}

J
jorlow@chromium.org 已提交
126 127
void DBIter::Next() {
  assert(valid_);
128
  assert(status_.ok());
J
jorlow@chromium.org 已提交
129

130
  PERF_CPU_TIMER_GUARD(iter_next_cpu_nanos, env_);
131 132
  // Release temporarily pinned blocks from last operation
  ReleaseTempPinnedData();
133 134 135
  local_stats_.skip_count_ += num_internal_keys_skipped_;
  local_stats_.skip_count_--;
  num_internal_keys_skipped_ = 0;
136
  bool ok = true;
S
Stanislau Hlebik 已提交
137
  if (direction_ == kReverse) {
138
    is_key_seqnum_zero_ = false;
139 140 141
    if (!ReverseToForward()) {
      ok = false;
    }
142
  } else if (!current_entry_is_merged_) {
143 144 145 146 147
    // If the current value is not a merge, the iter position is the
    // current key, which is already returned. We can safely issue a
    // Next() without checking the current key.
    // If the current key is a merge, very likely iter already points
    // to the next internal position.
148 149
    assert(iter_.Valid());
    iter_.Next();
150
    PERF_COUNTER_ADD(internal_key_skipped_count, 1);
J
jorlow@chromium.org 已提交
151
  }
J
jorlow@chromium.org 已提交
152

153
  local_stats_.next_count_++;
154
  if (ok && iter_.Valid()) {
155 156
    if (prefix_same_as_start_) {
      assert(prefix_extractor_ != nullptr);
157 158 159 160
      const Slice prefix = prefix_.GetUserKey();
      FindNextUserEntry(true /* skipping the current user key */, &prefix);
    } else {
      FindNextUserEntry(true /* skipping the current user key */, nullptr);
161
    }
162
  } else {
163
    is_key_seqnum_zero_ = false;
164 165
    valid_ = false;
  }
166 167 168 169
  if (statistics_ != nullptr && valid_) {
    local_stats_.next_found_count_++;
    local_stats_.bytes_read_ += (key().size() + value().size());
  }
J
jorlow@chromium.org 已提交
170 171
}

L
Levi Tamasi 已提交
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205
bool DBIter::SetBlobValueIfNeeded(const Slice& user_key,
                                  const Slice& blob_index) {
  assert(!is_blob_);

  if (expose_blob_index_) {  // Stacked BlobDB implementation
    is_blob_ = true;
    return true;
  }

  if (!version_) {
    status_ = Status::Corruption("Encountered unexpected blob index.");
    valid_ = false;
    return false;
  }

  // TODO: consider moving ReadOptions from ArenaWrappedDBIter to DBIter to
  // avoid having to copy options back and forth.
  ReadOptions read_options;
  read_options.read_tier = read_tier_;
  read_options.verify_checksums = verify_checksums_;

  const Status s =
      version_->GetBlob(read_options, user_key, blob_index, &blob_value_);

  if (!s.ok()) {
    status_ = s;
    valid_ = false;
    return false;
  }

  is_blob_ = true;
  return true;
}

206
// PRE: saved_key_ has the current user key if skipping_saved_key
207 208 209 210 211 212
// POST: saved_key_ should have the next user key if valid_,
//       if the current entry is a result of merge
//           current_entry_is_merged_ => true
//           saved_value_             => the merged value
//
// NOTE: In between, saved_key_ can point to a user key that has
213
//       a delete marker or a sequence number higher than sequence_
214
//       saved_key_ MUST have a proper user_key before calling this function
215
//
Y
Yanqin Jin 已提交
216
// The prefix parameter, if not null, indicates that we need to iterate
217 218 219
// within the prefix, and the iterator needs to be made invalid, if no
// more entry for the prefix can be found.
bool DBIter::FindNextUserEntry(bool skipping_saved_key, const Slice* prefix) {
220
  PERF_TIMER_GUARD(find_next_user_entry_time);
221
  return FindNextUserEntryInternal(skipping_saved_key, prefix);
222 223 224
}

// Actual implementation of DBIter::FindNextUserEntry()
225 226
bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key,
                                       const Slice* prefix) {
J
jorlow@chromium.org 已提交
227
  // Loop until we hit an acceptable entry to yield
228
  assert(iter_.Valid());
229
  assert(status_.ok());
J
jorlow@chromium.org 已提交
230
  assert(direction_ == kForward);
231
  current_entry_is_merged_ = false;
232 233 234

  // How many times in a row we have skipped an entry with user key less than
  // or equal to saved_key_. We could skip these entries either because
235
  // sequence numbers were too high or because skipping_saved_key = true.
236
  // What saved_key_ contains throughout this method:
Y
Yanqin Jin 已提交
237 238 239 240 241 242 243 244
  //  - if skipping_saved_key : saved_key_ contains the key that we need
  //                            to skip, and we haven't seen any keys greater
  //                            than that,
  //  - if num_skipped > 0    : saved_key_ contains the key that we have skipped
  //                            num_skipped times, and we haven't seen any keys
  //                            greater than that,
  //  - none of the above     : saved_key_ can contain anything, it doesn't
  //                            matter.
245
  uint64_t num_skipped = 0;
246 247 248 249 250
  // For write unprepared, the target sequence number in reseek could be larger
  // than the snapshot, and thus needs to be skipped again. This could result in
  // an infinite loop of reseeks. To avoid that, we limit the number of reseeks
  // to one.
  bool reseek_done = false;
251

Y
Yi Wu 已提交
252 253
  is_blob_ = false;

J
jorlow@chromium.org 已提交
254
  do {
255 256 257
    // Will update is_key_seqnum_zero_ as soon as we parsed the current key
    // but we need to save the previous value to be used in the loop.
    bool is_prev_key_seqnum_zero = is_key_seqnum_zero_;
258
    if (!ParseKey(&ikey_)) {
259
      is_key_seqnum_zero_ = false;
260
      return false;
261
    }
262 263
    Slice user_key_without_ts =
        StripTimestampFromUserKey(ikey_.user_key, timestamp_size_);
264

265 266
    is_key_seqnum_zero_ = (ikey_.sequence == 0);

267 268
    assert(iterate_upper_bound_ == nullptr ||
           iter_.UpperBoundCheckResult() != IterBoundCheck::kInbound ||
Y
Yanqin Jin 已提交
269
           user_comparator_.CompareWithoutTimestamp(
270
               user_key_without_ts, /*a_has_ts=*/false, *iterate_upper_bound_,
Y
Yanqin Jin 已提交
271
               /*b_has_ts=*/false) < 0);
272 273
    if (iterate_upper_bound_ != nullptr &&
        iter_.UpperBoundCheckResult() != IterBoundCheck::kInbound &&
Y
Yanqin Jin 已提交
274
        user_comparator_.CompareWithoutTimestamp(
275
            user_key_without_ts, /*a_has_ts=*/false, *iterate_upper_bound_,
Y
Yanqin Jin 已提交
276
            /*b_has_ts=*/false) >= 0) {
277 278
      break;
    }
279

280 281
    assert(prefix == nullptr || prefix_extractor_ != nullptr);
    if (prefix != nullptr &&
282 283
        prefix_extractor_->Transform(user_key_without_ts).compare(*prefix) !=
            0) {
284
      assert(prefix_same_as_start_);
285 286 287
      break;
    }

288
    if (TooManyInternalKeysSkipped()) {
289
      return false;
290 291
    }

Y
Yanqin Jin 已提交
292
    assert(ikey_.user_key.size() >= timestamp_size_);
293 294 295
    Slice ts = timestamp_size_ > 0 ? ExtractTimestampFromUserKey(
                                         ikey_.user_key, timestamp_size_)
                                   : Slice();
296 297
    bool more_recent = false;
    if (IsVisible(ikey_.sequence, ts, &more_recent)) {
298 299 300 301
      // If the previous entry is of seqnum 0, the current entry will not
      // possibly be skipped. This condition can potentially be relaxed to
      // prev_key.seq <= ikey_.sequence. We are cautious because it will be more
      // prone to bugs causing the same user key with the same sequence number.
302 303
      // Note that with current timestamp implementation, the same user key can
      // have different timestamps and zero sequence number on the bottommost
Y
Yanqin Jin 已提交
304
      // level. This may change in the future.
305 306
      if ((!is_prev_key_seqnum_zero || timestamp_size_ > 0) &&
          skipping_saved_key &&
307
          CompareKeyForSkip(ikey_.user_key, saved_key_.GetUserKey()) <= 0) {
308 309 310
        num_skipped++;  // skip this entry
        PERF_COUNTER_ADD(internal_key_skipped_count, 1);
      } else {
311
        assert(!skipping_saved_key ||
312
               CompareKeyForSkip(ikey_.user_key, saved_key_.GetUserKey()) > 0);
313 314 315 316 317
        if (!iter_.PrepareValue()) {
          assert(!iter_.status().ok());
          valid_ = false;
          return false;
        }
318
        num_skipped = 0;
319
        reseek_done = false;
320
        switch (ikey_.type) {
321
          case kTypeDeletion:
Y
Yanqin Jin 已提交
322
          case kTypeDeletionWithTimestamp:
323 324 325
          case kTypeSingleDeletion:
            // Arrange to skip all upcoming entries for this key since
            // they are hidden by this deletion.
326 327 328
            // if iterartor specified start_seqnum we
            // 1) return internal key, including the type
            // 2) return ikey only if ikey.seqnum >= start_seqnum_
329
            // note that if deletion seqnum is < start_seqnum_ we
330
            // just skip it like in normal iterator.
331 332 333 334 335 336 337 338 339 340 341 342 343 344
            if (start_seqnum_ > 0) {
              if (ikey_.sequence >= start_seqnum_) {
                saved_key_.SetInternalKey(ikey_);
                valid_ = true;
                return true;
              } else {
                saved_key_.SetUserKey(
                    ikey_.user_key,
                    !pin_thru_lifetime_ ||
                        !iter_.iter()->IsKeyPinned() /* copy */);
                skipping_saved_key = true;
                PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
              }
            } else if (timestamp_lb_) {
345
              saved_key_.SetInternalKey(ikey_);
346 347
              valid_ = true;
              return true;
348 349
            } else {
              saved_key_.SetUserKey(
350 351
                  ikey_.user_key, !pin_thru_lifetime_ ||
                                      !iter_.iter()->IsKeyPinned() /* copy */);
352
              skipping_saved_key = true;
353
              PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
354
            }
355 356
            break;
          case kTypeValue:
Y
Yi Wu 已提交
357
          case kTypeBlobIndex:
358 359 360
            if (start_seqnum_ > 0) {
              if (ikey_.sequence >= start_seqnum_) {
                saved_key_.SetInternalKey(ikey_);
L
Levi Tamasi 已提交
361 362 363 364 365 366 367

                if (ikey_.type == kTypeBlobIndex) {
                  if (!SetBlobValueIfNeeded(ikey_.user_key, iter_.value())) {
                    return false;
                  }
                }

Y
Yi Wu 已提交
368
                valid_ = true;
369
                return true;
370 371
              } else {
                // this key and all previous versions shouldn't be included,
372
                // skipping_saved_key
373 374 375 376
                saved_key_.SetUserKey(
                    ikey_.user_key,
                    !pin_thru_lifetime_ ||
                        !iter_.iter()->IsKeyPinned() /* copy */);
377
                skipping_saved_key = true;
Y
Yi Wu 已提交
378
              }
379 380
            } else if (timestamp_lb_) {
              saved_key_.SetInternalKey(ikey_);
L
Levi Tamasi 已提交
381 382 383 384 385 386 387

              if (ikey_.type == kTypeBlobIndex) {
                if (!SetBlobValueIfNeeded(ikey_.user_key, iter_.value())) {
                  return false;
                }
              }

388 389
              valid_ = true;
              return true;
390
            } else {
391
              saved_key_.SetUserKey(
392 393
                  ikey_.user_key, !pin_thru_lifetime_ ||
                                      !iter_.iter()->IsKeyPinned() /* copy */);
394
              if (range_del_agg_.ShouldDelete(
395
                      ikey_, RangeDelPositioningMode::kForwardTraversal)) {
396 397
                // Arrange to skip all upcoming entries for this key since
                // they are hidden by this deletion.
398
                skipping_saved_key = true;
399
                num_skipped = 0;
400
                reseek_done = false;
401
                PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
L
Levi Tamasi 已提交
402 403 404 405 406
              } else {
                if (ikey_.type == kTypeBlobIndex) {
                  if (!SetBlobValueIfNeeded(ikey_.user_key, iter_.value())) {
                    return false;
                  }
407
                }
408

409
                valid_ = true;
410
                return true;
411
              }
412 413 414
            }
            break;
          case kTypeMerge:
415
            saved_key_.SetUserKey(
416
                ikey_.user_key,
417
                !pin_thru_lifetime_ || !iter_.iter()->IsKeyPinned() /* copy */);
418
            if (range_del_agg_.ShouldDelete(
419
                    ikey_, RangeDelPositioningMode::kForwardTraversal)) {
420 421
              // Arrange to skip all upcoming entries for this key since
              // they are hidden by this deletion.
422
              skipping_saved_key = true;
423
              num_skipped = 0;
424
              reseek_done = false;
425 426 427 428 429 430
              PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
            } else {
              // By now, we are sure the current ikey is going to yield a
              // value
              current_entry_is_merged_ = true;
              valid_ = true;
431
              return MergeValuesNewToOld();  // Go to a different state machine
432 433 434
            }
            break;
          default:
435 436 437 438 439
            valid_ = false;
            status_ = Status::Corruption(
                "Unknown value type: " +
                std::to_string(static_cast<unsigned int>(ikey_.type)));
            return false;
440
        }
J
jorlow@chromium.org 已提交
441
      }
442
    } else {
443 444 445
      if (more_recent) {
        PERF_COUNTER_ADD(internal_recent_skipped_count, 1);
      }
446

447 448 449
      // This key was inserted after our snapshot was taken or skipped by
      // timestamp range. If this happens too many times in a row for the same
      // user key, we want to seek to the target sequence number.
Y
Yanqin Jin 已提交
450 451
      int cmp = user_comparator_.CompareWithoutTimestamp(
          ikey_.user_key, saved_key_.GetUserKey());
Y
Yanqin Jin 已提交
452
      if (cmp == 0 || (skipping_saved_key && cmp < 0)) {
453 454
        num_skipped++;
      } else {
455
        saved_key_.SetUserKey(
456
            ikey_.user_key,
457
            !iter_.iter()->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
458
        skipping_saved_key = false;
459
        num_skipped = 0;
460
        reseek_done = false;
461
      }
J
jorlow@chromium.org 已提交
462
    }
463 464 465

    // If we have sequentially iterated via numerous equal keys, then it's
    // better to seek so that we can avoid too many key comparisons.
466 467 468 469 470
    //
    // To avoid infinite loops, do not reseek if we have already attempted to
    // reseek previously.
    //
    // TODO(lth): If we reseek to sequence number greater than ikey_.sequence,
Y
Yanqin Jin 已提交
471
    // then it does not make sense to reseek as we would actually land further
472 473
    // away from the desired key. There is opportunity for optimization here.
    if (num_skipped > max_skip_ && !reseek_done) {
474
      is_key_seqnum_zero_ = false;
475
      num_skipped = 0;
476
      reseek_done = true;
477
      std::string last_key;
478
      if (skipping_saved_key) {
479 480 481
        // We're looking for the next user-key but all we see are the same
        // user-key with decreasing sequence numbers. Fast forward to
        // sequence number 0 and type deletion (the smallest type).
Y
Yanqin Jin 已提交
482 483 484 485 486
        if (timestamp_size_ == 0) {
          AppendInternalKey(
              &last_key,
              ParsedInternalKey(saved_key_.GetUserKey(), 0, kTypeDeletion));
        } else {
487
          const std::string kTsMin(timestamp_size_, static_cast<char>(0));
Y
Yanqin Jin 已提交
488 489 490
          AppendInternalKeyWithDifferentTimestamp(
              &last_key,
              ParsedInternalKey(saved_key_.GetUserKey(), 0, kTypeDeletion),
491
              kTsMin);
Y
Yanqin Jin 已提交
492
        }
493 494
        // Don't set skipping_saved_key = false because we may still see more
        // user-keys equal to saved_key_.
495 496 497 498 499 500
      } else {
        // We saw multiple entries with this user key and sequence numbers
        // higher than sequence_. Fast forward to sequence_.
        // Note that this only covers a case when a higher key was overwritten
        // many times since our snapshot was taken, not the case when a lot of
        // different keys were inserted after our snapshot was taken.
Y
Yanqin Jin 已提交
501 502 503 504 505 506 507 508 509 510 511
        if (timestamp_size_ == 0) {
          AppendInternalKey(
              &last_key, ParsedInternalKey(saved_key_.GetUserKey(), sequence_,
                                           kValueTypeForSeek));
        } else {
          AppendInternalKeyWithDifferentTimestamp(
              &last_key,
              ParsedInternalKey(saved_key_.GetUserKey(), sequence_,
                                kValueTypeForSeek),
              *timestamp_ub_);
        }
512
      }
513
      iter_.Seek(last_key);
514 515
      RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
    } else {
516
      iter_.Next();
517
    }
518
  } while (iter_.Valid());
519

J
jorlow@chromium.org 已提交
520
  valid_ = false;
521
  return iter_.status().ok();
J
jorlow@chromium.org 已提交
522 523
}

524 525
// Merge values of the same user key starting from the current iter_ position
// Scan from the newer entries to older entries.
526
// PRE: iter_.key() points to the first merge type entry
527
//      saved_key_ stores the user key
528
//      iter_.PrepareValue() has been called
529 530
// POST: saved_value_ has the merged value for the user key
//       iter_ points to the next entry (or invalid)
531
bool DBIter::MergeValuesNewToOld() {
532
  if (!merge_operator_) {
533
    ROCKS_LOG_ERROR(logger_, "Options::merge_operator is null.");
534
    status_ = Status::InvalidArgument("merge_operator_ must be set.");
535
    valid_ = false;
536
    return false;
D
Deon Nicholas 已提交
537
  }
538

539 540
  // Temporarily pin the blocks that hold merge operands
  TempPinData();
541
  merge_context_.Clear();
542
  // Start the merge process by pushing the first operand
543 544
  merge_context_.PushOperand(
      iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */);
545
  TEST_SYNC_POINT("DBIter::MergeValuesNewToOld:PushedFirstOperand");
546 547

  ParsedInternalKey ikey;
548
  Status s;
549
  for (iter_.Next(); iter_.Valid(); iter_.Next()) {
550
    TEST_SYNC_POINT("DBIter::MergeValuesNewToOld:SteppedToNextOperand");
551
    if (!ParseKey(&ikey)) {
552
      return false;
553 554
    }

555
    if (!user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) {
556 557
      // hit the next user key, stop right here
      break;
558 559
    }
    if (kTypeDeletion == ikey.type || kTypeSingleDeletion == ikey.type ||
560
               range_del_agg_.ShouldDelete(
561
                   ikey, RangeDelPositioningMode::kForwardTraversal)) {
562 563
      // hit a delete with the same user key, stop right here
      // iter_ is positioned after delete
564
      iter_.Next();
565
      break;
566 567 568 569 570 571 572
    }
    if (!iter_.PrepareValue()) {
      valid_ = false;
      return false;
    }

    if (kTypeValue == ikey.type) {
573 574
      // hit a put, merge the put value with operands and store the
      // final result in saved_value_. We are done!
575
      const Slice val = iter_.value();
576 577
      s = MergeHelper::TimedFullMerge(
          merge_operator_, ikey.user_key, &val, merge_context_.GetOperands(),
578
          &saved_value_, logger_, statistics_, env_, &pinned_value_, true);
579
      if (!s.ok()) {
Y
Yi Wu 已提交
580
        valid_ = false;
581
        status_ = s;
582
        return false;
583
      }
584
      // iter_ is positioned after put
585 586
      iter_.Next();
      if (!iter_.status().ok()) {
587 588 589 590
        valid_ = false;
        return false;
      }
      return true;
A
Andres Noetzli 已提交
591
    } else if (kTypeMerge == ikey.type) {
592 593
      // hit a merge, add the value as an operand and run associative merge.
      // when complete, add result to operands and continue.
594 595
      merge_context_.PushOperand(
          iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */);
596
      PERF_COUNTER_ADD(internal_merge_count, 1);
Y
Yi Wu 已提交
597
    } else if (kTypeBlobIndex == ikey.type) {
L
Levi Tamasi 已提交
598
      status_ = Status::NotSupported("BlobDB does not support merge operator.");
Y
Yi Wu 已提交
599
      valid_ = false;
600
      return false;
A
Andres Noetzli 已提交
601
    } else {
602 603 604 605 606
      valid_ = false;
      status_ = Status::Corruption(
          "Unrecognized value type: " +
          std::to_string(static_cast<unsigned int>(ikey.type)));
      return false;
607 608 609
    }
  }

610
  if (!iter_.status().ok()) {
611 612 613 614
    valid_ = false;
    return false;
  }

615 616 617 618
  // we either exhausted all internal keys under this user key, or hit
  // a deletion marker.
  // feed null as the existing value to the merge operator, such that
  // client can differentiate this scenario and do things accordingly.
619 620 621
  s = MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetUserKey(),
                                  nullptr, merge_context_.GetOperands(),
                                  &saved_value_, logger_, statistics_, env_,
622
                                  &pinned_value_, true);
623
  if (!s.ok()) {
Y
Yi Wu 已提交
624
    valid_ = false;
625
    status_ = s;
626
    return false;
627
  }
628 629 630

  assert(status_.ok());
  return true;
631 632
}

J
jorlow@chromium.org 已提交
633
void DBIter::Prev() {
Y
Yanqin Jin 已提交
634 635 636 637 638 639 640
  if (timestamp_size_ > 0) {
    valid_ = false;
    status_ = Status::NotSupported(
        "SeekToLast/SeekForPrev/Prev currently not supported with timestamp.");
    return;
  }

J
jorlow@chromium.org 已提交
641
  assert(valid_);
642
  assert(status_.ok());
643 644

  PERF_CPU_TIMER_GUARD(iter_prev_cpu_nanos, env_);
645
  ReleaseTempPinnedData();
646
  ResetInternalKeysSkippedCounter();
647
  bool ok = true;
S
Stanislau Hlebik 已提交
648
  if (direction_ == kForward) {
649 650 651 652 653
    if (!ReverseToBackward()) {
      ok = false;
    }
  }
  if (ok) {
654 655 656 657 658 659
    Slice prefix;
    if (prefix_same_as_start_) {
      assert(prefix_extractor_ != nullptr);
      prefix = prefix_.GetUserKey();
    }
    PrevInternal(prefix_same_as_start_ ? &prefix : nullptr);
S
Stanislau Hlebik 已提交
660
  }
661

M
Manuel Ung 已提交
662
  if (statistics_ != nullptr) {
663
    local_stats_.prev_count_++;
M
Manuel Ung 已提交
664
    if (valid_) {
665 666
      local_stats_.prev_found_count_++;
      local_stats_.bytes_read_ += (key().size() + value().size());
M
Manuel Ung 已提交
667 668
    }
  }
S
Stanislau Hlebik 已提交
669
}
J
jorlow@chromium.org 已提交
670

671
bool DBIter::ReverseToForward() {
672
  assert(iter_.status().ok());
673 674 675 676

  // When moving backwards, iter_ is positioned on _previous_ key, which may
  // not exist or may have different prefix than the current key().
  // If that's the case, seek iter_ to current key.
S
sdong 已提交
677
  if (!expect_total_order_inner_iter() || !iter_.Valid()) {
678 679
    IterKey last_key;
    last_key.SetInternalKey(ParsedInternalKey(
680
        saved_key_.GetUserKey(), kMaxSequenceNumber, kValueTypeForSeek));
681
    iter_.Seek(last_key.GetInternalKey());
682
  }
683

684
  direction_ = kForward;
685
  // Skip keys less than the current key() (a.k.a. saved_key_).
686
  while (iter_.Valid()) {
687 688 689 690
    ParsedInternalKey ikey;
    if (!ParseKey(&ikey)) {
      return false;
    }
691
    if (user_comparator_.Compare(ikey.user_key, saved_key_.GetUserKey()) >= 0) {
692 693
      return true;
    }
694
    iter_.Next();
695 696
  }

697
  if (!iter_.status().ok()) {
698 699
    valid_ = false;
    return false;
700
  }
701 702

  return true;
703 704
}

705 706
// Move iter_ to the key before saved_key_.
bool DBIter::ReverseToBackward() {
707
  assert(iter_.status().ok());
708 709 710 711 712

  // When current_entry_is_merged_ is true, iter_ may be positioned on the next
  // key, which may not exist or may have prefix different from current.
  // If that's the case, seek to saved_key_.
  if (current_entry_is_merged_ &&
S
sdong 已提交
713
      (!expect_total_order_inner_iter() || !iter_.Valid())) {
714
    IterKey last_key;
715 716 717 718 719
    // Using kMaxSequenceNumber and kValueTypeForSeek
    // (not kValueTypeForSeekForPrev) to seek to a key strictly smaller
    // than saved_key_.
    last_key.SetInternalKey(ParsedInternalKey(
        saved_key_.GetUserKey(), kMaxSequenceNumber, kValueTypeForSeek));
S
sdong 已提交
720
    if (!expect_total_order_inner_iter()) {
721
      iter_.SeekForPrev(last_key.GetInternalKey());
722 723 724 725 726
    } else {
      // Some iterators may not support SeekForPrev(), so we avoid using it
      // when prefix seek mode is disabled. This is somewhat expensive
      // (an extra Prev(), as well as an extra change of direction of iter_),
      // so we may need to reconsider it later.
727 728 729
      iter_.Seek(last_key.GetInternalKey());
      if (!iter_.Valid() && iter_.status().ok()) {
        iter_.SeekToLast();
730
      }
731 732 733 734
    }
  }

  direction_ = kReverse;
735
  return FindUserKeyBeforeSavedKey();
736 737
}

738
void DBIter::PrevInternal(const Slice* prefix) {
739
  while (iter_.Valid()) {
740
    saved_key_.SetUserKey(
741 742
        ExtractUserKey(iter_.key()),
        !iter_.iter()->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
743

744 745
    assert(prefix == nullptr || prefix_extractor_ != nullptr);
    if (prefix != nullptr &&
746 747 748
        prefix_extractor_
                ->Transform(StripTimestampFromUserKey(saved_key_.GetUserKey(),
                                                      timestamp_size_))
749 750
                .compare(*prefix) != 0) {
      assert(prefix_same_as_start_);
751 752 753 754 755
      // Current key does not have the same prefix as start
      valid_ = false;
      return;
    }

756 757 758 759
    assert(iterate_lower_bound_ == nullptr || iter_.MayBeOutOfLowerBound() ||
           user_comparator_.Compare(saved_key_.GetUserKey(),
                                    *iterate_lower_bound_) >= 0);
    if (iterate_lower_bound_ != nullptr && iter_.MayBeOutOfLowerBound() &&
760 761
        user_comparator_.Compare(saved_key_.GetUserKey(),
                                 *iterate_lower_bound_) < 0) {
762 763 764 765 766
      // We've iterated earlier than the user-specified lower bound.
      valid_ = false;
      return;
    }

767
    if (!FindValueForCurrentKey()) {  // assigns valid_
S
Stanislau Hlebik 已提交
768
      return;
J
jorlow@chromium.org 已提交
769
    }
770

771 772 773
    // Whether or not we found a value for current key, we need iter_ to end up
    // on a smaller key.
    if (!FindUserKeyBeforeSavedKey()) {
774 775 776
      return;
    }

777 778 779
    if (valid_) {
      // Found the value.
      return;
S
Stanislau Hlebik 已提交
780
    }
781 782 783

    if (TooManyInternalKeysSkipped(false)) {
      return;
S
Stanislau Hlebik 已提交
784 785
    }
  }
786

S
Stanislau Hlebik 已提交
787 788
  // We haven't found any key - iterator is not valid
  valid_ = false;
J
jorlow@chromium.org 已提交
789 790
}

791 792 793 794 795 796 797 798 799 800 801
// Used for backwards iteration.
// Looks at the entries with user key saved_key_ and finds the most up-to-date
// value for it, or executes a merge, or determines that the value was deleted.
// Sets valid_ to true if the value is found and is ready to be presented to
// the user through value().
// Sets valid_ to false if the value was deleted, and we should try another key.
// Returns false if an error occurred, and !status().ok() and !valid_.
//
// PRE: iter_ is positioned on the last entry with user key equal to saved_key_.
// POST: iter_ is positioned on one of the entries equal to saved_key_, or on
//       the entry just before them, or on the entry just after them.
S
Stanislau Hlebik 已提交
802
bool DBIter::FindValueForCurrentKey() {
803
  assert(iter_.Valid());
804
  merge_context_.Clear();
805
  current_entry_is_merged_ = false;
A
Andres Noetzli 已提交
806 807
  // last entry before merge (could be kTypeDeletion, kTypeSingleDeletion or
  // kTypeValue)
S
Stanislau Hlebik 已提交
808 809
  ValueType last_not_merge_type = kTypeDeletion;
  ValueType last_key_entry_type = kTypeDeletion;
J
jorlow@chromium.org 已提交
810

811 812 813
  // Temporarily pin blocks that hold (merge operands / the value)
  ReleaseTempPinnedData();
  TempPinData();
S
Stanislau Hlebik 已提交
814
  size_t num_skipped = 0;
815
  while (iter_.Valid()) {
816 817 818 819 820
    ParsedInternalKey ikey;
    if (!ParseKey(&ikey)) {
      return false;
    }

Y
Yanqin Jin 已提交
821 822 823 824 825 826 827
    assert(ikey.user_key.size() >= timestamp_size_);
    Slice ts;
    if (timestamp_size_ > 0) {
      ts = Slice(ikey.user_key.data() + ikey.user_key.size() - timestamp_size_,
                 timestamp_size_);
    }
    if (!IsVisible(ikey.sequence, ts) ||
828
        !user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) {
829 830
      break;
    }
831 832 833 834
    if (TooManyInternalKeysSkipped()) {
      return false;
    }

835 836 837
    // This user key has lots of entries.
    // We're going from old to new, and it's taking too long. Let's do a Seek()
    // and go from new to old. This helps when a key was overwritten many times.
838
    if (num_skipped >= max_skip_) {
S
Stanislau Hlebik 已提交
839 840 841
      return FindValueForCurrentKeyUsingSeek();
    }

842 843 844 845 846
    if (!iter_.PrepareValue()) {
      valid_ = false;
      return false;
    }

S
Stanislau Hlebik 已提交
847 848 849
    last_key_entry_type = ikey.type;
    switch (last_key_entry_type) {
      case kTypeValue:
Y
Yi Wu 已提交
850
      case kTypeBlobIndex:
851
        if (range_del_agg_.ShouldDelete(
852
                ikey, RangeDelPositioningMode::kBackwardTraversal)) {
A
Andrew Kryczka 已提交
853 854
          last_key_entry_type = kTypeRangeDeletion;
          PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
855
        } else if (iter_.iter()->IsValuePinned()) {
856
          pinned_value_ = iter_.value();
857 858 859 860 861
        } else {
          valid_ = false;
          status_ = Status::NotSupported(
              "Backward iteration not supported if underlying iterator's value "
              "cannot be pinned.");
A
Andrew Kryczka 已提交
862
        }
863
        merge_context_.Clear();
A
Andrew Kryczka 已提交
864
        last_not_merge_type = last_key_entry_type;
865 866 867
        if (!status_.ok()) {
          return false;
        }
S
Stanislau Hlebik 已提交
868 869
        break;
      case kTypeDeletion:
A
Andres Noetzli 已提交
870
      case kTypeSingleDeletion:
871
        merge_context_.Clear();
A
Andres Noetzli 已提交
872
        last_not_merge_type = last_key_entry_type;
873
        PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
S
Stanislau Hlebik 已提交
874 875
        break;
      case kTypeMerge:
876
        if (range_del_agg_.ShouldDelete(
877
                ikey, RangeDelPositioningMode::kBackwardTraversal)) {
A
Andrew Kryczka 已提交
878 879 880 881 882 883 884
          merge_context_.Clear();
          last_key_entry_type = kTypeRangeDeletion;
          last_not_merge_type = last_key_entry_type;
          PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
        } else {
          assert(merge_operator_ != nullptr);
          merge_context_.PushOperandBack(
885 886
              iter_.value(),
              iter_.iter()->IsValuePinned() /* operand_pinned */);
887
          PERF_COUNTER_ADD(internal_merge_count, 1);
A
Andrew Kryczka 已提交
888
        }
S
Stanislau Hlebik 已提交
889 890
        break;
      default:
891 892 893 894 895
        valid_ = false;
        status_ = Status::Corruption(
            "Unknown value type: " +
            std::to_string(static_cast<unsigned int>(last_key_entry_type)));
        return false;
S
Stanislau Hlebik 已提交
896 897
    }

898
    PERF_COUNTER_ADD(internal_key_skipped_count, 1);
899
    iter_.Prev();
S
Stanislau Hlebik 已提交
900
    ++num_skipped;
901 902
  }

903
  if (!iter_.status().ok()) {
904 905
    valid_ = false;
    return false;
S
Stanislau Hlebik 已提交
906 907
  }

908
  Status s;
S
sdong 已提交
909
  s.PermitUncheckedError();
Y
Yi Wu 已提交
910
  is_blob_ = false;
S
Stanislau Hlebik 已提交
911 912
  switch (last_key_entry_type) {
    case kTypeDeletion:
A
Andres Noetzli 已提交
913
    case kTypeSingleDeletion:
A
Andrew Kryczka 已提交
914
    case kTypeRangeDeletion:
S
Stanislau Hlebik 已提交
915
      valid_ = false;
916
      return true;
S
Stanislau Hlebik 已提交
917
    case kTypeMerge:
918
      current_entry_is_merged_ = true;
A
Aaron Gao 已提交
919
      if (last_not_merge_type == kTypeDeletion ||
A
Andrew Kryczka 已提交
920 921
          last_not_merge_type == kTypeSingleDeletion ||
          last_not_merge_type == kTypeRangeDeletion) {
922 923 924
        s = MergeHelper::TimedFullMerge(
            merge_operator_, saved_key_.GetUserKey(), nullptr,
            merge_context_.GetOperands(), &saved_value_, logger_, statistics_,
925
            env_, &pinned_value_, true);
Y
Yi Wu 已提交
926
      } else if (last_not_merge_type == kTypeBlobIndex) {
L
Levi Tamasi 已提交
927 928
        status_ =
            Status::NotSupported("BlobDB does not support merge operator.");
Y
Yi Wu 已提交
929
        valid_ = false;
930
        return false;
931
      } else {
S
Stanislau Hlebik 已提交
932
        assert(last_not_merge_type == kTypeValue);
933
        s = MergeHelper::TimedFullMerge(
934
            merge_operator_, saved_key_.GetUserKey(), &pinned_value_,
935
            merge_context_.GetOperands(), &saved_value_, logger_, statistics_,
936
            env_, &pinned_value_, true);
937
      }
S
Stanislau Hlebik 已提交
938 939
      break;
    case kTypeValue:
940
      // do nothing - we've already has value in pinned_value_
S
Stanislau Hlebik 已提交
941
      break;
Y
Yi Wu 已提交
942
    case kTypeBlobIndex:
L
Levi Tamasi 已提交
943
      if (!SetBlobValueIfNeeded(saved_key_.GetUserKey(), pinned_value_)) {
944
        return false;
Y
Yi Wu 已提交
945
      }
L
Levi Tamasi 已提交
946

Y
Yi Wu 已提交
947
      break;
S
Stanislau Hlebik 已提交
948
    default:
949 950 951 952 953
      valid_ = false;
      status_ = Status::Corruption(
          "Unknown value type: " +
          std::to_string(static_cast<unsigned int>(last_key_entry_type)));
      return false;
J
jorlow@chromium.org 已提交
954
  }
955
  if (!s.ok()) {
Y
Yi Wu 已提交
956
    valid_ = false;
957
    status_ = s;
958
    return false;
959
  }
960
  valid_ = true;
S
Stanislau Hlebik 已提交
961 962
  return true;
}
J
jorlow@chromium.org 已提交
963

S
Stanislau Hlebik 已提交
964 965
// This function is used in FindValueForCurrentKey.
// We use Seek() function instead of Prev() to find necessary value
966 967
// TODO: This is very similar to FindNextUserEntry() and MergeValuesNewToOld().
//       Would be nice to reuse some code.
S
Stanislau Hlebik 已提交
968
bool DBIter::FindValueForCurrentKeyUsingSeek() {
969 970 971
  // FindValueForCurrentKey will enable pinning before calling
  // FindValueForCurrentKeyUsingSeek()
  assert(pinned_iters_mgr_.PinningEnabled());
S
Stanislau Hlebik 已提交
972
  std::string last_key;
973 974
  AppendInternalKey(&last_key, ParsedInternalKey(saved_key_.GetUserKey(),
                                                 sequence_, kValueTypeForSeek));
975
  iter_.Seek(last_key);
S
Stanislau Hlebik 已提交
976 977
  RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);

978 979
  // In case read_callback presents, the value we seek to may not be visible.
  // Find the next value that's visible.
S
Stanislau Hlebik 已提交
980
  ParsedInternalKey ikey;
981
  is_blob_ = false;
982
  while (true) {
983
    if (!iter_.Valid()) {
984
      valid_ = false;
985
      return iter_.status().ok();
986 987 988 989 990
    }

    if (!ParseKey(&ikey)) {
      return false;
    }
Y
Yanqin Jin 已提交
991 992 993 994 995 996 997
    assert(ikey.user_key.size() >= timestamp_size_);
    Slice ts;
    if (timestamp_size_ > 0) {
      ts = Slice(ikey.user_key.data() + ikey.user_key.size() - timestamp_size_,
                 timestamp_size_);
    }

998
    if (!user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) {
999 1000 1001 1002 1003 1004 1005
      // No visible values for this key, even though FindValueForCurrentKey()
      // has seen some. This is possible if we're using a tailing iterator, and
      // the entries were discarded in a compaction.
      valid_ = false;
      return true;
    }

Y
Yanqin Jin 已提交
1006
    if (IsVisible(ikey.sequence, ts)) {
1007 1008
      break;
    }
1009

1010
    iter_.Next();
1011
  }
S
Stanislau Hlebik 已提交
1012

A
Andrew Kryczka 已提交
1013
  if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion ||
1014
      range_del_agg_.ShouldDelete(
1015
          ikey, RangeDelPositioningMode::kBackwardTraversal)) {
J
jorlow@chromium.org 已提交
1016
    valid_ = false;
1017
    return true;
S
Stanislau Hlebik 已提交
1018
  }
1019 1020 1021 1022
  if (!iter_.PrepareValue()) {
    valid_ = false;
    return false;
  }
Y
Yi Wu 已提交
1023
  if (ikey.type == kTypeValue || ikey.type == kTypeBlobIndex) {
1024 1025
    assert(iter_.iter()->IsValuePinned());
    pinned_value_ = iter_.value();
L
Levi Tamasi 已提交
1026 1027 1028 1029 1030 1031
    if (ikey.type == kTypeBlobIndex) {
      if (!SetBlobValueIfNeeded(ikey.user_key, pinned_value_)) {
        return false;
      }
    }

A
Andrew Kryczka 已提交
1032 1033 1034
    valid_ = true;
    return true;
  }
S
Stanislau Hlebik 已提交
1035 1036 1037

  // kTypeMerge. We need to collect all kTypeMerge values and save them
  // in operands
1038
  assert(ikey.type == kTypeMerge);
1039
  current_entry_is_merged_ = true;
1040
  merge_context_.Clear();
1041 1042
  merge_context_.PushOperand(
      iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */);
1043
  while (true) {
1044
    iter_.Next();
S
Stanislau Hlebik 已提交
1045

1046 1047
    if (!iter_.Valid()) {
      if (!iter_.status().ok()) {
1048 1049 1050 1051
        valid_ = false;
        return false;
      }
      break;
S
Stanislau Hlebik 已提交
1052
    }
1053 1054 1055
    if (!ParseKey(&ikey)) {
      return false;
    }
1056
    if (!user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) {
1057 1058 1059 1060
      break;
    }
    if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion ||
        range_del_agg_.ShouldDelete(
1061
            ikey, RangeDelPositioningMode::kForwardTraversal)) {
1062
      break;
1063 1064 1065 1066 1067 1068 1069
    }
    if (!iter_.PrepareValue()) {
      valid_ = false;
      return false;
    }

    if (ikey.type == kTypeValue) {
1070
      const Slice val = iter_.value();
1071 1072 1073 1074 1075 1076 1077 1078 1079
      Status s = MergeHelper::TimedFullMerge(
          merge_operator_, saved_key_.GetUserKey(), &val,
          merge_context_.GetOperands(), &saved_value_, logger_, statistics_,
          env_, &pinned_value_, true);
      if (!s.ok()) {
        valid_ = false;
        status_ = s;
        return false;
      }
Y
Yi Wu 已提交
1080
      valid_ = true;
1081 1082
      return true;
    } else if (ikey.type == kTypeMerge) {
1083 1084
      merge_context_.PushOperand(
          iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */);
1085 1086
      PERF_COUNTER_ADD(internal_merge_count, 1);
    } else if (ikey.type == kTypeBlobIndex) {
L
Levi Tamasi 已提交
1087
      status_ = Status::NotSupported("BlobDB does not support merge operator.");
Y
Yi Wu 已提交
1088
      valid_ = false;
1089 1090
      return false;
    } else {
1091 1092 1093 1094 1095
      valid_ = false;
      status_ = Status::Corruption(
          "Unknown value type: " +
          std::to_string(static_cast<unsigned int>(ikey.type)));
      return false;
1096
    }
S
Stanislau Hlebik 已提交
1097 1098
  }

1099 1100 1101 1102 1103
  Status s = MergeHelper::TimedFullMerge(
      merge_operator_, saved_key_.GetUserKey(), nullptr,
      merge_context_.GetOperands(), &saved_value_, logger_, statistics_, env_,
      &pinned_value_, true);
  if (!s.ok()) {
Y
Yi Wu 已提交
1104
    valid_ = false;
1105
    status_ = s;
1106
    return false;
1107
  }
S
Stanislau Hlebik 已提交
1108

1109 1110 1111
  // Make sure we leave iter_ in a good state. If it's valid and we don't care
  // about prefixes, that's already good enough. Otherwise it needs to be
  // seeked to the current key.
S
sdong 已提交
1112 1113
  if (!expect_total_order_inner_iter() || !iter_.Valid()) {
    if (!expect_total_order_inner_iter()) {
1114
      iter_.SeekForPrev(last_key);
1115
    } else {
1116 1117 1118
      iter_.Seek(last_key);
      if (!iter_.Valid() && iter_.status().ok()) {
        iter_.SeekToLast();
1119 1120 1121
      }
    }
    RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
S
Stanislau Hlebik 已提交
1122
  }
1123 1124 1125

  valid_ = true;
  return true;
S
Stanislau Hlebik 已提交
1126 1127
}

1128 1129 1130 1131
// Move backwards until the key smaller than saved_key_.
// Changes valid_ only if return value is false.
bool DBIter::FindUserKeyBeforeSavedKey() {
  assert(status_.ok());
S
Stanislau Hlebik 已提交
1132
  size_t num_skipped = 0;
1133
  while (iter_.Valid()) {
1134 1135 1136
    ParsedInternalKey ikey;
    if (!ParseKey(&ikey)) {
      return false;
1137 1138
    }

1139
    if (user_comparator_.Compare(ikey.user_key, saved_key_.GetUserKey()) < 0) {
1140 1141 1142 1143 1144
      return true;
    }

    if (TooManyInternalKeysSkipped()) {
      return false;
S
Stanislau Hlebik 已提交
1145
    }
1146

S
Siying Dong 已提交
1147
    assert(ikey.sequence != kMaxSequenceNumber);
Y
Yanqin Jin 已提交
1148 1149 1150 1151 1152 1153 1154
    assert(ikey.user_key.size() >= timestamp_size_);
    Slice ts;
    if (timestamp_size_ > 0) {
      ts = Slice(ikey.user_key.data() + ikey.user_key.size() - timestamp_size_,
                 timestamp_size_);
    }
    if (!IsVisible(ikey.sequence, ts)) {
1155 1156 1157 1158
      PERF_COUNTER_ADD(internal_recent_skipped_count, 1);
    } else {
      PERF_COUNTER_ADD(internal_key_skipped_count, 1);
    }
1159

1160
    if (num_skipped >= max_skip_) {
1161 1162 1163 1164 1165 1166
      num_skipped = 0;
      IterKey last_key;
      last_key.SetInternalKey(ParsedInternalKey(
          saved_key_.GetUserKey(), kMaxSequenceNumber, kValueTypeForSeek));
      // It would be more efficient to use SeekForPrev() here, but some
      // iterators may not support it.
1167
      iter_.Seek(last_key.GetInternalKey());
1168
      RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
1169
      if (!iter_.Valid()) {
1170 1171 1172 1173 1174 1175
        break;
      }
    } else {
      ++num_skipped;
    }

1176
    iter_.Prev();
S
Stanislau Hlebik 已提交
1177
  }
1178

1179
  if (!iter_.status().ok()) {
1180 1181 1182 1183 1184
    valid_ = false;
    return false;
  }

  return true;
S
Stanislau Hlebik 已提交
1185 1186
}

1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198
bool DBIter::TooManyInternalKeysSkipped(bool increment) {
  if ((max_skippable_internal_keys_ > 0) &&
      (num_internal_keys_skipped_ > max_skippable_internal_keys_)) {
    valid_ = false;
    status_ = Status::Incomplete("Too many internal keys skipped.");
    return true;
  } else if (increment) {
    num_internal_keys_skipped_++;
  }
  return false;
}

1199 1200
bool DBIter::IsVisible(SequenceNumber sequence, const Slice& ts,
                       bool* more_recent) {
Y
Yanqin Jin 已提交
1201
  // Remember that comparator orders preceding timestamp as larger.
1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214
  // TODO(yanqin): support timestamp in read_callback_.
  bool visible_by_seq = (read_callback_ == nullptr)
                            ? sequence <= sequence_
                            : read_callback_->IsVisible(sequence);

  bool visible_by_ts =
      (timestamp_ub_ == nullptr ||
       user_comparator_.CompareTimestamp(ts, *timestamp_ub_) <= 0) &&
      (timestamp_lb_ == nullptr ||
       user_comparator_.CompareTimestamp(ts, *timestamp_lb_) >= 0);

  if (more_recent) {
    *more_recent = !visible_by_seq;
1215
  }
1216
  return visible_by_seq && visible_by_ts;
1217
}
1218

1219
void DBIter::SetSavedKeyToSeekTarget(const Slice& target) {
1220
  is_key_seqnum_zero_ = false;
1221
  SequenceNumber seq = sequence_;
1222
  saved_key_.Clear();
Y
Yanqin Jin 已提交
1223
  saved_key_.SetInternalKey(target, seq, kValueTypeForSeek, timestamp_ub_);
1224

Z
zhangjinpeng1987 已提交
1225
  if (iterate_lower_bound_ != nullptr &&
Y
Yanqin Jin 已提交
1226 1227 1228
      user_comparator_.CompareWithoutTimestamp(
          saved_key_.GetUserKey(), /*a_has_ts=*/true, *iterate_lower_bound_,
          /*b_has_ts=*/false) < 0) {
1229
    // Seek key is smaller than the lower bound.
Z
zhangjinpeng1987 已提交
1230
    saved_key_.Clear();
Y
Yanqin Jin 已提交
1231 1232
    saved_key_.SetInternalKey(*iterate_lower_bound_, seq, kValueTypeForSeek,
                              timestamp_ub_);
Z
zhangjinpeng1987 已提交
1233
  }
1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256
}

void DBIter::SetSavedKeyToSeekForPrevTarget(const Slice& target) {
  is_key_seqnum_zero_ = false;
  saved_key_.Clear();
  // now saved_key is used to store internal key.
  saved_key_.SetInternalKey(target, 0 /* sequence_number */,
                            kValueTypeForSeekForPrev);

  if (iterate_upper_bound_ != nullptr &&
      user_comparator_.Compare(saved_key_.GetUserKey(),
                               *iterate_upper_bound_) >= 0) {
    saved_key_.Clear();
    saved_key_.SetInternalKey(*iterate_upper_bound_, kMaxSequenceNumber);
  }
}

void DBIter::Seek(const Slice& target) {
  PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, env_);
  StopWatch sw(env_, statistics_, DB_SEEK);

#ifndef ROCKSDB_LITE
  if (db_impl_ != nullptr && cfd_ != nullptr) {
1257 1258
    // TODO: What do we do if this returns an error?
    db_impl_->TraceIteratorSeek(cfd_->GetID(), target).PermitUncheckedError();
1259 1260 1261 1262 1263 1264
  }
#endif  // ROCKSDB_LITE

  status_ = Status::OK();
  ReleaseTempPinnedData();
  ResetInternalKeysSkippedCounter();
Z
zhangjinpeng1987 已提交
1265

1266
  // Seek the inner iterator based on the target key.
1267 1268
  {
    PERF_TIMER_GUARD(seek_internal_seek_time);
1269 1270

    SetSavedKeyToSeekTarget(target);
1271
    iter_.Seek(saved_key_.GetInternalKey());
1272

1273
    range_del_agg_.InvalidateRangeDelMapPositions();
1274
    RecordTick(statistics_, NUMBER_DB_SEEK);
1275
  }
1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288
  if (!iter_.Valid()) {
    valid_ = false;
    return;
  }
  direction_ = kForward;

  // Now the inner iterator is placed to the target position. From there,
  // we need to find out the next key that is visible to the user.
  ClearSavedValue();
  if (prefix_same_as_start_) {
    // The case where the iterator needs to be invalidated if it has exausted
    // keys within the same prefix of the seek key.
    assert(prefix_extractor_ != nullptr);
Y
Yanqin Jin 已提交
1289
    Slice target_prefix = prefix_extractor_->Transform(target);
1290 1291 1292
    FindNextUserEntry(false /* not skipping saved_key */,
                      &target_prefix /* prefix */);
    if (valid_) {
Y
Yanqin Jin 已提交
1293
      // Remember the prefix of the seek key for the future Next() call to
1294 1295
      // check.
      prefix_.SetUserKey(target_prefix);
M
Manuel Ung 已提交
1296
    }
J
jorlow@chromium.org 已提交
1297
  } else {
1298 1299 1300 1301
    FindNextUserEntry(false /* not skipping saved_key */, nullptr);
  }
  if (!valid_) {
    return;
J
jorlow@chromium.org 已提交
1302
  }
1303

1304 1305 1306 1307 1308
  // Updating stats and perf context counters.
  if (statistics_ != nullptr) {
    // Decrement since we don't want to count this key as skipped
    RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
    RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
1309
  }
1310
  PERF_COUNTER_ADD(iter_read_bytes, key().size() + value().size());
J
jorlow@chromium.org 已提交
1311
}
J
jorlow@chromium.org 已提交
1312

A
Aaron Gao 已提交
1313
void DBIter::SeekForPrev(const Slice& target) {
1314
  PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, env_);
A
Aaron Gao 已提交
1315
  StopWatch sw(env_, statistics_, DB_SEEK);
1316 1317 1318

#ifndef ROCKSDB_LITE
  if (db_impl_ != nullptr && cfd_ != nullptr) {
1319 1320 1321
    // TODO: What do we do if this returns an error?
    db_impl_->TraceIteratorSeekForPrev(cfd_->GetID(), target)
        .PermitUncheckedError();
1322 1323 1324
  }
#endif  // ROCKSDB_LITE

Y
Yanqin Jin 已提交
1325 1326 1327 1328 1329 1330 1331
  if (timestamp_size_ > 0) {
    valid_ = false;
    status_ = Status::NotSupported(
        "SeekToLast/SeekForPrev/Prev currently not supported with timestamp.");
    return;
  }

1332
  status_ = Status::OK();
A
Aaron Gao 已提交
1333
  ReleaseTempPinnedData();
1334
  ResetInternalKeysSkippedCounter();
Z
zhangjinpeng1987 已提交
1335

1336
  // Seek the inner iterator based on the target key.
A
Aaron Gao 已提交
1337 1338
  {
    PERF_TIMER_GUARD(seek_internal_seek_time);
1339
    SetSavedKeyToSeekForPrevTarget(target);
1340
    iter_.SeekForPrev(saved_key_.GetInternalKey());
1341
    range_del_agg_.InvalidateRangeDelMapPositions();
1342
    RecordTick(statistics_, NUMBER_DB_SEEK);
A
Aaron Gao 已提交
1343
  }
1344 1345 1346
  if (!iter_.Valid()) {
    valid_ = false;
    return;
1347
  }
1348
  direction_ = kReverse;
1349

1350 1351 1352 1353 1354 1355 1356 1357
  // Now the inner iterator is placed to the target position. From there,
  // we need to find out the first key that is visible to the user in the
  // backward direction.
  ClearSavedValue();
  if (prefix_same_as_start_) {
    // The case where the iterator needs to be invalidated if it has exausted
    // keys within the same prefix of the seek key.
    assert(prefix_extractor_ != nullptr);
Y
Yanqin Jin 已提交
1358
    Slice target_prefix = prefix_extractor_->Transform(target);
1359 1360 1361 1362 1363
    PrevInternal(&target_prefix);
    if (valid_) {
      // Remember the prefix of the seek key for the future Prev() call to
      // check.
      prefix_.SetUserKey(target_prefix);
A
Aaron Gao 已提交
1364 1365
    }
  } else {
1366
    PrevInternal(nullptr);
A
Aaron Gao 已提交
1367
  }
1368 1369 1370 1371 1372 1373

  // Report stats and perf context.
  if (statistics_ != nullptr && valid_) {
    RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
    RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
    PERF_COUNTER_ADD(iter_read_bytes, key().size() + value().size());
A
Aaron Gao 已提交
1374 1375 1376
  }
}

J
jorlow@chromium.org 已提交
1377
void DBIter::SeekToFirst() {
1378 1379 1380 1381
  if (iterate_lower_bound_ != nullptr) {
    Seek(*iterate_lower_bound_);
    return;
  }
1382
  PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, env_);
1383 1384
  // Don't use iter_::Seek() if we set a prefix extractor
  // because prefix seek will be used.
S
sdong 已提交
1385
  if (!expect_total_order_inner_iter()) {
1386 1387 1388
    max_skip_ = std::numeric_limits<uint64_t>::max();
  }
  status_ = Status::OK();
J
jorlow@chromium.org 已提交
1389
  direction_ = kForward;
1390
  ReleaseTempPinnedData();
1391
  ResetInternalKeysSkippedCounter();
J
jorlow@chromium.org 已提交
1392
  ClearSavedValue();
1393
  is_key_seqnum_zero_ = false;
1394 1395 1396

  {
    PERF_TIMER_GUARD(seek_internal_seek_time);
1397
    iter_.SeekToFirst();
1398
    range_del_agg_.InvalidateRangeDelMapPositions();
1399 1400
  }

M
Manuel Ung 已提交
1401
  RecordTick(statistics_, NUMBER_DB_SEEK);
1402
  if (iter_.Valid()) {
1403
    saved_key_.SetUserKey(
1404 1405
        ExtractUserKey(iter_.key()),
        !iter_.iter()->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
1406 1407
    FindNextUserEntry(false /* not skipping saved_key */,
                      nullptr /* no prefix check */);
M
Manuel Ung 已提交
1408 1409 1410 1411
    if (statistics_ != nullptr) {
      if (valid_) {
        RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
        RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
1412
        PERF_COUNTER_ADD(iter_read_bytes, key().size() + value().size());
M
Manuel Ung 已提交
1413 1414
      }
    }
J
jorlow@chromium.org 已提交
1415 1416
  } else {
    valid_ = false;
J
jorlow@chromium.org 已提交
1417
  }
1418 1419
  if (valid_ && prefix_same_as_start_) {
    assert(prefix_extractor_ != nullptr);
1420 1421
    prefix_.SetUserKey(prefix_extractor_->Transform(
        StripTimestampFromUserKey(saved_key_.GetUserKey(), timestamp_size_)));
1422
  }
J
jorlow@chromium.org 已提交
1423 1424
}

J
jorlow@chromium.org 已提交
1425
void DBIter::SeekToLast() {
Y
Yanqin Jin 已提交
1426 1427 1428 1429 1430 1431 1432
  if (timestamp_size_ > 0) {
    valid_ = false;
    status_ = Status::NotSupported(
        "SeekToLast/SeekForPrev/Prev currently not supported with timestamp.");
    return;
  }

1433 1434 1435
  if (iterate_upper_bound_ != nullptr) {
    // Seek to last key strictly less than ReadOptions.iterate_upper_bound.
    SeekForPrev(*iterate_upper_bound_);
1436
    if (Valid() && user_comparator_.Equal(*iterate_upper_bound_, key())) {
1437
      ReleaseTempPinnedData();
1438
      PrevInternal(nullptr);
1439 1440 1441 1442
    }
    return;
  }

1443
  PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, env_);
S
Stanislau Hlebik 已提交
1444
  // Don't use iter_::Seek() if we set a prefix extractor
1445
  // because prefix seek will be used.
S
sdong 已提交
1446
  if (!expect_total_order_inner_iter()) {
S
Stanislau Hlebik 已提交
1447 1448
    max_skip_ = std::numeric_limits<uint64_t>::max();
  }
1449
  status_ = Status::OK();
J
jorlow@chromium.org 已提交
1450
  direction_ = kReverse;
1451
  ReleaseTempPinnedData();
1452
  ResetInternalKeysSkippedCounter();
J
jorlow@chromium.org 已提交
1453
  ClearSavedValue();
1454
  is_key_seqnum_zero_ = false;
1455 1456 1457

  {
    PERF_TIMER_GUARD(seek_internal_seek_time);
1458
    iter_.SeekToLast();
1459
    range_del_agg_.InvalidateRangeDelMapPositions();
1460
  }
1461
  PrevInternal(nullptr);
M
Manuel Ung 已提交
1462 1463 1464 1465 1466
  if (statistics_ != nullptr) {
    RecordTick(statistics_, NUMBER_DB_SEEK);
    if (valid_) {
      RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
      RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
1467
      PERF_COUNTER_ADD(iter_read_bytes, key().size() + value().size());
M
Manuel Ung 已提交
1468 1469
    }
  }
1470 1471
  if (valid_ && prefix_same_as_start_) {
    assert(prefix_extractor_ != nullptr);
1472 1473
    prefix_.SetUserKey(prefix_extractor_->Transform(
        StripTimestampFromUserKey(saved_key_.GetUserKey(), timestamp_size_)));
1474
  }
J
jorlow@chromium.org 已提交
1475 1476
}

1477 1478
Iterator* NewDBIterator(Env* env, const ReadOptions& read_options,
                        const ImmutableCFOptions& cf_options,
1479
                        const MutableCFOptions& mutable_cf_options,
1480
                        const Comparator* user_key_comparator,
L
Levi Tamasi 已提交
1481
                        InternalIterator* internal_iter, const Version* version,
1482
                        const SequenceNumber& sequence,
Y
Yi Wu 已提交
1483
                        uint64_t max_sequential_skip_in_iterations,
1484
                        ReadCallback* read_callback, DBImpl* db_impl,
L
Levi Tamasi 已提交
1485 1486 1487 1488 1489 1490
                        ColumnFamilyData* cfd, bool expose_blob_index) {
  DBIter* db_iter =
      new DBIter(env, read_options, cf_options, mutable_cf_options,
                 user_key_comparator, internal_iter, version, sequence, false,
                 max_sequential_skip_in_iterations, read_callback, db_impl, cfd,
                 expose_blob_index);
1491
  return db_iter;
1492 1493
}

1494
}  // namespace ROCKSDB_NAMESPACE