no_batched_ops_stress.cc 36.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#ifdef GFLAGS
#include "db_stress_tool/db_stress_common.h"
12
#include "rocksdb/utilities/transaction_db.h"
13
#include "utilities/fault_injection_fs.h"
14

15
namespace ROCKSDB_NAMESPACE {
16 17 18 19 20 21
class NonBatchedOpsStressTest : public StressTest {
 public:
  NonBatchedOpsStressTest() {}

  virtual ~NonBatchedOpsStressTest() {}

22
  void VerifyDb(ThreadState* thread) const override {
23 24
    // This `ReadOptions` is for validation purposes. Ignore
    // `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
25
    ReadOptions options(FLAGS_verify_checksum, true);
26 27 28 29 30 31 32
    std::string ts_str;
    Slice ts;
    if (FLAGS_user_timestamp_size > 0) {
      ts_str = GenerateTimestampForRead();
      ts = ts_str;
      options.timestamp = &ts;
    }
33 34 35 36 37 38 39 40 41 42 43 44 45 46
    auto shared = thread->shared;
    const int64_t max_key = shared->GetMaxKey();
    const int64_t keys_per_thread = max_key / shared->GetNumThreads();
    int64_t start = keys_per_thread * thread->tid;
    int64_t end = start + keys_per_thread;
    uint64_t prefix_to_use =
        (FLAGS_prefix_size < 0) ? 1 : static_cast<size_t>(FLAGS_prefix_size);
    if (thread->tid == shared->GetNumThreads() - 1) {
      end = max_key;
    }
    for (size_t cf = 0; cf < column_families_.size(); ++cf) {
      if (thread->shared->HasVerificationFailedYet()) {
        break;
      }
47 48
      if (thread->rand.OneIn(4)) {
        // 1/4 chance use iterator to verify this range
49 50
        Slice prefix;
        std::string seek_key = Key(start);
51 52
        std::unique_ptr<Iterator> iter(
            db_->NewIterator(options, column_families_[cf]));
53 54
        iter->Seek(seek_key);
        prefix = Slice(seek_key.data(), prefix_to_use);
55 56 57 58 59 60 61
        for (auto i = start; i < end; i++) {
          if (thread->shared->HasVerificationFailedYet()) {
            break;
          }
          std::string from_db;
          std::string keystr = Key(i);
          Slice k = keystr;
62 63 64 65 66 67 68
          Slice pfx = Slice(keystr.data(), prefix_to_use);
          // Reseek when the prefix changes
          if (prefix_to_use > 0 && prefix.compare(pfx) != 0) {
            iter->Seek(k);
            seek_key = keystr;
            prefix = Slice(seek_key.data(), prefix_to_use);
          }
69 70
          Status s = iter->status();
          if (iter->Valid()) {
71
            Slice iter_key = iter->key();
72 73 74 75 76
            if (iter->key().compare(k) > 0) {
              s = Status::NotFound(Slice());
            } else if (iter->key().compare(k) == 0) {
              from_db = iter->value().ToString();
              iter->Next();
77
            } else if (iter_key.compare(k) < 0) {
78 79 80 81 82 83
              VerificationAbort(shared, "An out of range key was found",
                                static_cast<int>(cf), i);
            }
          } else {
            // The iterator found no value for the key in question, so do not
            // move to the next item in the iterator
84
            s = Status::NotFound();
85
          }
86 87
          VerifyOrSyncValue(static_cast<int>(cf), i, options, shared, from_db,
                            s, true);
88 89 90 91 92
          if (from_db.length()) {
            PrintKeyValue(static_cast<int>(cf), static_cast<uint32_t>(i),
                          from_db.data(), from_db.length());
          }
        }
93 94
      } else if (thread->rand.OneIn(3)) {
        // 1/4 chance use Get to verify this range
95 96 97 98 99 100 101 102
        for (auto i = start; i < end; i++) {
          if (thread->shared->HasVerificationFailedYet()) {
            break;
          }
          std::string from_db;
          std::string keystr = Key(i);
          Slice k = keystr;
          Status s = db_->Get(options, column_families_[cf], k, &from_db);
103 104
          VerifyOrSyncValue(static_cast<int>(cf), i, options, shared, from_db,
                            s, true);
105 106 107 108 109
          if (from_db.length()) {
            PrintKeyValue(static_cast<int>(cf), static_cast<uint32_t>(i),
                          from_db.data(), from_db.length());
          }
        }
110 111
      } else if (thread->rand.OneIn(2)) {
        // 1/4 chance use MultiGet to verify this range
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
        for (auto i = start; i < end;) {
          if (thread->shared->HasVerificationFailedYet()) {
            break;
          }
          // Keep the batch size to some reasonable value
          size_t batch_size = thread->rand.Uniform(128) + 1;
          batch_size = std::min<size_t>(batch_size, end - i);
          std::vector<std::string> keystrs(batch_size);
          std::vector<Slice> keys(batch_size);
          std::vector<PinnableSlice> values(batch_size);
          std::vector<Status> statuses(batch_size);
          for (size_t j = 0; j < batch_size; ++j) {
            keystrs[j] = Key(i + j);
            keys[j] = Slice(keystrs[j].data(), keystrs[j].length());
          }
          db_->MultiGet(options, column_families_[cf], batch_size, keys.data(),
                        values.data(), statuses.data());
          for (size_t j = 0; j < batch_size; ++j) {
            Status s = statuses[j];
            std::string from_db = values[j].ToString();
132 133
            VerifyOrSyncValue(static_cast<int>(cf), i + j, options, shared,
                              from_db, s, true);
134 135 136 137 138 139 140 141
            if (from_db.length()) {
              PrintKeyValue(static_cast<int>(cf), static_cast<uint32_t>(i + j),
                            from_db.data(), from_db.length());
            }
          }

          i += batch_size;
        }
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175
      } else {
        // 1/4 chance use GetMergeOperand to verify this range
        // Start off with small size that will be increased later if necessary
        std::vector<PinnableSlice> values(4);
        GetMergeOperandsOptions merge_operands_info;
        merge_operands_info.expected_max_number_of_operands =
            static_cast<int>(values.size());
        for (auto i = start; i < end; i++) {
          if (thread->shared->HasVerificationFailedYet()) {
            break;
          }
          std::string from_db;
          std::string keystr = Key(i);
          Slice k = keystr;
          int number_of_operands = 0;
          Status s = db_->GetMergeOperands(options, column_families_[cf], k,
                                           values.data(), &merge_operands_info,
                                           &number_of_operands);
          if (s.IsIncomplete()) {
            // Need to resize values as there are more than values.size() merge
            // operands on this key. Should only happen a few times when we
            // encounter a key that had more merge operands than any key seen so
            // far
            values.resize(number_of_operands);
            merge_operands_info.expected_max_number_of_operands =
                static_cast<int>(number_of_operands);
            s = db_->GetMergeOperands(options, column_families_[cf], k,
                                      values.data(), &merge_operands_info,
                                      &number_of_operands);
          }
          // Assumed here that GetMergeOperands always sets number_of_operand
          if (number_of_operands) {
            from_db = values[number_of_operands - 1].ToString();
          }
176 177
          VerifyOrSyncValue(static_cast<int>(cf), i, options, shared, from_db,
                            s, true);
178 179 180 181 182
          if (from_db.length()) {
            PrintKeyValue(static_cast<int>(cf), static_cast<uint32_t>(i),
                          from_db.data(), from_db.length());
          }
        }
183 184 185 186
      }
    }
  }

187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284
#ifndef ROCKSDB_LITE
  void ContinuouslyVerifyDb(ThreadState* thread) const override {
    if (!cmp_db_) {
      return;
    }
    assert(cmp_db_);
    assert(!cmp_cfhs_.empty());
    Status s = cmp_db_->TryCatchUpWithPrimary();
    if (!s.ok()) {
      assert(false);
      exit(1);
    }

    const auto checksum_column_family = [](Iterator* iter,
                                           uint32_t* checksum) -> Status {
      assert(nullptr != checksum);
      uint32_t ret = 0;
      for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
        ret = crc32c::Extend(ret, iter->key().data(), iter->key().size());
        ret = crc32c::Extend(ret, iter->value().data(), iter->value().size());
      }
      *checksum = ret;
      return iter->status();
    };

