db_iter.cc 16.8 KB
Newer Older
1 2 3 4 5
//  Copyright (c) 2013, Facebook, Inc.  All rights reserved.
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.
//
J
jorlow@chromium.org 已提交
6 7 8 9 10
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include "db/db_iter.h"
11
#include <stdexcept>
12
#include <deque>
J
jorlow@chromium.org 已提交
13 14 15

#include "db/filename.h"
#include "db/dbformat.h"
16 17 18 19
#include "rocksdb/env.h"
#include "rocksdb/options.h"
#include "rocksdb/iterator.h"
#include "rocksdb/merge_operator.h"
J
jorlow@chromium.org 已提交
20
#include "port/port.h"
21
#include "util/arena.h"
J
jorlow@chromium.org 已提交
22 23
#include "util/logging.h"
#include "util/mutexlock.h"
24
#include "util/perf_context_imp.h"
J
jorlow@chromium.org 已提交
25

26
namespace rocksdb {
J
jorlow@chromium.org 已提交
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47

#if 0
static void DumpInternalIter(Iterator* iter) {
  for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
    ParsedInternalKey k;
    if (!ParseInternalKey(iter->key(), &k)) {
      fprintf(stderr, "Corrupt '%s'\n", EscapeString(iter->key()).c_str());
    } else {
      fprintf(stderr, "@ '%s'\n", k.DebugString().c_str());
    }
  }
}
#endif

// Memtables and sstables that make the DB representation contain
// (userkey,seq,type) => uservalue entries.  DBIter
// combines multiple entries for the same userkey found in the DB
// representation into a single entry while accounting for sequence
// numbers, deletion markers, overwrites, etc.
class DBIter: public Iterator {
 public:
48
  // The following is grossly complicated. TODO: clean it up
J
jorlow@chromium.org 已提交
49 50 51 52 53 54 55 56 57 58
  // Which direction is the iterator currently moving?
  // (1) When moving forward, the internal iterator is positioned at
  //     the exact entry that yields this->key(), this->value()
  // (2) When moving backwards, the internal iterator is positioned
  //     just before all entries whose user key == this->key().
  enum Direction {
    kForward,
    kReverse
  };

59 60 61 62
  DBIter(Env* env, const Options& options, const Comparator* cmp,
         Iterator* iter, SequenceNumber s, bool arena_mode)
      : arena_mode_(arena_mode),
        env_(env),
I
Igor Canadi 已提交
63
        logger_(options.info_log.get()),
J
jorlow@chromium.org 已提交
64
        user_comparator_(cmp),
65
        user_merge_operator_(options.merge_operator.get()),
J
jorlow@chromium.org 已提交
66 67
        iter_(iter),
        sequence_(s),
J
jorlow@chromium.org 已提交
68
        direction_(kForward),
69
        valid_(false),
70
        current_entry_is_merged_(false),
71
        statistics_(options.statistics.get()) {
72
    RecordTick(statistics_, NO_ITERATORS, 1);
73
    max_skip_ = options.max_sequential_skip_in_iterations;
J
jorlow@chromium.org 已提交
74 75
  }
  virtual ~DBIter() {
76
    RecordTick(statistics_, NO_ITERATORS, -1);
77 78 79 80 81 82 83 84 85
    if (!arena_mode_) {
      delete iter_;
    } else {
      iter_->~Iterator();
    }
  }
  virtual void SetIter(Iterator* iter) {
    assert(iter_ == nullptr);
    iter_ = iter;
J
jorlow@chromium.org 已提交
86 87 88 89
  }
  virtual bool Valid() const { return valid_; }
  virtual Slice key() const {
    assert(valid_);
90
    return saved_key_.GetKey();
J
jorlow@chromium.org 已提交
91 92 93
  }
  virtual Slice value() const {
    assert(valid_);
94 95
    return (direction_ == kForward && !current_entry_is_merged_) ?
      iter_->value() : saved_value_;
J
jorlow@chromium.org 已提交
96 97 98 99 100 101 102 103 104
  }
  virtual Status status() const {
    if (status_.ok()) {
      return iter_->status();
    } else {
      return status_;
    }
  }

J
jorlow@chromium.org 已提交
105 106 107 108 109
  virtual void Next();
  virtual void Prev();
  virtual void Seek(const Slice& target);
  virtual void SeekToFirst();
  virtual void SeekToLast();
J
jorlow@chromium.org 已提交
110

J
jorlow@chromium.org 已提交
111
 private:
112 113
  inline void FindNextUserEntry(bool skipping);
  void FindNextUserEntryInternal(bool skipping);
J
jorlow@chromium.org 已提交
114 115
  void FindPrevUserEntry();
  bool ParseKey(ParsedInternalKey* key);
116
  void MergeValuesNewToOld();
J
jorlow@chromium.org 已提交
117 118 119 120 121 122 123 124 125 126

