listener_test.cc 17.8 KB
Newer Older
1 2 3 4
//  Copyright (c) 2013, Facebook, Inc.  All rights reserved.
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.
5

6
#include "db/db_impl.h"
7
#include "db/dbformat.h"
8 9 10
#include "db/filename.h"
#include "db/version_set.h"
#include "db/write_batch_internal.h"
11
#include "memtable/hash_linklist_rep.h"
12 13 14 15 16
#include "rocksdb/cache.h"
#include "rocksdb/compaction_filter.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/filter_policy.h"
17
#include "rocksdb/options.h"
18 19 20 21 22 23 24 25 26 27 28 29
#include "rocksdb/perf_context.h"
#include "rocksdb/slice.h"
#include "rocksdb/slice_transform.h"
#include "rocksdb/table.h"
#include "rocksdb/table_properties.h"
#include "table/block_based_table_factory.h"
#include "table/plain_table_factory.h"
#include "util/hash.h"
#include "util/logging.h"
#include "util/mutexlock.h"
#include "util/rate_limiter.h"
#include "util/statistics.h"
30
#include "util/string_util.h"
31
#include "util/sync_point.h"
32
#include "util/testharness.h"
33
#include "util/testutil.h"
34
#include "utilities/merge_operators.h"
35 36 37 38 39

#ifndef ROCKSDB_LITE

namespace rocksdb {

I
Igor Sugak 已提交
40
class EventListenerTest : public testing::Test {
41 42 43
 public:
  EventListenerTest() {
    dbname_ = test::TmpDir() + "/listener_test";
44
    EXPECT_OK(DestroyDB(dbname_, Options()));
45 46 47 48 49 50 51 52 53 54 55
    db_ = nullptr;
    Reopen();
  }

  ~EventListenerTest() {
    Close();
    Options options;
    options.db_paths.emplace_back(dbname_, 0);
    options.db_paths.emplace_back(dbname_ + "_2", 0);
    options.db_paths.emplace_back(dbname_ + "_3", 0);
    options.db_paths.emplace_back(dbname_ + "_4", 0);
56
    EXPECT_OK(DestroyDB(dbname_, options));
57 58 59 60 61 62
  }

  void CreateColumnFamilies(const std::vector<std::string>& cfs,
                            const ColumnFamilyOptions* options = nullptr) {
    ColumnFamilyOptions cf_opts;
    cf_opts = ColumnFamilyOptions(Options());
63
    size_t cfi = handles_.size();
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
    handles_.resize(cfi + cfs.size());
    for (auto cf : cfs) {
      ASSERT_OK(db_->CreateColumnFamily(cf_opts, cf, &handles_[cfi++]));
    }
  }

  void Close() {
    for (auto h : handles_) {
      delete h;
    }
    handles_.clear();
    delete db_;
    db_ = nullptr;
  }

  void ReopenWithColumnFamilies(const std::vector<std::string>& cfs,
                                const Options* options = nullptr) {
    ASSERT_OK(TryReopenWithColumnFamilies(cfs, options));
  }

  Status TryReopenWithColumnFamilies(const std::vector<std::string>& cfs,
                                     const Options* options = nullptr) {
    Close();
    Options opts = (options == nullptr) ? Options() : *options;
    std::vector<const Options*> v_opts(cfs.size(), &opts);
    return TryReopenWithColumnFamilies(cfs, v_opts);
  }

  Status TryReopenWithColumnFamilies(
      const std::vector<std::string>& cfs,
      const std::vector<const Options*>& options) {
    Close();
96
    EXPECT_EQ(cfs.size(), options.size());
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
    std::vector<ColumnFamilyDescriptor> column_families;
    for (size_t i = 0; i < cfs.size(); ++i) {
      column_families.push_back(ColumnFamilyDescriptor(cfs[i], *options[i]));
    }
    DBOptions db_opts = DBOptions(*options[0]);
    return DB::Open(db_opts, dbname_, column_families, &handles_, &db_);
  }