    auto* shared = thread->shared;
    assert(shared);
    const int64_t max_key = shared->GetMaxKey();
    ReadOptions read_opts(FLAGS_verify_checksum, true);
    std::string ts_str;
    Slice ts;
    if (FLAGS_user_timestamp_size > 0) {
      ts_str = GenerateTimestampForRead();
      ts = ts_str;
      read_opts.timestamp = &ts;
    }

    static Random64 rand64(shared->GetSeed());

    {
      uint32_t crc = 0;
      std::unique_ptr<Iterator> it(cmp_db_->NewIterator(read_opts));
      s = checksum_column_family(it.get(), &crc);
      if (!s.ok()) {
        fprintf(stderr, "Computing checksum of default cf: %s\n",
                s.ToString().c_str());
        assert(false);
      }
    }

    for (auto* handle : cmp_cfhs_) {
      if (thread->rand.OneInOpt(3)) {
        // Use Get()
        uint64_t key = rand64.Uniform(static_cast<uint64_t>(max_key));
        std::string key_str = Key(key);
        std::string value;
        std::string key_ts;
        s = cmp_db_->Get(read_opts, handle, key_str, &value,
                         FLAGS_user_timestamp_size > 0 ? &key_ts : nullptr);
        s.PermitUncheckedError();
      } else {
        // Use range scan
        std::unique_ptr<Iterator> iter(cmp_db_->NewIterator(read_opts, handle));
        uint32_t rnd = (thread->rand.Next()) % 4;
        if (0 == rnd) {
          // SeekToFirst() + Next()*5
          read_opts.total_order_seek = true;
          iter->SeekToFirst();
          for (int i = 0; i < 5 && iter->Valid(); ++i, iter->Next()) {
          }
        } else if (1 == rnd) {
          // SeekToLast() + Prev()*5
          read_opts.total_order_seek = true;
          iter->SeekToLast();
          for (int i = 0; i < 5 && iter->Valid(); ++i, iter->Prev()) {
          }
        } else if (2 == rnd) {
          // Seek() +Next()*5
          uint64_t key = rand64.Uniform(static_cast<uint64_t>(max_key));
          std::string key_str = Key(key);
          iter->Seek(key_str);
          for (int i = 0; i < 5 && iter->Valid(); ++i, iter->Next()) {
          }
        } else {
          // SeekForPrev() + Prev()*5
          uint64_t key = rand64.Uniform(static_cast<uint64_t>(max_key));
          std::string key_str = Key(key);
          iter->SeekForPrev(key_str);
          for (int i = 0; i < 5 && iter->Valid(); ++i, iter->Prev()) {
          }
        }
      }
    }
  }
#else
  void ContinuouslyVerifyDb(ThreadState* /*thread*/) const override {}
#endif  // ROCKSDB_LITE

285
  void MaybeClearOneColumnFamily(ThreadState* thread) override {
286 287
    if (FLAGS_column_families > 1) {
      if (thread->rand.OneInOpt(FLAGS_clear_column_family_one_in)) {
288 289
        // drop column family and then create it again (can't drop default)
        int cf = thread->rand.Next() % (FLAGS_column_families - 1) + 1;
S
sdong 已提交
290 291
        std::string new_name =
            std::to_string(new_column_family_name_.fetch_add(1));
292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320
        {
          MutexLock l(thread->shared->GetMutex());
          fprintf(
              stdout,
              "[CF %d] Dropping and recreating column family. new name: %s\n",
              cf, new_name.c_str());
        }
        thread->shared->LockColumnFamily(cf);
        Status s = db_->DropColumnFamily(column_families_[cf]);
        delete column_families_[cf];
        if (!s.ok()) {
          fprintf(stderr, "dropping column family error: %s\n",
                  s.ToString().c_str());
          std::terminate();
        }
        s = db_->CreateColumnFamily(ColumnFamilyOptions(options_), new_name,
                                    &column_families_[cf]);
        column_family_names_[cf] = new_name;
        thread->shared->ClearColumnFamily(cf);
        if (!s.ok()) {
          fprintf(stderr, "creating column family error: %s\n",
                  s.ToString().c_str());
          std::terminate();
        }
        thread->shared->UnlockColumnFamily(cf);
      }
    }
  }

321
  bool ShouldAcquireMutexOnKey() const override { return true; }
322

323 324
  bool IsStateTracked() const override { return true; }

325 326 327
  Status TestGet(ThreadState* thread, const ReadOptions& read_opts,
                 const std::vector<int>& rand_column_families,
                 const std::vector<int64_t>& rand_keys) override {
328 329 330 331
    auto cfh = column_families_[rand_column_families[0]];
    std::string key_str = Key(rand_keys[0]);
    Slice key = key_str;
    std::string from_db;
A
anand76 已提交
332 333 334 335
    int error_count = 0;

    if (fault_fs_guard) {
      fault_fs_guard->EnableErrorInjection();
336
      SharedState::ignore_read_error = false;
A
anand76 已提交
337
    }
338
    Status s = db_->Get(read_opts, cfh, key, &from_db);
A
anand76 已提交
339 340 341
    if (fault_fs_guard) {
      error_count = fault_fs_guard->GetAndResetErrorCount();
    }
342
    if (s.ok()) {
A
anand76 已提交
343
      if (fault_fs_guard) {
344
        if (error_count && !SharedState::ignore_read_error) {
A
anand76 已提交
345 346 347 348
          // Grab mutex so multiple thread don't try to print the
          // stack trace at the same time
          MutexLock l(thread->shared->GetMutex());
          fprintf(stderr, "Didn't get expected error from Get\n");
349
          fprintf(stderr, "Callstack that injected the fault\n");
A
anand76 已提交
350 351 352 353
          fault_fs_guard->PrintFaultBacktrace();
          std::terminate();
        }
      }
354 355 356 357 358 359
      // found case
      thread->stats.AddGets(1, 1);
    } else if (s.IsNotFound()) {
      // not found case
      thread->stats.AddGets(1, 0);
    } else {
A
anand76 已提交
360 361 362 363 364 365
      if (error_count == 0) {
        // errors case
        thread->stats.AddErrors(1);
      } else {
        thread->stats.AddVerifiedErrors(1);
      }
366
    }
A
anand76 已提交
367 368 369
    if (fault_fs_guard) {
      fault_fs_guard->DisableErrorInjection();
    }
370 371 372
    return s;
  }

373
  std::vector<Status> TestMultiGet(
374 375
      ThreadState* thread, const ReadOptions& read_opts,
      const std::vector<int>& rand_column_families,
376
      const std::vector<int64_t>& rand_keys) override {
377 378 379 380 381 382 383 384
    size_t num_keys = rand_keys.size();
    std::vector<std::string> key_str;
    std::vector<Slice> keys;
    key_str.reserve(num_keys);
    keys.reserve(num_keys);
    std::vector<PinnableSlice> values(num_keys);
    std::vector<Status> statuses(num_keys);
    ColumnFamilyHandle* cfh = column_families_[rand_column_families[0]];
A
anand76 已提交
385
    int error_count = 0;
386 387 388 389 390 391 392 393
    // Do a consistency check between Get and MultiGet. Don't do it too
    // often as it will slow db_stress down
    bool do_consistency_check = thread->rand.OneIn(4);

    ReadOptions readoptionscopy = read_opts;
    if (do_consistency_check) {
      readoptionscopy.snapshot = db_->GetSnapshot();
    }
394

395 396 397
    // To appease clang analyzer
    const bool use_txn = FLAGS_use_txn;

398 399 400 401 402
    // Create a transaction in order to write some data. The purpose is to
    // exercise WriteBatchWithIndex::MultiGetFromBatchAndDB. The transaction
    // will be rolled back once MultiGet returns.
#ifndef ROCKSDB_LITE
    Transaction* txn = nullptr;
403
    if (use_txn) {
404
      WriteOptions wo;
405 406 407
      if (FLAGS_rate_limit_auto_wal_flush) {
        wo.rate_limiter_priority = Env::IO_USER;
      }
408 409 410 411 412
      Status s = NewTxn(wo, &txn);
      if (!s.ok()) {
        fprintf(stderr, "NewTxn: %s\n", s.ToString().c_str());
        std::terminate();
      }
413 414
    }
#endif
415 416 417
    for (size_t i = 0; i < num_keys; ++i) {
      key_str.emplace_back(Key(rand_keys[i]));
      keys.emplace_back(key_str.back());
418
#ifndef ROCKSDB_LITE
419
      if (use_txn) {
420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453
        // With a 1 in 10 probability, insert the just added key in the batch
        // into the transaction. This will create an overlap with the MultiGet
        // keys and exercise some corner cases in the code
        if (thread->rand.OneIn(10)) {
          int op = thread->rand.Uniform(2);
          Status s;
          switch (op) {
            case 0:
            case 1: {
              uint32_t value_base =
                  thread->rand.Next() % thread->shared->UNKNOWN_SENTINEL;
              char value[100];
              size_t sz = GenerateValue(value_base, value, sizeof(value));
              Slice v(value, sz);
              if (op == 0) {
                s = txn->Put(cfh, keys.back(), v);
              } else {
                s = txn->Merge(cfh, keys.back(), v);
              }
              break;
            }
            case 2:
              s = txn->Delete(cfh, keys.back());
              break;
            default:
              assert(false);
          }
          if (!s.ok()) {
            fprintf(stderr, "Transaction put: %s\n", s.ToString().c_str());
            std::terminate();
          }
        }
      }
#endif
454
    }
455

456
    if (!use_txn) {
A
anand76 已提交
457 458
      if (fault_fs_guard) {
        fault_fs_guard->EnableErrorInjection();
459
        SharedState::ignore_read_error = false;
A
anand76 已提交
460
      }
461
      db_->MultiGet(readoptionscopy, cfh, num_keys, keys.data(), values.data(),
462
                    statuses.data());
A
anand76 已提交
463 464 465
      if (fault_fs_guard) {
        error_count = fault_fs_guard->GetAndResetErrorCount();
      }
466 467
    } else {
#ifndef ROCKSDB_LITE
468
      txn->MultiGet(readoptionscopy, cfh, num_keys, keys.data(), values.data(),
469 470 471 472
                    statuses.data());
#endif
    }

473
    if (fault_fs_guard && error_count && !SharedState::ignore_read_error) {
474 475 476 477
      int stat_nok = 0;
      for (const auto& s : statuses) {
        if (!s.ok() && !s.IsNotFound()) {
          stat_nok++;
A
anand76 已提交
478
        }
479 480 481 482 483 484
      }

      if (stat_nok < error_count) {
        // Grab mutex so multiple thread don't try to print the
        // stack trace at the same time
        MutexLock l(thread->shared->GetMutex());
485 486 487
        fprintf(stderr, "Didn't get expected error from MultiGet. \n");
        fprintf(stderr, "num_keys %zu Expected %d errors, seen %d\n", num_keys,
                error_count, stat_nok);
488 489 490 491 492
        fprintf(stderr, "Callstack that injected the fault\n");
        fault_fs_guard->PrintFaultBacktrace();
        std::terminate();
      }
    }
493 494 495
    if (fault_fs_guard) {
      fault_fs_guard->DisableErrorInjection();
    }
496

497 498 499 500 501 502 503 504 505
    for (size_t i = 0; i < statuses.size(); ++i) {
      Status s = statuses[i];
      bool is_consistent = true;
      // Only do the consistency check if no error was injected and MultiGet
      // didn't return an unexpected error
      if (do_consistency_check && !error_count && (s.ok() || s.IsNotFound())) {
        Status tmp_s;
        std::string value;

506 507 508 509 510 511 512
        if (use_txn) {
#ifndef ROCKSDB_LITE
          tmp_s = txn->Get(readoptionscopy, cfh, keys[i], &value);
#endif  // ROCKSDB_LITE
        } else {
          tmp_s = db_->Get(readoptionscopy, cfh, keys[i], &value);
        }
513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542
        if (!tmp_s.ok() && !tmp_s.IsNotFound()) {
          fprintf(stderr, "Get error: %s\n", s.ToString().c_str());
          is_consistent = false;
        } else if (!s.ok() && tmp_s.ok()) {
          fprintf(stderr, "MultiGet returned different results with key %s\n",
                  keys[i].ToString(true).c_str());
          fprintf(stderr, "Get returned ok, MultiGet returned not found\n");
          is_consistent = false;
        } else if (s.ok() && tmp_s.IsNotFound()) {
          fprintf(stderr, "MultiGet returned different results with key %s\n",
                  keys[i].ToString(true).c_str());
          fprintf(stderr, "MultiGet returned ok, Get returned not found\n");
          is_consistent = false;
        } else if (s.ok() && value != values[i].ToString()) {
          fprintf(stderr, "MultiGet returned different results with key %s\n",
                  keys[i].ToString(true).c_str());
          fprintf(stderr, "MultiGet returned value %s\n",
                  values[i].ToString(true).c_str());
          fprintf(stderr, "Get returned value %s\n", value.c_str());
          is_consistent = false;
        }
      }

      if (!is_consistent) {
        fprintf(stderr, "TestMultiGet error: is_consistent is false\n");
        thread->stats.AddErrors(1);
        // Fail fast to preserve the DB state
        thread->shared->SetVerificationFailure();
        break;
      } else if (s.ok()) {
543 544
        // found case
        thread->stats.AddGets(1, 1);
545 546 547
      } else if (s.IsNotFound()) {
        // not found case
        thread->stats.AddGets(1, 0);
548 549 550
      } else if (s.IsMergeInProgress() && use_txn) {
        // With txn this is sometimes expected.
        thread->stats.AddGets(1, 1);
551
      } else {
A
anand76 已提交
552 553 554 555 556 557 558
        if (error_count == 0) {
          // errors case
          fprintf(stderr, "MultiGet error: %s\n", s.ToString().c_str());
          thread->stats.AddErrors(1);
        } else {
          thread->stats.AddVerifiedErrors(1);
        }
559 560
      }
    }
561 562 563

    if (readoptionscopy.snapshot) {
      db_->ReleaseSnapshot(readoptionscopy.snapshot);
A
anand76 已提交
564
    }
565 566 567 568 569
    if (use_txn) {
#ifndef ROCKSDB_LITE
      RollbackTxn(txn);
#endif
    }
570 571 572
    return statuses;
  }

573 574 575
  Status TestPrefixScan(ThreadState* thread, const ReadOptions& read_opts,
                        const std::vector<int>& rand_column_families,
                        const std::vector<int64_t>& rand_keys) override {
576 577 578 579 580 581 582 583
    auto cfh = column_families_[rand_column_families[0]];
    std::string key_str = Key(rand_keys[0]);
    Slice key = key_str;
    Slice prefix = Slice(key.data(), FLAGS_prefix_size);

    std::string upper_bound;
    Slice ub_slice;
    ReadOptions ro_copy = read_opts;
584 585 586
    // Get the next prefix first and then see if we want to set upper bound.
    // We'll use the next prefix in an assertion later on
    if (GetNextPrefix(prefix, &upper_bound) && thread->rand.OneIn(2)) {
587 588 589 590 591 592
      // For half of the time, set the upper bound to the next prefix
      ub_slice = Slice(upper_bound);
      ro_copy.iterate_upper_bound = &ub_slice;
    }

    Iterator* iter = db_->NewIterator(ro_copy, cfh);
593
    unsigned long count = 0;
594 595 596 597
    for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix);
         iter->Next()) {
      ++count;
    }
