db_iter.cc 50.5 KB
Newer Older
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
S
Siying Dong 已提交
2 3 4
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).
5
//
J
jorlow@chromium.org 已提交
6 7 8 9 10
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include "db/db_iter.h"
11

12
#include <iostream>
S
Stanislau Hlebik 已提交
13
#include <limits>
14
#include <string>
J
jorlow@chromium.org 已提交
15 16

#include "db/dbformat.h"
17
#include "db/merge_context.h"
18
#include "db/merge_helper.h"
19
#include "db/pinned_iterators_manager.h"
20
#include "file/filename.h"
21
#include "logging/logging.h"
22
#include "memory/arena.h"
23
#include "monitoring/perf_context_imp.h"
24 25 26
#include "rocksdb/env.h"
#include "rocksdb/iterator.h"
#include "rocksdb/merge_operator.h"
27
#include "rocksdb/options.h"
28
#include "rocksdb/system_clock.h"
S
sdong 已提交
29
#include "table/internal_iterator.h"
30
#include "table/iterator_wrapper.h"
31
#include "trace_replay/trace_replay.h"
J
jorlow@chromium.org 已提交
32
#include "util/mutexlock.h"
33
#include "util/string_util.h"
34
#include "util/user_comparator_wrapper.h"
J
jorlow@chromium.org 已提交
35