  inline void ClearSavedValue() {
    if (saved_value_.capacity() > 1048576) {
      std::string empty;
      swap(empty, saved_value_);
    } else {
      saved_value_.clear();
    }
  }

127
  bool arena_mode_;
J
jorlow@chromium.org 已提交
128
  Env* const env_;
I
Igor Canadi 已提交
129
  Logger* logger_;
J
jorlow@chromium.org 已提交
130
  const Comparator* const user_comparator_;
131
  const MergeOperator* const user_merge_operator_;
132
  Iterator* iter_;
J
jorlow@chromium.org 已提交
133
  SequenceNumber const sequence_;
J
jorlow@chromium.org 已提交
134

J
jorlow@chromium.org 已提交
135
  Status status_;
136
  IterKey saved_key_;   // == current key when direction_==kReverse
J
jorlow@chromium.org 已提交
137 138
  std::string saved_value_;   // == current raw value when direction_==kReverse
  Direction direction_;
J
jorlow@chromium.org 已提交
139
  bool valid_;
140
  bool current_entry_is_merged_;
141
  Statistics* statistics_;
142
  uint64_t max_skip_;
J
jorlow@chromium.org 已提交
143 144 145 146 147 148 149 150 151

  // No copying allowed
  DBIter(const DBIter&);
  void operator=(const DBIter&);
};

inline bool DBIter::ParseKey(ParsedInternalKey* ikey) {
  if (!ParseInternalKey(iter_->key(), ikey)) {
    status_ = Status::Corruption("corrupted internal key in DBIter");
152 153
    Log(logger_, "corrupted internal key in DBIter: %s",
        iter_->key().ToString(true).c_str());
J
jorlow@chromium.org 已提交
154 155 156 157 158 159
    return false;
  } else {
    return true;
  }
}

J
jorlow@chromium.org 已提交
160 161 162 163 164 165 166 167 168 169 170
void DBIter::Next() {
  assert(valid_);

  if (direction_ == kReverse) {  // Switch directions?
    direction_ = kForward;
    // iter_ is pointing just before the entries for this->key(),
    // so advance into the range of entries for this->key() and then
    // use the normal skipping code below.
    if (!iter_->Valid()) {
      iter_->SeekToFirst();
    } else {
J
jorlow@chromium.org 已提交
171 172
      iter_->Next();
    }
J
jorlow@chromium.org 已提交
173 174
    if (!iter_->Valid()) {
      valid_ = false;
175
      saved_key_.Clear();
J
jorlow@chromium.org 已提交
176
      return;
J
jorlow@chromium.org 已提交
177 178
    }
  }
J
jorlow@chromium.org 已提交
179

180 181 182 183 184 185
  // If the current value is merged, we might already hit end of iter_
  if (!iter_->Valid()) {
    valid_ = false;
    return;
  }
  FindNextUserEntry(true /* skipping the current user key */);
J
jorlow@chromium.org 已提交
186 187
}

188 189 190 191 192 193 194 195 196