598 599 600

    assert(count <= GetPrefixKeyCount(prefix.ToString(), upper_bound));

601 602 603 604
    Status s = iter->status();
    if (iter->status().ok()) {
      thread->stats.AddPrefixes(1, count);
    } else {
605
      fprintf(stderr, "TestPrefixScan error: %s\n", s.ToString().c_str());
606 607 608 609 610 611
      thread->stats.AddErrors(1);
    }
    delete iter;
    return s;
  }

612 613 614 615 616
  Status TestPut(ThreadState* thread, WriteOptions& write_opts,
                 const ReadOptions& read_opts,
                 const std::vector<int>& rand_column_families,
                 const std::vector<int64_t>& rand_keys, char (&value)[100],
                 std::unique_ptr<MutexLock>& lock) override {
617 618 619 620
    auto shared = thread->shared;
    int64_t max_key = shared->GetMaxKey();
    int64_t rand_key = rand_keys[0];
    int rand_column_family = rand_column_families[0];
621 622
    std::string write_ts_str;
    Slice write_ts;
623 624 625 626 627 628 629
    while (!shared->AllowsOverwrite(rand_key) &&
           (FLAGS_use_merge || shared->Exists(rand_column_family, rand_key))) {
      lock.reset();
      rand_key = thread->rand.Next() % max_key;
      rand_column_family = thread->rand.Next() % FLAGS_column_families;
      lock.reset(
          new MutexLock(shared->GetMutexForKey(rand_column_family, rand_key)));
630 631 632 633
      if (FLAGS_user_timestamp_size > 0) {
        write_ts_str = NowNanosStr();
        write_ts = write_ts_str;
      }
634
    }
635 636 637 638
    if (write_ts.size() == 0 && FLAGS_user_timestamp_size) {
      write_ts_str = NowNanosStr();
      write_ts = write_ts_str;
    }
639 640 641 642 643 644 645 646 647 648

    std::string key_str = Key(rand_key);
    Slice key = key_str;
    ColumnFamilyHandle* cfh = column_families_[rand_column_family];

    if (FLAGS_verify_before_write) {
      std::string key_str2 = Key(rand_key);
      Slice k = key_str2;
      std::string from_db;
      Status s = db_->Get(read_opts, cfh, k, &from_db);
649 650
      if (!VerifyOrSyncValue(rand_column_family, rand_key, read_opts, shared,
                             from_db, s, true)) {
651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668
        return s;
      }
    }
    uint32_t value_base = thread->rand.Next() % shared->UNKNOWN_SENTINEL;
    size_t sz = GenerateValue(value_base, value, sizeof(value));
    Slice v(value, sz);
    shared->Put(rand_column_family, rand_key, value_base, true /* pending */);
    Status s;
    if (FLAGS_use_merge) {
      if (!FLAGS_use_txn) {
        s = db_->Merge(write_opts, cfh, key, v);
      } else {
#ifndef ROCKSDB_LITE
        Transaction* txn;
        s = NewTxn(write_opts, &txn);
        if (s.ok()) {
          s = txn->Merge(cfh, key, v);
          if (s.ok()) {
669
            s = CommitTxn(txn, thread);
670 671 672 673 674 675
          }
        }
#endif
      }
    } else {
      if (!FLAGS_use_txn) {
676 677 678 679 680
        if (FLAGS_user_timestamp_size == 0) {
          s = db_->Put(write_opts, cfh, key, v);
        } else {
          s = db_->Put(write_opts, cfh, key, write_ts, v);
        }
681 682 683 684 685 686 687
      } else {
#ifndef ROCKSDB_LITE
        Transaction* txn;
        s = NewTxn(write_opts, &txn);
        if (s.ok()) {
          s = txn->Put(cfh, key, v);
          if (s.ok()) {
688
            s = CommitTxn(txn, thread);
689 690 691 692 693 694 695
          }
        }
#endif
      }
    }
    shared->Put(rand_column_family, rand_key, value_base, false /* pending */);
    if (!s.ok()) {
696 697 698 699 700 701 702 703 704 705 706 707
      if (FLAGS_injest_error_severity >= 2) {
        if (!is_db_stopped_ && s.severity() >= Status::Severity::kFatalError) {
          is_db_stopped_ = true;
        } else if (!is_db_stopped_ ||
                   s.severity() < Status::Severity::kFatalError) {
          fprintf(stderr, "put or merge error: %s\n", s.ToString().c_str());
          std::terminate();
        }
      } else {
        fprintf(stderr, "put or merge error: %s\n", s.ToString().c_str());
        std::terminate();
      }
708 709 710 711 712 713 714
    }
    thread->stats.AddBytesForWrites(1, sz);
    PrintKeyValue(rand_column_family, static_cast<uint32_t>(rand_key), value,
                  sz);
    return s;
  }