36
namespace ROCKSDB_NAMESPACE {
J
jorlow@chromium.org 已提交
37

38
DBIter::DBIter(Env* _env, const ReadOptions& read_options,
39 40
               const ImmutableCFOptions& cf_options,
               const MutableCFOptions& mutable_cf_options,
L
Levi Tamasi 已提交
41 42 43
               const Comparator* cmp, InternalIterator* iter,
               const Version* version, SequenceNumber s, bool arena_mode,
               uint64_t max_sequential_skip_in_iterations,
44
               ReadCallback* read_callback, DBImpl* db_impl,
L
Levi Tamasi 已提交
45
               ColumnFamilyData* cfd, bool expose_blob_index)
46 47
    : prefix_extractor_(mutable_cf_options.prefix_extractor.get()),
      env_(_env),
48
      clock_(_env->GetSystemClock()),
49 50 51 52
      logger_(cf_options.info_log),
      user_comparator_(cmp),
      merge_operator_(cf_options.merge_operator),
      iter_(iter),
L
Levi Tamasi 已提交
53
      version_(version),
54 55 56
      read_callback_(read_callback),
      sequence_(s),
      statistics_(cf_options.statistics),
Y
Yanqin Jin 已提交
57 58
      max_skip_(max_sequential_skip_in_iterations),
      max_skippable_internal_keys_(read_options.max_skippable_internal_keys),
59 60 61 62 63 64 65
      num_internal_keys_skipped_(0),
      iterate_lower_bound_(read_options.iterate_lower_bound),
      iterate_upper_bound_(read_options.iterate_upper_bound),
      direction_(kForward),
      valid_(false),
      current_entry_is_merged_(false),
      is_key_seqnum_zero_(false),
66 67 68
      prefix_same_as_start_(mutable_cf_options.prefix_extractor
                                ? read_options.prefix_same_as_start
                                : false),
69
      pin_thru_lifetime_(read_options.pin_data),
S
sdong 已提交
70 71 72
      expect_total_order_inner_iter_(prefix_extractor_ == nullptr ||
                                     read_options.total_order_seek ||
                                     read_options.auto_prefix_mode),
L
Levi Tamasi 已提交
73 74 75
      read_tier_(read_options.read_tier),
      verify_checksums_(read_options.verify_checksums),
      expose_blob_index_(expose_blob_index),
76 77 78 79 80
      is_blob_(false),
      arena_mode_(arena_mode),
      range_del_agg_(&cf_options.internal_comparator, s),
      db_impl_(db_impl),
      cfd_(cfd),
Y
Yanqin Jin 已提交
81 82
      start_seqnum_(read_options.iter_start_seqnum),
      timestamp_ub_(read_options.timestamp),
83
      timestamp_lb_(read_options.iter_start_ts),
Y
Yanqin Jin 已提交
84
      timestamp_size_(timestamp_ub_ ? timestamp_ub_->size() : 0) {
85 86 87 88 89
  RecordTick(statistics_, NO_ITERATOR_CREATED);
  if (pin_thru_lifetime_) {
    pinned_iters_mgr_.StartPinning();
  }
  if (iter_.iter()) {
90
    iter_.iter()->SetPinnedItersMgr(&pinned_iters_mgr_);
J
jorlow@chromium.org 已提交
91
  }
Y
Yanqin Jin 已提交
92
  assert(timestamp_size_ == user_comparator_.timestamp_size());
93
}
94

95 96 97
Status DBIter::GetProperty(std::string prop_name, std::string* prop) {
  if (prop == nullptr) {
    return Status::InvalidArgument("prop is nullptr");
98
  }
99 100 101 102
  if (prop_name == "rocksdb.iterator.super-version-number") {
    // First try to pass the value returned from inner iterator.
    return iter_.iter()->GetProperty(prop_name, prop);
  } else if (prop_name == "rocksdb.iterator.is-key-pinned") {
103
    if (valid_) {
104 105 106
      *prop = (pin_thru_lifetime_ && saved_key_.IsKeyPinned()) ? "1" : "0";
    } else {
      *prop = "Iterator is not valid.";
107
    }
108 109 110 111
    return Status::OK();
  } else if (prop_name == "rocksdb.iterator.internal-key") {
    *prop = saved_key_.GetUserKey().ToString();
    return Status::OK();
112
  }
113 114
  return Status::InvalidArgument("Unidentified property.");
}
115

116
bool DBIter::ParseKey(ParsedInternalKey* ikey) {
117 118 119 120
  Status s =
      ParseInternalKey(iter_.key(), ikey, false /* log_err_key */);  // TODO
  if (!s.ok()) {
    status_ = Status::Corruption("In DBIter: ", s.getState());
121
    valid_ = false;
122
    ROCKS_LOG_ERROR(logger_, "In DBIter: %s", status_.getState());
J
jorlow@chromium.org 已提交
123 124 125 126 127 128
    return false;
  } else {
    return true;
  }
}

J
jorlow@chromium.org 已提交
129 130
void DBIter::Next() {
  assert(valid_);
131
  assert(status_.ok());
J
jorlow@chromium.org 已提交
132

133
  PERF_CPU_TIMER_GUARD(iter_next_cpu_nanos, clock_);
134 135
  // Release temporarily pinned blocks from last operation
  ReleaseTempPinnedData();
136 137 138
  local_stats_.skip_count_ += num_internal_keys_skipped_;
  local_stats_.skip_count_--;
  num_internal_keys_skipped_ = 0;
139
  bool ok = true;
S
Stanislau Hlebik 已提交
140
  if (direction_ == kReverse) {
141
    is_key_seqnum_zero_ = false;
142 143 144
    if (!ReverseToForward()) {
      ok = false;
    }
145
  } else if (!current_entry_is_merged_) {
146 147 148 149 150
    // If the current value is not a merge, the iter position is the
    // current key, which is already returned. We can safely issue a
    // Next() without checking the current key.
    // If the current key is a merge, very likely iter already points
    // to the next internal position.
151 152
    assert(iter_.Valid());
    iter_.Next();
153
    PERF_COUNTER_ADD(internal_key_skipped_count, 1);
J
jorlow@chromium.org 已提交
154
  }
J
jorlow@chromium.org 已提交
155

156
  local_stats_.next_count_++;
157
  if (ok && iter_.Valid()) {
158 159
    if (prefix_same_as_start_) {
      assert(prefix_extractor_ != nullptr);
160 161 162 163
      const Slice prefix = prefix_.GetUserKey();
      FindNextUserEntry(true /* skipping the current user key */, &prefix);
    } else {
      FindNextUserEntry(true /* skipping the current user key */, nullptr);
164
    }
165
  } else {
166
    is_key_seqnum_zero_ = false;
167 168
    valid_ = false;
  }
169 170 171 172
  if (statistics_ != nullptr && valid_) {
    local_stats_.next_found_count_++;
    local_stats_.bytes_read_ += (key().size() + value().size());
  }
J
jorlow@chromium.org 已提交
173 174
}

L
Levi Tamasi 已提交
175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208
bool DBIter::SetBlobValueIfNeeded(const Slice& user_key,
                                  const Slice& blob_index) {
  assert(!is_blob_);

  if (expose_blob_index_) {  // Stacked BlobDB implementation
    is_blob_ = true;
    return true;
  }

  if (!version_) {
    status_ = Status::Corruption("Encountered unexpected blob index.");
    valid_ = false;
    return false;
  }

  // TODO: consider moving ReadOptions from ArenaWrappedDBIter to DBIter to
  // avoid having to copy options back and forth.
  ReadOptions read_options;
  read_options.read_tier = read_tier_;
  read_options.verify_checksums = verify_checksums_;

  const Status s =
      version_->GetBlob(read_options, user_key, blob_index, &blob_value_);

  if (!s.ok()) {
    status_ = s;
    valid_ = false;
    return false;
  }

  is_blob_ = true;
  return true;
}

209
// PRE: saved_key_ has the current user key if skipping_saved_key
210 211 212 213 214 215
// POST: saved_key_ should have the next user key if valid_,
//       if the current entry is a result of merge
//           current_entry_is_merged_ => true
//           saved_value_             => the merged value
//
// NOTE: In between, saved_key_ can point to a user key that has
216
//       a delete marker or a sequence number higher than sequence_
217
//       saved_key_ MUST have a proper user_key before calling this function
218
//
Y
Yanqin Jin 已提交
219
// The prefix parameter, if not null, indicates that we need to iterate
220 221 222
// within the prefix, and the iterator needs to be made invalid, if no
// more entry for the prefix can be found.
bool DBIter::FindNextUserEntry(bool skipping_saved_key, const Slice* prefix) {
223
  PERF_TIMER_GUARD(find_next_user_entry_time);
224
  return FindNextUserEntryInternal(skipping_saved_key, prefix);
225 226 227
}

// Actual implementation of DBIter::FindNextUserEntry()
228 229
bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key,
                                       const Slice* prefix) {
J
jorlow@chromium.org 已提交
230
  // Loop until we hit an acceptable entry to yield
231
  assert(iter_.Valid());
232
  assert(status_.ok());
J
jorlow@chromium.org 已提交
233
  assert(direction_ == kForward);
234
  current_entry_is_merged_ = false;
235 236 237

  // How many times in a row we have skipped an entry with user key less than
  // or equal to saved_key_. We could skip these entries either because
238
  // sequence numbers were too high or because skipping_saved_key = true.
239
  // What saved_key_ contains throughout this method:
Y
Yanqin Jin 已提交
240 241 242 243 244 245 246 247
  //  - if skipping_saved_key : saved_key_ contains the key that we need
  //                            to skip, and we haven't seen any keys greater
  //                            than that,
  //  - if num_skipped > 0    : saved_key_ contains the key that we have skipped
  //                            num_skipped times, and we haven't seen any keys
  //                            greater than that,
  //  - none of the above     : saved_key_ can contain anything, it doesn't
  //                            matter.
248
  uint64_t num_skipped = 0;
249 250 251 252 253
  // For write unprepared, the target sequence number in reseek could be larger
  // than the snapshot, and thus needs to be skipped again. This could result in
  // an infinite loop of reseeks. To avoid that, we limit the number of reseeks
  // to one.
  bool reseek_done = false;
254

Y
Yi Wu 已提交
255 256
  is_blob_ = false;

J
jorlow@chromium.org 已提交
257
  do {
258 259 260
    // Will update is_key_seqnum_zero_ as soon as we parsed the current key
    // but we need to save the previous value to be used in the loop.
    bool is_prev_key_seqnum_zero = is_key_seqnum_zero_;
261
    if (!ParseKey(&ikey_)) {
262
      is_key_seqnum_zero_ = false;
263
      return false;
264
    }
265 266
    Slice user_key_without_ts =
        StripTimestampFromUserKey(ikey_.user_key, timestamp_size_);
267

268 269
    is_key_seqnum_zero_ = (ikey_.sequence == 0);

270 271
    assert(iterate_upper_bound_ == nullptr ||
           iter_.UpperBoundCheckResult() != IterBoundCheck::kInbound ||
Y
Yanqin Jin 已提交
272
           user_comparator_.CompareWithoutTimestamp(
273
               user_key_without_ts, /*a_has_ts=*/false, *iterate_upper_bound_,
Y
Yanqin Jin 已提交
274
               /*b_has_ts=*/false) < 0);
275 276
    if (iterate_upper_bound_ != nullptr &&
        iter_.UpperBoundCheckResult() != IterBoundCheck::kInbound &&
Y
Yanqin Jin 已提交
277
        user_comparator_.CompareWithoutTimestamp(
278
            user_key_without_ts, /*a_has_ts=*/false, *iterate_upper_bound_,
Y
Yanqin Jin 已提交
279
            /*b_has_ts=*/false) >= 0) {
280 281
      break;
    }
282

283 284
    assert(prefix == nullptr || prefix_extractor_ != nullptr);
    if (prefix != nullptr &&
285 286
        prefix_extractor_->Transform(user_key_without_ts).compare(*prefix) !=
            0) {
287
      assert(prefix_same_as_start_);
288 289 290
      break;
    }

291
    if (TooManyInternalKeysSkipped()) {
292
      return false;
293 294
    }

Y
Yanqin Jin 已提交
295
    assert(ikey_.user_key.size() >= timestamp_size_);
296 297 298
    Slice ts = timestamp_size_ > 0 ? ExtractTimestampFromUserKey(
                                         ikey_.user_key, timestamp_size_)
                                   : Slice();
299 300
    bool more_recent = false;
    if (IsVisible(ikey_.sequence, ts, &more_recent)) {
301 302 303 304
      // If the previous entry is of seqnum 0, the current entry will not
      // possibly be skipped. This condition can potentially be relaxed to
      // prev_key.seq <= ikey_.sequence. We are cautious because it will be more
      // prone to bugs causing the same user key with the same sequence number.
305 306
      // Note that with current timestamp implementation, the same user key can
      // have different timestamps and zero sequence number on the bottommost
Y
Yanqin Jin 已提交
307
      // level. This may change in the future.
308 309
      if ((!is_prev_key_seqnum_zero || timestamp_size_ > 0) &&
          skipping_saved_key &&
310
          CompareKeyForSkip(ikey_.user_key, saved_key_.GetUserKey()) <= 0) {
311 312 313
        num_skipped++;  // skip this entry
        PERF_COUNTER_ADD(internal_key_skipped_count, 1);
      } else {
314
        assert(!skipping_saved_key ||
315
               CompareKeyForSkip(ikey_.user_key, saved_key_.GetUserKey()) > 0);
316 317 318 319 320
        if (!iter_.PrepareValue()) {
          assert(!iter_.status().ok());
          valid_ = false;
          return false;
        }
321
        num_skipped = 0;
322
        reseek_done = false;
323
        switch (ikey_.type) {
324
          case kTypeDeletion:
Y
Yanqin Jin 已提交
325
          case kTypeDeletionWithTimestamp:
326 327 328
          case kTypeSingleDeletion:
            // Arrange to skip all upcoming entries for this key since
            // they are hidden by this deletion.
329 330 331
            // if iterartor specified start_seqnum we
            // 1) return internal key, including the type
            // 2) return ikey only if ikey.seqnum >= start_seqnum_
332
            // note that if deletion seqnum is < start_seqnum_ we
333
            // just skip it like in normal iterator.
334 335 336 337 338 339 340 341 342 343 344 345 346 347
            if (start_seqnum_ > 0) {
              if (ikey_.sequence >= start_seqnum_) {
                saved_key_.SetInternalKey(ikey_);
                valid_ = true;
                return true;
              } else {
                saved_key_.SetUserKey(
                    ikey_.user_key,
                    !pin_thru_lifetime_ ||
                        !iter_.iter()->IsKeyPinned() /* copy */);
                skipping_saved_key = true;
                PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
              }
            } else if (timestamp_lb_) {
348
              saved_key_.SetInternalKey(ikey_);
349 350
              valid_ = true;
              return true;
351 352
            } else {
              saved_key_.SetUserKey(
353 354
                  ikey_.user_key, !pin_thru_lifetime_ ||
                                      !iter_.iter()->IsKeyPinned() /* copy */);
355
              skipping_saved_key = true;
356
              PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
357
            }
358 359
            break;
          case kTypeValue:
Y
Yi Wu 已提交
360
          case kTypeBlobIndex:
361 362 363
            if (start_seqnum_ > 0) {
              if (ikey_.sequence >= start_seqnum_) {
                saved_key_.SetInternalKey(ikey_);
L
Levi Tamasi 已提交
364 365 366 367 368 369 370

                if (ikey_.type == kTypeBlobIndex) {
                  if (!SetBlobValueIfNeeded(ikey_.user_key, iter_.value())) {
                    return false;
                  }
                }

Y
Yi Wu 已提交
371
                valid_ = true;
372
                return true;
373 374
              } else {
                // this key and all previous versions shouldn't be included,
375
                // skipping_saved_key
376 377 378 379
                saved_key_.SetUserKey(
                    ikey_.user_key,
                    !pin_thru_lifetime_ ||
                        !iter_.iter()->IsKeyPinned() /* copy */);
380
                skipping_saved_key = true;
Y
Yi Wu 已提交
381
              }
382 383
            } else if (timestamp_lb_) {
              saved_key_.SetInternalKey(ikey_);
L
Levi Tamasi 已提交
384 385 386 387 388 389 390

              if (ikey_.type == kTypeBlobIndex) {
                if (!SetBlobValueIfNeeded(ikey_.user_key, iter_.value())) {
                  return false;
                }
              }

391 392
              valid_ = true;
              return true;
393
            } else {
394
              saved_key_.SetUserKey(
395 396
                  ikey_.user_key, !pin_thru_lifetime_ ||
                                      !iter_.iter()->IsKeyPinned() /* copy */);
397
              if (range_del_agg_.ShouldDelete(
398
                      ikey_, RangeDelPositioningMode::kForwardTraversal)) {
399 400
                // Arrange to skip all upcoming entries for this key since
                // they are hidden by this deletion.
401
                skipping_saved_key = true;
402
                num_skipped = 0;
403
                reseek_done = false;
404
                PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
L
Levi Tamasi 已提交
405 406 407 408 409
              } else {
                if (ikey_.type == kTypeBlobIndex) {
                  if (!SetBlobValueIfNeeded(ikey_.user_key, iter_.value())) {
                    return false;
                  }
410
                }
411

412
                valid_ = true;
413
                return true;
414
              }
415 416 417
            }
            break;
          case kTypeMerge:
418
            saved_key_.SetUserKey(
419
                ikey_.user_key,
420
                !pin_thru_lifetime_ || !iter_.iter()->IsKeyPinned() /* copy */);
421
            if (range_del_agg_.ShouldDelete(
422
                    ikey_, RangeDelPositioningMode::kForwardTraversal)) {
423 424
              // Arrange to skip all upcoming entries for this key since
              // they are hidden by this deletion.
425
              skipping_saved_key = true;
426
              num_skipped = 0;
427
              reseek_done = false;
428 429 430 431 432 433
              PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
            } else {
              // By now, we are sure the current ikey is going to yield a
              // value
              current_entry_is_merged_ = true;
              valid_ = true;
434
              return MergeValuesNewToOld();  // Go to a different state machine
435 436 437
            }
            break;
          default:
438 439 440 441 442
            valid_ = false;
            status_ = Status::Corruption(
                "Unknown value type: " +
                std::to_string(static_cast<unsigned int>(ikey_.type)));
            return false;
443
        }
J
jorlow@chromium.org 已提交
444
      }
445
    } else {
446 447 448
      if (more_recent) {
        PERF_COUNTER_ADD(internal_recent_skipped_count, 1);
      }
449

450 451 452
      // This key was inserted after our snapshot was taken or skipped by
      // timestamp range. If this happens too many times in a row for the same
      // user key, we want to seek to the target sequence number.
Y
Yanqin Jin 已提交
453 454
      int cmp = user_comparator_.CompareWithoutTimestamp(
          ikey_.user_key, saved_key_.GetUserKey());
Y
Yanqin Jin 已提交
455
      if (cmp == 0 || (skipping_saved_key && cmp < 0)) {
456 457
        num_skipped++;
      } else {
458
        saved_key_.SetUserKey(
459
            ikey_.user_key,
460
            !iter_.iter()->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
461
        skipping_saved_key = false;
462
        num_skipped = 0;
463
        reseek_done = false;
464
      }
J
jorlow@chromium.org 已提交
465
    }
466 467 468

    // If we have sequentially iterated via numerous equal keys, then it's
    // better to seek so that we can avoid too many key comparisons.
469 470 471 472 473
    //
    // To avoid infinite loops, do not reseek if we have already attempted to
    // reseek previously.
    //
    // TODO(lth): If we reseek to sequence number greater than ikey_.sequence,
Y
Yanqin Jin 已提交
474
    // then it does not make sense to reseek as we would actually land further
475 476
    // away from the desired key. There is opportunity for optimization here.
    if (num_skipped > max_skip_ && !reseek_done) {
477
      is_key_seqnum_zero_ = false;
478
      num_skipped = 0;
479
      reseek_done = true;
480
      std::string last_key;
481
      if (skipping_saved_key) {
482 483 484
        // We're looking for the next user-key but all we see are the same
        // user-key with decreasing sequence numbers. Fast forward to
        // sequence number 0 and type deletion (the smallest type).
Y
Yanqin Jin 已提交
485 486 487 488 489
        if (timestamp_size_ == 0) {
          AppendInternalKey(
              &last_key,
              ParsedInternalKey(saved_key_.GetUserKey(), 0, kTypeDeletion));
        } else {
490
          const std::string kTsMin(timestamp_size_, static_cast<char>(0));
Y
Yanqin Jin 已提交
491 492 493
          AppendInternalKeyWithDifferentTimestamp(
              &last_key,
              ParsedInternalKey(saved_key_.GetUserKey(), 0, kTypeDeletion),
494
              kTsMin);
Y
Yanqin Jin 已提交
495
        }
496 497
        // Don't set skipping_saved_key = false because we may still see more
        // user-keys equal to saved_key_.
498 499 500 501 502 503
      } else {
        // We saw multiple entries with this user key and sequence numbers
        // higher than sequence_. Fast forward to sequence_.
        // Note that this only covers a case when a higher key was overwritten
        // many times since our snapshot was taken, not the case when a lot of
        // different keys were inserted after our snapshot was taken.
Y
Yanqin Jin 已提交
504 505 506 507 508 509 510 511 512 513 514
        if (timestamp_size_ == 0) {
          AppendInternalKey(
              &last_key, ParsedInternalKey(saved_key_.GetUserKey(), sequence_,
                                           kValueTypeForSeek));
        } else {
          AppendInternalKeyWithDifferentTimestamp(
              &last_key,
              ParsedInternalKey(saved_key_.GetUserKey(), sequence_,
                                kValueTypeForSeek),
              *timestamp_ub_);
        }
515
      }
516
      iter_.Seek(last_key);
517 518
      RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
    } else {
519
      iter_.Next();
520
    }
521
  } while (iter_.Valid());
522

J
jorlow@chromium.org 已提交
523
  valid_ = false;
524
  return iter_.status().ok();
J
jorlow@chromium.org 已提交
525 526
}

527 528
// Merge values of the same user key starting from the current iter_ position
// Scan from the newer entries to older entries.
529
// PRE: iter_.key() points to the first merge type entry
530
//      saved_key_ stores the user key
531
//      iter_.PrepareValue() has been called
532 533
// POST: saved_value_ has the merged value for the user key
//       iter_ points to the next entry (or invalid)
534
bool DBIter::MergeValuesNewToOld() {
535
  if (!merge_operator_) {
536
    ROCKS_LOG_ERROR(logger_, "Options::merge_operator is null.");
537
    status_ = Status::InvalidArgument("merge_operator_ must be set.");
538
    valid_ = false;
539
    return false;
D
Deon Nicholas 已提交
540
  }
541

542 543
  // Temporarily pin the blocks that hold merge operands
  TempPinData();
544
  merge_context_.Clear();
545
  // Start the merge process by pushing the first operand
546 547
  merge_context_.PushOperand(
      iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */);
548
  TEST_SYNC_POINT("DBIter::MergeValuesNewToOld:PushedFirstOperand");
549 550

  ParsedInternalKey ikey;
551
  for (iter_.Next(); iter_.Valid(); iter_.Next()) {
552
    TEST_SYNC_POINT("DBIter::MergeValuesNewToOld:SteppedToNextOperand");
553
    if (!ParseKey(&ikey)) {
554
      return false;
555 556
    }

557
    if (!user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) {
558 559
      // hit the next user key, stop right here
      break;
560 561
    }
    if (kTypeDeletion == ikey.type || kTypeSingleDeletion == ikey.type ||
562
               range_del_agg_.ShouldDelete(
563
                   ikey, RangeDelPositioningMode::kForwardTraversal)) {
564 565
      // hit a delete with the same user key, stop right here
      // iter_ is positioned after delete
566
      iter_.Next();
567
      break;
568 569 570 571 572 573 574
    }
    if (!iter_.PrepareValue()) {
      valid_ = false;
      return false;
    }

    if (kTypeValue == ikey.type) {
575 576
      // hit a put, merge the put value with operands and store the
      // final result in saved_value_. We are done!
577
      const Slice val = iter_.value();
578
      Status s = MergeHelper::TimedFullMerge(
579
          merge_operator_, ikey.user_key, &val, merge_context_.GetOperands(),
580
          &saved_value_, logger_, statistics_, clock_, &pinned_value_, true);
581
      if (!s.ok()) {
Y
Yi Wu 已提交
582
        valid_ = false;
583
        status_ = s;
584
        return false;
585
      }
586
      // iter_ is positioned after put
587 588
      iter_.Next();
      if (!iter_.status().ok()) {
589 590 591 592
        valid_ = false;
        return false;
      }
      return true;
A
Andres Noetzli 已提交
593
    } else if (kTypeMerge == ikey.type) {
594 595
      // hit a merge, add the value as an operand and run associative merge.
      // when complete, add result to operands and continue.
596 597
      merge_context_.PushOperand(
          iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */);
598
      PERF_COUNTER_ADD(internal_merge_count, 1);
Y
Yi Wu 已提交
599
    } else if (kTypeBlobIndex == ikey.type) {
L
Levi Tamasi 已提交
600
      status_ = Status::NotSupported("BlobDB does not support merge operator.");
Y
Yi Wu 已提交
601
      valid_ = false;
602
      return false;
A
Andres Noetzli 已提交
603
    } else {
604 605 606 607 608
      valid_ = false;
      status_ = Status::Corruption(
          "Unrecognized value type: " +
          std::to_string(static_cast<unsigned int>(ikey.type)));
      return false;
609 610 611
    }
  }

612
  if (!iter_.status().ok()) {
613 614 615 616
    valid_ = false;
    return false;
  }

617 618 619 620
  // we either exhausted all internal keys under this user key, or hit
  // a deletion marker.
  // feed null as the existing value to the merge operator, such that
  // client can differentiate this scenario and do things accordingly.
621 622
  Status s = MergeHelper::TimedFullMerge(
      merge_operator_, saved_key_.GetUserKey(), nullptr,
623
      merge_context_.GetOperands(), &saved_value_, logger_, statistics_, clock_,
624
      &pinned_value_, true);
625
  if (!s.ok()) {
Y
Yi Wu 已提交
626
    valid_ = false;
627
    status_ = s;
628
    return false;
629
  }
630 631 632

  assert(status_.ok());
  return true;
633 634
}

J
jorlow@chromium.org 已提交
635
void DBIter::Prev() {
Y
Yanqin Jin 已提交
636 637 638 639 640 641 642
  if (timestamp_size_ > 0) {
    valid_ = false;
    status_ = Status::NotSupported(
        "SeekToLast/SeekForPrev/Prev currently not supported with timestamp.");
    return;
  }

J
jorlow@chromium.org 已提交
643
  assert(valid_);
644
  assert(status_.ok());
645

646
  PERF_CPU_TIMER_GUARD(iter_prev_cpu_nanos, clock_);
647
  ReleaseTempPinnedData();
648
  ResetInternalKeysSkippedCounter();
649
  bool ok = true;
S
Stanislau Hlebik 已提交
650
  if (direction_ == kForward) {
651 652 653 654 655
    if (!ReverseToBackward()) {
      ok = false;
    }
  }
  if (ok) {
656 657 658 659 660 661
    Slice prefix;
    if (prefix_same_as_start_) {
      assert(prefix_extractor_ != nullptr);
      prefix = prefix_.GetUserKey();
    }
    PrevInternal(prefix_same_as_start_ ? &prefix : nullptr);
S
Stanislau Hlebik 已提交
662
  }
663

M
Manuel Ung 已提交
664
  if (statistics_ != nullptr) {
665
    local_stats_.prev_count_++;
M
Manuel Ung 已提交
666
    if (valid_) {
667 668
      local_stats_.prev_found_count_++;
      local_stats_.bytes_read_ += (key().size() + value().size());
M
Manuel Ung 已提交
669 670
    }
  }
S
Stanislau Hlebik 已提交
671
}
J
jorlow@chromium.org 已提交
672

673
bool DBIter::ReverseToForward() {
674
  assert(iter_.status().ok());
675 676 677 678

  // When moving backwards, iter_ is positioned on _previous_ key, which may
  // not exist or may have different prefix than the current key().
  // If that's the case, seek iter_ to current key.
S
sdong 已提交
679
  if (!expect_total_order_inner_iter() || !iter_.Valid()) {
680 681
    IterKey last_key;
    last_key.SetInternalKey(ParsedInternalKey(
682
        saved_key_.GetUserKey(), kMaxSequenceNumber, kValueTypeForSeek));
683
    iter_.Seek(last_key.GetInternalKey());
684
  }
685

686
  direction_ = kForward;
687
  // Skip keys less than the current key() (a.k.a. saved_key_).
688
  while (iter_.Valid()) {
689 690 691 692
    ParsedInternalKey ikey;
    if (!ParseKey(&ikey)) {
      return false;
    }
693
    if (user_comparator_.Compare(ikey.user_key, saved_key_.GetUserKey()) >= 0) {
694 695
      return true;
    }
696
    iter_.Next();
697 698
  }

699
  if (!iter_.status().ok()) {
700 701
    valid_ = false;
    return false;
702
  }
703 704

  return true;
705 706
}

707 708
// Move iter_ to the key before saved_key_.
bool DBIter::ReverseToBackward() {
709
  assert(iter_.status().ok());
710 711 712 713 714

  // When current_entry_is_merged_ is true, iter_ may be positioned on the next
  // key, which may not exist or may have prefix different from current.
  // If that's the case, seek to saved_key_.
  if (current_entry_is_merged_ &&
S
sdong 已提交
715
      (!expect_total_order_inner_iter() || !iter_.Valid())) {
716
    IterKey last_key;
717 718 719 720 721
    // Using kMaxSequenceNumber and kValueTypeForSeek
    // (not kValueTypeForSeekForPrev) to seek to a key strictly smaller
    // than saved_key_.
    last_key.SetInternalKey(ParsedInternalKey(
        saved_key_.GetUserKey(), kMaxSequenceNumber, kValueTypeForSeek));
S
sdong 已提交
722
    if (!expect_total_order_inner_iter()) {
723
      iter_.SeekForPrev(last_key.GetInternalKey());
724 725 726 727 728
    } else {
      // Some iterators may not support SeekForPrev(), so we avoid using it
      // when prefix seek mode is disabled. This is somewhat expensive
      // (an extra Prev(), as well as an extra change of direction of iter_),
      // so we may need to reconsider it later.
729 730 731
      iter_.Seek(last_key.GetInternalKey());
      if (!iter_.Valid() && iter_.status().ok()) {
        iter_.SeekToLast();
732
      }
733 734 735 736
    }
  }

  direction_ = kReverse;
737
  return FindUserKeyBeforeSavedKey();
738 739
}

740
void DBIter::PrevInternal(const Slice* prefix) {
741
  while (iter_.Valid()) {
742
    saved_key_.SetUserKey(
743 744
        ExtractUserKey(iter_.key()),
        !iter_.iter()->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
745

746 747
    assert(prefix == nullptr || prefix_extractor_ != nullptr);
    if (prefix != nullptr &&
748 749 750
        prefix_extractor_
                ->Transform(StripTimestampFromUserKey(saved_key_.GetUserKey(),
                                                      timestamp_size_))
751 752
                .compare(*prefix) != 0) {
      assert(prefix_same_as_start_);
753 754 755 756 757
      // Current key does not have the same prefix as start
      valid_ = false;
      return;
    }

758 759 760 761
    assert(iterate_lower_bound_ == nullptr || iter_.MayBeOutOfLowerBound() ||
           user_comparator_.Compare(saved_key_.GetUserKey(),
                                    *iterate_lower_bound_) >= 0);
    if (iterate_lower_bound_ != nullptr && iter_.MayBeOutOfLowerBound() &&
762 763
        user_comparator_.Compare(saved_key_.GetUserKey(),
                                 *iterate_lower_bound_) < 0) {
764 765 766 767 768
      // We've iterated earlier than the user-specified lower bound.
      valid_ = false;
      return;
    }

769
    if (!FindValueForCurrentKey()) {  // assigns valid_
S
Stanislau Hlebik 已提交
770
      return;
J
jorlow@chromium.org 已提交
771
    }
772

773 774 775
    // Whether or not we found a value for current key, we need iter_ to end up
    // on a smaller key.
    if (!FindUserKeyBeforeSavedKey()) {
776 777 778
      return;
    }

779 780 781
    if (valid_) {
      // Found the value.
      return;
S
Stanislau Hlebik 已提交
782
    }
783 784 785

    if (TooManyInternalKeysSkipped(false)) {
      return;
S
Stanislau Hlebik 已提交
786 787
    }
  }
788

S
Stanislau Hlebik 已提交
789 790
  // We haven't found any key - iterator is not valid
  valid_ = false;
J
jorlow@chromium.org 已提交
791 792
}

793 794 795 796 797 798 799 800 801 802 803
// Used for backwards iteration.
// Looks at the entries with user key saved_key_ and finds the most up-to-date
// value for it, or executes a merge, or determines that the value was deleted.
// Sets valid_ to true if the value is found and is ready to be presented to
// the user through value().
// Sets valid_ to false if the value was deleted, and we should try another key.
// Returns false if an error occurred, and !status().ok() and !valid_.
//
// PRE: iter_ is positioned on the last entry with user key equal to saved_key_.
// POST: iter_ is positioned on one of the entries equal to saved_key_, or on
//       the entry just before them, or on the entry just after them.
S
Stanislau Hlebik 已提交
804
bool DBIter::FindValueForCurrentKey() {
805
  assert(iter_.Valid());
806
  merge_context_.Clear();
807
  current_entry_is_merged_ = false;
A
Andres Noetzli 已提交
808 809
  // last entry before merge (could be kTypeDeletion, kTypeSingleDeletion or
  // kTypeValue)
S
Stanislau Hlebik 已提交
810 811
  ValueType last_not_merge_type = kTypeDeletion;
  ValueType last_key_entry_type = kTypeDeletion;
J
jorlow@chromium.org 已提交
812

813 814 815
  // Temporarily pin blocks that hold (merge operands / the value)
  ReleaseTempPinnedData();
  TempPinData();
S
Stanislau Hlebik 已提交
816
  size_t num_skipped = 0;
817
  while (iter_.Valid()) {
818 819 820 821 822
    ParsedInternalKey ikey;
    if (!ParseKey(&ikey)) {
      return false;
    }

Y
Yanqin Jin 已提交
823 824 825 826 827 828 829
    assert(ikey.user_key.size() >= timestamp_size_);
    Slice ts;
    if (timestamp_size_ > 0) {
      ts = Slice(ikey.user_key.data() + ikey.user_key.size() - timestamp_size_,
                 timestamp_size_);
    }
    if (!IsVisible(ikey.sequence, ts) ||
830
        !user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) {
831 832
      break;
    }
833 834 835 836
    if (TooManyInternalKeysSkipped()) {
      return false;
    }

837 838 839
    // This user key has lots of entries.
    // We're going from old to new, and it's taking too long. Let's do a Seek()
    // and go from new to old. This helps when a key was overwritten many times.
840
    if (num_skipped >= max_skip_) {
S
Stanislau Hlebik 已提交
841 842 843
      return FindValueForCurrentKeyUsingSeek();
    }

844 845 846 847 848
    if (!iter_.PrepareValue()) {
      valid_ = false;
      return false;
    }

S
Stanislau Hlebik 已提交
849 850 851
    last_key_entry_type = ikey.type;
    switch (last_key_entry_type) {
      case kTypeValue:
Y
Yi Wu 已提交
852
      case kTypeBlobIndex:
853
        if (range_del_agg_.ShouldDelete(
854
                ikey, RangeDelPositioningMode::kBackwardTraversal)) {
A
Andrew Kryczka 已提交
855 856
          last_key_entry_type = kTypeRangeDeletion;
          PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
857
        } else if (iter_.iter()->IsValuePinned()) {
858
          pinned_value_ = iter_.value();
859 860 861 862 863
        } else {
          valid_ = false;
          status_ = Status::NotSupported(
              "Backward iteration not supported if underlying iterator's value "
              "cannot be pinned.");
A
Andrew Kryczka 已提交
864
        }
865
        merge_context_.Clear();
A
Andrew Kryczka 已提交
866
        last_not_merge_type = last_key_entry_type;
867 868 869
        if (!status_.ok()) {
          return false;
        }
S
Stanislau Hlebik 已提交
870 871
        break;
      case kTypeDeletion:
A
Andres Noetzli 已提交
872
      case kTypeSingleDeletion:
873
        merge_context_.Clear();
A
Andres Noetzli 已提交
874
        last_not_merge_type = last_key_entry_type;
875
        PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
S
Stanislau Hlebik 已提交
876 877
        break;
      case kTypeMerge:
878
        if (range_del_agg_.ShouldDelete(
879
                ikey, RangeDelPositioningMode::kBackwardTraversal)) {
A
Andrew Kryczka 已提交
880 881 882 883 884 885 886
          merge_context_.Clear();
          last_key_entry_type = kTypeRangeDeletion;
          last_not_merge_type = last_key_entry_type;
          PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
        } else {
          assert(merge_operator_ != nullptr);
          merge_context_.PushOperandBack(
887 888
              iter_.value(),
              iter_.iter()->IsValuePinned() /* operand_pinned */);
889
          PERF_COUNTER_ADD(internal_merge_count, 1);
A
Andrew Kryczka 已提交
890
        }
S
Stanislau Hlebik 已提交
891 892
        break;
      default:
893 894 895 896 897
        valid_ = false;
        status_ = Status::Corruption(
            "Unknown value type: " +
            std::to_string(static_cast<unsigned int>(last_key_entry_type)));
        return false;
S
Stanislau Hlebik 已提交
898 899
    }

900
    PERF_COUNTER_ADD(internal_key_skipped_count, 1);
901
    iter_.Prev();
S
Stanislau Hlebik 已提交
902
    ++num_skipped;
903 904
  }

905
  if (!iter_.status().ok()) {
906 907
    valid_ = false;
    return false;
S
Stanislau Hlebik 已提交
908 909
  }

910
  Status s;
S
sdong 已提交
911
  s.PermitUncheckedError();
Y
Yi Wu 已提交
912
  is_blob_ = false;
S
Stanislau Hlebik 已提交
913 914
  switch (last_key_entry_type) {
    case kTypeDeletion:
A
Andres Noetzli 已提交
915
    case kTypeSingleDeletion:
A
Andrew Kryczka 已提交
916
    case kTypeRangeDeletion:
S
Stanislau Hlebik 已提交
917
      valid_ = false;
918
      return true;
S
Stanislau Hlebik 已提交
919
    case kTypeMerge:
920
      current_entry_is_merged_ = true;
A
Aaron Gao 已提交
921
      if (last_not_merge_type == kTypeDeletion ||
A
Andrew Kryczka 已提交
922 923
          last_not_merge_type == kTypeSingleDeletion ||
          last_not_merge_type == kTypeRangeDeletion) {
924 925 926
        s = MergeHelper::TimedFullMerge(
            merge_operator_, saved_key_.GetUserKey(), nullptr,
            merge_context_.GetOperands(), &saved_value_, logger_, statistics_,
927
            clock_, &pinned_value_, true);
Y
Yi Wu 已提交
928
      } else if (last_not_merge_type == kTypeBlobIndex) {
L
Levi Tamasi 已提交
929 930
        status_ =
            Status::NotSupported("BlobDB does not support merge operator.");
Y
Yi Wu 已提交
931
        valid_ = false;
932
        return false;
933
      } else {
S
Stanislau Hlebik 已提交
934
        assert(last_not_merge_type == kTypeValue);
935
        s = MergeHelper::TimedFullMerge(
936
            merge_operator_, saved_key_.GetUserKey(), &pinned_value_,
937
            merge_context_.GetOperands(), &saved_value_, logger_, statistics_,
938
            clock_, &pinned_value_, true);
939
      }
S
Stanislau Hlebik 已提交
940 941
      break;
    case kTypeValue:
942
      // do nothing - we've already has value in pinned_value_
S
Stanislau Hlebik 已提交
943
      break;
Y
Yi Wu 已提交
944
    case kTypeBlobIndex:
L
Levi Tamasi 已提交
945
      if (!SetBlobValueIfNeeded(saved_key_.GetUserKey(), pinned_value_)) {
946
        return false;
Y
Yi Wu 已提交
947
      }
L
Levi Tamasi 已提交
948

Y
Yi Wu 已提交
949
      break;
S
Stanislau Hlebik 已提交
950
    default:
951 952 953 954 955
      valid_ = false;
      status_ = Status::Corruption(
          "Unknown value type: " +
          std::to_string(static_cast<unsigned int>(last_key_entry_type)));
      return false;
J
jorlow@chromium.org 已提交
956
  }
957
  if (!s.ok()) {
Y
Yi Wu 已提交
958
    valid_ = false;
959
    status_ = s;
960
    return false;
961
  }
962
  valid_ = true;
S
Stanislau Hlebik 已提交
963 964
  return true;
}
J
jorlow@chromium.org 已提交
965

S
Stanislau Hlebik 已提交
966 967
// This function is used in FindValueForCurrentKey.
// We use Seek() function instead of Prev() to find necessary value
968 969
// TODO: This is very similar to FindNextUserEntry() and MergeValuesNewToOld().
//       Would be nice to reuse some code.
S
Stanislau Hlebik 已提交
970
bool DBIter::FindValueForCurrentKeyUsingSeek() {
971 972 973
  // FindValueForCurrentKey will enable pinning before calling
  // FindValueForCurrentKeyUsingSeek()
  assert(pinned_iters_mgr_.PinningEnabled());
S
Stanislau Hlebik 已提交
974
  std::string last_key;
975 976
  AppendInternalKey(&last_key, ParsedInternalKey(saved_key_.GetUserKey(),
                                                 sequence_, kValueTypeForSeek));
977
  iter_.Seek(last_key);
S
Stanislau Hlebik 已提交
978 979
  RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);

980 981
  // In case read_callback presents, the value we seek to may not be visible.
  // Find the next value that's visible.
S
Stanislau Hlebik 已提交
982
  ParsedInternalKey ikey;
983
  is_blob_ = false;
984
  while (true) {
985
    if (!iter_.Valid()) {
986
      valid_ = false;
987
      return iter_.status().ok();
988 989 990 991 992
    }

    if (!ParseKey(&ikey)) {
      return false;
    }
Y
Yanqin Jin 已提交
993 994 995 996 997 998 999
    assert(ikey.user_key.size() >= timestamp_size_);
    Slice ts;
    if (timestamp_size_ > 0) {
      ts = Slice(ikey.user_key.data() + ikey.user_key.size() - timestamp_size_,
                 timestamp_size_);
    }

1000
    if (!user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) {
1001 1002 1003 1004 1005 1006 1007
      // No visible values for this key, even though FindValueForCurrentKey()
      // has seen some. This is possible if we're using a tailing iterator, and
      // the entries were discarded in a compaction.
      valid_ = false;
      return true;
    }

Y
Yanqin Jin 已提交
1008
    if (IsVisible(ikey.sequence, ts)) {
1009 1010
      break;
    }
1011

1012
    iter_.Next();
1013
  }
S
Stanislau Hlebik 已提交
1014

A
Andrew Kryczka 已提交
1015
  if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion ||
1016
      range_del_agg_.ShouldDelete(
1017
          ikey, RangeDelPositioningMode::kBackwardTraversal)) {
J
jorlow@chromium.org 已提交
1018
    valid_ = false;
1019
    return true;
S
Stanislau Hlebik 已提交
1020
  }
1021 1022 1023 1024
  if (!iter_.PrepareValue()) {
    valid_ = false;
    return false;
  }
Y
Yi Wu 已提交
1025
  if (ikey.type == kTypeValue || ikey.type == kTypeBlobIndex) {
1026 1027
    assert(iter_.iter()->IsValuePinned());
    pinned_value_ = iter_.value();
L
Levi Tamasi 已提交
1028 1029 1030 1031 1032 1033
    if (ikey.type == kTypeBlobIndex) {
      if (!SetBlobValueIfNeeded(ikey.user_key, pinned_value_)) {
        return false;
      }
    }

A
Andrew Kryczka 已提交
1034 1035 1036
    valid_ = true;
    return true;
  }
S
Stanislau Hlebik 已提交
1037 1038 1039

  // kTypeMerge. We need to collect all kTypeMerge values and save them
  // in operands
1040
  assert(ikey.type == kTypeMerge);
1041
  current_entry_is_merged_ = true;
1042
  merge_context_.Clear();
1043 1044
  merge_context_.PushOperand(
      iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */);
1045
  while (true) {
1046
    iter_.Next();
S
Stanislau Hlebik 已提交
1047

1048 1049
    if (!iter_.Valid()) {
      if (!iter_.status().ok()) {
1050 1051 1052 1053
        valid_ = false;
        return false;
      }
      break;
S
Stanislau Hlebik 已提交
1054
    }
1055 1056 1057
    if (!ParseKey(&ikey)) {
      return false;
    }
1058
    if (!user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) {
1059 1060 1061 1062
      break;
    }
    if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion ||
        range_del_agg_.ShouldDelete(
1063
            ikey, RangeDelPositioningMode::kForwardTraversal)) {
1064
      break;
1065 1066 1067 1068 1069 1070 1071
    }
    if (!iter_.PrepareValue()) {
      valid_ = false;
      return false;
    }

    if (ikey.type == kTypeValue) {
1072
      const Slice val = iter_.value();
1073 1074 1075
      Status s = MergeHelper::TimedFullMerge(
          merge_operator_, saved_key_.GetUserKey(), &val,
          merge_context_.GetOperands(), &saved_value_, logger_, statistics_,
1076
          clock_, &pinned_value_, true);
1077 1078 1079 1080 1081
      if (!s.ok()) {
        valid_ = false;
        status_ = s;
        return false;
      }
Y
Yi Wu 已提交
1082
      valid_ = true;
1083 1084
      return true;
    } else if (ikey.type == kTypeMerge) {
1085 1086
      merge_context_.PushOperand(
          iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */);
1087 1088
      PERF_COUNTER_ADD(internal_merge_count, 1);
    } else if (ikey.type == kTypeBlobIndex) {
L
Levi Tamasi 已提交
1089
      status_ = Status::NotSupported("BlobDB does not support merge operator.");
Y
Yi Wu 已提交
1090
      valid_ = false;
1091 1092
      return false;
    } else {
1093 1094 1095 1096 1097
      valid_ = false;
      status_ = Status::Corruption(
          "Unknown value type: " +
          std::to_string(static_cast<unsigned int>(ikey.type)));
      return false;
1098
    }
S
Stanislau Hlebik 已提交
1099 1100
  }

