db_iter.cc 51.3 KB
Newer Older
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
S
Siying Dong 已提交
2 3 4
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).
5
//
J
jorlow@chromium.org 已提交
6 7 8 9 10
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include "db/db_iter.h"
11

12
#include <iostream>
S
Stanislau Hlebik 已提交
13
#include <limits>
14
#include <string>
J
jorlow@chromium.org 已提交
15 16

#include "db/dbformat.h"
17
#include "db/merge_context.h"
18
#include "db/merge_helper.h"
19
#include "db/pinned_iterators_manager.h"
20
#include "file/filename.h"
21
#include "logging/logging.h"
22
#include "memory/arena.h"
23
#include "monitoring/perf_context_imp.h"
24 25 26
#include "rocksdb/env.h"
#include "rocksdb/iterator.h"
#include "rocksdb/merge_operator.h"
27
#include "rocksdb/options.h"
28
#include "rocksdb/system_clock.h"
S
sdong 已提交
29
#include "table/internal_iterator.h"
30
#include "table/iterator_wrapper.h"
31
#include "trace_replay/trace_replay.h"
J
jorlow@chromium.org 已提交
32
#include "util/mutexlock.h"
33
#include "util/string_util.h"
34
#include "util/user_comparator_wrapper.h"
J
jorlow@chromium.org 已提交
35