// PRE: saved_key_ has the current user key if skipping
// POST: saved_key_ should have the next user key if valid_,
//       if the current entry is a result of merge
//           current_entry_is_merged_ => true
//           saved_value_             => the merged value
//
// NOTE: In between, saved_key_ can point to a user key that has
//       a delete marker
197
inline void DBIter::FindNextUserEntry(bool skipping) {
L
Lei Jin 已提交
198
  PERF_TIMER_AUTO(find_next_user_entry_time);
199
  FindNextUserEntryInternal(skipping);
L
Lei Jin 已提交
200
  PERF_TIMER_STOP(find_next_user_entry_time);
201 202 203 204
}

// Actual implementation of DBIter::FindNextUserEntry()
void DBIter::FindNextUserEntryInternal(bool skipping) {
J
jorlow@chromium.org 已提交
205 206 207
  // Loop until we hit an acceptable entry to yield
  assert(iter_->Valid());
  assert(direction_ == kForward);
208
  current_entry_is_merged_ = false;
209
  uint64_t num_skipped = 0;
J
jorlow@chromium.org 已提交
210
  do {
J
jorlow@chromium.org 已提交
211
    ParsedInternalKey ikey;
J
jorlow@chromium.org 已提交
212
    if (ParseKey(&ikey) && ikey.sequence <= sequence_) {
213
      if (skipping &&
214
          user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) <= 0) {
215
        num_skipped++; // skip this entry
L
Lei Jin 已提交
216
        PERF_COUNTER_ADD(internal_key_skipped_count, 1);
217 218 219 220 221 222
      } else {
        skipping = false;
        switch (ikey.type) {
          case kTypeDeletion:
            // Arrange to skip all upcoming entries for this key since
            // they are hidden by this deletion.
L
Lei Jin 已提交
223
            saved_key_.SetKey(ikey.user_key);
224
            skipping = true;
225
            num_skipped = 0;
L
Lei Jin 已提交
226
            PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
227 228
            break;
          case kTypeValue:
J
jorlow@chromium.org 已提交
229
            valid_ = true;
L
Lei Jin 已提交
230
            saved_key_.SetKey(ikey.user_key);
J
jorlow@chromium.org 已提交
231
            return;
232 233
          case kTypeMerge:
            // By now, we are sure the current ikey is going to yield a value
L
Lei Jin 已提交
234
            saved_key_.SetKey(ikey.user_key);
235 236
            current_entry_is_merged_ = true;
            valid_ = true;
D
Deon Nicholas 已提交
237
            MergeValuesNewToOld();  // Go to a different state machine
238
            return;
239
          default:
J
Jim Paton 已提交
240 241
            assert(false);
            break;
242
        }
J
jorlow@chromium.org 已提交
243
      }
J
jorlow@chromium.org 已提交
244
    }
245 246 247 248 249 250 251
    // If we have sequentially iterated via numerous keys and still not
    // found the next user-key, then it is better to seek so that we can
    // avoid too many key comparisons. We seek to the last occurence of
    // our current key by looking for sequence number 0.
    if (skipping && num_skipped > max_skip_) {
      num_skipped = 0;
      std::string last_key;
252 253
      AppendInternalKey(&last_key, ParsedInternalKey(saved_key_.GetKey(), 0,
                                                     kValueTypeForSeek));
254 255 256 257 258
      iter_->Seek(last_key);
      RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
    } else {
      iter_->Next();
    }
J
jorlow@chromium.org 已提交
259 260
  } while (iter_->Valid());
  valid_ = false;
J
jorlow@chromium.org 已提交
261 262
}