  Status TryReopen(Options* options = nullptr) {
    Close();
    Options opts;
    if (options != nullptr) {
      opts = *options;
    } else {
      opts.create_if_missing = true;
    }

    return DB::Open(opts, dbname_, &db_);
  }

  void Reopen(Options* options = nullptr) {
    ASSERT_OK(TryReopen(options));
  }

  void CreateAndReopenWithCF(const std::vector<std::string>& cfs,
                             const Options* options = nullptr) {
    CreateColumnFamilies(cfs, options);
    std::vector<std::string> cfs_plus_default = cfs;
    cfs_plus_default.insert(cfs_plus_default.begin(), kDefaultColumnFamilyName);
    ReopenWithColumnFamilies(cfs_plus_default, options);
  }

  DBImpl* dbfull() {
    return reinterpret_cast<DBImpl*>(db_);
  }

  Status Put(int cf, const Slice& k, const Slice& v,
             WriteOptions wo = WriteOptions()) {
    return db_->Put(wo, handles_[cf], k, v);
  }

138
  Status Flush(size_t cf = 0) {
139 140
    FlushOptions opt = FlushOptions();
    opt.wait = true;
141
    if (cf == 0) {
142
      return db_->Flush(opt);
143
    } else {
144
      return db_->Flush(opt, handles_[cf]);
145 146 147
    }
  }

148 149
  const size_t k110KB = 110 << 10;

150 151 152 153 154
  DB* db_;
  std::string dbname_;
  std::vector<ColumnFamilyHandle*> handles_;
};

155 156 157 158 159
struct TestPropertiesCollector : public rocksdb::TablePropertiesCollector {
  virtual rocksdb::Status AddUserKey(const rocksdb::Slice& key,
                                     const rocksdb::Slice& value,
                                     rocksdb::EntryType type,
                                     rocksdb::SequenceNumber seq,
160
                                     uint64_t file_size) override {
161 162
    return Status::OK();
  }
163 164
  virtual rocksdb::Status Finish(
      rocksdb::UserCollectedProperties* properties) override {
165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188
    properties->insert({"0", "1"});
    return Status::OK();
  }

  virtual const char* Name() const override {
    return "TestTablePropertiesCollector";
  }

  rocksdb::UserCollectedProperties GetReadableProperties() const override {
    rocksdb::UserCollectedProperties ret;
    ret["2"] = "3";
    return ret;
  }
};

class TestPropertiesCollectorFactory : public TablePropertiesCollectorFactory {
 public:
  virtual TablePropertiesCollector* CreateTablePropertiesCollector(
      TablePropertiesCollectorFactory::Context context) override {
    return new TestPropertiesCollector;
  }
  const char* Name() const override { return "TestTablePropertiesCollector"; }
};

O
Ori Bernstein 已提交
189 190
class TestCompactionListener : public EventListener {
 public:
191
  void OnCompactionCompleted(DB *db, const CompactionJobInfo& ci) override {
192
    std::lock_guard<std::mutex> lock(mutex_);
O
Ori Bernstein 已提交
193
    compacted_dbs_.push_back(db);
194 195
    ASSERT_GT(ci.input_files.size(), 0U);
    ASSERT_GT(ci.output_files.size(), 0U);
196 197
    ASSERT_EQ(db->GetEnv()->GetThreadID(), ci.thread_id);
    ASSERT_GT(ci.thread_id, 0U);
198 199 200 201 202 203 204 205 206 207

    for (auto fl : {ci.input_files, ci.output_files}) {
      for (auto fn : fl) {
        auto it = ci.table_properties.find(fn);
        ASSERT_NE(it, ci.table_properties.end());
        auto tp = it->second;
        ASSERT_TRUE(tp != nullptr);
        ASSERT_EQ(tp->user_collected_properties.find("0")->second, "1");
      }
    }
O
Ori Bernstein 已提交
208 209 210
  }