1101 1102
  Status s = MergeHelper::TimedFullMerge(
      merge_operator_, saved_key_.GetUserKey(), nullptr,
1103
      merge_context_.GetOperands(), &saved_value_, logger_, statistics_, clock_,
1104 1105
      &pinned_value_, true);
  if (!s.ok()) {
Y
Yi Wu 已提交
1106
    valid_ = false;
1107
    status_ = s;
1108
    return false;
1109
  }
S
Stanislau Hlebik 已提交
1110

1111 1112 1113
  // Make sure we leave iter_ in a good state. If it's valid and we don't care
  // about prefixes, that's already good enough. Otherwise it needs to be
  // seeked to the current key.
S
sdong 已提交
1114 1115
  if (!expect_total_order_inner_iter() || !iter_.Valid()) {
    if (!expect_total_order_inner_iter()) {
1116
      iter_.SeekForPrev(last_key);
1117
    } else {
1118 1119 1120
      iter_.Seek(last_key);
      if (!iter_.Valid() && iter_.status().ok()) {
        iter_.SeekToLast();
1121 1122 1123
      }
    }
    RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
S
Stanislau Hlebik 已提交
1124
  }
1125 1126 1127

  valid_ = true;
  return true;
S
Stanislau Hlebik 已提交
1128 1129
}