36
namespace ROCKSDB_NAMESPACE {
J
jorlow@chromium.org 已提交
37

38
DBIter::DBIter(Env* _env, const ReadOptions& read_options,
39 40
               const ImmutableCFOptions& cf_options,
               const MutableCFOptions& mutable_cf_options,
L
Levi Tamasi 已提交
41 42 43
               const Comparator* cmp, InternalIterator* iter,
               const Version* version, SequenceNumber s, bool arena_mode,
               uint64_t max_sequential_skip_in_iterations,
44
               ReadCallback* read_callback, DBImpl* db_impl,
L
Levi Tamasi 已提交
45
               ColumnFamilyData* cfd, bool expose_blob_index)
46 47
    : prefix_extractor_(mutable_cf_options.prefix_extractor.get()),
      env_(_env),
48
      clock_(_env->GetSystemClock()),
49 50 51 52
      logger_(cf_options.info_log),
      user_comparator_(cmp),
      merge_operator_(cf_options.merge_operator),
      iter_(iter),
L
Levi Tamasi 已提交
53
      version_(version),
54 55 56
      read_callback_(read_callback),
      sequence_(s),
      statistics_(cf_options.statistics),
Y
Yanqin Jin 已提交
57 58
      max_skip_(max_sequential_skip_in_iterations),
      max_skippable_internal_keys_(read_options.max_skippable_internal_keys),
59 60 61 62 63 64 65
      num_internal_keys_skipped_(0),
      iterate_lower_bound_(read_options.iterate_lower_bound),
      iterate_upper_bound_(read_options.iterate_upper_bound),
      direction_(kForward),
      valid_(false),
      current_entry_is_merged_(false),
      is_key_seqnum_zero_(false),
66 67 68
      prefix_same_as_start_(mutable_cf_options.prefix_extractor
                                ? read_options.prefix_same_as_start
                                : false),
69
      pin_thru_lifetime_(read_options.pin_data),
S
sdong 已提交
70 71 72
      expect_total_order_inner_iter_(prefix_extractor_ == nullptr ||
                                     read_options.total_order_seek ||
                                     read_options.auto_prefix_mode),
L
Levi Tamasi 已提交
73 74 75
      read_tier_(read_options.read_tier),
      verify_checksums_(read_options.verify_checksums),
      expose_blob_index_(expose_blob_index),
76 77 78 79 80
      is_blob_(false),
      arena_mode_(arena_mode),
      range_del_agg_(&cf_options.internal_comparator, s),
      db_impl_(db_impl),
      cfd_(cfd),
Y
Yanqin Jin 已提交
81 82
      start_seqnum_(read_options.iter_start_seqnum),
      timestamp_ub_(read_options.timestamp),
83
      timestamp_lb_(read_options.iter_start_ts),
Y
Yanqin Jin 已提交
84
      timestamp_size_(timestamp_ub_ ? timestamp_ub_->size() : 0) {
85 86 87 88 89
  RecordTick(statistics_, NO_ITERATOR_CREATED);
  if (pin_thru_lifetime_) {
    pinned_iters_mgr_.StartPinning();
  }
  if (iter_.iter()) {
90
    iter_.iter()->SetPinnedItersMgr(&pinned_iters_mgr_);
J
jorlow@chromium.org 已提交
91
  }
Y
Yanqin Jin 已提交
92
  assert(timestamp_size_ == user_comparator_.timestamp_size());
93
}
94

95 96 97
Status DBIter::GetProperty(std::string prop_name, std::string* prop) {
  if (prop == nullptr) {
    return Status::InvalidArgument("prop is nullptr");
98
  }
99 100 101 102
  if (prop_name == "rocksdb.iterator.super-version-number") {
    // First try to pass the value returned from inner iterator.
    return iter_.iter()->GetProperty(prop_name, prop);
  } else if (prop_name == "rocksdb.iterator.is-key-pinned") {
103
    if (valid_) {
104 105 106
      *prop = (pin_thru_lifetime_ && saved_key_.IsKeyPinned()) ? "1" : "0";
    } else {
      *prop = "Iterator is not valid.";
107
    }
108 109 110 111
    return Status::OK();
  } else if (prop_name == "rocksdb.iterator.internal-key") {
    *prop = saved_key_.GetUserKey().ToString();
    return Status::OK();
112
  }
113 114
  return Status::InvalidArgument("Unidentified property.");
}
115

116
bool DBIter::ParseKey(ParsedInternalKey* ikey) {
117 118 119 120
  Status s =
      ParseInternalKey(iter_.key(), ikey, false /* log_err_key */);  // TODO
  if (!s.ok()) {
    status_ = Status::Corruption("In DBIter: ", s.getState());
121
    valid_ = false;
122
    ROCKS_LOG_ERROR(logger_, "In DBIter: %s", status_.getState());
J
jorlow@chromium.org 已提交
123 124 125 126 127 128
    return false;
  } else {
    return true;
  }
}

J
jorlow@chromium.org 已提交
129 130
void DBIter::Next() {
  assert(valid_);
131
  assert(status_.ok());
J
jorlow@chromium.org 已提交
132

133
  PERF_CPU_TIMER_GUARD(iter_next_cpu_nanos, clock_);
134 135
  // Release temporarily pinned blocks from last operation
  ReleaseTempPinnedData();
136 137 138
  local_stats_.skip_count_ += num_internal_keys_skipped_;
  local_stats_.skip_count_--;
  num_internal_keys_skipped_ = 0;
139
  bool ok = true;
S
Stanislau Hlebik 已提交
140
  if (direction_ == kReverse) {
141
    is_key_seqnum_zero_ = false;
142 143 144
    if (!ReverseToForward()) {
      ok = false;
    }
145
  } else if (!current_entry_is_merged_) {
146 147 148 149 150
    // If the current value is not a merge, the iter position is the
    // current key, which is already returned. We can safely issue a
    // Next() without checking the current key.
    // If the current key is a merge, very likely iter already points
    // to the next internal position.
151 152
    assert(iter_.Valid());
    iter_.Next();
153
    PERF_COUNTER_ADD(internal_key_skipped_count, 1);
J
jorlow@chromium.org 已提交
154
  }
J
jorlow@chromium.org 已提交
155

156
  local_stats_.next_count_++;
157
  if (ok && iter_.Valid()) {
158 159
    if (prefix_same_as_start_) {
      assert(prefix_extractor_ != nullptr);
160 161 162 163
      const Slice prefix = prefix_.GetUserKey();
      FindNextUserEntry(true /* skipping the current user key */, &prefix);
    } else {
      FindNextUserEntry(true /* skipping the current user key */, nullptr);
164
    }
165
  } else {
166
    is_key_seqnum_zero_ = false;
167 168
    valid_ = false;
  }
169 170 171 172
  if (statistics_ != nullptr && valid_) {
    local_stats_.next_found_count_++;
    local_stats_.bytes_read_ += (key().size() + value().size());
  }
J
jorlow@chromium.org 已提交
173 174
}

L
Levi Tamasi 已提交
175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208
bool DBIter::SetBlobValueIfNeeded(const Slice& user_key,
                                  const Slice& blob_index) {
  assert(!is_blob_);

  if (expose_blob_index_) {  // Stacked BlobDB implementation
    is_blob_ = true;
    return true;
  }

  if (!version_) {
    status_ = Status::Corruption("Encountered unexpected blob index.");
    valid_ = false;
    return false;
  }

  // TODO: consider moving ReadOptions from ArenaWrappedDBIter to DBIter to
  // avoid having to copy options back and forth.
  ReadOptions read_options;
  read_options.read_tier = read_tier_;
  read_options.verify_checksums = verify_checksums_;

  const Status s =
      version_->GetBlob(read_options, user_key, blob_index, &blob_value_);

  if (!s.ok()) {
    status_ = s;
    valid_ = false;
    return false;
  }

  is_blob_ = true;
  return true;
}

209
// PRE: saved_key_ has the current user key if skipping_saved_key
210 211 212 213 214 215
// POST: saved_key_ should have the next user key if valid_,
//       if the current entry is a result of merge
//           current_entry_is_merged_ => true
//           saved_value_             => the merged value
//
// NOTE: In between, saved_key_ can point to a user key that has
216
//       a delete marker or a sequence number higher than sequence_
217
//       saved_key_ MUST have a proper user_key before calling this function
218
//
Y
Yanqin Jin 已提交
219
// The prefix parameter, if not null, indicates that we need to iterate
220 221 222
// within the prefix, and the iterator needs to be made invalid, if no
// more entry for the prefix can be found.
bool DBIter::FindNextUserEntry(bool skipping_saved_key, const Slice* prefix) {
223
  PERF_TIMER_GUARD(find_next_user_entry_time);
224
  return FindNextUserEntryInternal(skipping_saved_key, prefix);
225 226 227
}

// Actual implementation of DBIter::FindNextUserEntry()
228 229
bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key,
                                       const Slice* prefix) {
J
jorlow@chromium.org 已提交
230
  // Loop until we hit an acceptable entry to yield
231
  assert(iter_.Valid());
232
  assert(status_.ok());
J
jorlow@chromium.org 已提交
233
  assert(direction_ == kForward);
234
  current_entry_is_merged_ = false;
235 236 237

  // How many times in a row we have skipped an entry with user key less than
  // or equal to saved_key_. We could skip these entries either because
238
  // sequence numbers were too high or because skipping_saved_key = true.
239
  // What saved_key_ contains throughout this method:
Y
Yanqin Jin 已提交
240 241 242 243 244 245 246 247
  //  - if skipping_saved_key : saved_key_ contains the key that we need
  //                            to skip, and we haven't seen any keys greater
  //                            than that,
  //  - if num_skipped > 0    : saved_key_ contains the key that we have skipped
  //                            num_skipped times, and we haven't seen any keys
  //                            greater than that,
  //  - none of the above     : saved_key_ can contain anything, it doesn't
  //                            matter.
248
  uint64_t num_skipped = 0;
249 250 251 252 253
  // For write unprepared, the target sequence number in reseek could be larger
  // than the snapshot, and thus needs to be skipped again. This could result in
  // an infinite loop of reseeks. To avoid that, we limit the number of reseeks
  // to one.
  bool reseek_done = false;
254

Y
Yi Wu 已提交
255 256
  is_blob_ = false;

J
jorlow@chromium.org 已提交
257
  do {
258 259 260
    // Will update is_key_seqnum_zero_ as soon as we parsed the current key
    // but we need to save the previous value to be used in the loop.
    bool is_prev_key_seqnum_zero = is_key_seqnum_zero_;
261
    if (!ParseKey(&ikey_)) {
262
      is_key_seqnum_zero_ = false;
263
      return false;
264
    }
265 266
    Slice user_key_without_ts =
        StripTimestampFromUserKey(ikey_.user_key, timestamp_size_);
267

268 269
    is_key_seqnum_zero_ = (ikey_.sequence == 0);

270 271
    assert(iterate_upper_bound_ == nullptr ||
           iter_.UpperBoundCheckResult() != IterBoundCheck::kInbound ||
Y
Yanqin Jin 已提交
272
           user_comparator_.CompareWithoutTimestamp(
273
               user_key_without_ts, /*a_has_ts=*/false, *iterate_upper_bound_,
Y
Yanqin Jin 已提交
274
               /*b_has_ts=*/false) < 0);
275 276
    if (iterate_upper_bound_ != nullptr &&
        iter_.UpperBoundCheckResult() != IterBoundCheck::kInbound &&
Y
Yanqin Jin 已提交
277
        user_comparator_.CompareWithoutTimestamp(
278
            user_key_without_ts, /*a_has_ts=*/false, *iterate_upper_bound_,
Y
Yanqin Jin 已提交
279
            /*b_has_ts=*/false) >= 0) {
280 281
      break;
    }
282

283 284
    assert(prefix == nullptr || prefix_extractor_ != nullptr);
    if (prefix != nullptr &&
285 286
        prefix_extractor_->Transform(user_key_without_ts).compare(*prefix) !=
            0) {
287
      assert(prefix_same_as_start_);
288 289 290
      break;
    }

291
    if (TooManyInternalKeysSkipped()) {
292
      return false;
293 294
    }

Y
Yanqin Jin 已提交
295
    assert(ikey_.user_key.size() >= timestamp_size_);
296 297 298
    Slice ts = timestamp_size_ > 0 ? ExtractTimestampFromUserKey(
                                         ikey_.user_key, timestamp_size_)
                                   : Slice();
299 300
    bool more_recent = false;
    if (IsVisible(ikey_.sequence, ts, &more_recent)) {
301 302 303 304
      // If the previous entry is of seqnum 0, the current entry will not
      // possibly be skipped. This condition can potentially be relaxed to
      // prev_key.seq <= ikey_.sequence. We are cautious because it will be more
      // prone to bugs causing the same user key with the same sequence number.
305 306
      // Note that with current timestamp implementation, the same user key can
      // have different timestamps and zero sequence number on the bottommost
Y
Yanqin Jin 已提交
307
      // level. This may change in the future.
308 309
      if ((!is_prev_key_seqnum_zero || timestamp_size_ > 0) &&
          skipping_saved_key &&
310
          CompareKeyForSkip(ikey_.user_key, saved_key_.GetUserKey()) <= 0) {
311 312 313
        num_skipped++;  // skip this entry
        PERF_COUNTER_ADD(internal_key_skipped_count, 1);
      } else {
314
        assert(!skipping_saved_key ||
315
               CompareKeyForSkip(ikey_.user_key, saved_key_.GetUserKey()) > 0);
316 317 318 319 320
        if (!iter_.PrepareValue()) {
          assert(!iter_.status().ok());
          valid_ = false;
          return false;
        }
321
        num_skipped = 0;
322
        reseek_done = false;
323
        switch (ikey_.type) {
324
          case kTypeDeletion:
Y
Yanqin Jin 已提交
325
          case kTypeDeletionWithTimestamp:
326 327 328
          case kTypeSingleDeletion:
            // Arrange to skip all upcoming entries for this key since
            // they are hidden by this deletion.
329 330 331
            // if iterartor specified start_seqnum we
            // 1) return internal key, including the type
            // 2) return ikey only if ikey.seqnum >= start_seqnum_
332
            // note that if deletion seqnum is < start_seqnum_ we
333
            // just skip it like in normal iterator.
334 335 336 337 338 339 340 341 342 343 344 345 346 347
            if (start_seqnum_ > 0) {
              if (ikey_.sequence >= start_seqnum_) {
                saved_key_.SetInternalKey(ikey_);
                valid_ = true;
                return true;
              } else {
                saved_key_.SetUserKey(
                    ikey_.user_key,
                    !pin_thru_lifetime_ ||
                        !iter_.iter()->IsKeyPinned() /* copy */);
                skipping_saved_key = true;
                PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
              }
            } else if (timestamp_lb_) {
348
              saved_key_.SetInternalKey(ikey_);
349 350
              valid_ = true;
              return true;
351 352
            } else {
              saved_key_.SetUserKey(
353 354
                  ikey_.user_key, !pin_thru_lifetime_ ||
                                      !iter_.iter()->IsKeyPinned() /* copy */);
355
              skipping_saved_key = true;
356
              PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
357
            }
358 359
            break;
          case kTypeValue:
Y
Yi Wu 已提交
360
          case kTypeBlobIndex:
361 362 363
            if (start_seqnum_ > 0) {
              if (ikey_.sequence >= start_seqnum_) {
                saved_key_.SetInternalKey(ikey_);
L
Levi Tamasi 已提交
364 365 366 367 368 369 370

                if (ikey_.type == kTypeBlobIndex) {
                  if (!SetBlobValueIfNeeded(ikey_.user_key, iter_.value())) {
                    return false;
                  }
                }

Y
Yi Wu 已提交
371
                valid_ = true;
372
                return true;
373 374
              } else {
                // this key and all previous versions shouldn't be included,
375
                // skipping_saved_key
376 377 378 379
                saved_key_.SetUserKey(
                    ikey_.user_key,
                    !pin_thru_lifetime_ ||
                        !iter_.iter()->IsKeyPinned() /* copy */);
380
                skipping_saved_key = true;
Y
Yi Wu 已提交
381
              }
382 383
            } else if (timestamp_lb_) {
              saved_key_.SetInternalKey(ikey_);
L
Levi Tamasi 已提交
384 385 386 387 388 389 390

              if (ikey_.type == kTypeBlobIndex) {
                if (!SetBlobValueIfNeeded(ikey_.user_key, iter_.value())) {
                  return false;
                }
              }

391 392
              valid_ = true;
              return true;
393
            } else {
394
              saved_key_.SetUserKey(
395 396
                  ikey_.user_key, !pin_thru_lifetime_ ||
                                      !iter_.iter()->IsKeyPinned() /* copy */);
397
              if (range_del_agg_.ShouldDelete(
398
                      ikey_, RangeDelPositioningMode::kForwardTraversal)) {
399 400
                // Arrange to skip all upcoming entries for this key since
                // they are hidden by this deletion.
401
                skipping_saved_key = true;
402
                num_skipped = 0;
403
                reseek_done = false;
404
                PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
L
Levi Tamasi 已提交
405 406 407 408 409
              } else {
                if (ikey_.type == kTypeBlobIndex) {
                  if (!SetBlobValueIfNeeded(ikey_.user_key, iter_.value())) {
                    return false;
                  }
410
                }
411

412
                valid_ = true;
413
                return true;
414
              }
415 416 417
            }
            break;
          case kTypeMerge:
418
            saved_key_.SetUserKey(
419
                ikey_.user_key,
420
                !pin_thru_lifetime_ || !iter_.iter()->IsKeyPinned() /* copy */);
421
            if (range_del_agg_.ShouldDelete(
422
                    ikey_, RangeDelPositioningMode::kForwardTraversal)) {
423 424
              // Arrange to skip all upcoming entries for this key since
              // they are hidden by this deletion.
425
              skipping_saved_key = true;
426
              num_skipped = 0;
427
              reseek_done = false;
428 429 430 431 432 433
              PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
            } else {
              // By now, we are sure the current ikey is going to yield a
              // value
              current_entry_is_merged_ = true;
              valid_ = true;
434
              return MergeValuesNewToOld();  // Go to a different state machine
435 436 437
            }
            break;
          default:
438 439 440 441 442
            valid_ = false;
            status_ = Status::Corruption(
                "Unknown value type: " +
                std::to_string(static_cast<unsigned int>(ikey_.type)));
            return false;
443
        }
J
jorlow@chromium.org 已提交
444
      }
445
    } else {
446 447 448
      if (more_recent) {
        PERF_COUNTER_ADD(internal_recent_skipped_count, 1);
      }
449

450 451 452
      // This key was inserted after our snapshot was taken or skipped by
      // timestamp range. If this happens too many times in a row for the same
      // user key, we want to seek to the target sequence number.
Y
Yanqin Jin 已提交
453 454
      int cmp = user_comparator_.CompareWithoutTimestamp(
          ikey_.user_key, saved_key_.GetUserKey());
Y
Yanqin Jin 已提交
455
      if (cmp == 0 || (skipping_saved_key && cmp < 0)) {
456 457
        num_skipped++;
      } else {
458
        saved_key_.SetUserKey(
459
            ikey_.user_key,
460
            !iter_.iter()->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
461
        skipping_saved_key = false;
462
        num_skipped = 0;
463
        reseek_done = false;
464
      }
J
jorlow@chromium.org 已提交
465
    }
466 467 468

    // If we have sequentially iterated via numerous equal keys, then it's
    // better to seek so that we can avoid too many key comparisons.
469 470 471 472 473
    //
    // To avoid infinite loops, do not reseek if we have already attempted to
    // reseek previously.
    //
    // TODO(lth): If we reseek to sequence number greater than ikey_.sequence,
Y
Yanqin Jin 已提交
474
    // then it does not make sense to reseek as we would actually land further
475 476
    // away from the desired key. There is opportunity for optimization here.
    if (num_skipped > max_skip_ && !reseek_done) {
477
      is_key_seqnum_zero_ = false;
478
      num_skipped = 0;
479
      reseek_done = true;
480
      std::string last_key;
481
      if (skipping_saved_key) {
482 483 484
        // We're looking for the next user-key but all we see are the same
        // user-key with decreasing sequence numbers. Fast forward to
        // sequence number 0 and type deletion (the smallest type).
Y
Yanqin Jin 已提交
485 486 487 488 489
        if (timestamp_size_ == 0) {
          AppendInternalKey(
              &last_key,
              ParsedInternalKey(saved_key_.GetUserKey(), 0, kTypeDeletion));
        } else {
490
          const std::string kTsMin(timestamp_size_, static_cast<char>(0));
Y
Yanqin Jin 已提交
491 492 493
          AppendInternalKeyWithDifferentTimestamp(
              &last_key,
              ParsedInternalKey(saved_key_.GetUserKey(), 0, kTypeDeletion),
494
              kTsMin);
Y
Yanqin Jin 已提交
495
        }
496 497
        // Don't set skipping_saved_key = false because we may still see more
        // user-keys equal to saved_key_.
498 499 500 501 502 503
      } else {
        // We saw multiple entries with this user key and sequence numbers
        // higher than sequence_. Fast forward to sequence_.
        // Note that this only covers a case when a higher key was overwritten
        // many times since our snapshot was taken, not the case when a lot of
        // different keys were inserted after our snapshot was taken.
Y
Yanqin Jin 已提交
504 505 506 507 508 509 510 511 512 513 514
        if (timestamp_size_ == 0) {
          AppendInternalKey(
              &last_key, ParsedInternalKey(saved_key_.GetUserKey(), sequence_,
                                           kValueTypeForSeek));
        } else {
          AppendInternalKeyWithDifferentTimestamp(
              &last_key,
              ParsedInternalKey(saved_key_.GetUserKey(), sequence_,
                                kValueTypeForSeek),
              *timestamp_ub_);
        }
515
      }
516
      iter_.Seek(last_key);
517 518
      RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
    } else {
519
      iter_.Next();
520
    }
521
  } while (iter_.Valid());
522

J
jorlow@chromium.org 已提交
523
  valid_ = false;
524
  return iter_.status().ok();
J
jorlow@chromium.org 已提交
525 526
}

527 528
// Merge values of the same user key starting from the current iter_ position
// Scan from the newer entries to older entries.
529
// PRE: iter_.key() points to the first merge type entry
530
//      saved_key_ stores the user key
531
//      iter_.PrepareValue() has been called
532 533
// POST: saved_value_ has the merged value for the user key
//       iter_ points to the next entry (or invalid)
534
bool DBIter::MergeValuesNewToOld() {
535
  if (!merge_operator_) {
536
    ROCKS_LOG_ERROR(logger_, "Options::merge_operator is null.");
537
    status_ = Status::InvalidArgument("merge_operator_ must be set.");
538
    valid_ = false;
539
    return false;
D
Deon Nicholas 已提交
540
  }
541

542 543
  // Temporarily pin the blocks that hold merge operands
  TempPinData();
544
  merge_context_.Clear();
545
  // Start the merge process by pushing the first operand
546 547
  merge_context_.PushOperand(
      iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */);
548
  TEST_SYNC_POINT("DBIter::MergeValuesNewToOld:PushedFirstOperand");
549 550

  ParsedInternalKey ikey;
551
  for (iter_.Next(); iter_.Valid(); iter_.Next()) {
552
    TEST_SYNC_POINT("DBIter::MergeValuesNewToOld:SteppedToNextOperand");
553
    if (!ParseKey(&ikey)) {
554
      return false;
555 556
    }

557
    if (!user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) {
558 559
      // hit the next user key, stop right here
      break;
560 561
    }
    if (kTypeDeletion == ikey.type || kTypeSingleDeletion == ikey.type ||
562
               range_del_agg_.ShouldDelete(
563
                   ikey, RangeDelPositioningMode::kForwardTraversal)) {
564 565
      // hit a delete with the same user key, stop right here
      // iter_ is positioned after delete
566
      iter_.Next();
567
      break;
568 569 570 571 572 573 574
    }
    if (!iter_.PrepareValue()) {
      valid_ = false;
      return false;
    }

    if (kTypeValue == ikey.type) {
575 576
      // hit a put, merge the put value with operands and store the
      // final result in saved_value_. We are done!
577
      const Slice val = iter_.value();
578
      Status s = MergeHelper::TimedFullMerge(
579
          merge_operator_, ikey.user_key, &val, merge_context_.GetOperands(),
580
          &saved_value_, logger_, statistics_, clock_, &pinned_value_, true);
581
      if (!s.ok()) {
Y
Yi Wu 已提交
582
        valid_ = false;
583
        status_ = s;
584
        return false;
585
      }
586
      // iter_ is positioned after put
587 588
      iter_.Next();
      if (!iter_.status().ok()) {
589 590 591 592
        valid_ = false;
        return false;
      }
      return true;
A
Andres Noetzli 已提交
593
    } else if (kTypeMerge == ikey.type) {
594 595
      // hit a merge, add the value as an operand and run associative merge.
      // when complete, add result to operands and continue.
596 597
      merge_context_.PushOperand(
          iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */);
598
      PERF_COUNTER_ADD(internal_merge_count, 1);
Y
Yi Wu 已提交
599
    } else if (kTypeBlobIndex == ikey.type) {
L
Levi Tamasi 已提交
600
      status_ = Status::NotSupported("BlobDB does not support merge operator.");
Y
Yi Wu 已提交
601
      valid_ = false;
602
      return false;
A
Andres Noetzli 已提交
603
    } else {
604 605 606 607 608
      valid_ = false;
      status_ = Status::Corruption(
          "Unrecognized value type: " +
          std::to_string(static_cast<unsigned int>(ikey.type)));
      return false;
609 610 611
    }
  }

612
  if (!iter_.status().ok()) {
613 614 615 616
    valid_ = false;
    return false;
  }

617 618 619 620
  // we either exhausted all internal keys under this user key, or hit
  // a deletion marker.
  // feed null as the existing value to the merge operator, such that
  // client can differentiate this scenario and do things accordingly.
621 622
  Status s = MergeHelper::TimedFullMerge(
      merge_operator_, saved_key_.GetUserKey(), nullptr,
623
      merge_context_.GetOperands(), &saved_value_, logger_, statistics_, clock_,
624
      &pinned_value_, true);
625
  if (!s.ok()) {
Y
Yi Wu 已提交
626
    valid_ = false;
627
    status_ = s;
628
    return false;
629
  }
630 631 632

  assert(status_.ok());
  return true;
633 634
}

J
jorlow@chromium.org 已提交
635
void DBIter::Prev() {
Y
Yanqin Jin 已提交
636 637 638 639 640 641 642
  if (timestamp_size_ > 0) {
    valid_ = false;
    status_ = Status::NotSupported(
        "SeekToLast/SeekForPrev/Prev currently not supported with timestamp.");
    return;
  }

J
jorlow@chromium.org 已提交
643
  assert(valid_);
644
  assert(status_.ok());
645

646
  PERF_CPU_TIMER_GUARD(iter_prev_cpu_nanos, clock_);
647
  ReleaseTempPinnedData();
648
  ResetInternalKeysSkippedCounter();
649
  bool ok = true;
S
Stanislau Hlebik 已提交
650
  if (direction_ == kForward) {
651 652 653 654 655
    if (!ReverseToBackward()) {
      ok = false;
    }
  }
  if (ok) {
656 657 658 659 660 661
    Slice prefix;
    if (prefix_same_as_start_) {
      assert(prefix_extractor_ != nullptr);
      prefix = prefix_.GetUserKey();
    }
    PrevInternal(prefix_same_as_start_ ? &prefix : nullptr);
S
Stanislau Hlebik 已提交
662
  }
663

M
Manuel Ung 已提交
664
  if (statistics_ != nullptr) {
665
    local_stats_.prev_count_++;
M
Manuel Ung 已提交
666
    if (valid_) {
667 668
      local_stats_.prev_found_count_++;
      local_stats_.bytes_read_ += (key().size() + value().size());
M
Manuel Ung 已提交
669 670
    }
  }
S
Stanislau Hlebik 已提交
671
}
J
jorlow@chromium.org 已提交
672

673
bool DBIter::ReverseToForward() {
674
  assert(iter_.status().ok());
675 676 677 678

  // When moving backwards, iter_ is positioned on _previous_ key, which may
  // not exist or may have different prefix than the current key().
  // If that's the case, seek iter_ to current key.
S
sdong 已提交
679
  if (!expect_total_order_inner_iter() || !iter_.Valid()) {
680 681
    IterKey last_key;
    last_key.SetInternalKey(ParsedInternalKey(
682
        saved_key_.GetUserKey(), kMaxSequenceNumber, kValueTypeForSeek));
683
    iter_.Seek(last_key.GetInternalKey());
684
    RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
685
  }
686

687
  direction_ = kForward;
688
  // Skip keys less than the current key() (a.k.a. saved_key_).
689
  while (iter_.Valid()) {
690 691 692 693
    ParsedInternalKey ikey;
    if (!ParseKey(&ikey)) {
      return false;
    }
694
    if (user_comparator_.Compare(ikey.user_key, saved_key_.GetUserKey()) >= 0) {
695 696
      return true;
    }
697
    iter_.Next();
698 699
  }

700
  if (!iter_.status().ok()) {
701 702
    valid_ = false;
    return false;
703
  }
704 705

  return true;
706 707
}

708 709
// Move iter_ to the key before saved_key_.
bool DBIter::ReverseToBackward() {
710
  assert(iter_.status().ok());
711 712 713 714 715

  // When current_entry_is_merged_ is true, iter_ may be positioned on the next
  // key, which may not exist or may have prefix different from current.
  // If that's the case, seek to saved_key_.
  if (current_entry_is_merged_ &&
S
sdong 已提交
716
      (!expect_total_order_inner_iter() || !iter_.Valid())) {
717
    IterKey last_key;
718 719 720 721 722
    // Using kMaxSequenceNumber and kValueTypeForSeek
    // (not kValueTypeForSeekForPrev) to seek to a key strictly smaller
    // than saved_key_.
    last_key.SetInternalKey(ParsedInternalKey(
        saved_key_.GetUserKey(), kMaxSequenceNumber, kValueTypeForSeek));
S
sdong 已提交
723
    if (!expect_total_order_inner_iter()) {
724
      iter_.SeekForPrev(last_key.GetInternalKey());
725 726 727 728 729
    } else {
      // Some iterators may not support SeekForPrev(), so we avoid using it
      // when prefix seek mode is disabled. This is somewhat expensive
      // (an extra Prev(), as well as an extra change of direction of iter_),
      // so we may need to reconsider it later.
730 731 732
      iter_.Seek(last_key.GetInternalKey());
      if (!iter_.Valid() && iter_.status().ok()) {
        iter_.SeekToLast();
733
      }
734
    }
735
    RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
736 737 738
  }

  direction_ = kReverse;
739
  return FindUserKeyBeforeSavedKey();
740 741
}

742
void DBIter::PrevInternal(const Slice* prefix) {
743
  while (iter_.Valid()) {
744
    saved_key_.SetUserKey(
745 746
        ExtractUserKey(iter_.key()),
        !iter_.iter()->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
747

748 749
    assert(prefix == nullptr || prefix_extractor_ != nullptr);
    if (prefix != nullptr &&
750 751 752
        prefix_extractor_
                ->Transform(StripTimestampFromUserKey(saved_key_.GetUserKey(),
                                                      timestamp_size_))
753 754
                .compare(*prefix) != 0) {
      assert(prefix_same_as_start_);
755 756 757 758 759
      // Current key does not have the same prefix as start
      valid_ = false;
      return;
    }

760 761 762 763
    assert(iterate_lower_bound_ == nullptr || iter_.MayBeOutOfLowerBound() ||
           user_comparator_.Compare(saved_key_.GetUserKey(),
                                    *iterate_lower_bound_) >= 0);
    if (iterate_lower_bound_ != nullptr && iter_.MayBeOutOfLowerBound() &&
764 765
        user_comparator_.Compare(saved_key_.GetUserKey(),
                                 *iterate_lower_bound_) < 0) {
766 767 768 769 770
      // We've iterated earlier than the user-specified lower bound.
      valid_ = false;
      return;
    }

771
    if (!FindValueForCurrentKey()) {  // assigns valid_
S
Stanislau Hlebik 已提交
772
      return;
J
jorlow@chromium.org 已提交
773
    }
774

775 776 777
    // Whether or not we found a value for current key, we need iter_ to end up
    // on a smaller key.
    if (!FindUserKeyBeforeSavedKey()) {
778 779 780
      return;
    }

781 782 783
    if (valid_) {
      // Found the value.
      return;
S
Stanislau Hlebik 已提交
784
    }
785 786 787

    if (TooManyInternalKeysSkipped(false)) {
      return;
S
Stanislau Hlebik 已提交
788 789
    }
  }
790

S
Stanislau Hlebik 已提交
791 792
  // We haven't found any key - iterator is not valid
  valid_ = false;
J
jorlow@chromium.org 已提交
793 794
}

795 796 797 798 799 800 801 802 803 804 805
// Used for backwards iteration.
// Looks at the entries with user key saved_key_ and finds the most up-to-date
// value for it, or executes a merge, or determines that the value was deleted.
// Sets valid_ to true if the value is found and is ready to be presented to
// the user through value().
// Sets valid_ to false if the value was deleted, and we should try another key.
// Returns false if an error occurred, and !status().ok() and !valid_.
//
// PRE: iter_ is positioned on the last entry with user key equal to saved_key_.
// POST: iter_ is positioned on one of the entries equal to saved_key_, or on
//       the entry just before them, or on the entry just after them.
S
Stanislau Hlebik 已提交
806
bool DBIter::FindValueForCurrentKey() {
807
  assert(iter_.Valid());
808
  merge_context_.Clear();
809
  current_entry_is_merged_ = false;
A
Andres Noetzli 已提交
810 811
  // last entry before merge (could be kTypeDeletion, kTypeSingleDeletion or
  // kTypeValue)
S
Stanislau Hlebik 已提交
812 813
  ValueType last_not_merge_type = kTypeDeletion;
  ValueType last_key_entry_type = kTypeDeletion;
J
jorlow@chromium.org 已提交
814

815 816 817
  // Temporarily pin blocks that hold (merge operands / the value)
  ReleaseTempPinnedData();
  TempPinData();
S
Stanislau Hlebik 已提交
818
  size_t num_skipped = 0;
819
  while (iter_.Valid()) {
820 821 822 823 824
    ParsedInternalKey ikey;
    if (!ParseKey(&ikey)) {
      return false;
    }

Y
Yanqin Jin 已提交
825 826 827 828 829 830 831
    assert(ikey.user_key.size() >= timestamp_size_);
    Slice ts;
    if (timestamp_size_ > 0) {
      ts = Slice(ikey.user_key.data() + ikey.user_key.size() - timestamp_size_,
                 timestamp_size_);
    }
    if (!IsVisible(ikey.sequence, ts) ||
832
        !user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) {
833 834
      break;
    }
835 836 837 838
    if (TooManyInternalKeysSkipped()) {
      return false;
    }

839 840 841
    // This user key has lots of entries.
    // We're going from old to new, and it's taking too long. Let's do a Seek()
    // and go from new to old. This helps when a key was overwritten many times.
842
    if (num_skipped >= max_skip_) {
S
Stanislau Hlebik 已提交
843 844 845
      return FindValueForCurrentKeyUsingSeek();
    }

846 847 848 849 850
    if (!iter_.PrepareValue()) {
      valid_ = false;
      return false;
    }

S
Stanislau Hlebik 已提交
851 852 853
    last_key_entry_type = ikey.type;
    switch (last_key_entry_type) {
      case kTypeValue:
Y
Yi Wu 已提交
854
      case kTypeBlobIndex:
855
        if (range_del_agg_.ShouldDelete(
856
                ikey, RangeDelPositioningMode::kBackwardTraversal)) {
A
Andrew Kryczka 已提交
857 858
          last_key_entry_type = kTypeRangeDeletion;
          PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
859
        } else if (iter_.iter()->IsValuePinned()) {
860
          pinned_value_ = iter_.value();
861 862 863 864 865
        } else {
          valid_ = false;
          status_ = Status::NotSupported(
              "Backward iteration not supported if underlying iterator's value "
              "cannot be pinned.");
A
Andrew Kryczka 已提交
866
        }
867
        merge_context_.Clear();
A
Andrew Kryczka 已提交
868
        last_not_merge_type = last_key_entry_type;
869 870 871
        if (!status_.ok()) {
          return false;
        }
S
Stanislau Hlebik 已提交
872 873
        break;
      case kTypeDeletion:
A
Andres Noetzli 已提交
874
      case kTypeSingleDeletion:
875
        merge_context_.Clear();
A
Andres Noetzli 已提交
876
        last_not_merge_type = last_key_entry_type;
877
        PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
S
Stanislau Hlebik 已提交
878 879
        break;
      case kTypeMerge:
880
        if (range_del_agg_.ShouldDelete(
881
                ikey, RangeDelPositioningMode::kBackwardTraversal)) {
A
Andrew Kryczka 已提交
882 883 884 885 886 887 888
          merge_context_.Clear();
          last_key_entry_type = kTypeRangeDeletion;
          last_not_merge_type = last_key_entry_type;
          PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
        } else {
          assert(merge_operator_ != nullptr);
          merge_context_.PushOperandBack(
889 890
              iter_.value(),
              iter_.iter()->IsValuePinned() /* operand_pinned */);
891
          PERF_COUNTER_ADD(internal_merge_count, 1);
A
Andrew Kryczka 已提交
892
        }
S
Stanislau Hlebik 已提交
893 894
        break;
      default:
895 896 897 898 899
        valid_ = false;
        status_ = Status::Corruption(
            "Unknown value type: " +
            std::to_string(static_cast<unsigned int>(last_key_entry_type)));
        return false;
S
Stanislau Hlebik 已提交
900 901
    }

902
    PERF_COUNTER_ADD(internal_key_skipped_count, 1);
903
    iter_.Prev();
S
Stanislau Hlebik 已提交
904
    ++num_skipped;
905 906
  }

907
  if (!iter_.status().ok()) {
908 909
    valid_ = false;
    return false;
S
Stanislau Hlebik 已提交
910 911
  }

912
  Status s;
S
sdong 已提交
913
  s.PermitUncheckedError();
Y
Yi Wu 已提交
914
  is_blob_ = false;
S
Stanislau Hlebik 已提交
915 916
  switch (last_key_entry_type) {
    case kTypeDeletion:
A
Andres Noetzli 已提交
917
    case kTypeSingleDeletion:
A
Andrew Kryczka 已提交
918
    case kTypeRangeDeletion:
S
Stanislau Hlebik 已提交
919
      valid_ = false;
920
      return true;
S
Stanislau Hlebik 已提交
921
    case kTypeMerge:
922
      current_entry_is_merged_ = true;
A
Aaron Gao 已提交
923
      if (last_not_merge_type == kTypeDeletion ||
A
Andrew Kryczka 已提交
924 925
          last_not_merge_type == kTypeSingleDeletion ||
          last_not_merge_type == kTypeRangeDeletion) {
926 927 928
        s = MergeHelper::TimedFullMerge(
            merge_operator_, saved_key_.GetUserKey(), nullptr,
            merge_context_.GetOperands(), &saved_value_, logger_, statistics_,
929
            clock_, &pinned_value_, true);
Y
Yi Wu 已提交
930
      } else if (last_not_merge_type == kTypeBlobIndex) {
L
Levi Tamasi 已提交
931 932
        status_ =
            Status::NotSupported("BlobDB does not support merge operator.");
Y
Yi Wu 已提交
933
        valid_ = false;
934
        return false;
935
      } else {
S
Stanislau Hlebik 已提交
936
        assert(last_not_merge_type == kTypeValue);
937
        s = MergeHelper::TimedFullMerge(
938
            merge_operator_, saved_key_.GetUserKey(), &pinned_value_,
939
            merge_context_.GetOperands(), &saved_value_, logger_, statistics_,
940
            clock_, &pinned_value_, true);
941
      }
S
Stanislau Hlebik 已提交
942 943
      break;
    case kTypeValue:
944
      // do nothing - we've already has value in pinned_value_
S
Stanislau Hlebik 已提交
945
      break;
Y
Yi Wu 已提交
946
    case kTypeBlobIndex:
L
Levi Tamasi 已提交
947
      if (!SetBlobValueIfNeeded(saved_key_.GetUserKey(), pinned_value_)) {
948
        return false;
Y
Yi Wu 已提交
949
      }
L
Levi Tamasi 已提交
950

Y
Yi Wu 已提交
951
      break;
S
Stanislau Hlebik 已提交
952
    default:
953 954 955 956 957
      valid_ = false;
      status_ = Status::Corruption(
          "Unknown value type: " +
          std::to_string(static_cast<unsigned int>(last_key_entry_type)));
      return false;
J
jorlow@chromium.org 已提交
958
  }
959
  if (!s.ok()) {
Y
Yi Wu 已提交
960
    valid_ = false;
961
    status_ = s;
962
    return false;
963
  }
964
  valid_ = true;
S
Stanislau Hlebik 已提交
965 966
  return true;
}
J
jorlow@chromium.org 已提交
967

S
Stanislau Hlebik 已提交
968 969
// This function is used in FindValueForCurrentKey.
// We use Seek() function instead of Prev() to find necessary value
970 971
// TODO: This is very similar to FindNextUserEntry() and MergeValuesNewToOld().
//       Would be nice to reuse some code.
S
Stanislau Hlebik 已提交
972
bool DBIter::FindValueForCurrentKeyUsingSeek() {
973 974 975
  // FindValueForCurrentKey will enable pinning before calling
  // FindValueForCurrentKeyUsingSeek()
  assert(pinned_iters_mgr_.PinningEnabled());
S
Stanislau Hlebik 已提交
976
  std::string last_key;
977 978
  AppendInternalKey(&last_key, ParsedInternalKey(saved_key_.GetUserKey(),
                                                 sequence_, kValueTypeForSeek));
979
  iter_.Seek(last_key);
S
Stanislau Hlebik 已提交
980 981
  RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);

982 983
  // In case read_callback presents, the value we seek to may not be visible.
  // Find the next value that's visible.
S
Stanislau Hlebik 已提交
984
  ParsedInternalKey ikey;
985
  is_blob_ = false;
986
  while (true) {
987
    if (!iter_.Valid()) {
988
      valid_ = false;
989
      return iter_.status().ok();
990 991 992 993 994
    }

    if (!ParseKey(&ikey)) {
      return false;
    }
Y
Yanqin Jin 已提交
995 996 997 998 999 1000 1001
    assert(ikey.user_key.size() >= timestamp_size_);
    Slice ts;
    if (timestamp_size_ > 0) {
      ts = Slice(ikey.user_key.data() + ikey.user_key.size() - timestamp_size_,
                 timestamp_size_);
    }

1002
    if (!user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) {
1003 1004 1005 1006 1007 1008 1009
      // No visible values for this key, even though FindValueForCurrentKey()
      // has seen some. This is possible if we're using a tailing iterator, and
      // the entries were discarded in a compaction.
      valid_ = false;
      return true;
    }

Y
Yanqin Jin 已提交
1010
    if (IsVisible(ikey.sequence, ts)) {
1011 1012
      break;
    }
1013

1014
    iter_.Next();
1015
  }
S
Stanislau Hlebik 已提交
1016

A
Andrew Kryczka 已提交
1017
  if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion ||
1018
      range_del_agg_.ShouldDelete(
1019
          ikey, RangeDelPositioningMode::kBackwardTraversal)) {
J
jorlow@chromium.org 已提交
1020
    valid_ = false;
1021
    return true;
S
Stanislau Hlebik 已提交
1022
  }
1023 1024 1025 1026
  if (!iter_.PrepareValue()) {
    valid_ = false;
    return false;
  }
Y
Yi Wu 已提交
1027
  if (ikey.type == kTypeValue || ikey.type == kTypeBlobIndex) {
1028 1029
    assert(iter_.iter()->IsValuePinned());
    pinned_value_ = iter_.value();
L
Levi Tamasi 已提交
1030 1031 1032 1033 1034 1035
    if (ikey.type == kTypeBlobIndex) {
      if (!SetBlobValueIfNeeded(ikey.user_key, pinned_value_)) {
        return false;
      }
    }

A
Andrew Kryczka 已提交
1036 1037 1038
    valid_ = true;
    return true;
  }
S
Stanislau Hlebik 已提交
1039 1040 1041

  // kTypeMerge. We need to collect all kTypeMerge values and save them
  // in operands
1042
  assert(ikey.type == kTypeMerge);
1043
  current_entry_is_merged_ = true;
1044
  merge_context_.Clear();
1045 1046
  merge_context_.PushOperand(
      iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */);
1047
  while (true) {
1048
    iter_.Next();
S
Stanislau Hlebik 已提交
1049

1050 1051
    if (!iter_.Valid()) {
      if (!iter_.status().ok()) {
1052 1053 1054 1055
        valid_ = false;
        return false;
      }
      break;
S
Stanislau Hlebik 已提交
1056
    }
1057 1058 1059
    if (!ParseKey(&ikey)) {
      return false;
    }
1060
    if (!user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) {
1061 1062 1063 1064
      break;
    }
    if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion ||
        range_del_agg_.ShouldDelete(
1065
            ikey, RangeDelPositioningMode::kForwardTraversal)) {
1066
      break;
1067 1068 1069 1070 1071 1072 1073
    }
    if (!iter_.PrepareValue()) {
      valid_ = false;
      return false;
    }

    if (ikey.type == kTypeValue) {
1074
      const Slice val = iter_.value();
1075 1076 1077
      Status s = MergeHelper::TimedFullMerge(
          merge_operator_, saved_key_.GetUserKey(), &val,
          merge_context_.GetOperands(), &saved_value_, logger_, statistics_,
1078
          clock_, &pinned_value_, true);
1079 1080 1081 1082 1083
      if (!s.ok()) {
        valid_ = false;
        status_ = s;
        return false;
      }
Y
Yi Wu 已提交
1084
      valid_ = true;
1085 1086
      return true;
    } else if (ikey.type == kTypeMerge) {
1087 1088
      merge_context_.PushOperand(
          iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */);
1089 1090
      PERF_COUNTER_ADD(internal_merge_count, 1);
    } else if (ikey.type == kTypeBlobIndex) {
L
Levi Tamasi 已提交
1091
      status_ = Status::NotSupported("BlobDB does not support merge operator.");
Y
Yi Wu 已提交
1092
      valid_ = false;
1093 1094
      return false;
    } else {
1095 1096 1097 1098 1099
      valid_ = false;
      status_ = Status::Corruption(
          "Unknown value type: " +
          std::to_string(static_cast<unsigned int>(ikey.type)));
      return false;
1100
    }
S
Stanislau Hlebik 已提交
1101 1102
  }