263 264 265 266 267 268 269
// Merge values of the same user key starting from the current iter_ position
// Scan from the newer entries to older entries.
// PRE: iter_->key() points to the first merge type entry
//      saved_key_ stores the user key
// POST: saved_value_ has the merged value for the user key
//       iter_ points to the next entry (or invalid)
void DBIter::MergeValuesNewToOld() {
D
Deon Nicholas 已提交
270 271 272 273 274
  if (!user_merge_operator_) {
    Log(logger_, "Options::merge_operator is null.");
    throw std::logic_error("DBIter::MergeValuesNewToOld() with"
                           " Options::merge_operator null");
  }
275

276 277 278
  // Start the merge process by pushing the first operand
  std::deque<std::string> operands;
  operands.push_front(iter_->value().ToString());
279

280
  std::string merge_result;   // Temporary string to hold merge result later
281 282 283 284 285 286 287
  ParsedInternalKey ikey;
  for (iter_->Next(); iter_->Valid(); iter_->Next()) {
    if (!ParseKey(&ikey)) {
      // skip corrupted key
      continue;
    }

288
    if (user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) != 0) {
289 290 291 292 293 294 295 296 297 298 299 300
      // hit the next user key, stop right here
      break;
    }

    if (kTypeDeletion == ikey.type) {
      // hit a delete with the same user key, stop right here
      // iter_ is positioned after delete
      iter_->Next();
      break;
    }

    if (kTypeValue == ikey.type) {
301 302 303
      // hit a put, merge the put value with operands and store the
      // final result in saved_value_. We are done!
      // ignore corruption if there is any.
304
      const Slice value = iter_->value();
D
Deon Nicholas 已提交
305
      user_merge_operator_->FullMerge(ikey.user_key, &value, operands,
I
Igor Canadi 已提交
306
                                      &saved_value_, logger_);
307 308 309 310 311 312
      // iter_ is positioned after put
      iter_->Next();
      return;
    }

    if (kTypeMerge == ikey.type) {
313 314 315 316
      // hit a merge, add the value as an operand and run associative merge.
      // when complete, add result to operands and continue.
      const Slice& value = iter_->value();
      operands.push_front(value.ToString());
317 318 319 320 321
    }
  }

  // we either exhausted all internal keys under this user key, or hit
  // a deletion marker.
322
  // feed null as the existing value to the merge operator, such that
323
  // client can differentiate this scenario and do things accordingly.
324
  user_merge_operator_->FullMerge(saved_key_.GetKey(), nullptr, operands,
I
Igor Canadi 已提交
325
                                  &saved_value_, logger_);
326 327
}

J
jorlow@chromium.org 已提交
328 329
void DBIter::Prev() {
  assert(valid_);
J
jorlow@chromium.org 已提交
330

331
  // Throw an exception now if merge_operator is provided
332
  // TODO: support backward iteration
333 334 335 336 337 338
  if (user_merge_operator_) {
    Log(logger_, "Prev not supported yet if merge_operator is provided");
    throw std::logic_error("DBIter::Prev backward iteration not supported"
                           " if merge_operator is provided");
  }

J
jorlow@chromium.org 已提交
339 340 341 342
  if (direction_ == kForward) {  // Switch directions?
    // iter_ is pointing at the current entry.  Scan backwards until
    // the key changes so we can use the normal reverse scanning code.
    assert(iter_->Valid());  // Otherwise valid_ would have been false
L
Lei Jin 已提交
343
    saved_key_.SetKey(ExtractUserKey(iter_->key()));
J
jorlow@chromium.org 已提交
344 345
    while (true) {
      iter_->Prev();
J
jorlow@chromium.org 已提交
346
      if (!iter_->Valid()) {
J
jorlow@chromium.org 已提交
347
        valid_ = false;
348
        saved_key_.Clear();
J
jorlow@chromium.org 已提交
349
        ClearSavedValue();
J
jorlow@chromium.org 已提交
350 351
        return;
      }
J
jorlow@chromium.org 已提交
352
      if (user_comparator_->Compare(ExtractUserKey(iter_->key()),
353
                                    saved_key_.GetKey()) < 0) {
J
jorlow@chromium.org 已提交
354 355
        break;
      }
J
jorlow@chromium.org 已提交
356
    }
J
jorlow@chromium.org 已提交
357
    direction_ = kReverse;
J
jorlow@chromium.org 已提交
358
  }
J
jorlow@chromium.org 已提交
359 360

  FindPrevUserEntry();
J
jorlow@chromium.org 已提交
361 362
}