1130 1131 1132 1133
// Move backwards until the key smaller than saved_key_.
// Changes valid_ only if return value is false.
bool DBIter::FindUserKeyBeforeSavedKey() {
  assert(status_.ok());
S
Stanislau Hlebik 已提交
1134
  size_t num_skipped = 0;
1135
  while (iter_.Valid()) {
1136 1137 1138
    ParsedInternalKey ikey;
    if (!ParseKey(&ikey)) {
      return false;
1139 1140
    }

1141
    if (user_comparator_.Compare(ikey.user_key, saved_key_.GetUserKey()) < 0) {
1142 1143 1144 1145 1146
      return true;
    }

    if (TooManyInternalKeysSkipped()) {
      return false;
S
Stanislau Hlebik 已提交
1147
    }
1148

S
Siying Dong 已提交
1149
    assert(ikey.sequence != kMaxSequenceNumber);
Y
Yanqin Jin 已提交
1150 1151 1152 1153 1154 1155 1156
    assert(ikey.user_key.size() >= timestamp_size_);
    Slice ts;
    if (timestamp_size_ > 0) {
      ts = Slice(ikey.user_key.data() + ikey.user_key.size() - timestamp_size_,
                 timestamp_size_);
    }
    if (!IsVisible(ikey.sequence, ts)) {
1157 1158 1159 1160
      PERF_COUNTER_ADD(internal_recent_skipped_count, 1);
    } else {
      PERF_COUNTER_ADD(internal_key_skipped_count, 1);
    }
1161

1162
    if (num_skipped >= max_skip_) {
1163 1164 1165 1166 1167 1168
      num_skipped = 0;
      IterKey last_key;
      last_key.SetInternalKey(ParsedInternalKey(
          saved_key_.GetUserKey(), kMaxSequenceNumber, kValueTypeForSeek));
      // It would be more efficient to use SeekForPrev() here, but some
      // iterators may not support it.
1169
      iter_.Seek(last_key.GetInternalKey());
1170
      RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
1171
      if (!iter_.Valid()) {
1172 1173 1174 1175 1176 1177
        break;
      }
    } else {
      ++num_skipped;
    }

1178
    iter_.Prev();
S
Stanislau Hlebik 已提交
1179
  }
1180

1181
  if (!iter_.status().ok()) {
1182 1183 1184 1185 1186
    valid_ = false;
    return false;
  }

  return true;
S
Stanislau Hlebik 已提交
1187 1188
}