1103 1104
  Status s = MergeHelper::TimedFullMerge(
      merge_operator_, saved_key_.GetUserKey(), nullptr,
1105
      merge_context_.GetOperands(), &saved_value_, logger_, statistics_, clock_,
1106 1107
      &pinned_value_, true);
  if (!s.ok()) {
Y
Yi Wu 已提交
1108
    valid_ = false;
1109
    status_ = s;
1110
    return false;
1111
  }
S
Stanislau Hlebik 已提交
1112

1113 1114 1115
  // Make sure we leave iter_ in a good state. If it's valid and we don't care
  // about prefixes, that's already good enough. Otherwise it needs to be
  // seeked to the current key.
S
sdong 已提交
1116 1117
  if (!expect_total_order_inner_iter() || !iter_.Valid()) {
    if (!expect_total_order_inner_iter()) {
1118
      iter_.SeekForPrev(last_key);
1119
    } else {
1120 1121 1122
      iter_.Seek(last_key);
      if (!iter_.Valid() && iter_.status().ok()) {
        iter_.SeekToLast();
1123 1124 1125
      }
    }
    RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
S
Stanislau Hlebik 已提交
1126
  }
1127 1128 1129

  valid_ = true;
  return true;
S
Stanislau Hlebik 已提交
1130 1131
}