715 716 717
  Status TestDelete(ThreadState* thread, WriteOptions& write_opts,
                    const std::vector<int>& rand_column_families,
                    const std::vector<int64_t>& rand_keys,
718
                    std::unique_ptr<MutexLock>& /* lock */) override {
719 720 721 722 723
    int64_t rand_key = rand_keys[0];
    int rand_column_family = rand_column_families[0];
    auto shared = thread->shared;

    // OPERATION delete
724 725
    std::string write_ts_str = NowNanosStr();
    Slice write_ts = write_ts_str;
726 727 728 729 730 731 732 733 734 735 736

    std::string key_str = Key(rand_key);
    Slice key = key_str;
    auto cfh = column_families_[rand_column_family];

    // Use delete if the key may be overwritten and a single deletion
    // otherwise.
    Status s;
    if (shared->AllowsOverwrite(rand_key)) {
      shared->Delete(rand_column_family, rand_key, true /* pending */);
      if (!FLAGS_use_txn) {
737 738 739 740 741
        if (FLAGS_user_timestamp_size == 0) {
          s = db_->Delete(write_opts, cfh, key);
        } else {
          s = db_->Delete(write_opts, cfh, key, write_ts);
        }
742 743 744 745 746 747 748
      } else {
#ifndef ROCKSDB_LITE
        Transaction* txn;
        s = NewTxn(write_opts, &txn);
        if (s.ok()) {
          s = txn->Delete(cfh, key);
          if (s.ok()) {
749
            s = CommitTxn(txn, thread);
750 751 752 753 754 755 756
          }
        }
#endif
      }
      shared->Delete(rand_column_family, rand_key, false /* pending */);
      thread->stats.AddDeletes(1);
      if (!s.ok()) {
757 758 759 760 761 762 763 764 765 766 767 768 769
        if (FLAGS_injest_error_severity >= 2) {
          if (!is_db_stopped_ &&
              s.severity() >= Status::Severity::kFatalError) {
            is_db_stopped_ = true;
          } else if (!is_db_stopped_ ||
                     s.severity() < Status::Severity::kFatalError) {
            fprintf(stderr, "delete error: %s\n", s.ToString().c_str());
            std::terminate();
          }
        } else {
          fprintf(stderr, "delete error: %s\n", s.ToString().c_str());
          std::terminate();
        }
770 771 772 773
      }
    } else {
      shared->SingleDelete(rand_column_family, rand_key, true /* pending */);
      if (!FLAGS_use_txn) {
774 775 776 777 778
        if (FLAGS_user_timestamp_size == 0) {
          s = db_->SingleDelete(write_opts, cfh, key);
        } else {
          s = db_->SingleDelete(write_opts, cfh, key, write_ts);
        }
779 780 781 782 783 784 785
      } else {
#ifndef ROCKSDB_LITE
        Transaction* txn;
        s = NewTxn(write_opts, &txn);
        if (s.ok()) {
          s = txn->SingleDelete(cfh, key);
          if (s.ok()) {
786
            s = CommitTxn(txn, thread);
787 788 789 790 791 792 793
          }
        }
#endif
      }
      shared->SingleDelete(rand_column_family, rand_key, false /* pending */);
      thread->stats.AddSingleDeletes(1);
      if (!s.ok()) {
794 795 796 797 798 799 800 801 802 803 804 805 806
        if (FLAGS_injest_error_severity >= 2) {
          if (!is_db_stopped_ &&
              s.severity() >= Status::Severity::kFatalError) {
            is_db_stopped_ = true;
          } else if (!is_db_stopped_ ||
                     s.severity() < Status::Severity::kFatalError) {
            fprintf(stderr, "single delete error: %s\n", s.ToString().c_str());
            std::terminate();
          }
        } else {
          fprintf(stderr, "single delete error: %s\n", s.ToString().c_str());
          std::terminate();
        }
807 808 809 810 811
      }
    }
    return s;
  }