1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200
bool DBIter::TooManyInternalKeysSkipped(bool increment) {
  if ((max_skippable_internal_keys_ > 0) &&
      (num_internal_keys_skipped_ > max_skippable_internal_keys_)) {
    valid_ = false;
    status_ = Status::Incomplete("Too many internal keys skipped.");
    return true;
  } else if (increment) {
    num_internal_keys_skipped_++;
  }
  return false;
}

1201 1202
bool DBIter::IsVisible(SequenceNumber sequence, const Slice& ts,
                       bool* more_recent) {
Y
Yanqin Jin 已提交
1203
  // Remember that comparator orders preceding timestamp as larger.
1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216
  // TODO(yanqin): support timestamp in read_callback_.
  bool visible_by_seq = (read_callback_ == nullptr)
                            ? sequence <= sequence_
                            : read_callback_->IsVisible(sequence);

  bool visible_by_ts =
      (timestamp_ub_ == nullptr ||
       user_comparator_.CompareTimestamp(ts, *timestamp_ub_) <= 0) &&
      (timestamp_lb_ == nullptr ||
       user_comparator_.CompareTimestamp(ts, *timestamp_lb_) >= 0);

  if (more_recent) {
    *more_recent = !visible_by_seq;
1217
  }
1218
  return visible_by_seq && visible_by_ts;
1219
}
1220