1132 1133 1134 1135
// Move backwards until the key smaller than saved_key_.
// Changes valid_ only if return value is false.
bool DBIter::FindUserKeyBeforeSavedKey() {
  assert(status_.ok());
S
Stanislau Hlebik 已提交
1136
  size_t num_skipped = 0;
1137
  while (iter_.Valid()) {
1138 1139 1140
    ParsedInternalKey ikey;
    if (!ParseKey(&ikey)) {
      return false;
1141 1142
    }

1143
    if (user_comparator_.Compare(ikey.user_key, saved_key_.GetUserKey()) < 0) {
1144 1145 1146 1147 1148
      return true;
    }

    if (TooManyInternalKeysSkipped()) {
      return false;
S
Stanislau Hlebik 已提交
1149
    }
1150

S
Siying Dong 已提交
1151
    assert(ikey.sequence != kMaxSequenceNumber);
Y
Yanqin Jin 已提交
1152 1153 1154 1155 1156 1157 1158
    assert(ikey.user_key.size() >= timestamp_size_);
    Slice ts;
    if (timestamp_size_ > 0) {
      ts = Slice(ikey.user_key.data() + ikey.user_key.size() - timestamp_size_,
                 timestamp_size_);
    }
    if (!IsVisible(ikey.sequence, ts)) {
1159 1160 1161 1162
      PERF_COUNTER_ADD(internal_recent_skipped_count, 1);
    } else {
      PERF_COUNTER_ADD(internal_key_skipped_count, 1);
    }
1163

1164
    if (num_skipped >= max_skip_) {
1165 1166 1167 1168 1169 1170
      num_skipped = 0;
      IterKey last_key;
      last_key.SetInternalKey(ParsedInternalKey(
          saved_key_.GetUserKey(), kMaxSequenceNumber, kValueTypeForSeek));
      // It would be more efficient to use SeekForPrev() here, but some
      // iterators may not support it.
1171
      iter_.Seek(last_key.GetInternalKey());
1172
      RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
1173
      if (!iter_.Valid()) {
1174 1175 1176 1177 1178 1179
        break;
      }
    } else {
      ++num_skipped;
    }

1180
    iter_.Prev();
S
Stanislau Hlebik 已提交
1181
  }
1182

1183
  if (!iter_.status().ok()) {
1184 1185 1186 1187 1188
    valid_ = false;
    return false;
  }

  return true;
S
Stanislau Hlebik 已提交
1189 1190
}