812 813 814 815
  Status TestDeleteRange(ThreadState* thread, WriteOptions& write_opts,
                         const std::vector<int>& rand_column_families,
                         const std::vector<int64_t>& rand_keys,
                         std::unique_ptr<MutexLock>& lock) override {
816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851
    // OPERATION delete range
    std::vector<std::unique_ptr<MutexLock>> range_locks;
    // delete range does not respect disallowed overwrites. the keys for
    // which overwrites are disallowed are randomly distributed so it
    // could be expensive to find a range where each key allows
    // overwrites.
    int64_t rand_key = rand_keys[0];
    int rand_column_family = rand_column_families[0];
    auto shared = thread->shared;
    int64_t max_key = shared->GetMaxKey();
    if (rand_key > max_key - FLAGS_range_deletion_width) {
      lock.reset();
      rand_key =
          thread->rand.Next() % (max_key - FLAGS_range_deletion_width + 1);
      range_locks.emplace_back(
          new MutexLock(shared->GetMutexForKey(rand_column_family, rand_key)));
    } else {
      range_locks.emplace_back(std::move(lock));
    }
    for (int j = 1; j < FLAGS_range_deletion_width; ++j) {
      if (((rand_key + j) & ((1 << FLAGS_log2_keys_per_lock) - 1)) == 0) {
        range_locks.emplace_back(new MutexLock(
            shared->GetMutexForKey(rand_column_family, rand_key + j)));
      }
    }
    shared->DeleteRange(rand_column_family, rand_key,
                        rand_key + FLAGS_range_deletion_width,
                        true /* pending */);

    std::string keystr = Key(rand_key);
    Slice key = keystr;
    auto cfh = column_families_[rand_column_family];
    std::string end_keystr = Key(rand_key + FLAGS_range_deletion_width);
    Slice end_key = end_keystr;
    Status s = db_->DeleteRange(write_opts, cfh, key, end_key);
    if (!s.ok()) {
852 853 854 855 856 857 858 859 860 861 862 863
      if (FLAGS_injest_error_severity >= 2) {
        if (!is_db_stopped_ && s.severity() >= Status::Severity::kFatalError) {
          is_db_stopped_ = true;
        } else if (!is_db_stopped_ ||
                   s.severity() < Status::Severity::kFatalError) {
          fprintf(stderr, "delete range error: %s\n", s.ToString().c_str());
          std::terminate();
        }
      } else {
        fprintf(stderr, "delete range error: %s\n", s.ToString().c_str());
        std::terminate();
      }
864 865 866 867 868 869 870 871 872 873
    }
    int covered = shared->DeleteRange(rand_column_family, rand_key,
                                      rand_key + FLAGS_range_deletion_width,
                                      false /* pending */);
    thread->stats.AddRangeDeletions(1);
    thread->stats.AddCoveredByRangeDeletions(covered);
    return s;
  }