1221
void DBIter::SetSavedKeyToSeekTarget(const Slice& target) {
1222
  is_key_seqnum_zero_ = false;
1223
  SequenceNumber seq = sequence_;
1224
  saved_key_.Clear();
Y
Yanqin Jin 已提交
1225
  saved_key_.SetInternalKey(target, seq, kValueTypeForSeek, timestamp_ub_);
1226

Z
zhangjinpeng1987 已提交
1227
  if (iterate_lower_bound_ != nullptr &&
Y
Yanqin Jin 已提交
1228 1229 1230
      user_comparator_.CompareWithoutTimestamp(
          saved_key_.GetUserKey(), /*a_has_ts=*/true, *iterate_lower_bound_,
          /*b_has_ts=*/false) < 0) {
1231
    // Seek key is smaller than the lower bound.
Z
zhangjinpeng1987 已提交
1232
    saved_key_.Clear();
Y
Yanqin Jin 已提交
1233 1234
    saved_key_.SetInternalKey(*iterate_lower_bound_, seq, kValueTypeForSeek,
                              timestamp_ub_);
Z
zhangjinpeng1987 已提交
1235
  }
1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253
}

void DBIter::SetSavedKeyToSeekForPrevTarget(const Slice& target) {
  is_key_seqnum_zero_ = false;
  saved_key_.Clear();
  // now saved_key is used to store internal key.
  saved_key_.SetInternalKey(target, 0 /* sequence_number */,
                            kValueTypeForSeekForPrev);

  if (iterate_upper_bound_ != nullptr &&
      user_comparator_.Compare(saved_key_.GetUserKey(),
                               *iterate_upper_bound_) >= 0) {
    saved_key_.Clear();
    saved_key_.SetInternalKey(*iterate_upper_bound_, kMaxSequenceNumber);
  }
}