1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202
bool DBIter::TooManyInternalKeysSkipped(bool increment) {
  if ((max_skippable_internal_keys_ > 0) &&
      (num_internal_keys_skipped_ > max_skippable_internal_keys_)) {
    valid_ = false;
    status_ = Status::Incomplete("Too many internal keys skipped.");
    return true;
  } else if (increment) {
    num_internal_keys_skipped_++;
  }
  return false;
}

1203 1204
bool DBIter::IsVisible(SequenceNumber sequence, const Slice& ts,
                       bool* more_recent) {
Y
Yanqin Jin 已提交
1205
  // Remember that comparator orders preceding timestamp as larger.
1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218
  // TODO(yanqin): support timestamp in read_callback_.
  bool visible_by_seq = (read_callback_ == nullptr)
                            ? sequence <= sequence_
                            : read_callback_->IsVisible(sequence);

  bool visible_by_ts =
      (timestamp_ub_ == nullptr ||
       user_comparator_.CompareTimestamp(ts, *timestamp_ub_) <= 0) &&
      (timestamp_lb_ == nullptr ||
       user_comparator_.CompareTimestamp(ts, *timestamp_lb_) >= 0);

  if (more_recent) {
    *more_recent = !visible_by_seq;
1219
  }
1220
  return visible_by_seq && visible_by_ts;
1221
}
1222