#ifdef ROCKSDB_LITE
874
  void TestIngestExternalFile(
875 876 877
      ThreadState* /* thread */,
      const std::vector<int>& /* rand_column_families */,
      const std::vector<int64_t>& /* rand_keys */,
878
      std::unique_ptr<MutexLock>& /* lock */) override {
879 880 881 882 883 884 885
    assert(false);
    fprintf(stderr,
            "RocksDB lite does not support "
            "TestIngestExternalFile\n");
    std::terminate();
  }
#else
886 887 888 889
  void TestIngestExternalFile(ThreadState* thread,
                              const std::vector<int>& rand_column_families,
                              const std::vector<int64_t>& rand_keys,
                              std::unique_ptr<MutexLock>& lock) override {
890
    const std::string sst_filename =
S
sdong 已提交
891
        FLAGS_db + "/." + std::to_string(thread->tid) + ".sst";
892
    Status s;
893
    if (db_stress_env->FileExists(sst_filename).ok()) {
894 895
      // Maybe we terminated abnormally before, so cleanup to give this file
      // ingestion a clean slate
896
      s = db_stress_env->DeleteFile(sst_filename);
897 898 899 900 901 902 903 904 905
    }

    SstFileWriter sst_file_writer(EnvOptions(options_), options_);
    if (s.ok()) {
      s = sst_file_writer.Open(sst_filename);
    }
    int64_t key_base = rand_keys[0];
    int column_family = rand_column_families[0];
    std::vector<std::unique_ptr<MutexLock>> range_locks;
906 907 908
    range_locks.reserve(FLAGS_ingest_external_file_width);
    std::vector<int64_t> keys;
    keys.reserve(FLAGS_ingest_external_file_width);
909
    std::vector<uint32_t> values;
910
    values.reserve(FLAGS_ingest_external_file_width);
911 912
    SharedState* shared = thread->shared;

913
    assert(FLAGS_nooverwritepercent < 100);
914 915
    // Grab locks, set pending state on expected values, and add keys
    for (int64_t key = key_base;
916 917
         s.ok() && key < shared->GetMaxKey() &&
         static_cast<int32_t>(keys.size()) < FLAGS_ingest_external_file_width;
918 919 920 921 922 923 924
         ++key) {
      if (key == key_base) {
        range_locks.emplace_back(std::move(lock));
      } else if ((key & ((1 << FLAGS_log2_keys_per_lock) - 1)) == 0) {
        range_locks.emplace_back(
            new MutexLock(shared->GetMutexForKey(column_family, key)));
      }
925 926 927 928 929 930
      if (!shared->AllowsOverwrite(key)) {
        // We could alternatively include `key` on the condition its current
        // value is `DELETION_SENTINEL`.
        continue;
      }
      keys.push_back(key);
931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952

      uint32_t value_base = thread->rand.Next() % shared->UNKNOWN_SENTINEL;
      values.push_back(value_base);
      shared->Put(column_family, key, value_base, true /* pending */);

      char value[100];
      size_t value_len = GenerateValue(value_base, value, sizeof(value));
      auto key_str = Key(key);
      s = sst_file_writer.Put(Slice(key_str), Slice(value, value_len));
    }

    if (s.ok()) {
      s = sst_file_writer.Finish();
    }
    if (s.ok()) {
      s = db_->IngestExternalFile(column_families_[column_family],
                                  {sst_filename}, IngestExternalFileOptions());
    }
    if (!s.ok()) {
      fprintf(stderr, "file ingestion error: %s\n", s.ToString().c_str());
      std::terminate();
    }
953 954
    for (size_t i = 0; i < keys.size(); ++i) {
      shared->Put(column_family, keys[i], values[i], false /* pending */);
955 956 957 958
    }
  }