void DBIter::Seek(const Slice& target) {
1254 1255
  PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, clock_);
  StopWatch sw(clock_, statistics_, DB_SEEK);
1256 1257 1258

#ifndef ROCKSDB_LITE
  if (db_impl_ != nullptr && cfd_ != nullptr) {
1259 1260
    // TODO: What do we do if this returns an error?
    db_impl_->TraceIteratorSeek(cfd_->GetID(), target).PermitUncheckedError();
1261 1262 1263 1264 1265 1266
  }
#endif  // ROCKSDB_LITE

  status_ = Status::OK();
  ReleaseTempPinnedData();
  ResetInternalKeysSkippedCounter();
Z
zhangjinpeng1987 已提交
1267

1268
  // Seek the inner iterator based on the target key.
1269 1270
  {
    PERF_TIMER_GUARD(seek_internal_seek_time);
1271 1272

    SetSavedKeyToSeekTarget(target);
1273
    iter_.Seek(saved_key_.GetInternalKey());
1274

1275
    range_del_agg_.InvalidateRangeDelMapPositions();
1276
    RecordTick(statistics_, NUMBER_DB_SEEK);
1277
  }
1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290
  if (!iter_.Valid()) {
    valid_ = false;
    return;
  }
  direction_ = kForward;

  // Now the inner iterator is placed to the target position. From there,
  // we need to find out the next key that is visible to the user.
  ClearSavedValue();
  if (prefix_same_as_start_) {
    // The case where the iterator needs to be invalidated if it has exausted
    // keys within the same prefix of the seek key.
    assert(prefix_extractor_ != nullptr);
Y
Yanqin Jin 已提交
1291
    Slice target_prefix = prefix_extractor_->Transform(target);
1292 1293 1294
    FindNextUserEntry(false /* not skipping saved_key */,
                      &target_prefix /* prefix */);
    if (valid_) {
Y
Yanqin Jin 已提交
1295
      // Remember the prefix of the seek key for the future Next() call to
1296 1297
      // check.
      prefix_.SetUserKey(target_prefix);
M
Manuel Ung 已提交
1298
    }
J
jorlow@chromium.org 已提交
1299
  } else {
1300 1301 1302 1303
    FindNextUserEntry(false /* not skipping saved_key */, nullptr);
  }
  if (!valid_) {
    return;
J
jorlow@chromium.org 已提交
1304
  }
1305

1306 1307 1308 1309 1310
  // Updating stats and perf context counters.
  if (statistics_ != nullptr) {
    // Decrement since we don't want to count this key as skipped
    RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
    RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
1311
  }
1312
  PERF_COUNTER_ADD(iter_read_bytes, key().size() + value().size());
J
jorlow@chromium.org 已提交
1313
}
J
jorlow@chromium.org 已提交
1314

A
Aaron Gao 已提交
1315
void DBIter::SeekForPrev(const Slice& target) {
1316 1317
  PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, clock_);
  StopWatch sw(clock_, statistics_, DB_SEEK);
1318 1319 1320

#ifndef ROCKSDB_LITE
  if (db_impl_ != nullptr && cfd_ != nullptr) {
1321 1322 1323
    // TODO: What do we do if this returns an error?
    db_impl_->TraceIteratorSeekForPrev(cfd_->GetID(), target)
        .PermitUncheckedError();
1324 1325 1326
  }