1223
void DBIter::SetSavedKeyToSeekTarget(const Slice& target) {
1224
  is_key_seqnum_zero_ = false;
1225
  SequenceNumber seq = sequence_;
1226
  saved_key_.Clear();
Y
Yanqin Jin 已提交
1227
  saved_key_.SetInternalKey(target, seq, kValueTypeForSeek, timestamp_ub_);
1228

Z
zhangjinpeng1987 已提交
1229
  if (iterate_lower_bound_ != nullptr &&
Y
Yanqin Jin 已提交
1230 1231 1232
      user_comparator_.CompareWithoutTimestamp(
          saved_key_.GetUserKey(), /*a_has_ts=*/true, *iterate_lower_bound_,
          /*b_has_ts=*/false) < 0) {
1233
    // Seek key is smaller than the lower bound.
Z
zhangjinpeng1987 已提交
1234
    saved_key_.Clear();
Y
Yanqin Jin 已提交
1235 1236
    saved_key_.SetInternalKey(*iterate_lower_bound_, seq, kValueTypeForSeek,
                              timestamp_ub_);
Z
zhangjinpeng1987 已提交
1237
  }
1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255
}

void DBIter::SetSavedKeyToSeekForPrevTarget(const Slice& target) {
  is_key_seqnum_zero_ = false;
  saved_key_.Clear();
  // now saved_key is used to store internal key.
  saved_key_.SetInternalKey(target, 0 /* sequence_number */,
                            kValueTypeForSeekForPrev);

  if (iterate_upper_bound_ != nullptr &&
      user_comparator_.Compare(saved_key_.GetUserKey(),
                               *iterate_upper_bound_) >= 0) {
    saved_key_.Clear();
    saved_key_.SetInternalKey(*iterate_upper_bound_, kMaxSequenceNumber);
  }
}