  std::vector<DB*> compacted_dbs_;
211
  std::mutex mutex_;
O
Ori Bernstein 已提交
212 213
};

I
Igor Sugak 已提交
214
TEST_F(EventListenerTest, OnSingleDBCompactionTest) {
O
Ori Bernstein 已提交
215 216 217 218 219 220 221 222 223 224 225 226 227 228
  const int kTestKeySize = 16;
  const int kTestValueSize = 984;
  const int kEntrySize = kTestKeySize + kTestValueSize;
  const int kEntriesPerBuffer = 100;
  const int kNumL0Files = 4;

  Options options;
  options.create_if_missing = true;
  options.write_buffer_size = kEntrySize * kEntriesPerBuffer;
  options.compaction_style = kCompactionStyleLevel;
  options.target_file_size_base = options.write_buffer_size;
  options.max_bytes_for_level_base = options.target_file_size_base * 2;
  options.max_bytes_for_level_multiplier = 2;
  options.compression = kNoCompression;
229
#if ROCKSDB_USING_THREAD_STATUS
O
Ori Bernstein 已提交
230
  options.enable_thread_tracking = true;
231
#endif  // ROCKSDB_USING_THREAD_STATUS
O
Ori Bernstein 已提交
232
  options.level0_file_num_compaction_trigger = kNumL0Files;
233 234
  options.table_properties_collector_factories.push_back(
      std::make_shared<TestPropertiesCollectorFactory>());
O
Ori Bernstein 已提交
235 236 237 238 239 240 241 242 243 244 245 246 247 248 249

  TestCompactionListener* listener = new TestCompactionListener();
  options.listeners.emplace_back(listener);
  std::vector<std::string> cf_names = {
      "pikachu", "ilya", "muromec", "dobrynia",
      "nikitich", "alyosha", "popovich"};
  CreateAndReopenWithCF(cf_names, &options);
  ASSERT_OK(Put(1, "pikachu", std::string(90000, 'p')));
  ASSERT_OK(Put(2, "ilya", std::string(90000, 'i')));
  ASSERT_OK(Put(3, "muromec", std::string(90000, 'm')));
  ASSERT_OK(Put(4, "dobrynia", std::string(90000, 'd')));
  ASSERT_OK(Put(5, "nikitich", std::string(90000, 'n')));
  ASSERT_OK(Put(6, "alyosha", std::string(90000, 'a')));
  ASSERT_OK(Put(7, "popovich", std::string(90000, 'p')));
  for (size_t i = 1; i < 8; ++i) {
250
    ASSERT_OK(Flush(i));
O
Ori Bernstein 已提交
251 252
    const Slice kStart = "a";
    const Slice kEnd = "z";
253 254
    ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[i],
                                     &kStart, &kEnd));
O
Ori Bernstein 已提交
255 256 257 258 259 260 261 262 263 264
    dbfull()->TEST_WaitForFlushMemTable();
    dbfull()->TEST_WaitForCompact();
  }

  ASSERT_EQ(listener->compacted_dbs_.size(), cf_names.size());
  for (size_t i = 0; i < cf_names.size(); ++i) {
    ASSERT_EQ(listener->compacted_dbs_[i], db_);
  }
}