#endif  // ROCKSDB_LITE

Y
Yanqin Jin 已提交
1327 1328 1329 1330 1331 1332 1333
  if (timestamp_size_ > 0) {
    valid_ = false;
    status_ = Status::NotSupported(
        "SeekToLast/SeekForPrev/Prev currently not supported with timestamp.");
    return;
  }

1334
  status_ = Status::OK();
A
Aaron Gao 已提交
1335
  ReleaseTempPinnedData();
1336
  ResetInternalKeysSkippedCounter();
Z
zhangjinpeng1987 已提交
1337

1338
  // Seek the inner iterator based on the target key.
A
Aaron Gao 已提交
1339 1340
  {
    PERF_TIMER_GUARD(seek_internal_seek_time);
1341
    SetSavedKeyToSeekForPrevTarget(target);
1342
    iter_.SeekForPrev(saved_key_.GetInternalKey());
1343
    range_del_agg_.InvalidateRangeDelMapPositions();
1344
    RecordTick(statistics_, NUMBER_DB_SEEK);
A
Aaron Gao 已提交
1345
  }
1346 1347 1348
  if (!iter_.Valid()) {
    valid_ = false;
    return;
1349
  }
1350
  direction_ = kReverse;
1351

1352 1353 1354 1355 1356 1357 1358 1359
  // Now the inner iterator is placed to the target position. From there,
  // we need to find out the first key that is visible to the user in the
  // backward direction.
  ClearSavedValue();
  if (prefix_same_as_start_) {
    // The case where the iterator needs to be invalidated if it has exausted
    // keys within the same prefix of the seek key.
    assert(prefix_extractor_ != nullptr);
Y
Yanqin Jin 已提交
1360
    Slice target_prefix = prefix_extractor_->Transform(target);
1361 1362 1363 1364 1365
    PrevInternal(&target_prefix);
    if (valid_) {
      // Remember the prefix of the seek key for the future Prev() call to
      // check.
      prefix_.SetUserKey(target_prefix);
A
Aaron Gao 已提交
1366 1367
    }
  } else {
1368
    PrevInternal(nullptr);
A
Aaron Gao 已提交
1369
  }
1370 1371 1372 1373 1374 1375

  // Report stats and perf context.
  if (statistics_ != nullptr && valid_) {
    RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
    RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
    PERF_COUNTER_ADD(iter_read_bytes, key().size() + value().size());
A
Aaron Gao 已提交
1376 1377 1378
  }
}

J
jorlow@chromium.org 已提交
1379
void DBIter::SeekToFirst() {
1380 1381 1382 1383
  if (iterate_lower_bound_ != nullptr) {
    Seek(*iterate_lower_bound_);
    return;
  }
1384
  PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, clock_);
1385 1386
  // Don't use iter_::Seek() if we set a prefix extractor
  // because prefix seek will be used.
S
sdong 已提交
1387
  if (!expect_total_order_inner_iter()) {
1388 1389 1390
    max_skip_ = std::numeric_limits<uint64_t>::max();
  }
  status_ = Status::OK();
J
jorlow@chromium.org 已提交
1391
  direction_ = kForward;
1392
  ReleaseTempPinnedData();
1393
  ResetInternalKeysSkippedCounter();
J
jorlow@chromium.org 已提交
1394
  ClearSavedValue();
1395
  is_key_seqnum_zero_ = false;
1396 1397 1398

  {
    PERF_TIMER_GUARD(seek_internal_seek_time);
1399
    iter_.SeekToFirst();
1400
    range_del_agg_.InvalidateRangeDelMapPositions();
1401 1402
  }

M
Manuel Ung 已提交
1403
  RecordTick(statistics_, NUMBER_DB_SEEK);
1404
  if (iter_.Valid()) {
1405
    saved_key_.SetUserKey(
1406 1407
        ExtractUserKey(iter_.key()),
        !iter_.iter()->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
1408 1409
    FindNextUserEntry(false /* not skipping saved_key */,
                      nullptr /* no prefix check */);
M
Manuel Ung 已提交
1410 1411 1412 1413
    if (statistics_ != nullptr) {
      if (valid_) {
        RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
        RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
1414
        PERF_COUNTER_ADD(iter_read_bytes, key().size() + value().size());
M
Manuel Ung 已提交
1415 1416
      }
    }
J
jorlow@chromium.org 已提交
1417 1418
  } else {
    valid_ = false;
J
jorlow@chromium.org 已提交
1419
  }
1420 1421
  if (valid_ && prefix_same_as_start_) {
    assert(prefix_extractor_ != nullptr);
1422 1423
    prefix_.SetUserKey(prefix_extractor_->Transform(
        StripTimestampFromUserKey(saved_key_.GetUserKey(), timestamp_size_)));
1424
  }
J
jorlow@chromium.org 已提交
1425 1426
}

J
jorlow@chromium.org 已提交
1427
void DBIter::SeekToLast() {
Y
Yanqin Jin 已提交
1428 1429 1430 1431 1432 1433 1434
  if (timestamp_size_ > 0) {
    valid_ = false;
    status_ = Status::NotSupported(
        "SeekToLast/SeekForPrev/Prev currently not supported with timestamp.");
    return;
  }

1435 1436 1437
  if (iterate_upper_bound_ != nullptr) {
    // Seek to last key strictly less than ReadOptions.iterate_upper_bound.
    SeekForPrev(*iterate_upper_bound_);
1438
    if (Valid() && user_comparator_.Equal(*iterate_upper_bound_, key())) {
1439
      ReleaseTempPinnedData();
1440
      PrevInternal(nullptr);
1441 1442 1443 1444
    }
    return;
  }

1445
  PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, clock_);
S
Stanislau Hlebik 已提交
1446
  // Don't use iter_::Seek() if we set a prefix extractor
1447
  // because prefix seek will be used.
S
sdong 已提交
1448
  if (!expect_total_order_inner_iter()) {
S
Stanislau Hlebik 已提交
1449 1450
    max_skip_ = std::numeric_limits<uint64_t>::max();
  }
1451
  status_ = Status::OK();
J
jorlow@chromium.org 已提交
1452
  direction_ = kReverse;
1453
  ReleaseTempPinnedData();
1454
  ResetInternalKeysSkippedCounter();
J
jorlow@chromium.org 已提交
1455
  ClearSavedValue();
1456
  is_key_seqnum_zero_ = false;
1457 1458 1459

  {
    PERF_TIMER_GUARD(seek_internal_seek_time);
1460
    iter_.SeekToLast();
1461
    range_del_agg_.InvalidateRangeDelMapPositions();
1462
  }
1463
  PrevInternal(nullptr);
M
Manuel Ung 已提交
1464 1465 1466 1467 1468
  if (statistics_ != nullptr) {
    RecordTick(statistics_, NUMBER_DB_SEEK);
    if (valid_) {
      RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
      RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
1469
      PERF_COUNTER_ADD(iter_read_bytes, key().size() + value().size());
M
Manuel Ung 已提交
1470 1471
    }
  }
1472 1473
  if (valid_ && prefix_same_as_start_) {
    assert(prefix_extractor_ != nullptr);
1474 1475
    prefix_.SetUserKey(prefix_extractor_->Transform(
        StripTimestampFromUserKey(saved_key_.GetUserKey(), timestamp_size_)));
1476
  }
J
jorlow@chromium.org 已提交
1477 1478
}

1479 1480
Iterator* NewDBIterator(Env* env, const ReadOptions& read_options,
                        const ImmutableCFOptions& cf_options,
1481
                        const MutableCFOptions& mutable_cf_options,
1482
                        const Comparator* user_key_comparator,
L
Levi Tamasi 已提交
1483
                        InternalIterator* internal_iter, const Version* version,
1484
                        const SequenceNumber& sequence,
Y
Yi Wu 已提交
1485
                        uint64_t max_sequential_skip_in_iterations,
1486
                        ReadCallback* read_callback, DBImpl* db_impl,
L
Levi Tamasi 已提交
1487 1488 1489 1490 1491 1492
                        ColumnFamilyData* cfd, bool expose_blob_index) {
  DBIter* db_iter =
      new DBIter(env, read_options, cf_options, mutable_cf_options,
                 user_key_comparator, internal_iter, version, sequence, false,
                 max_sequential_skip_in_iterations, read_callback, db_impl, cfd,
                 expose_blob_index);
1493
  return db_iter;
1494 1495
}

1496
}  // namespace ROCKSDB_NAMESPACE