#endif  // ROCKSDB_LITE

959 960 961
  bool VerifyOrSyncValue(int cf, int64_t key, const ReadOptions& /*opts*/,
                         SharedState* shared, const std::string& value_from_db,
                         const Status& s, bool strict = false) const {
962 963 964 965 966 967
    if (shared->HasVerificationFailedYet()) {
      return false;
    }
    // compare value_from_db with the value in the shared state
    uint32_t value_base = shared->Get(cf, key);
    if (value_base == SharedState::UNKNOWN_SENTINEL) {
968 969 970 971 972 973 974 975 976
      if (s.ok()) {
        // Value exists in db, update state to reflect that
        Slice slice(value_from_db);
        value_base = GetValueBase(slice);
        shared->Put(cf, key, value_base, false);
      } else if (s.IsNotFound()) {
        // Value doesn't exist in db, update state to reflect that
        shared->SingleDelete(cf, key, false);
      }
977 978 979 980 981 982 983
      return true;
    }
    if (value_base == SharedState::DELETION_SENTINEL && !strict) {
      return true;
    }

    if (s.ok()) {
984
      char value[kValueMaxLen];
985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006
      if (value_base == SharedState::DELETION_SENTINEL) {
        VerificationAbort(shared, "Unexpected value found", cf, key);
        return false;
      }
      size_t sz = GenerateValue(value_base, value, sizeof(value));
      if (value_from_db.length() != sz) {
        VerificationAbort(shared, "Length of value read is not equal", cf, key);
        return false;
      }
      if (memcmp(value_from_db.data(), value, sz) != 0) {
        VerificationAbort(shared, "Contents of value read don't match", cf,
                          key);
        return false;
      }
    } else {
      if (value_base != SharedState::DELETION_SENTINEL) {
        VerificationAbort(shared, "Value not found: " + s.ToString(), cf, key);
        return false;
      }
    }
    return true;
  }
1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021

#ifndef ROCKSDB_LITE
  void PrepareTxnDbOptions(SharedState* shared,
                           TransactionDBOptions& txn_db_opts) override {
    txn_db_opts.rollback_deletion_type_callback =
        [shared](TransactionDB*, ColumnFamilyHandle*, const Slice& key) {
          assert(shared);
          uint64_t key_num = 0;
          bool ok = GetIntVal(key.ToString(), &key_num);
          assert(ok);
          (void)ok;
          return !shared->AllowsOverwrite(key_num);
        };
  }
#endif  // ROCKSDB_LITE
1022 1023 1024 1025 1026 1027
};

StressTest* CreateNonBatchedOpsStressTest() {
  return new NonBatchedOpsStressTest();
}

1028
}  // namespace ROCKSDB_NAMESPACE
1029
#endif  // GFLAGS