265
// This simple Listener can only handle one flush at a time.
266 267
class TestFlushListener : public EventListener {
 public:
S
sdong 已提交
268 269
  explicit TestFlushListener(Env* env)
      : slowdown_count(0), stop_count(0), db_closed(), env_(env) {
D
Dmitri Smirnov 已提交
270 271
    db_closed = false;
  }
272
  void OnTableFileCreated(
273
      const TableFileCreationInfo& info) override {
274 275 276 277 278 279
    // remember the info for later checking the FlushJobInfo.
    prev_fc_info_ = info;
    ASSERT_GT(info.db_name.size(), 0U);
    ASSERT_GT(info.cf_name.size(), 0U);
    ASSERT_GT(info.file_path.size(), 0U);
    ASSERT_GT(info.job_id, 0);
280 281 282 283 284
    ASSERT_GT(info.table_properties.data_size, 0U);
    ASSERT_GT(info.table_properties.raw_key_size, 0U);
    ASSERT_GT(info.table_properties.raw_value_size, 0U);
    ASSERT_GT(info.table_properties.num_data_blocks, 0U);
    ASSERT_GT(info.table_properties.num_entries, 0U);
285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303

#if ROCKSDB_USING_THREAD_STATUS
    // Verify the id of the current thread that created this table
    // file matches the id of any active flush or compaction thread.
    uint64_t thread_id = env_->GetThreadID();
    std::vector<ThreadStatus> thread_list;
    ASSERT_OK(env_->GetThreadList(&thread_list));
    bool found_match = false;
    for (auto thread_status : thread_list) {
      if (thread_status.operation_type == ThreadStatus::OP_FLUSH ||
          thread_status.operation_type == ThreadStatus::OP_COMPACTION) {
        if (thread_id == thread_status.thread_id) {
          found_match = true;
          break;
        }
      }
    }
    ASSERT_TRUE(found_match);
#endif  // ROCKSDB_USING_THREAD_STATUS
304 305
  }

306
  void OnFlushCompleted(
307
      DB* db, const FlushJobInfo& info) override {
308
    flushed_dbs_.push_back(db);
309 310
    flushed_column_family_names_.push_back(info.cf_name);
    if (info.triggered_writes_slowdown) {
311 312
      slowdown_count++;
    }
313
    if (info.triggered_writes_stop) {
314 315
      stop_count++;
    }
316 317 318 319 320
    // verify whether the previously created file matches the flushed file.
    ASSERT_EQ(prev_fc_info_.db_name, db->GetName());
    ASSERT_EQ(prev_fc_info_.cf_name, info.cf_name);
    ASSERT_EQ(prev_fc_info_.job_id, info.job_id);
    ASSERT_EQ(prev_fc_info_.file_path, info.file_path);
321 322
    ASSERT_EQ(db->GetEnv()->GetThreadID(), info.thread_id);
    ASSERT_GT(info.thread_id, 0U);
323 324
    ASSERT_EQ(info.table_properties.user_collected_properties.find("0")->second,
              "1");
325 326 327 328 329 330
  }

  std::vector<std::string> flushed_column_family_names_;
  std::vector<DB*> flushed_dbs_;
  int slowdown_count;
  int stop_count;
331 332
  bool db_closing;
  std::atomic_bool db_closed;
333
  TableFileCreationInfo prev_fc_info_;
334 335 336

 protected:
  Env* env_;
337 338
};

I
Igor Sugak 已提交
339
TEST_F(EventListenerTest, OnSingleDBFlushTest) {
340
  Options options;
341
  options.write_buffer_size = k110KB;
342 343 344 345
#if ROCKSDB_USING_THREAD_STATUS
  options.enable_thread_tracking = true;
#endif  // ROCKSDB_USING_THREAD_STATUS
  TestFlushListener* listener = new TestFlushListener(options.env);
346 347 348 349
  options.listeners.emplace_back(listener);
  std::vector<std::string> cf_names = {
      "pikachu", "ilya", "muromec", "dobrynia",
      "nikitich", "alyosha", "popovich"};
350 351
  options.table_properties_collector_factories.push_back(
      std::make_shared<TestPropertiesCollectorFactory>());
352 353
  CreateAndReopenWithCF(cf_names, &options);

354 355 356 357 358 359 360
  ASSERT_OK(Put(1, "pikachu", std::string(90000, 'p')));
  ASSERT_OK(Put(2, "ilya", std::string(90000, 'i')));
  ASSERT_OK(Put(3, "muromec", std::string(90000, 'm')));
  ASSERT_OK(Put(4, "dobrynia", std::string(90000, 'd')));
  ASSERT_OK(Put(5, "nikitich", std::string(90000, 'n')));
  ASSERT_OK(Put(6, "alyosha", std::string(90000, 'a')));
  ASSERT_OK(Put(7, "popovich", std::string(90000, 'p')));
361
  for (size_t i = 1; i < 8; ++i) {
362
    ASSERT_OK(Flush(i));
363 364 365 366 367 368 369 370 371 372 373 374
    dbfull()->TEST_WaitForFlushMemTable();
    ASSERT_EQ(listener->flushed_dbs_.size(), i);
    ASSERT_EQ(listener->flushed_column_family_names_.size(), i);
  }

  // make sure call-back functions are called in the right order
  for (size_t i = 0; i < cf_names.size(); ++i) {
    ASSERT_EQ(listener->flushed_dbs_[i], db_);
    ASSERT_EQ(listener->flushed_column_family_names_[i], cf_names[i]);
  }
}