void DBIter::Seek(const Slice& target) {
1256 1257
  PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, clock_);
  StopWatch sw(clock_, statistics_, DB_SEEK);
1258 1259 1260

#ifndef ROCKSDB_LITE
  if (db_impl_ != nullptr && cfd_ != nullptr) {
1261
    // TODO: What do we do if this returns an error?
1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274
    Slice lower_bound, upper_bound;
    if (iterate_lower_bound_ != nullptr) {
      lower_bound = *iterate_lower_bound_;
    } else {
      lower_bound = Slice("");
    }
    if (iterate_upper_bound_ != nullptr) {
      upper_bound = *iterate_upper_bound_;
    } else {
      upper_bound = Slice("");
    }
    db_impl_->TraceIteratorSeek(cfd_->GetID(), target, lower_bound, upper_bound)
        .PermitUncheckedError();
1275 1276 1277 1278 1279 1280
  }
#endif  // ROCKSDB_LITE

  status_ = Status::OK();
  ReleaseTempPinnedData();
  ResetInternalKeysSkippedCounter();
Z
zhangjinpeng1987 已提交
1281

1282
  // Seek the inner iterator based on the target key.
1283 1284
  {
    PERF_TIMER_GUARD(seek_internal_seek_time);
1285 1286

    SetSavedKeyToSeekTarget(target);
1287
    iter_.Seek(saved_key_.GetInternalKey());
1288

1289
    range_del_agg_.InvalidateRangeDelMapPositions();
1290
    RecordTick(statistics_, NUMBER_DB_SEEK);
1291
  }
1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304
  if (!iter_.Valid()) {
    valid_ = false;
    return;
  }
  direction_ = kForward;

  // Now the inner iterator is placed to the target position. From there,
  // we need to find out the next key that is visible to the user.
  ClearSavedValue();
  if (prefix_same_as_start_) {
    // The case where the iterator needs to be invalidated if it has exausted
    // keys within the same prefix of the seek key.
    assert(prefix_extractor_ != nullptr);
Y
Yanqin Jin 已提交
1305
    Slice target_prefix = prefix_extractor_->Transform(target);
1306 1307 1308
    FindNextUserEntry(false /* not skipping saved_key */,
                      &target_prefix /* prefix */);
    if (valid_) {
Y
Yanqin Jin 已提交
1309
      // Remember the prefix of the seek key for the future Next() call to
1310 1311
      // check.
      prefix_.SetUserKey(target_prefix);
M
Manuel Ung 已提交
1312
    }
J
jorlow@chromium.org 已提交
1313
  } else {
1314 1315 1316 1317
    FindNextUserEntry(false /* not skipping saved_key */, nullptr);
  }
  if (!valid_) {
    return;
J
jorlow@chromium.org 已提交
1318
  }
1319

1320 1321 1322 1323 1324
  // Updating stats and perf context counters.
  if (statistics_ != nullptr) {
    // Decrement since we don't want to count this key as skipped
    RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
    RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
1325
  }
1326
  PERF_COUNTER_ADD(iter_read_bytes, key().size() + value().size());
J
jorlow@chromium.org 已提交
1327
}
J
jorlow@chromium.org 已提交
1328

A
Aaron Gao 已提交
1329
void DBIter::SeekForPrev(const Slice& target) {
1330 1331
  PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, clock_);
  StopWatch sw(clock_, statistics_, DB_SEEK);
1332 1333 1334

#ifndef ROCKSDB_LITE
  if (db_impl_ != nullptr && cfd_ != nullptr) {
1335
    // TODO: What do we do if this returns an error?
1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349
    Slice lower_bound, upper_bound;
    if (iterate_lower_bound_ != nullptr) {
      lower_bound = *iterate_lower_bound_;
    } else {
      lower_bound = Slice("");
    }
    if (iterate_upper_bound_ != nullptr) {
      upper_bound = *iterate_upper_bound_;
    } else {
      upper_bound = Slice("");
    }
    db_impl_
        ->TraceIteratorSeekForPrev(cfd_->GetID(), target, lower_bound,
                                   upper_bound)
1350
        .PermitUncheckedError();
1351 1352 1353
  }