J
jorlow@chromium.org 已提交
363 364
void DBIter::FindPrevUserEntry() {
  assert(direction_ == kReverse);
365
  uint64_t num_skipped = 0;
J
jorlow@chromium.org 已提交
366

J
jorlow@chromium.org 已提交
367
  ValueType value_type = kTypeDeletion;
S
sdong 已提交
368
  bool saved_key_valid = true;
J
jorlow@chromium.org 已提交
369 370 371 372 373
  if (iter_->Valid()) {
    do {
      ParsedInternalKey ikey;
      if (ParseKey(&ikey) && ikey.sequence <= sequence_) {
        if ((value_type != kTypeDeletion) &&
374
            user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) < 0) {
J
jorlow@chromium.org 已提交
375 376 377 378 379
          // We encountered a non-deleted value in entries for previous keys,
          break;
        }
        value_type = ikey.type;
        if (value_type == kTypeDeletion) {
380
          saved_key_.Clear();
J
jorlow@chromium.org 已提交
381
          ClearSavedValue();
S
sdong 已提交
382
          saved_key_valid = false;
J
jorlow@chromium.org 已提交
383 384 385 386 387 388
        } else {
          Slice raw_value = iter_->value();
          if (saved_value_.capacity() > raw_value.size() + 1048576) {
            std::string empty;
            swap(empty, saved_value_);
          }
L
Lei Jin 已提交
389
          saved_key_.SetKey(ExtractUserKey(iter_->key()));
J
jorlow@chromium.org 已提交
390 391
          saved_value_.assign(raw_value.data(), raw_value.size());
        }
S
sdong 已提交
392 393 394 395
      } else {
        // In the case of ikey.sequence > sequence_, we might have already
        // iterated to a different user key.
        saved_key_valid = false;
J
jorlow@chromium.org 已提交
396
      }
397 398 399 400 401
      num_skipped++;
      // If we have sequentially iterated via numerous keys and still not
      // found the prev user-key, then it is better to seek so that we can
      // avoid too many key comparisons. We seek to the first occurence of
      // our current key by looking for max sequence number.
S
sdong 已提交
402
      if (saved_key_valid && num_skipped > max_skip_) {
403 404
        num_skipped = 0;
        std::string last_key;
405 406 407
        AppendInternalKey(&last_key, ParsedInternalKey(saved_key_.GetKey(),
                                                       kMaxSequenceNumber,
                                                       kValueTypeForSeek));
408 409 410 411 412
        iter_->Seek(last_key);
        RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
      } else {
        iter_->Prev();
      }
J
jorlow@chromium.org 已提交
413 414
    } while (iter_->Valid());
  }
J
jorlow@chromium.org 已提交
415

J
jorlow@chromium.org 已提交
416 417 418
  if (value_type == kTypeDeletion) {
    // End
    valid_ = false;
419
    saved_key_.Clear();
J
jorlow@chromium.org 已提交
420 421 422 423 424 425
    ClearSavedValue();
    direction_ = kForward;
  } else {
    valid_ = true;
  }
}
J
jorlow@chromium.org 已提交
426

J
jorlow@chromium.org 已提交
427
void DBIter::Seek(const Slice& target) {
428 429 430
  saved_key_.Clear();
  // now savved_key is used to store internal key.
  saved_key_.SetInternalKey(target, sequence_);
L
Lei Jin 已提交
431
  PERF_TIMER_AUTO(seek_internal_seek_time);
432
  iter_->Seek(saved_key_.GetKey());
L
Lei Jin 已提交
433
  PERF_TIMER_STOP(seek_internal_seek_time);
J
jorlow@chromium.org 已提交
434
  if (iter_->Valid()) {
435 436
    direction_ = kForward;
    ClearSavedValue();
437
    FindNextUserEntry(false /*not skipping */);
J
jorlow@chromium.org 已提交
438 439 440 441
  } else {
    valid_ = false;
  }
}
J
jorlow@chromium.org 已提交
442