I
Igor Sugak 已提交
375
TEST_F(EventListenerTest, MultiCF) {
376
  Options options;
377
  options.write_buffer_size = k110KB;
378 379 380 381
#if ROCKSDB_USING_THREAD_STATUS
  options.enable_thread_tracking = true;
#endif  // ROCKSDB_USING_THREAD_STATUS
  TestFlushListener* listener = new TestFlushListener(options.env);
382
  options.listeners.emplace_back(listener);
383 384
  options.table_properties_collector_factories.push_back(
      std::make_shared<TestPropertiesCollectorFactory>());
385 386 387 388 389
  std::vector<std::string> cf_names = {
      "pikachu", "ilya", "muromec", "dobrynia",
      "nikitich", "alyosha", "popovich"};
  CreateAndReopenWithCF(cf_names, &options);

390 391 392 393 394 395 396
  ASSERT_OK(Put(1, "pikachu", std::string(90000, 'p')));
  ASSERT_OK(Put(2, "ilya", std::string(90000, 'i')));
  ASSERT_OK(Put(3, "muromec", std::string(90000, 'm')));
  ASSERT_OK(Put(4, "dobrynia", std::string(90000, 'd')));
  ASSERT_OK(Put(5, "nikitich", std::string(90000, 'n')));
  ASSERT_OK(Put(6, "alyosha", std::string(90000, 'a')));
  ASSERT_OK(Put(7, "popovich", std::string(90000, 'p')));
397
  for (size_t i = 1; i < 8; ++i) {
398
    ASSERT_OK(Flush(i));
399 400 401 402 403 404 405 406 407 408 409
    ASSERT_EQ(listener->flushed_dbs_.size(), i);
    ASSERT_EQ(listener->flushed_column_family_names_.size(), i);
  }

  // make sure call-back functions are called in the right order
  for (size_t i = 0; i < cf_names.size(); i++) {
    ASSERT_EQ(listener->flushed_dbs_[i], db_);
    ASSERT_EQ(listener->flushed_column_family_names_[i], cf_names[i]);
  }
}