#endif  // ROCKSDB_LITE

Y
Yanqin Jin 已提交
1354 1355 1356 1357 1358 1359 1360
  if (timestamp_size_ > 0) {
    valid_ = false;
    status_ = Status::NotSupported(
        "SeekToLast/SeekForPrev/Prev currently not supported with timestamp.");
    return;
  }

1361
  status_ = Status::OK();
A
Aaron Gao 已提交
1362
  ReleaseTempPinnedData();
1363
  ResetInternalKeysSkippedCounter();
Z
zhangjinpeng1987 已提交
1364

1365
  // Seek the inner iterator based on the target key.
A
Aaron Gao 已提交
1366 1367
  {
    PERF_TIMER_GUARD(seek_internal_seek_time);
1368
    SetSavedKeyToSeekForPrevTarget(target);
1369
    iter_.SeekForPrev(saved_key_.GetInternalKey());
1370
    range_del_agg_.InvalidateRangeDelMapPositions();
1371
    RecordTick(statistics_, NUMBER_DB_SEEK);
A
Aaron Gao 已提交
1372
  }
1373 1374 1375
  if (!iter_.Valid()) {
    valid_ = false;
    return;
1376
  }
1377
  direction_ = kReverse;
1378

1379 1380 1381 1382 1383 1384 1385 1386
  // Now the inner iterator is placed to the target position. From there,
  // we need to find out the first key that is visible to the user in the
  // backward direction.
  ClearSavedValue();
  if (prefix_same_as_start_) {
    // The case where the iterator needs to be invalidated if it has exausted
    // keys within the same prefix of the seek key.
    assert(prefix_extractor_ != nullptr);
Y
Yanqin Jin 已提交
1387
    Slice target_prefix = prefix_extractor_->Transform(target);
1388 1389 1390 1391 1392
    PrevInternal(&target_prefix);
    if (valid_) {
      // Remember the prefix of the seek key for the future Prev() call to
      // check.
      prefix_.SetUserKey(target_prefix);
A
Aaron Gao 已提交
1393 1394
    }
  } else {
1395
    PrevInternal(nullptr);
A
Aaron Gao 已提交
1396
  }
1397 1398 1399 1400 1401 1402

  // Report stats and perf context.
  if (statistics_ != nullptr && valid_) {
    RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
    RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
    PERF_COUNTER_ADD(iter_read_bytes, key().size() + value().size());
A
Aaron Gao 已提交
1403 1404 1405
  }
}

J
jorlow@chromium.org 已提交
1406
void DBIter::SeekToFirst() {
1407 1408 1409 1410
  if (iterate_lower_bound_ != nullptr) {
    Seek(*iterate_lower_bound_);
    return;
  }
1411
  PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, clock_);
1412 1413
  // Don't use iter_::Seek() if we set a prefix extractor
  // because prefix seek will be used.
S
sdong 已提交
1414
  if (!expect_total_order_inner_iter()) {
1415 1416 1417
    max_skip_ = std::numeric_limits<uint64_t>::max();
  }
  status_ = Status::OK();
J
jorlow@chromium.org 已提交
1418
  direction_ = kForward;
1419
  ReleaseTempPinnedData();
1420
  ResetInternalKeysSkippedCounter();
J
jorlow@chromium.org 已提交
1421
  ClearSavedValue();
1422
  is_key_seqnum_zero_ = false;
1423 1424 1425

  {
    PERF_TIMER_GUARD(seek_internal_seek_time);
1426
    iter_.SeekToFirst();
1427
    range_del_agg_.InvalidateRangeDelMapPositions();
1428 1429
  }

M
Manuel Ung 已提交
1430
  RecordTick(statistics_, NUMBER_DB_SEEK);
1431
  if (iter_.Valid()) {
1432
    saved_key_.SetUserKey(
1433 1434
        ExtractUserKey(iter_.key()),
        !iter_.iter()->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
1435 1436
    FindNextUserEntry(false /* not skipping saved_key */,
                      nullptr /* no prefix check */);
M
Manuel Ung 已提交
1437 1438 1439 1440
    if (statistics_ != nullptr) {
      if (valid_) {
        RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
        RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
1441
        PERF_COUNTER_ADD(iter_read_bytes, key().size() + value().size());
M
Manuel Ung 已提交
1442 1443
      }
    }
J
jorlow@chromium.org 已提交
1444 1445
  } else {
    valid_ = false;
J
jorlow@chromium.org 已提交
1446
  }
1447 1448
  if (valid_ && prefix_same_as_start_) {
    assert(prefix_extractor_ != nullptr);
1449 1450
    prefix_.SetUserKey(prefix_extractor_->Transform(
        StripTimestampFromUserKey(saved_key_.GetUserKey(), timestamp_size_)));
1451
  }
J
jorlow@chromium.org 已提交
1452 1453
}

J
jorlow@chromium.org 已提交
1454
void DBIter::SeekToLast() {
Y
Yanqin Jin 已提交
1455 1456 1457 1458 1459 1460 1461
  if (timestamp_size_ > 0) {
    valid_ = false;
    status_ = Status::NotSupported(
        "SeekToLast/SeekForPrev/Prev currently not supported with timestamp.");
    return;
  }

1462 1463 1464
  if (iterate_upper_bound_ != nullptr) {
    // Seek to last key strictly less than ReadOptions.iterate_upper_bound.
    SeekForPrev(*iterate_upper_bound_);
1465
    if (Valid() && user_comparator_.Equal(*iterate_upper_bound_, key())) {
1466
      ReleaseTempPinnedData();
1467
      PrevInternal(nullptr);
1468 1469 1470 1471
    }
    return;
  }

1472
  PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, clock_);
S
Stanislau Hlebik 已提交
1473
  // Don't use iter_::Seek() if we set a prefix extractor
1474
  // because prefix seek will be used.
S
sdong 已提交
1475
  if (!expect_total_order_inner_iter()) {
S
Stanislau Hlebik 已提交
1476 1477
    max_skip_ = std::numeric_limits<uint64_t>::max();
  }
1478
  status_ = Status::OK();
J
jorlow@chromium.org 已提交
1479
  direction_ = kReverse;
1480
  ReleaseTempPinnedData();
1481
  ResetInternalKeysSkippedCounter();
J
jorlow@chromium.org 已提交
1482
  ClearSavedValue();
1483
  is_key_seqnum_zero_ = false;
1484 1485 1486

  {
    PERF_TIMER_GUARD(seek_internal_seek_time);
1487
    iter_.SeekToLast();
1488
    range_del_agg_.InvalidateRangeDelMapPositions();
1489
  }
1490
  PrevInternal(nullptr);
M
Manuel Ung 已提交
1491 1492 1493 1494 1495
  if (statistics_ != nullptr) {
    RecordTick(statistics_, NUMBER_DB_SEEK);
    if (valid_) {
      RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
      RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
1496
      PERF_COUNTER_ADD(iter_read_bytes, key().size() + value().size());
M
Manuel Ung 已提交
1497 1498
    }
  }
1499 1500
  if (valid_ && prefix_same_as_start_) {
    assert(prefix_extractor_ != nullptr);
1501 1502
    prefix_.SetUserKey(prefix_extractor_->Transform(
        StripTimestampFromUserKey(saved_key_.GetUserKey(), timestamp_size_)));
1503
  }
J
jorlow@chromium.org 已提交
1504 1505
}

1506 1507
Iterator* NewDBIterator(Env* env, const ReadOptions& read_options,
                        const ImmutableCFOptions& cf_options,
1508
                        const MutableCFOptions& mutable_cf_options,
1509
                        const Comparator* user_key_comparator,
L
Levi Tamasi 已提交
1510
                        InternalIterator* internal_iter, const Version* version,
1511
                        const SequenceNumber& sequence,
Y
Yi Wu 已提交
1512
                        uint64_t max_sequential_skip_in_iterations,
1513
                        ReadCallback* read_callback, DBImpl* db_impl,
L
Levi Tamasi 已提交
1514 1515 1516 1517 1518 1519
                        ColumnFamilyData* cfd, bool expose_blob_index) {
  DBIter* db_iter =
      new DBIter(env, read_options, cf_options, mutable_cf_options,
                 user_key_comparator, internal_iter, version, sequence, false,
                 max_sequential_skip_in_iterations, read_callback, db_impl, cfd,
                 expose_blob_index);
1520
  return db_iter;
1521 1522
}

1523
}  // namespace ROCKSDB_NAMESPACE