J
jorlow@chromium.org 已提交
443 444 445
void DBIter::SeekToFirst() {
  direction_ = kForward;
  ClearSavedValue();
L
Lei Jin 已提交
446
  PERF_TIMER_AUTO(seek_internal_seek_time);
J
jorlow@chromium.org 已提交
447
  iter_->SeekToFirst();
L
Lei Jin 已提交
448
  PERF_TIMER_STOP(seek_internal_seek_time);
J
jorlow@chromium.org 已提交
449
  if (iter_->Valid()) {
450
    FindNextUserEntry(false /* not skipping */);
J
jorlow@chromium.org 已提交
451 452
  } else {
    valid_ = false;
J
jorlow@chromium.org 已提交
453 454 455
  }
}

J
jorlow@chromium.org 已提交
456
void DBIter::SeekToLast() {
457
  // Throw an exception for now if merge_operator is provided
458 459 460 461 462 463 464
  // TODO: support backward iteration
  if (user_merge_operator_) {
    Log(logger_, "SeekToLast not supported yet if merge_operator is provided");
    throw std::logic_error("DBIter::SeekToLast: backward iteration not"
                           " supported if merge_operator is provided");
  }

J
jorlow@chromium.org 已提交
465 466
  direction_ = kReverse;
  ClearSavedValue();
L
Lei Jin 已提交
467
  PERF_TIMER_AUTO(seek_internal_seek_time);
J
jorlow@chromium.org 已提交
468
  iter_->SeekToLast();
L
Lei Jin 已提交
469
  PERF_TIMER_STOP(seek_internal_seek_time);
J
jorlow@chromium.org 已提交
470 471 472
  FindPrevUserEntry();
}

473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503
Iterator* NewDBIterator(Env* env, const Options& options,
                        const Comparator* user_key_comparator,
                        Iterator* internal_iter,
                        const SequenceNumber& sequence) {
  return new DBIter(env, options, user_key_comparator, internal_iter, sequence,
                    false);
}

ArenaWrappedDBIter::~ArenaWrappedDBIter() { db_iter_->~Iterator(); }

void ArenaWrappedDBIter::SetDBIter(DBIter* iter) { db_iter_ = iter; }

void ArenaWrappedDBIter::SetIterUnderDBIter(Iterator* iter) {
  static_cast<DBIter*>(db_iter_)->SetIter(iter);
}

inline bool ArenaWrappedDBIter::Valid() const { return db_iter_->Valid(); }
inline void ArenaWrappedDBIter::SeekToFirst() { db_iter_->SeekToFirst(); }
inline void ArenaWrappedDBIter::SeekToLast() { db_iter_->SeekToLast(); }
inline void ArenaWrappedDBIter::Seek(const Slice& target) {
  db_iter_->Seek(target);
}
inline void ArenaWrappedDBIter::Next() { db_iter_->Next(); }
inline void ArenaWrappedDBIter::Prev() { db_iter_->Prev(); }
inline Slice ArenaWrappedDBIter::key() const { return db_iter_->key(); }
inline Slice ArenaWrappedDBIter::value() const { return db_iter_->value(); }
inline Status ArenaWrappedDBIter::status() const { return db_iter_->status(); }
void ArenaWrappedDBIter::RegisterCleanup(CleanupFunction function, void* arg1,
                                         void* arg2) {
  db_iter_->RegisterCleanup(function, arg1, arg2);
}
J
jorlow@chromium.org 已提交
504

505 506
ArenaWrappedDBIter* NewArenaWrappedDbIterator(
    Env* env, const Options& options, const Comparator* user_key_comparator,
J
jorlow@chromium.org 已提交
507
    const SequenceNumber& sequence) {
508 509 510 511 512 513 514
  ArenaWrappedDBIter* iter = new ArenaWrappedDBIter();
  Arena* arena = iter->GetArena();
  auto mem = arena->AllocateAligned(sizeof(DBIter));
  DBIter* db_iter = new (mem)
      DBIter(env, options, user_key_comparator, nullptr, sequence, true);
  iter->SetDBIter(db_iter);
  return iter;
J
jorlow@chromium.org 已提交
515 516
}

517
}  // namespace rocksdb