I
Igor Sugak 已提交
410
TEST_F(EventListenerTest, MultiDBMultiListeners) {
411 412 413 414
  Options options;
#if ROCKSDB_USING_THREAD_STATUS
  options.enable_thread_tracking = true;
#endif  // ROCKSDB_USING_THREAD_STATUS
415 416
  options.table_properties_collector_factories.push_back(
      std::make_shared<TestPropertiesCollectorFactory>());
417 418 419 420
  std::vector<TestFlushListener*> listeners;
  const int kNumDBs = 5;
  const int kNumListeners = 10;
  for (int i = 0; i < kNumListeners; ++i) {
421
    listeners.emplace_back(new TestFlushListener(options.env));
422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438
  }

  std::vector<std::string> cf_names = {
      "pikachu", "ilya", "muromec", "dobrynia",
      "nikitich", "alyosha", "popovich"};

  options.create_if_missing = true;
  for (int i = 0; i < kNumListeners; ++i) {
    options.listeners.emplace_back(listeners[i]);
  }
  DBOptions db_opts(options);
  ColumnFamilyOptions cf_opts(options);

  std::vector<DB*> dbs;
  std::vector<std::vector<ColumnFamilyHandle *>> vec_handles;

  for (int d = 0; d < kNumDBs; ++d) {
439
    ASSERT_OK(DestroyDB(dbname_ + ToString(d), options));
440 441
    DB* db;
    std::vector<ColumnFamilyHandle*> handles;
442
    ASSERT_OK(DB::Open(options, dbname_ + ToString(d), &db));
443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477
    for (size_t c = 0; c < cf_names.size(); ++c) {
      ColumnFamilyHandle* handle;
      db->CreateColumnFamily(cf_opts, cf_names[c], &handle);
      handles.push_back(handle);
    }

    vec_handles.push_back(std::move(handles));
    dbs.push_back(db);
  }

  for (int d = 0; d < kNumDBs; ++d) {
    for (size_t c = 0; c < cf_names.size(); ++c) {
      ASSERT_OK(dbs[d]->Put(WriteOptions(), vec_handles[d][c],
                cf_names[c], cf_names[c]));
    }
  }

  for (size_t c = 0; c < cf_names.size(); ++c) {
    for (int d = 0; d < kNumDBs; ++d) {
      ASSERT_OK(dbs[d]->Flush(FlushOptions(), vec_handles[d][c]));
      reinterpret_cast<DBImpl*>(dbs[d])->TEST_WaitForFlushMemTable();
    }
  }

  for (auto* listener : listeners) {
    int pos = 0;
    for (size_t c = 0; c < cf_names.size(); ++c) {
      for (int d = 0; d < kNumDBs; ++d) {
        ASSERT_EQ(listener->flushed_dbs_[pos], dbs[d]);
        ASSERT_EQ(listener->flushed_column_family_names_[pos], cf_names[c]);
        pos++;
      }
    }
  }

478

479 480 481 482 483 484 485 486 487 488 489 490 491
  for (auto handles : vec_handles) {
    for (auto h : handles) {
      delete h;
    }
    handles.clear();
  }
  vec_handles.clear();

  for (auto db : dbs) {
    delete db;
  }
}

I
Igor Sugak 已提交
492
TEST_F(EventListenerTest, DisableBGCompaction) {
493
  Options options;
494 495 496 497
#if ROCKSDB_USING_THREAD_STATUS
  options.enable_thread_tracking = true;
#endif  // ROCKSDB_USING_THREAD_STATUS
  TestFlushListener* listener = new TestFlushListener(options.env);
498
  const int kCompactionTrigger = 1;
499
  const int kSlowdownTrigger = 5;
500 501
  const int kStopTrigger = 100;
  options.level0_file_num_compaction_trigger = kCompactionTrigger;
502 503
  options.level0_slowdown_writes_trigger = kSlowdownTrigger;
  options.level0_stop_writes_trigger = kStopTrigger;
504
  options.max_write_buffer_number = 10;
505 506 507 508 509 510
  options.listeners.emplace_back(listener);
  // BG compaction is disabled.  Number of L0 files will simply keeps
  // increasing in this test.
  options.compaction_style = kCompactionStyleNone;
  options.compression = kNoCompression;
  options.write_buffer_size = 100000;  // Small write buffer
511 512
  options.table_properties_collector_factories.push_back(
      std::make_shared<TestPropertiesCollectorFactory>());
513 514 515 516

  CreateAndReopenWithCF({"pikachu"}, &options);
  ColumnFamilyMetaData cf_meta;
  db_->GetColumnFamilyMetaData(handles_[1], &cf_meta);
517

518
  // keep writing until writes are forced to stop.
519 520 521 522
  for (int i = 0; static_cast<int>(cf_meta.file_count) < kSlowdownTrigger * 10;
       ++i) {
    Put(1, ToString(i), std::string(10000, 'x'), WriteOptions());
    db_->Flush(FlushOptions(), handles_[1]);
523 524
    db_->GetColumnFamilyMetaData(handles_[1], &cf_meta);
  }
525
  ASSERT_GE(listener->slowdown_count, kSlowdownTrigger * 9);
526 527 528 529 530 531 532
}

}  // namespace rocksdb

#endif  // ROCKSDB_LITE

int main(int argc, char** argv) {
I
Igor Sugak 已提交
533 534
  ::testing::InitGoogleTest(&argc, argv);
  return RUN_ALL_TESTS();
535 536
}