db_impl_write.cc 74.2 KB
Newer Older
S
Siying Dong 已提交
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
S
Siying Dong 已提交
2 3 4
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).
S
Siying Dong 已提交
5 6 7 8
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
#include <cinttypes>
10 11

#include "db/db_impl/db_impl.h"
12
#include "db/error_handler.h"
13
#include "db/event_helpers.h"
14
#include "monitoring/perf_context_imp.h"
S
Siying Dong 已提交
15
#include "options/options_helper.h"
16
#include "test_util/sync_point.h"
17
#include "util/cast_util.h"
S
Siying Dong 已提交
18

19
namespace ROCKSDB_NAMESPACE {
S
Siying Dong 已提交
20 21 22 23 24 25 26 27
// Convenience methods
Status DBImpl::Put(const WriteOptions& o, ColumnFamilyHandle* column_family,
                   const Slice& key, const Slice& val) {
  return DB::Put(o, column_family, key, val);
}

Status DBImpl::Merge(const WriteOptions& o, ColumnFamilyHandle* column_family,
                     const Slice& key, const Slice& val) {
28
  auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
S
Siying Dong 已提交
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
  if (!cfh->cfd()->ioptions()->merge_operator) {
    return Status::NotSupported("Provide a merge_operator when opening DB");
  } else {
    return DB::Merge(o, column_family, key, val);
  }
}

Status DBImpl::Delete(const WriteOptions& write_options,
                      ColumnFamilyHandle* column_family, const Slice& key) {
  return DB::Delete(write_options, column_family, key);
}

Status DBImpl::SingleDelete(const WriteOptions& write_options,
                            ColumnFamilyHandle* column_family,
                            const Slice& key) {
  return DB::SingleDelete(write_options, column_family, key);
}

47 48 49 50 51
void DBImpl::SetRecoverableStatePreReleaseCallback(
    PreReleaseCallback* callback) {
  recoverable_state_pre_release_callback_.reset(callback);
}

S
Siying Dong 已提交
52 53 54 55 56 57 58 59 60 61 62 63
Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch) {
  return WriteImpl(write_options, my_batch, nullptr, nullptr);
}

#ifndef ROCKSDB_LITE
Status DBImpl::WriteWithCallback(const WriteOptions& write_options,
                                 WriteBatch* my_batch,
                                 WriteCallback* callback) {
  return WriteImpl(write_options, my_batch, callback, nullptr);
}
#endif  // ROCKSDB_LITE

64 65 66
// The main write queue. This is the only write queue that updates LastSequence.
// When using one write queue, the same sequence also indicates the last
// published sequence.
S
Siying Dong 已提交
67 68 69
Status DBImpl::WriteImpl(const WriteOptions& write_options,
                         WriteBatch* my_batch, WriteCallback* callback,
                         uint64_t* log_used, uint64_t log_ref,
70
                         bool disable_memtable, uint64_t* seq_used,
71
                         size_t batch_cnt,
72
                         PreReleaseCallback* pre_release_callback) {
73
  assert(!seq_per_batch_ || batch_cnt != 0);
S
Siying Dong 已提交
74 75 76
  if (my_batch == nullptr) {
    return Status::Corruption("Batch is nullptr!");
  }
77 78 79
  if (tracer_) {
    InstrumentedMutexLock lock(&trace_mutex_);
    if (tracer_) {
80 81
      // TODO: maybe handle the tracing status?
      tracer_->Write(my_batch).PermitUncheckedError();
82 83
    }
  }
84 85 86
  if (write_options.sync && write_options.disableWAL) {
    return Status::InvalidArgument("Sync writes has to enable WAL.");
  }
87
  if (two_write_queues_ && immutable_db_options_.enable_pipelined_write) {
88 89 90
    return Status::NotSupported(
        "pipelined_writes is not compatible with concurrent prepares");
  }
91
  if (seq_per_batch_ && immutable_db_options_.enable_pipelined_write) {
92
    // TODO(yiwu): update pipeline write with seq_per_batch and batch_cnt
93 94 95
    return Status::NotSupported(
        "pipelined_writes is not compatible with seq_per_batch");
  }
M
Maysam Yabandeh 已提交
96 97 98 99 100
  if (immutable_db_options_.unordered_write &&
      immutable_db_options_.enable_pipelined_write) {
    return Status::NotSupported(
        "pipelined_writes is not compatible with unordered_write");
  }
101 102 103
  // Otherwise IsLatestPersistentState optimization does not make sense
  assert(!WriteBatchInternal::IsLatestPersistentState(my_batch) ||
         disable_memtable);
S
Siying Dong 已提交
104

105
  if (write_options.low_pri) {
106 107 108
    Status s = ThrottleLowPriWritesIfNeeded(write_options, my_batch);
    if (!s.ok()) {
      return s;
109 110 111
    }
  }

112
  if (two_write_queues_ && disable_memtable) {
M
Maysam Yabandeh 已提交
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
    AssignOrder assign_order =
        seq_per_batch_ ? kDoAssignOrder : kDontAssignOrder;
    // Otherwise it is WAL-only Prepare batches in WriteCommitted policy and
    // they don't consume sequence.
    return WriteImplWALOnly(&nonmem_write_thread_, write_options, my_batch,
                            callback, log_used, log_ref, seq_used, batch_cnt,
                            pre_release_callback, assign_order,
                            kDontPublishLastSeq, disable_memtable);
  }

  if (immutable_db_options_.unordered_write) {
    const size_t sub_batch_cnt = batch_cnt != 0
                                     ? batch_cnt
                                     // every key is a sub-batch consuming a seq
                                     : WriteBatchInternal::Count(my_batch);
Y
Yanqin Jin 已提交
128
    uint64_t seq = 0;
M
Maysam Yabandeh 已提交
129 130
    // Use a write thread to i) optimize for WAL write, ii) publish last
    // sequence in in increasing order, iii) call pre_release_callback serially
131 132 133 134
    Status status = WriteImplWALOnly(
        &write_thread_, write_options, my_batch, callback, log_used, log_ref,
        &seq, sub_batch_cnt, pre_release_callback, kDoAssignOrder,
        kDoPublishLastSeq, disable_memtable);
135
    TEST_SYNC_POINT("DBImpl::WriteImpl:UnorderedWriteAfterWriteWAL");
M
Maysam Yabandeh 已提交
136 137 138 139 140 141 142
    if (!status.ok()) {
      return status;
    }
    if (seq_used) {
      *seq_used = seq;
    }
    if (!disable_memtable) {
143
      TEST_SYNC_POINT("DBImpl::WriteImpl:BeforeUnorderedWriteMemtable");
M
Maysam Yabandeh 已提交
144 145 146 147
      status = UnorderedWriteMemtable(write_options, my_batch, callback,
                                      log_ref, seq, sub_batch_cnt);
    }
    return status;
148 149
  }

150 151
  if (immutable_db_options_.enable_pipelined_write) {
    return PipelinedWriteImpl(write_options, my_batch, callback, log_used,
152
                              log_ref, disable_memtable, seq_used);
153 154
  }

S
Siying Dong 已提交
155 156
  PERF_TIMER_GUARD(write_pre_and_post_process_time);
  WriteThread::Writer w(write_options, my_batch, callback, log_ref,
157
                        disable_memtable, batch_cnt, pre_release_callback);
S
Siying Dong 已提交
158 159 160 161 162 163 164 165

  if (!write_options.disableWAL) {
    RecordTick(stats_, WRITE_WITH_WAL);
  }

  StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE);

  write_thread_.JoinBatchGroup(&w);
166
  if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) {
S
Siying Dong 已提交
167 168 169
    // we are a non-leader in a parallel group

    if (w.ShouldWriteToMemtable()) {
170 171 172
      PERF_TIMER_STOP(write_pre_and_post_process_time);
      PERF_TIMER_GUARD(write_memtable_time);

S
Siying Dong 已提交
173 174 175
      ColumnFamilyMemTablesImpl column_family_memtables(
          versions_->GetColumnFamilySet());
      w.status = WriteBatchInternal::InsertInto(
Y
Yi Wu 已提交
176
          &w, w.sequence, &column_family_memtables, &flush_scheduler_,
177
          &trim_history_scheduler_,
S
Siying Dong 已提交
178
          write_options.ignore_missing_column_families, 0 /*log_number*/, this,
179
          true /*concurrent_memtable_writes*/, seq_per_batch_, w.batch_cnt,
180
          batch_per_txn_, write_options.memtable_insert_hint_per_batch);
181 182

      PERF_TIMER_START(write_pre_and_post_process_time);
S
Siying Dong 已提交
183 184
    }

185
    if (write_thread_.CompleteParallelMemTableWriter(&w)) {
Y
Yi Wu 已提交
186
      // we're responsible for exit batch group
187
      // TODO(myabandeh): propagate status to write_group
188
      auto last_sequence = w.write_group->last_sequence;
S
Siying Dong 已提交
189
      versions_->SetLastSequence(last_sequence);
190
      MemTableInsertStatusCheck(w.status);
Y
Yi Wu 已提交
191
      write_thread_.ExitAsBatchGroupFollower(&w);
S
Siying Dong 已提交
192 193 194 195 196 197 198 199
    }
    assert(w.state == WriteThread::STATE_COMPLETED);
    // STATE_COMPLETED conditional below handles exit
  }
  if (w.state == WriteThread::STATE_COMPLETED) {
    if (log_used != nullptr) {
      *log_used = w.log_used;
    }
200 201 202
    if (seq_used != nullptr) {
      *seq_used = w.sequence;
    }
S
Siying Dong 已提交
203 204 205 206 207
    // write is complete and leader has updated sequence
    return w.FinalStatus();
  }
  // else we are the leader of the write batch group
  assert(w.state == WriteThread::STATE_GROUP_LEADER);
208
  Status status;
S
Siying Dong 已提交
209 210 211 212 213
  // Once reaches this point, the current writer "w" will try to do its write
  // job.  It may also pick up some of the remaining writers in the "writers_"
  // when it finds suitable, and finish them in the same write batch.
  // This is how a write job could be done by the other writer.
  WriteContext write_context;
214
  WriteThread::WriteGroup write_group;
Y
Yi Wu 已提交
215
  bool in_parallel_group = false;
S
Siying Dong 已提交
216
  uint64_t last_sequence = kMaxSequenceNumber;
S
Siying Dong 已提交
217 218 219

  mutex_.Lock();

220
  bool need_log_sync = write_options.sync;
S
Siying Dong 已提交
221
  bool need_log_dir_sync = need_log_sync && !log_dir_synced_;
222
  if (!two_write_queues_ || !disable_memtable) {
223 224 225
    // With concurrent writes we do preprocess only in the write thread that
    // also does write to memtable to avoid sync issue on shared data structure
    // with the other thread
226 227 228 229

    // PreprocessWrite does its own perf timing.
    PERF_TIMER_STOP(write_pre_and_post_process_time);

230
    status = PreprocessWrite(write_options, &need_log_sync, &write_context);
231 232 233 234 235
    if (!two_write_queues_) {
      // Assign it after ::PreprocessWrite since the sequence might advance
      // inside it by WriteRecoverableState
      last_sequence = versions_->LastSequence();
    }
236 237

    PERF_TIMER_START(write_pre_and_post_process_time);
238 239
  }
  log::Writer* log_writer = logs_.back().writer;
S
Siying Dong 已提交
240 241 242 243 244 245 246 247

  mutex_.Unlock();

  // Add to log and apply to memtable.  We can release the lock
  // during this phase since &w is currently responsible for logging
  // and protects against concurrent loggers and concurrent writes
  // into memtables

248
  TEST_SYNC_POINT("DBImpl::WriteImpl:BeforeLeaderEnters");
S
Siying Dong 已提交
249
  last_batch_group_size_ =
250
      write_thread_.EnterAsBatchGroupLeader(&w, &write_group);
S
Siying Dong 已提交
251

252
  IOStatus io_s;
S
Siying Dong 已提交
253 254 255 256
  if (status.ok()) {
    // Rules for when we can update the memtable concurrently
    // 1. supported by memtable
    // 2. Puts are not okay if inplace_update_support
257
    // 3. Merges are not okay
S
Siying Dong 已提交
258
    //
259
    // Rules 1..2 are enforced by checking the options
S
Siying Dong 已提交
260 261
    // during startup (CheckConcurrentWritesSupported), so if
    // options.allow_concurrent_memtable_write is true then they can be
262 263
    // assumed to be true.  Rule 3 is checked for each batch.  We could
    // relax rules 2 if we could prevent write batches from referring
S
Siying Dong 已提交
264 265
    // more than once to a particular key.
    bool parallel = immutable_db_options_.allow_concurrent_memtable_write &&
266
                    write_group.size > 1;
267
    size_t total_count = 0;
268
    size_t valid_batches = 0;
269
    size_t total_byte_size = 0;
270
    size_t pre_release_callback_cnt = 0;
271
    for (auto* writer : write_group) {
S
Siying Dong 已提交
272
      if (writer->CheckCallback(this)) {
273
        valid_batches += writer->batch_cnt;
S
Siying Dong 已提交
274 275 276 277
        if (writer->ShouldWriteToMemtable()) {
          total_count += WriteBatchInternal::Count(writer->batch);
          parallel = parallel && !writer->batch->HasMerge();
        }
I
Igor Canadi 已提交
278 279
        total_byte_size = WriteBatchInternal::AppendedByteSize(
            total_byte_size, WriteBatchInternal::ByteSize(writer->batch));
280 281 282
        if (writer->pre_release_callback) {
          pre_release_callback_cnt++;
        }
S
Siying Dong 已提交
283 284
      }
    }
285 286 287 288 289 290 291
    // Note about seq_per_batch_: either disableWAL is set for the entire write
    // group or not. In either case we inc seq for each write batch with no
    // failed callback. This means that there could be a batch with
    // disalbe_memtable in between; although we do not write this batch to
    // memtable it still consumes a seq. Otherwise, if !seq_per_batch_, we inc
    // the seq per valid written key to mem.
    size_t seq_inc = seq_per_batch_ ? valid_batches : total_count;
S
Siying Dong 已提交
292

293
    const bool concurrent_update = two_write_queues_;
S
Siying Dong 已提交
294 295 296
    // Update stats while we are an exclusive group leader, so we know
    // that nobody else can be writing to these particular stats.
    // We're optimistic, updating the stats before we successfully
Y
Yi Wu 已提交
297
    // commit.  That lets us release our leader status early.
S
Siying Dong 已提交
298
    auto stats = default_cf_internal_stats_;
299
    stats->AddDBStats(InternalStats::kIntStatsNumKeysWritten, total_count,
300
                      concurrent_update);
S
Siying Dong 已提交
301
    RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count);
302
    stats->AddDBStats(InternalStats::kIntStatsBytesWritten, total_byte_size,
303
                      concurrent_update);
S
Siying Dong 已提交
304
    RecordTick(stats_, BYTES_WRITTEN, total_byte_size);
305 306
    stats->AddDBStats(InternalStats::kIntStatsWriteDoneBySelf, 1,
                      concurrent_update);
S
Siying Dong 已提交
307
    RecordTick(stats_, WRITE_DONE_BY_SELF);
308
    auto write_done_by_other = write_group.size - 1;
S
Siying Dong 已提交
309
    if (write_done_by_other > 0) {
310 311
      stats->AddDBStats(InternalStats::kIntStatsWriteDoneByOther,
                        write_done_by_other, concurrent_update);
S
Siying Dong 已提交
312 313
      RecordTick(stats_, WRITE_DONE_BY_OTHER, write_done_by_other);
    }
S
Siying Dong 已提交
314
    RecordInHistogram(stats_, BYTES_PER_WRITE, total_byte_size);
S
Siying Dong 已提交
315 316 317 318 319 320 321

    if (write_options.disableWAL) {
      has_unpersisted_data_.store(true, std::memory_order_relaxed);
    }

    PERF_TIMER_STOP(write_pre_and_post_process_time);

322
    if (!two_write_queues_) {
323 324
      if (status.ok() && !write_options.disableWAL) {
        PERF_TIMER_GUARD(write_wal_time);
325 326
        io_s = WriteToWAL(write_group, log_writer, log_used, need_log_sync,
                          need_log_dir_sync, last_sequence + 1);
327 328 329 330
      }
    } else {
      if (status.ok() && !write_options.disableWAL) {
        PERF_TIMER_GUARD(write_wal_time);
331
        // LastAllocatedSequence is increased inside WriteToWAL under
332
        // wal_write_mutex_ to ensure ordered events in WAL
333 334
        io_s = ConcurrentWriteToWAL(write_group, log_used, &last_sequence,
                                    seq_inc);
335 336
      } else {
        // Otherwise we inc seq number for memtable writes
337
        last_sequence = versions_->FetchAddLastAllocatedSequence(seq_inc);
S
Siying Dong 已提交
338 339
      }
    }
340
    status = io_s;
S
Siying Dong 已提交
341
    assert(last_sequence != kMaxSequenceNumber);
342
    const SequenceNumber current_sequence = last_sequence + 1;
343
    last_sequence += seq_inc;
S
Siying Dong 已提交
344

345 346 347
    // PreReleaseCallback is called after WAL write and before memtable write
    if (status.ok()) {
      SequenceNumber next_sequence = current_sequence;
348
      size_t index = 0;
349 350 351 352 353 354 355 356 357 358
      // Note: the logic for advancing seq here must be consistent with the
      // logic in WriteBatchInternal::InsertInto(write_group...) as well as
      // with WriteBatchInternal::InsertInto(write_batch...) that is called on
      // the merged batch during recovery from the WAL.
      for (auto* writer : write_group) {
        if (writer->CallbackFailed()) {
          continue;
        }
        writer->sequence = next_sequence;
        if (writer->pre_release_callback) {
359
          Status ws = writer->pre_release_callback->Callback(
360 361
              writer->sequence, disable_memtable, writer->log_used, index++,
              pre_release_callback_cnt);
362 363 364 365 366 367 368 369 370 371 372 373 374 375
          if (!ws.ok()) {
            status = ws;
            break;
          }
        }
        if (seq_per_batch_) {
          assert(writer->batch_cnt);
          next_sequence += writer->batch_cnt;
        } else if (writer->ShouldWriteToMemtable()) {
          next_sequence += WriteBatchInternal::Count(writer->batch);
        }
      }
    }

S
Siying Dong 已提交
376 377 378 379
    if (status.ok()) {
      PERF_TIMER_GUARD(write_memtable_time);

      if (!parallel) {
380
        // w.sequence will be set inside InsertInto
Y
Yi Wu 已提交
381
        w.status = WriteBatchInternal::InsertInto(
S
Siying Dong 已提交
382
            write_group, current_sequence, column_family_memtables_.get(),
383 384
            &flush_scheduler_, &trim_history_scheduler_,
            write_options.ignore_missing_column_families,
385 386
            0 /*recovery_log_number*/, this, parallel, seq_per_batch_,
            batch_per_txn_);
S
Siying Dong 已提交
387
      } else {
388 389
        write_group.last_sequence = last_sequence;
        write_thread_.LaunchParallelMemTableWriters(&write_group);
Y
Yi Wu 已提交
390
        in_parallel_group = true;
S
Siying Dong 已提交
391

392 393
        // Each parallel follower is doing each own writes. The leader should
        // also do its own.
S
Siying Dong 已提交
394 395 396 397 398
        if (w.ShouldWriteToMemtable()) {
          ColumnFamilyMemTablesImpl column_family_memtables(
              versions_->GetColumnFamilySet());
          assert(w.sequence == current_sequence);
          w.status = WriteBatchInternal::InsertInto(
Y
Yi Wu 已提交
399
              &w, w.sequence, &column_family_memtables, &flush_scheduler_,
400
              &trim_history_scheduler_,
S
Siying Dong 已提交
401
              write_options.ignore_missing_column_families, 0 /*log_number*/,
402
              this, true /*concurrent_memtable_writes*/, seq_per_batch_,
403 404
              w.batch_cnt, batch_per_txn_,
              write_options.memtable_insert_hint_per_batch);
S
Siying Dong 已提交
405
        }
406 407 408
      }
      if (seq_used != nullptr) {
        *seq_used = w.sequence;
S
Siying Dong 已提交
409 410 411 412 413
      }
    }
  }
  PERF_TIMER_START(write_pre_and_post_process_time);

414
  if (!w.CallbackFailed()) {
415 416 417 418 419
    if (!io_s.ok()) {
      IOStatusCheck(io_s);
    } else {
      WriteStatusCheck(status);
    }
S
Siying Dong 已提交
420 421
  }

422
  if (need_log_sync) {
S
Siying Dong 已提交
423
    mutex_.Lock();
424 425 426 427 428
    if (status.ok()) {
      status = MarkLogsSynced(logfile_number_, need_log_dir_sync);
    } else {
      MarkLogsNotSynced(logfile_number_);
    }
S
Siying Dong 已提交
429
    mutex_.Unlock();
430
    // Requesting sync with two_write_queues_ is expected to be very rare. We
431
    // hence provide a simple implementation that is not necessarily efficient.
432
    if (two_write_queues_) {
433 434 435 436 437 438
      if (manual_wal_flush_) {
        status = FlushWAL(true);
      } else {
        status = SyncWAL();
      }
    }
S
Siying Dong 已提交
439 440
  }

Y
Yi Wu 已提交
441 442 443 444
  bool should_exit_batch_group = true;
  if (in_parallel_group) {
    // CompleteParallelWorker returns true if this thread should
    // handle exit, false means somebody else did
445
    should_exit_batch_group = write_thread_.CompleteParallelMemTableWriter(&w);
Y
Yi Wu 已提交
446 447
  }
  if (should_exit_batch_group) {
448
    if (status.ok()) {
449 450
      // Note: if we are to resume after non-OK statuses we need to revisit how
      // we reacts to non-OK statuses here.
451 452
      versions_->SetLastSequence(last_sequence);
    }
453
    MemTableInsertStatusCheck(w.status);
454
    write_thread_.ExitAsBatchGroupLeader(write_group, status);
S
Siying Dong 已提交
455 456
  }

Y
Yi Wu 已提交
457 458 459
  if (status.ok()) {
    status = w.FinalStatus();
  }
S
Siying Dong 已提交
460 461 462
  return status;
}

463 464 465
Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options,
                                  WriteBatch* my_batch, WriteCallback* callback,
                                  uint64_t* log_used, uint64_t log_ref,
466
                                  bool disable_memtable, uint64_t* seq_used) {
467 468 469 470 471 472 473 474
  PERF_TIMER_GUARD(write_pre_and_post_process_time);
  StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE);

  WriteContext write_context;

  WriteThread::Writer w(write_options, my_batch, callback, log_ref,
                        disable_memtable);
  write_thread_.JoinBatchGroup(&w);
475
  TEST_SYNC_POINT("DBImplWrite::PipelinedWriteImpl:AfterJoinBatchGroup");
476 477 478 479 480 481 482 483
  if (w.state == WriteThread::STATE_GROUP_LEADER) {
    WriteThread::WriteGroup wal_write_group;
    if (w.callback && !w.callback->AllowWriteBatching()) {
      write_thread_.WaitForMemTableWriters();
    }
    mutex_.Lock();
    bool need_log_sync = !write_options.disableWAL && write_options.sync;
    bool need_log_dir_sync = need_log_sync && !log_dir_synced_;
484 485
    // PreprocessWrite does its own perf timing.
    PERF_TIMER_STOP(write_pre_and_post_process_time);
486
    w.status = PreprocessWrite(write_options, &need_log_sync, &write_context);
487
    PERF_TIMER_START(write_pre_and_post_process_time);
488
    log::Writer* log_writer = logs_.back().writer;
489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519
    mutex_.Unlock();

    // This can set non-OK status if callback fail.
    last_batch_group_size_ =
        write_thread_.EnterAsBatchGroupLeader(&w, &wal_write_group);
    const SequenceNumber current_sequence =
        write_thread_.UpdateLastSequence(versions_->LastSequence()) + 1;
    size_t total_count = 0;
    size_t total_byte_size = 0;

    if (w.status.ok()) {
      SequenceNumber next_sequence = current_sequence;
      for (auto writer : wal_write_group) {
        if (writer->CheckCallback(this)) {
          if (writer->ShouldWriteToMemtable()) {
            writer->sequence = next_sequence;
            size_t count = WriteBatchInternal::Count(writer->batch);
            next_sequence += count;
            total_count += count;
          }
          total_byte_size = WriteBatchInternal::AppendedByteSize(
              total_byte_size, WriteBatchInternal::ByteSize(writer->batch));
        }
      }
      if (w.disable_wal) {
        has_unpersisted_data_.store(true, std::memory_order_relaxed);
      }
      write_thread_.UpdateLastSequence(current_sequence + total_count - 1);
    }

    auto stats = default_cf_internal_stats_;
520
    stats->AddDBStats(InternalStats::kIntStatsNumKeysWritten, total_count);
521
    RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count);
522
    stats->AddDBStats(InternalStats::kIntStatsBytesWritten, total_byte_size);
523
    RecordTick(stats_, BYTES_WRITTEN, total_byte_size);
S
Siying Dong 已提交
524
    RecordInHistogram(stats_, BYTES_PER_WRITE, total_byte_size);
525 526 527

    PERF_TIMER_STOP(write_pre_and_post_process_time);

528
    IOStatus io_s;
529 530
    io_s.PermitUncheckedError();  // Allow io_s to be uninitialized

531
    if (w.status.ok() && !write_options.disableWAL) {
532
      PERF_TIMER_GUARD(write_wal_time);
533
      stats->AddDBStats(InternalStats::kIntStatsWriteDoneBySelf, 1);
534 535
      RecordTick(stats_, WRITE_DONE_BY_SELF, 1);
      if (wal_write_group.size > 1) {
536
        stats->AddDBStats(InternalStats::kIntStatsWriteDoneByOther,
537 538 539
                          wal_write_group.size - 1);
        RecordTick(stats_, WRITE_DONE_BY_OTHER, wal_write_group.size - 1);
      }
540 541 542
      io_s = WriteToWAL(wal_write_group, log_writer, log_used, need_log_sync,
                        need_log_dir_sync, current_sequence);
      w.status = io_s;
543 544 545
    }

    if (!w.CallbackFailed()) {
546 547 548 549 550
      if (!io_s.ok()) {
        IOStatusCheck(io_s);
      } else {
        WriteStatusCheck(w.status);
      }
551 552 553 554
    }

    if (need_log_sync) {
      mutex_.Lock();
555 556 557 558 559
      if (w.status.ok()) {
        w.status = MarkLogsSynced(logfile_number_, need_log_dir_sync);
      } else {
        MarkLogsNotSynced(logfile_number_);
      }
560 561 562 563 564 565
      mutex_.Unlock();
    }

    write_thread_.ExitAsBatchGroupLeader(wal_write_group, w.status);
  }

566 567 568 569
  // NOTE: the memtable_write_group is declared before the following
  // `if` statement because its lifetime needs to be longer
  // that the inner context  of the `if` as a reference to it
  // may be used further below within the outer _write_thread
570
  WriteThread::WriteGroup memtable_write_group;
571

572 573
  if (w.state == WriteThread::STATE_MEMTABLE_WRITER_LEADER) {
    PERF_TIMER_GUARD(write_memtable_time);
574
    assert(w.ShouldWriteToMemtable());
575 576 577 578 579 580 581
    write_thread_.EnterAsMemTableWriter(&w, &memtable_write_group);
    if (memtable_write_group.size > 1 &&
        immutable_db_options_.allow_concurrent_memtable_write) {
      write_thread_.LaunchParallelMemTableWriters(&memtable_write_group);
    } else {
      memtable_write_group.status = WriteBatchInternal::InsertInto(
          memtable_write_group, w.sequence, column_family_memtables_.get(),
582 583 584
          &flush_scheduler_, &trim_history_scheduler_,
          write_options.ignore_missing_column_families, 0 /*log_number*/, this,
          false /*concurrent_memtable_writes*/, seq_per_batch_, batch_per_txn_);
585 586 587
      versions_->SetLastSequence(memtable_write_group.last_sequence);
      write_thread_.ExitAsMemTableWriter(&w, memtable_write_group);
    }
588 589 590 591
  } else {
    // NOTE: the memtable_write_group is never really used,
    // so we need to set its status to pass ASSERT_STATUS_CHECKED
    memtable_write_group.status.PermitUncheckedError();
592 593 594 595 596 597 598
  }

  if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) {
    assert(w.ShouldWriteToMemtable());
    ColumnFamilyMemTablesImpl column_family_memtables(
        versions_->GetColumnFamilySet());
    w.status = WriteBatchInternal::InsertInto(
Y
Yi Wu 已提交
599
        &w, w.sequence, &column_family_memtables, &flush_scheduler_,
600
        &trim_history_scheduler_, write_options.ignore_missing_column_families,
601 602 603
        0 /*log_number*/, this, true /*concurrent_memtable_writes*/,
        false /*seq_per_batch*/, 0 /*batch_cnt*/, true /*batch_per_txn*/,
        write_options.memtable_insert_hint_per_batch);
604 605 606 607 608 609
    if (write_thread_.CompleteParallelMemTableWriter(&w)) {
      MemTableInsertStatusCheck(w.status);
      versions_->SetLastSequence(w.write_group->last_sequence);
      write_thread_.ExitAsMemTableWriter(&w, *w.write_group);
    }
  }
610 611 612
  if (seq_used != nullptr) {
    *seq_used = w.sequence;
  }
613 614

  assert(w.state == WriteThread::STATE_COMPLETED);
615 616 617
  return w.FinalStatus();
}

M
Maysam Yabandeh 已提交
618 619 620 621 622 623 624 625 626 627 628 629 630 631 632
Status DBImpl::UnorderedWriteMemtable(const WriteOptions& write_options,
                                      WriteBatch* my_batch,
                                      WriteCallback* callback, uint64_t log_ref,
                                      SequenceNumber seq,
                                      const size_t sub_batch_cnt) {
  PERF_TIMER_GUARD(write_pre_and_post_process_time);
  StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE);

  WriteThread::Writer w(write_options, my_batch, callback, log_ref,
                        false /*disable_memtable*/);

  if (w.CheckCallback(this) && w.ShouldWriteToMemtable()) {
    w.sequence = seq;
    size_t total_count = WriteBatchInternal::Count(my_batch);
    InternalStats* stats = default_cf_internal_stats_;
633
    stats->AddDBStats(InternalStats::kIntStatsNumKeysWritten, total_count);
M
Maysam Yabandeh 已提交
634 635 636 637 638 639
    RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count);

    ColumnFamilyMemTablesImpl column_family_memtables(
        versions_->GetColumnFamilySet());
    w.status = WriteBatchInternal::InsertInto(
        &w, w.sequence, &column_family_memtables, &flush_scheduler_,
640 641
        &trim_history_scheduler_, write_options.ignore_missing_column_families,
        0 /*log_number*/, this, true /*concurrent_memtable_writes*/,
642 643
        seq_per_batch_, sub_batch_cnt, true /*batch_per_txn*/,
        write_options.memtable_insert_hint_per_batch);
M
Maysam Yabandeh 已提交
644 645 646 647 648 649 650
    if (write_options.disableWAL) {
      has_unpersisted_data_.store(true, std::memory_order_relaxed);
    }
  }

  size_t pending_cnt = pending_memtable_writes_.fetch_sub(1) - 1;
  if (pending_cnt == 0) {
651 652 653 654 655
    // switch_cv_ waits until pending_memtable_writes_ = 0. Locking its mutex
    // before notify ensures that cv is in waiting state when it is notified
    // thus not missing the update to pending_memtable_writes_ even though it is
    // not modified under the mutex.
    std::lock_guard<std::mutex> lck(switch_mutex_);
M
Maysam Yabandeh 已提交
656 657
    switch_cv_.notify_all();
  }
658
  WriteStatusCheck(w.status);
M
Maysam Yabandeh 已提交
659 660 661 662 663 664 665

  if (!w.FinalStatus().ok()) {
    return w.FinalStatus();
  }
  return Status::OK();
}

666 667 668
// The 2nd write queue. If enabled it will be used only for WAL-only writes.
// This is the only queue that updates LastPublishedSequence which is only
// applicable in a two-queue setting.
M
Maysam Yabandeh 已提交
669 670 671 672 673 674
Status DBImpl::WriteImplWALOnly(
    WriteThread* write_thread, const WriteOptions& write_options,
    WriteBatch* my_batch, WriteCallback* callback, uint64_t* log_used,
    const uint64_t log_ref, uint64_t* seq_used, const size_t sub_batch_cnt,
    PreReleaseCallback* pre_release_callback, const AssignOrder assign_order,
    const PublishLastSeq publish_last_seq, const bool disable_memtable) {
675 676
  PERF_TIMER_GUARD(write_pre_and_post_process_time);
  WriteThread::Writer w(write_options, my_batch, callback, log_ref,
M
Maysam Yabandeh 已提交
677
                        disable_memtable, sub_batch_cnt, pre_release_callback);
678 679
  RecordTick(stats_, WRITE_WITH_WAL);
  StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE);
680

M
Maysam Yabandeh 已提交
681
  write_thread->JoinBatchGroup(&w);
682 683 684 685 686
  assert(w.state != WriteThread::STATE_PARALLEL_MEMTABLE_WRITER);
  if (w.state == WriteThread::STATE_COMPLETED) {
    if (log_used != nullptr) {
      *log_used = w.log_used;
    }
687 688 689
    if (seq_used != nullptr) {
      *seq_used = w.sequence;
    }
690 691 692 693
    return w.FinalStatus();
  }
  // else we are the leader of the write batch group
  assert(w.state == WriteThread::STATE_GROUP_LEADER);
M
Maysam Yabandeh 已提交
694 695

  if (publish_last_seq == kDoPublishLastSeq) {
696 697
    Status status;

M
Maysam Yabandeh 已提交
698 699 700 701 702 703 704 705 706 707 708 709
    // Currently we only use kDoPublishLastSeq in unordered_write
    assert(immutable_db_options_.unordered_write);
    WriteContext write_context;
    if (error_handler_.IsDBStopped()) {
      status = error_handler_.GetBGError();
    }
    // TODO(myabandeh): Make preliminary checks thread-safe so we could do them
    // without paying the cost of obtaining the mutex.
    if (status.ok()) {
      InstrumentedMutexLock l(&mutex_);
      bool need_log_sync = false;
      status = PreprocessWrite(write_options, &need_log_sync, &write_context);
710
      WriteStatusCheckOnLocked(status);
M
Maysam Yabandeh 已提交
711 712 713 714 715 716 717 718 719
    }
    if (!status.ok()) {
      WriteThread::WriteGroup write_group;
      write_thread->EnterAsBatchGroupLeader(&w, &write_group);
      write_thread->ExitAsBatchGroupLeader(write_group, status);
      return status;
    }
  }

720 721
  WriteThread::WriteGroup write_group;
  uint64_t last_sequence;
M
Maysam Yabandeh 已提交
722
  write_thread->EnterAsBatchGroupLeader(&w, &write_group);
723 724 725
  // Note: no need to update last_batch_group_size_ here since the batch writes
  // to WAL only

726
  size_t pre_release_callback_cnt = 0;
727
  size_t total_byte_size = 0;
728 729 730 731
  for (auto* writer : write_group) {
    if (writer->CheckCallback(this)) {
      total_byte_size = WriteBatchInternal::AppendedByteSize(
          total_byte_size, WriteBatchInternal::ByteSize(writer->batch));
732 733 734
      if (writer->pre_release_callback) {
        pre_release_callback_cnt++;
      }
735 736 737
    }
  }

738
  const bool concurrent_update = true;
739 740 741 742 743
  // Update stats while we are an exclusive group leader, so we know
  // that nobody else can be writing to these particular stats.
  // We're optimistic, updating the stats before we successfully
  // commit.  That lets us release our leader status early.
  auto stats = default_cf_internal_stats_;
744
  stats->AddDBStats(InternalStats::kIntStatsBytesWritten, total_byte_size,
745
                    concurrent_update);
746
  RecordTick(stats_, BYTES_WRITTEN, total_byte_size);
747 748
  stats->AddDBStats(InternalStats::kIntStatsWriteDoneBySelf, 1,
                    concurrent_update);
749 750 751
  RecordTick(stats_, WRITE_DONE_BY_SELF);
  auto write_done_by_other = write_group.size - 1;
  if (write_done_by_other > 0) {
752 753
    stats->AddDBStats(InternalStats::kIntStatsWriteDoneByOther,
                      write_done_by_other, concurrent_update);
754 755
    RecordTick(stats_, WRITE_DONE_BY_OTHER, write_done_by_other);
  }
S
Siying Dong 已提交
756
  RecordInHistogram(stats_, BYTES_PER_WRITE, total_byte_size);
757 758 759 760

  PERF_TIMER_STOP(write_pre_and_post_process_time);

  PERF_TIMER_GUARD(write_wal_time);
761
  // LastAllocatedSequence is increased inside WriteToWAL under
762
  // wal_write_mutex_ to ensure ordered events in WAL
763
  size_t seq_inc = 0 /* total_count */;
M
Maysam Yabandeh 已提交
764
  if (assign_order == kDoAssignOrder) {
765 766
    size_t total_batch_cnt = 0;
    for (auto* writer : write_group) {
M
Maysam Yabandeh 已提交
767 768 769 770
      assert(writer->batch_cnt || !seq_per_batch_);
      if (!writer->CallbackFailed()) {
        total_batch_cnt += writer->batch_cnt;
      }
771 772 773
    }
    seq_inc = total_batch_cnt;
  }
774
  Status status;
775
  IOStatus io_s;
776
  io_s.PermitUncheckedError();  // Allow io_s to be uninitialized
M
Maysam Yabandeh 已提交
777
  if (!write_options.disableWAL) {
778 779
    io_s = ConcurrentWriteToWAL(write_group, log_used, &last_sequence, seq_inc);
    status = io_s;
M
Maysam Yabandeh 已提交
780 781 782 783
  } else {
    // Otherwise we inc seq number to do solely the seq allocation
    last_sequence = versions_->FetchAddLastAllocatedSequence(seq_inc);
  }
M
Maysam Yabandeh 已提交
784 785

  size_t memtable_write_cnt = 0;
786 787
  auto curr_seq = last_sequence + 1;
  for (auto* writer : write_group) {
788 789
    if (writer->CallbackFailed()) {
      continue;
790
    }
791
    writer->sequence = curr_seq;
M
Maysam Yabandeh 已提交
792 793
    if (assign_order == kDoAssignOrder) {
      assert(writer->batch_cnt || !seq_per_batch_);
794
      curr_seq += writer->batch_cnt;
795
    }
M
Maysam Yabandeh 已提交
796 797 798
    if (!writer->disable_memtable) {
      memtable_write_cnt++;
    }
799
    // else seq advances only by memtable writes
800
  }
801
  if (status.ok() && write_options.sync) {
M
Maysam Yabandeh 已提交
802
    assert(!write_options.disableWAL);
803
    // Requesting sync with two_write_queues_ is expected to be very rare. We
804 805 806 807 808 809 810 811 812 813
    // hance provide a simple implementation that is not necessarily efficient.
    if (manual_wal_flush_) {
      status = FlushWAL(true);
    } else {
      status = SyncWAL();
    }
  }
  PERF_TIMER_START(write_pre_and_post_process_time);

  if (!w.CallbackFailed()) {
814 815 816 817 818
    if (!io_s.ok()) {
      IOStatusCheck(io_s);
    } else {
      WriteStatusCheck(status);
    }
819
  }
820
  if (status.ok()) {
821
    size_t index = 0;
822 823 824
    for (auto* writer : write_group) {
      if (!writer->CallbackFailed() && writer->pre_release_callback) {
        assert(writer->sequence != kMaxSequenceNumber);
825
        Status ws = writer->pre_release_callback->Callback(
826 827
            writer->sequence, disable_memtable, writer->log_used, index++,
            pre_release_callback_cnt);
828 829 830 831 832 833 834
        if (!ws.ok()) {
          status = ws;
          break;
        }
      }
    }
  }
M
Maysam Yabandeh 已提交
835 836 837 838 839 840 841 842 843
  if (publish_last_seq == kDoPublishLastSeq) {
    versions_->SetLastSequence(last_sequence + seq_inc);
    // Currently we only use kDoPublishLastSeq in unordered_write
    assert(immutable_db_options_.unordered_write);
  }
  if (immutable_db_options_.unordered_write && status.ok()) {
    pending_memtable_writes_ += memtable_write_cnt;
  }
  write_thread->ExitAsBatchGroupLeader(write_group, status);
844 845 846
  if (status.ok()) {
    status = w.FinalStatus();
  }
847 848 849
  if (seq_used != nullptr) {
    *seq_used = w.sequence;
  }
850
  return status;
851 852
}

853 854 855 856
void DBImpl::WriteStatusCheckOnLocked(const Status& status) {
  // Is setting bg_error_ enough here?  This will at least stop
  // compaction and fail any further writes.
  // Caller must hold mutex_.
857
  assert(!status.IsIOFenced() || !error_handler_.GetBGError().ok());
858 859 860
  mutex_.AssertHeld();
  if (immutable_db_options_.paranoid_checks && !status.ok() &&
      !status.IsBusy() && !status.IsIncomplete()) {
861
    // Maybe change the return status to void?
862
    error_handler_.SetBGError(status, BackgroundErrorReason::kWriteCallback);
863 864 865
  }
}

866
void DBImpl::WriteStatusCheck(const Status& status) {
867 868
  // Is setting bg_error_ enough here?  This will at least stop
  // compaction and fail any further writes.
869
  assert(!status.IsIOFenced() || !error_handler_.GetBGError().ok());
870 871 872
  if (immutable_db_options_.paranoid_checks && !status.ok() &&
      !status.IsBusy() && !status.IsIncomplete()) {
    mutex_.Lock();
873
    // Maybe change the return status to void?
874
    error_handler_.SetBGError(status, BackgroundErrorReason::kWriteCallback);
875 876 877 878
    mutex_.Unlock();
  }
}

879 880 881
void DBImpl::IOStatusCheck(const IOStatus& io_status) {
  // Is setting bg_error_ enough here?  This will at least stop
  // compaction and fail any further writes.
882 883 884
  if ((immutable_db_options_.paranoid_checks && !io_status.ok() &&
       !io_status.IsBusy() && !io_status.IsIncomplete()) ||
      io_status.IsIOFenced()) {
885
    mutex_.Lock();
886
    // Maybe change the return status to void?
887
    error_handler_.SetBGError(io_status, BackgroundErrorReason::kWriteCallback);
888 889 890 891
    mutex_.Unlock();
  }
}

892
void DBImpl::MemTableInsertStatusCheck(const Status& status) {
Y
Yi Wu 已提交
893 894 895 896 897
  // A non-OK status here indicates that the state implied by the
  // WAL has diverged from the in-memory state.  This could be
  // because of a corrupt write_batch (very bad), or because the
  // client specified an invalid column family and didn't specify
  // ignore_missing_column_families.
898
  if (!status.ok()) {
Y
Yi Wu 已提交
899
    mutex_.Lock();
900
    assert(!error_handler_.IsBGWorkStopped());
901
    // Maybe change the return status to void?
902 903
    error_handler_.SetBGError(status, BackgroundErrorReason::kMemTable)
        .PermitUncheckedError();
Y
Yi Wu 已提交
904 905 906 907
    mutex_.Unlock();
  }
}

S
Siying Dong 已提交
908
Status DBImpl::PreprocessWrite(const WriteOptions& write_options,
909
                               bool* need_log_sync,
S
Siying Dong 已提交
910 911
                               WriteContext* write_context) {
  mutex_.AssertHeld();
912
  assert(write_context != nullptr && need_log_sync != nullptr);
S
Siying Dong 已提交
913 914
  Status status;

915 916 917 918
  if (error_handler_.IsDBStopped()) {
    status = error_handler_.GetBGError();
  }

919 920
  PERF_TIMER_GUARD(write_scheduling_flushes_compactions_time);

S
Siying Dong 已提交
921 922 923 924
  assert(!single_column_family_mode_ ||
         versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1);
  if (UNLIKELY(status.ok() && !single_column_family_mode_ &&
               total_log_size_ > GetMaxTotalWalSize())) {
M
Maysam Yabandeh 已提交
925
    WaitForPendingWrites();
926
    status = SwitchWAL(write_context);
S
Siying Dong 已提交
927 928 929 930 931 932 933 934
  }

  if (UNLIKELY(status.ok() && write_buffer_manager_->ShouldFlush())) {
    // Before a new memtable is added in SwitchMemtable(),
    // write_buffer_manager_->ShouldFlush() will keep returning true. If another
    // thread is writing to another DB with the same write buffer, they may also
    // be flushed. We may end up with flushing much more DBs than needed. It's
    // suboptimal but still correct.
M
Maysam Yabandeh 已提交
935
    WaitForPendingWrites();
S
Siying Dong 已提交
936 937 938
    status = HandleWriteBufferFull(write_context);
  }

939 940 941 942
  if (UNLIKELY(status.ok() && !trim_history_scheduler_.Empty())) {
    status = TrimMemtableHistory(write_context);
  }

S
Siying Dong 已提交
943
  if (UNLIKELY(status.ok() && !flush_scheduler_.Empty())) {
M
Maysam Yabandeh 已提交
944
    WaitForPendingWrites();
S
Siying Dong 已提交
945 946 947
    status = ScheduleFlushes(write_context);
  }

948 949 950
  PERF_TIMER_STOP(write_scheduling_flushes_compactions_time);
  PERF_TIMER_GUARD(write_pre_and_post_process_time);

S
Siying Dong 已提交
951 952
  if (UNLIKELY(status.ok() && (write_controller_.IsStopped() ||
                               write_controller_.NeedsDelay()))) {
953
    PERF_TIMER_STOP(write_pre_and_post_process_time);
S
Siying Dong 已提交
954 955 956 957 958 959
    PERF_TIMER_GUARD(write_delay_time);
    // We don't know size of curent batch so that we always use the size
    // for previous one. It might create a fairness issue that expiration
    // might happen for smaller writes but larger writes can go through.
    // Can optimize it if it is an issue.
    status = DelayWrite(last_batch_group_size_, write_options);
960
    PERF_TIMER_START(write_pre_and_post_process_time);
S
Siying Dong 已提交
961 962
  }

963
  if (status.ok() && *need_log_sync) {
964 965 966 967 968 969 970
    // Wait until the parallel syncs are finished. Any sync process has to sync
    // the front log too so it is enough to check the status of front()
    // We do a while loop since log_sync_cv_ is signalled when any sync is
    // finished
    // Note: there does not seem to be a reason to wait for parallel sync at
    // this early step but it is not important since parallel sync (SyncWAL) and
    // need_log_sync are usually not used together.
S
Siying Dong 已提交
971 972 973 974 975
    while (logs_.front().getting_synced) {
      log_sync_cv_.Wait();
    }
    for (auto& log : logs_) {
      assert(!log.getting_synced);
976 977 978 979 980
      // This is just to prevent the logs to be synced by a parallel SyncWAL
      // call. We will do the actual syncing later after we will write to the
      // WAL.
      // Note: there does not seem to be a reason to set this early before we
      // actually write to the WAL
S
Siying Dong 已提交
981 982
      log.getting_synced = true;
    }
983 984
  } else {
    *need_log_sync = false;
S
Siying Dong 已提交
985 986 987 988 989
  }

  return status;
}

990
WriteBatch* DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group,
991 992
                               WriteBatch* tmp_batch, size_t* write_with_wal,
                               WriteBatch** to_be_cached_state) {
993 994
  assert(write_with_wal != nullptr);
  assert(tmp_batch != nullptr);
995
  assert(*to_be_cached_state == nullptr);
S
Siying Dong 已提交
996
  WriteBatch* merged_batch = nullptr;
997
  *write_with_wal = 0;
998
  auto* leader = write_group.leader;
999 1000
  assert(!leader->disable_wal);  // Same holds for all in the batch group
  if (write_group.size == 1 && !leader->CallbackFailed() &&
1001
      leader->batch->GetWalTerminationPoint().is_cleared()) {
S
Siying Dong 已提交
1002 1003 1004
    // we simply write the first WriteBatch to WAL if the group only
    // contains one batch, that batch should be written to the WAL,
    // and the batch is not wanting to be truncated
1005
    merged_batch = leader->batch;
1006 1007 1008
    if (WriteBatchInternal::IsLatestPersistentState(merged_batch)) {
      *to_be_cached_state = merged_batch;
    }
1009
    *write_with_wal = 1;
S
Siying Dong 已提交
1010 1011 1012 1013
  } else {
    // WAL needs all of the batches flattened into a single batch.
    // We could avoid copying here with an iov-like AddRecord
    // interface
1014
    merged_batch = tmp_batch;
S
Siying Dong 已提交
1015
    for (auto writer : write_group) {
1016
      if (!writer->CallbackFailed()) {
1017 1018 1019 1020
        Status s = WriteBatchInternal::Append(merged_batch, writer->batch,
                                              /*WAL_only*/ true);
        // Always returns Status::OK.
        assert(s.ok());
1021 1022 1023 1024
        if (WriteBatchInternal::IsLatestPersistentState(writer->batch)) {
          // We only need to cache the last of such write batch
          *to_be_cached_state = writer->batch;
        }
1025
        (*write_with_wal)++;
S
Siying Dong 已提交
1026 1027 1028
      }
    }
  }
1029 1030
  return merged_batch;
}
S
Siying Dong 已提交
1031

1032
// When two_write_queues_ is disabled, this function is called from the only
1033
// write thread. Otherwise this must be called holding log_write_mutex_.
1034 1035 1036
IOStatus DBImpl::WriteToWAL(const WriteBatch& merged_batch,
                            log::Writer* log_writer, uint64_t* log_used,
                            uint64_t* log_size) {
1037 1038 1039
  assert(log_size != nullptr);
  Slice log_entry = WriteBatchInternal::Contents(&merged_batch);
  *log_size = log_entry.size();
1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050
  // When two_write_queues_ WriteToWAL has to be protected from concurretn calls
  // from the two queues anyway and log_write_mutex_ is already held. Otherwise
  // if manual_wal_flush_ is enabled we need to protect log_writer->AddRecord
  // from possible concurrent calls via the FlushWAL by the application.
  const bool needs_locking = manual_wal_flush_ && !two_write_queues_;
  // Due to performance cocerns of missed branch prediction penalize the new
  // manual_wal_flush_ feature (by UNLIKELY) instead of the more common case
  // when we do not need any locking.
  if (UNLIKELY(needs_locking)) {
    log_write_mutex_.Lock();
  }
1051 1052
  IOStatus io_s = log_writer->AddRecord(log_entry);

1053 1054 1055
  if (UNLIKELY(needs_locking)) {
    log_write_mutex_.Unlock();
  }
1056 1057 1058
  if (log_used != nullptr) {
    *log_used = logfile_number_;
  }
S
Siying Dong 已提交
1059
  total_log_size_ += log_entry.size();
1060 1061
  // TODO(myabandeh): it might be unsafe to access alive_log_files_.back() here
  // since alive_log_files_ might be modified concurrently
S
Siying Dong 已提交
1062 1063
  alive_log_files_.back().AddSize(log_entry.size());
  log_empty_ = false;
1064
  return io_s;
1065 1066
}

1067 1068 1069 1070 1071
IOStatus DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group,
                            log::Writer* log_writer, uint64_t* log_used,
                            bool need_log_sync, bool need_log_dir_sync,
                            SequenceNumber sequence) {
  IOStatus io_s;
1072 1073
  assert(!write_group.leader->disable_wal);
  // Same holds for all in the batch group
1074
  size_t write_with_wal = 0;
1075 1076 1077
  WriteBatch* to_be_cached_state = nullptr;
  WriteBatch* merged_batch = MergeBatch(write_group, &tmp_batch_,
                                        &write_with_wal, &to_be_cached_state);
M
Maysam Yabandeh 已提交
1078 1079 1080 1081 1082 1083 1084
  if (merged_batch == write_group.leader->batch) {
    write_group.leader->log_used = logfile_number_;
  } else if (write_with_wal > 1) {
    for (auto writer : write_group) {
      writer->log_used = logfile_number_;
    }
  }
1085 1086 1087 1088

  WriteBatchInternal::SetSequence(merged_batch, sequence);

  uint64_t log_size;
1089
  io_s = WriteToWAL(*merged_batch, log_writer, log_used, &log_size);
1090 1091
  if (to_be_cached_state) {
    cached_recoverable_state_ = *to_be_cached_state;
1092
    cached_recoverable_state_empty_ = false;
1093
  }
S
Siying Dong 已提交
1094

1095
  if (io_s.ok() && need_log_sync) {
S
Siying Dong 已提交
1096 1097 1098 1099 1100 1101 1102 1103 1104
    StopWatch sw(env_, stats_, WAL_FILE_SYNC_MICROS);
    // It's safe to access logs_ with unlocked mutex_ here because:
    //  - we've set getting_synced=true for all logs,
    //    so other threads won't pop from logs_ while we're here,
    //  - only writer thread can push to logs_, and we're in
    //    writer thread, so no one will push to logs_,
    //  - as long as other threads don't modify it, it's safe to read
    //    from std::deque from multiple threads concurrently.
    for (auto& log : logs_) {
1105 1106
      io_s = log.writer->file()->Sync(immutable_db_options_.use_fsync);
      if (!io_s.ok()) {
S
Siying Dong 已提交
1107 1108 1109
        break;
      }
    }
1110 1111

    if (io_s.ok() && need_log_dir_sync) {
S
Siying Dong 已提交
1112 1113 1114
      // We only sync WAL directory the first time WAL syncing is
      // requested, so that in case users never turn on WAL sync,
      // we can avoid the disk I/O in the write code path.
1115
      io_s = directories_.GetWalDir()->Fsync(IOOptions(), nullptr);
S
Siying Dong 已提交
1116 1117 1118 1119 1120 1121
    }
  }

  if (merged_batch == &tmp_batch_) {
    tmp_batch_.Clear();
  }
1122
  if (io_s.ok()) {
S
Siying Dong 已提交
1123 1124
    auto stats = default_cf_internal_stats_;
    if (need_log_sync) {
1125
      stats->AddDBStats(InternalStats::kIntStatsWalFileSynced, 1);
S
Siying Dong 已提交
1126 1127
      RecordTick(stats_, WAL_FILE_SYNCED);
    }
1128
    stats->AddDBStats(InternalStats::kIntStatsWalFileBytes, log_size);
S
Siying Dong 已提交
1129
    RecordTick(stats_, WAL_FILE_BYTES, log_size);
1130
    stats->AddDBStats(InternalStats::kIntStatsWriteWithWal, write_with_wal);
S
Siying Dong 已提交
1131 1132
    RecordTick(stats_, WRITE_WITH_WAL, write_with_wal);
  }
1133
  return io_s;
S
Siying Dong 已提交
1134 1135
}

1136 1137 1138 1139
IOStatus DBImpl::ConcurrentWriteToWAL(
    const WriteThread::WriteGroup& write_group, uint64_t* log_used,
    SequenceNumber* last_sequence, size_t seq_inc) {
  IOStatus io_s;
1140

1141 1142
  assert(!write_group.leader->disable_wal);
  // Same holds for all in the batch group
1143 1144
  WriteBatch tmp_batch;
  size_t write_with_wal = 0;
1145
  WriteBatch* to_be_cached_state = nullptr;
1146
  WriteBatch* merged_batch =
1147
      MergeBatch(write_group, &tmp_batch, &write_with_wal, &to_be_cached_state);
1148 1149 1150 1151

  // We need to lock log_write_mutex_ since logs_ and alive_log_files might be
  // pushed back concurrently
  log_write_mutex_.Lock();
M
Maysam Yabandeh 已提交
1152 1153 1154 1155 1156 1157 1158
  if (merged_batch == write_group.leader->batch) {
    write_group.leader->log_used = logfile_number_;
  } else if (write_with_wal > 1) {
    for (auto writer : write_group) {
      writer->log_used = logfile_number_;
    }
  }
1159
  *last_sequence = versions_->FetchAddLastAllocatedSequence(seq_inc);
1160 1161 1162 1163 1164
  auto sequence = *last_sequence + 1;
  WriteBatchInternal::SetSequence(merged_batch, sequence);

  log::Writer* log_writer = logs_.back().writer;
  uint64_t log_size;
1165
  io_s = WriteToWAL(*merged_batch, log_writer, log_used, &log_size);
1166 1167
  if (to_be_cached_state) {
    cached_recoverable_state_ = *to_be_cached_state;
1168
    cached_recoverable_state_empty_ = false;
1169
  }
1170 1171
  log_write_mutex_.Unlock();

1172
  if (io_s.ok()) {
1173
    const bool concurrent = true;
1174
    auto stats = default_cf_internal_stats_;
1175 1176
    stats->AddDBStats(InternalStats::kIntStatsWalFileBytes, log_size,
                      concurrent);
1177
    RecordTick(stats_, WAL_FILE_BYTES, log_size);
1178
    stats->AddDBStats(InternalStats::kIntStatsWriteWithWal, write_with_wal,
1179
                      concurrent);
1180 1181
    RecordTick(stats_, WRITE_WITH_WAL, write_with_wal);
  }
1182
  return io_s;
1183 1184
}

1185 1186 1187 1188 1189
Status DBImpl::WriteRecoverableState() {
  mutex_.AssertHeld();
  if (!cached_recoverable_state_empty_) {
    bool dont_care_bool;
    SequenceNumber next_seq;
1190
    if (two_write_queues_) {
1191 1192
      log_write_mutex_.Lock();
    }
1193 1194 1195 1196 1197 1198
    SequenceNumber seq;
    if (two_write_queues_) {
      seq = versions_->FetchAddLastAllocatedSequence(0);
    } else {
      seq = versions_->LastSequence();
    }
1199
    WriteBatchInternal::SetSequence(&cached_recoverable_state_, seq + 1);
1200 1201
    auto status = WriteBatchInternal::InsertInto(
        &cached_recoverable_state_, column_family_memtables_.get(),
1202 1203 1204
        &flush_scheduler_, &trim_history_scheduler_, true,
        0 /*recovery_log_number*/, this, false /* concurrent_memtable_writes */,
        &next_seq, &dont_care_bool, seq_per_batch_);
1205 1206 1207
    auto last_seq = next_seq - 1;
    if (two_write_queues_) {
      versions_->FetchAddLastAllocatedSequence(last_seq - seq);
1208
      versions_->SetLastPublishedSequence(last_seq);
1209 1210
    }
    versions_->SetLastSequence(last_seq);
1211
    if (two_write_queues_) {
1212 1213
      log_write_mutex_.Unlock();
    }
1214 1215 1216 1217
    if (status.ok() && recoverable_state_pre_release_callback_) {
      const bool DISABLE_MEMTABLE = true;
      for (uint64_t sub_batch_seq = seq + 1;
           sub_batch_seq < next_seq && status.ok(); sub_batch_seq++) {
1218
        uint64_t const no_log_num = 0;
1219 1220 1221
        // Unlock it since the callback might end up locking mutex. e.g.,
        // AddCommitted -> AdvanceMaxEvictedSeq -> GetSnapshotListFromDB
        mutex_.Unlock();
1222
        status = recoverable_state_pre_release_callback_->Callback(
1223
            sub_batch_seq, !DISABLE_MEMTABLE, no_log_num, 0, 1);
1224
        mutex_.Lock();
1225 1226
      }
    }
1227 1228 1229 1230 1231 1232 1233 1234 1235
    if (status.ok()) {
      cached_recoverable_state_.Clear();
      cached_recoverable_state_empty_ = true;
    }
    return status;
  }
  return Status::OK();
}

Y
Yanqin Jin 已提交
1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250
void DBImpl::SelectColumnFamiliesForAtomicFlush(
    autovector<ColumnFamilyData*>* cfds) {
  for (ColumnFamilyData* cfd : *versions_->GetColumnFamilySet()) {
    if (cfd->IsDropped()) {
      continue;
    }
    if (cfd->imm()->NumNotFlushed() != 0 || !cfd->mem()->IsEmpty() ||
        !cached_recoverable_state_empty_.load()) {
      cfds->push_back(cfd);
    }
  }
}

// Assign sequence number for atomic flush.
void DBImpl::AssignAtomicFlushSeq(const autovector<ColumnFamilyData*>& cfds) {
1251
  assert(immutable_db_options_.atomic_flush);
Y
Yanqin Jin 已提交
1252 1253 1254 1255 1256 1257
  auto seq = versions_->LastSequence();
  for (auto cfd : cfds) {
    cfd->imm()->AssignAtomicFlushSeq(seq);
  }
}

1258
Status DBImpl::SwitchWAL(WriteContext* write_context) {
S
Siying Dong 已提交
1259 1260 1261 1262 1263 1264 1265 1266 1267
  mutex_.AssertHeld();
  assert(write_context != nullptr);
  Status status;

  if (alive_log_files_.begin()->getting_flushed) {
    return status;
  }

  auto oldest_alive_log = alive_log_files_.begin()->number;
S
Siying Dong 已提交
1268 1269
  bool flush_wont_release_oldest_log = false;
  if (allow_2pc()) {
F
Faustin Lammler 已提交
1270
    auto oldest_log_with_uncommitted_prep =
S
Siying Dong 已提交
1271 1272
        logs_with_prep_tracker_.FindMinLogContainingOutstandingPrep();

F
Faustin Lammler 已提交
1273 1274 1275 1276
    assert(oldest_log_with_uncommitted_prep == 0 ||
           oldest_log_with_uncommitted_prep >= oldest_alive_log);
    if (oldest_log_with_uncommitted_prep > 0 &&
        oldest_log_with_uncommitted_prep == oldest_alive_log) {
S
Siying Dong 已提交
1277
      if (unable_to_release_oldest_log_) {
S
Siying Dong 已提交
1278
        // we already attempted to flush all column families dependent on
F
Faustin Lammler 已提交
1279
        // the oldest alive log but the log still contained uncommitted
S
Siying Dong 已提交
1280
        // transactions so there is still nothing that we can do.
S
Siying Dong 已提交
1281
        return status;
S
Siying Dong 已提交
1282 1283 1284
      } else {
        ROCKS_LOG_WARN(
            immutable_db_options_.info_log,
F
Faustin Lammler 已提交
1285
            "Unable to release oldest log due to uncommitted transaction");
S
Siying Dong 已提交
1286 1287 1288
        unable_to_release_oldest_log_ = true;
        flush_wont_release_oldest_log = true;
      }
S
Siying Dong 已提交
1289
    }
S
Siying Dong 已提交
1290 1291
  }
  if (!flush_wont_release_oldest_log) {
S
Siying Dong 已提交
1292 1293
    // we only mark this log as getting flushed if we have successfully
    // flushed all data in this log. If this log contains outstanding prepared
1294 1295
    // transactions then we cannot flush this log until those transactions are
    // commited.
S
Siying Dong 已提交
1296
    unable_to_release_oldest_log_ = false;
S
Siying Dong 已提交
1297 1298 1299
    alive_log_files_.begin()->getting_flushed = true;
  }

1300 1301 1302 1303 1304
  ROCKS_LOG_INFO(
      immutable_db_options_.info_log,
      "Flushing all column families with data in WAL number %" PRIu64
      ". Total log size is %" PRIu64 " while max_total_wal_size is %" PRIu64,
      oldest_alive_log, total_log_size_.load(), GetMaxTotalWalSize());
S
Siying Dong 已提交
1305 1306
  // no need to refcount because drop is happening in write thread, so can't
  // happen while we're in the write thread
Y
Yanqin Jin 已提交
1307
  autovector<ColumnFamilyData*> cfds;
1308
  if (immutable_db_options_.atomic_flush) {
Y
Yanqin Jin 已提交
1309 1310 1311 1312 1313
    SelectColumnFamiliesForAtomicFlush(&cfds);
  } else {
    for (auto cfd : *versions_->GetColumnFamilySet()) {
      if (cfd->IsDropped()) {
        continue;
S
Siying Dong 已提交
1314
      }
Y
Yanqin Jin 已提交
1315 1316 1317 1318
      if (cfd->OldestLogToKeep() <= oldest_alive_log) {
        cfds.push_back(cfd);
      }
    }
1319
    MaybeFlushStatsCF(&cfds);
Y
Yanqin Jin 已提交
1320
  }
1321 1322 1323 1324 1325
  WriteThread::Writer nonmem_w;
  if (two_write_queues_) {
    nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
  }

Y
Yanqin Jin 已提交
1326 1327 1328
  for (const auto cfd : cfds) {
    cfd->Ref();
    status = SwitchMemtable(cfd, write_context);
1329
    cfd->UnrefAndTryDelete();
Y
Yanqin Jin 已提交
1330 1331
    if (!status.ok()) {
      break;
S
Siying Dong 已提交
1332 1333
    }
  }
1334 1335 1336 1337
  if (two_write_queues_) {
    nonmem_write_thread_.ExitUnbatched(&nonmem_w);
  }

1338
  if (status.ok()) {
1339
    if (immutable_db_options_.atomic_flush) {
Y
Yanqin Jin 已提交
1340 1341 1342 1343
      AssignAtomicFlushSeq(cfds);
    }
    for (auto cfd : cfds) {
      cfd->imm()->FlushRequested();
1344 1345 1346 1347 1348 1349 1350 1351 1352 1353
      if (!immutable_db_options_.atomic_flush) {
        FlushRequest flush_req;
        GenerateFlushRequest({cfd}, &flush_req);
        SchedulePendingFlush(flush_req, FlushReason::kWriteBufferManager);
      }
    }
    if (immutable_db_options_.atomic_flush) {
      FlushRequest flush_req;
      GenerateFlushRequest(cfds, &flush_req);
      SchedulePendingFlush(flush_req, FlushReason::kWriteBufferManager);
Y
Yanqin Jin 已提交
1354
    }
1355 1356
    MaybeScheduleFlushOrCompaction();
  }
S
Siying Dong 已提交
1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371
  return status;
}

Status DBImpl::HandleWriteBufferFull(WriteContext* write_context) {
  mutex_.AssertHeld();
  assert(write_context != nullptr);
  Status status;

  // Before a new memtable is added in SwitchMemtable(),
  // write_buffer_manager_->ShouldFlush() will keep returning true. If another
  // thread is writing to another DB with the same write buffer, they may also
  // be flushed. We may end up with flushing much more DBs than needed. It's
  // suboptimal but still correct.
  ROCKS_LOG_INFO(
      immutable_db_options_.info_log,
1372
      "Flushing column family with oldest memtable entry. Write buffer is "
1373
      "using %" ROCKSDB_PRIszt " bytes out of a total of %" ROCKSDB_PRIszt ".",
S
Siying Dong 已提交
1374 1375 1376 1377
      write_buffer_manager_->memory_usage(),
      write_buffer_manager_->buffer_size());
  // no need to refcount because drop is happening in write thread, so can't
  // happen while we're in the write thread
Y
Yanqin Jin 已提交
1378
  autovector<ColumnFamilyData*> cfds;
1379
  if (immutable_db_options_.atomic_flush) {
Y
Yanqin Jin 已提交
1380 1381 1382 1383
    SelectColumnFamiliesForAtomicFlush(&cfds);
  } else {
    ColumnFamilyData* cfd_picked = nullptr;
    SequenceNumber seq_num_for_cf_picked = kMaxSequenceNumber;
S
Siying Dong 已提交
1384

Y
Yanqin Jin 已提交
1385 1386 1387
    for (auto cfd : *versions_->GetColumnFamilySet()) {
      if (cfd->IsDropped()) {
        continue;
S
Siying Dong 已提交
1388
      }
Y
Yanqin Jin 已提交
1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400
      if (!cfd->mem()->IsEmpty()) {
        // We only consider active mem table, hoping immutable memtable is
        // already in the process of flushing.
        uint64_t seq = cfd->mem()->GetCreationSeq();
        if (cfd_picked == nullptr || seq < seq_num_for_cf_picked) {
          cfd_picked = cfd;
          seq_num_for_cf_picked = seq;
        }
      }
    }
    if (cfd_picked != nullptr) {
      cfds.push_back(cfd_picked);
S
Siying Dong 已提交
1401
    }
1402
    MaybeFlushStatsCF(&cfds);
S
Siying Dong 已提交
1403
  }
1404

1405 1406 1407 1408
  WriteThread::Writer nonmem_w;
  if (two_write_queues_) {
    nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
  }
1409
  for (const auto cfd : cfds) {
1410 1411 1412
    if (cfd->mem()->IsEmpty()) {
      continue;
    }
1413 1414
    cfd->Ref();
    status = SwitchMemtable(cfd, write_context);
1415
    cfd->UnrefAndTryDelete();
1416 1417
    if (!status.ok()) {
      break;
S
Siying Dong 已提交
1418
    }
1419
  }
1420 1421 1422 1423
  if (two_write_queues_) {
    nonmem_write_thread_.ExitUnbatched(&nonmem_w);
  }

1424
  if (status.ok()) {
1425
    if (immutable_db_options_.atomic_flush) {
Y
Yanqin Jin 已提交
1426 1427 1428 1429
      AssignAtomicFlushSeq(cfds);
    }
    for (const auto cfd : cfds) {
      cfd->imm()->FlushRequested();
1430 1431 1432 1433 1434 1435 1436 1437 1438 1439
      if (!immutable_db_options_.atomic_flush) {
        FlushRequest flush_req;
        GenerateFlushRequest({cfd}, &flush_req);
        SchedulePendingFlush(flush_req, FlushReason::kWriteBufferFull);
      }
    }
    if (immutable_db_options_.atomic_flush) {
      FlushRequest flush_req;
      GenerateFlushRequest(cfds, &flush_req);
      SchedulePendingFlush(flush_req, FlushReason::kWriteBufferFull);
Y
Yanqin Jin 已提交
1440
    }
1441
    MaybeScheduleFlushOrCompaction();
S
Siying Dong 已提交
1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463
  }
  return status;
}

uint64_t DBImpl::GetMaxTotalWalSize() const {
  mutex_.AssertHeld();
  return mutable_db_options_.max_total_wal_size == 0
             ? 4 * max_total_in_memory_state_
             : mutable_db_options_.max_total_wal_size;
}

// REQUIRES: mutex_ is held
// REQUIRES: this thread is currently at the front of the writer queue
Status DBImpl::DelayWrite(uint64_t num_bytes,
                          const WriteOptions& write_options) {
  uint64_t time_delayed = 0;
  bool delayed = false;
  {
    StopWatch sw(env_, stats_, WRITE_STALL, &time_delayed);
    uint64_t delay = write_controller_.GetDelay(env_, num_bytes);
    if (delay > 0) {
      if (write_options.no_slowdown) {
1464
        return Status::Incomplete("Write stall");
S
Siying Dong 已提交
1465 1466 1467
      }
      TEST_SYNC_POINT("DBImpl::DelayWrite:Sleep");

1468 1469 1470 1471
      // Notify write_thread_ about the stall so it can setup a barrier and
      // fail any pending writers with no_slowdown
      write_thread_.BeginWriteStall();
      TEST_SYNC_POINT("DBImpl::DelayWrite:BeginWriteStallDone");
S
Siying Dong 已提交
1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487
      mutex_.Unlock();
      // We will delay the write until we have slept for delay ms or
      // we don't need a delay anymore
      const uint64_t kDelayInterval = 1000;
      uint64_t stall_end = sw.start_time() + delay;
      while (write_controller_.NeedsDelay()) {
        if (env_->NowMicros() >= stall_end) {
          // We already delayed this write `delay` microseconds
          break;
        }

        delayed = true;
        // Sleep for 0.001 seconds
        env_->SleepForMicroseconds(kDelayInterval);
      }
      mutex_.Lock();
1488
      write_thread_.EndWriteStall();
S
Siying Dong 已提交
1489 1490
    }

1491 1492 1493 1494 1495
    // Don't wait if there's a background error, even if its a soft error. We
    // might wait here indefinitely as the background compaction may never
    // finish successfully, resulting in the stall condition lasting
    // indefinitely
    while (error_handler_.GetBGError().ok() && write_controller_.IsStopped()) {
S
Siying Dong 已提交
1496
      if (write_options.no_slowdown) {
1497
        return Status::Incomplete("Write stall");
S
Siying Dong 已提交
1498 1499
      }
      delayed = true;
1500 1501 1502 1503

      // Notify write_thread_ about the stall so it can setup a barrier and
      // fail any pending writers with no_slowdown
      write_thread_.BeginWriteStall();
S
Siying Dong 已提交
1504 1505
      TEST_SYNC_POINT("DBImpl::DelayWrite:Wait");
      bg_cv_.Wait();
1506
      write_thread_.EndWriteStall();
S
Siying Dong 已提交
1507 1508 1509 1510
    }
  }
  assert(!delayed || !write_options.no_slowdown);
  if (delayed) {
1511 1512
    default_cf_internal_stats_->AddDBStats(
        InternalStats::kIntStatsWriteStallMicros, time_delayed);
S
Siying Dong 已提交
1513 1514 1515
    RecordTick(stats_, STALL_MICROS, time_delayed);
  }

1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528
  // If DB is not in read-only mode and write_controller is not stopping
  // writes, we can ignore any background errors and allow the write to
  // proceed
  Status s;
  if (write_controller_.IsStopped()) {
    // If writes are still stopped, it means we bailed due to a background
    // error
    s = Status::Incomplete(error_handler_.GetBGError().ToString());
  }
  if (error_handler_.IsDBStopped()) {
    s = error_handler_.GetBGError();
  }
  return s;
S
Siying Dong 已提交
1529 1530
}

1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544
Status DBImpl::ThrottleLowPriWritesIfNeeded(const WriteOptions& write_options,
                                            WriteBatch* my_batch) {
  assert(write_options.low_pri);
  // This is called outside the DB mutex. Although it is safe to make the call,
  // the consistency condition is not guaranteed to hold. It's OK to live with
  // it in this case.
  // If we need to speed compaction, it means the compaction is left behind
  // and we start to limit low pri writes to a limit.
  if (write_controller_.NeedSpeedupCompaction()) {
    if (allow_2pc() && (my_batch->HasCommit() || my_batch->HasRollback())) {
      // For 2PC, we only rate limit prepare, not commit.
      return Status::OK();
    }
    if (write_options.no_slowdown) {
1545
      return Status::Incomplete("Low priority write stall");
1546 1547 1548 1549 1550 1551
    } else {
      assert(my_batch != nullptr);
      // Rate limit those writes. The reason that we don't completely wait
      // is that in case the write is heavy, low pri writes may never have
      // a chance to run. Now we guarantee we are still slowly making
      // progress.
1552
      PERF_TIMER_GUARD(write_delay_time);
1553 1554 1555
      write_controller_.low_pri_rate_limiter()->Request(
          my_batch->GetDataSize(), Env::IO_HIGH, nullptr /* stats */,
          RateLimiter::OpType::kWrite);
1556 1557 1558 1559 1560
    }
  }
  return Status::OK();
}

1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594
void DBImpl::MaybeFlushStatsCF(autovector<ColumnFamilyData*>* cfds) {
  assert(cfds != nullptr);
  if (!cfds->empty() && immutable_db_options_.persist_stats_to_disk) {
    ColumnFamilyData* cfd_stats =
        versions_->GetColumnFamilySet()->GetColumnFamily(
            kPersistentStatsColumnFamilyName);
    if (cfd_stats != nullptr && !cfd_stats->mem()->IsEmpty()) {
      for (ColumnFamilyData* cfd : *cfds) {
        if (cfd == cfd_stats) {
          // stats CF already included in cfds
          return;
        }
      }
      // force flush stats CF when its log number is less than all other CF's
      // log numbers
      bool force_flush_stats_cf = true;
      for (auto* loop_cfd : *versions_->GetColumnFamilySet()) {
        if (loop_cfd == cfd_stats) {
          continue;
        }
        if (loop_cfd->GetLogNumber() <= cfd_stats->GetLogNumber()) {
          force_flush_stats_cf = false;
        }
      }
      if (force_flush_stats_cf) {
        cfds->push_back(cfd_stats);
        ROCKS_LOG_INFO(immutable_db_options_.info_log,
                       "Force flushing stats CF with automated flush "
                       "to avoid holding old logs");
      }
    }
  }
}

1595 1596 1597 1598 1599 1600 1601 1602 1603
Status DBImpl::TrimMemtableHistory(WriteContext* context) {
  autovector<ColumnFamilyData*> cfds;
  ColumnFamilyData* tmp_cfd;
  while ((tmp_cfd = trim_history_scheduler_.TakeNextColumnFamily()) !=
         nullptr) {
    cfds.push_back(tmp_cfd);
  }
  for (auto& cfd : cfds) {
    autovector<MemTable*> to_delete;
1604 1605
    bool trimmed = cfd->imm()->TrimHistory(
        &to_delete, cfd->mem()->ApproximateMemoryUsage());
1606 1607 1608 1609
    if (!to_delete.empty()) {
      for (auto m : to_delete) {
        delete m;
      }
1610 1611
    }
    if (trimmed) {
1612 1613 1614
      context->superversion_context.NewSuperVersion();
      assert(context->superversion_context.new_superversion.get() != nullptr);
      cfd->InstallSuperVersion(&context->superversion_context, &mutex_);
1615 1616
    }

1617
    if (cfd->UnrefAndTryDelete()) {
1618 1619 1620 1621 1622 1623
      cfd = nullptr;
    }
  }
  return Status::OK();
}

S
Siying Dong 已提交
1624
Status DBImpl::ScheduleFlushes(WriteContext* context) {
Y
Yanqin Jin 已提交
1625
  autovector<ColumnFamilyData*> cfds;
1626
  if (immutable_db_options_.atomic_flush) {
Y
Yanqin Jin 已提交
1627 1628 1629 1630 1631 1632 1633 1634 1635 1636
    SelectColumnFamiliesForAtomicFlush(&cfds);
    for (auto cfd : cfds) {
      cfd->Ref();
    }
    flush_scheduler_.Clear();
  } else {
    ColumnFamilyData* tmp_cfd;
    while ((tmp_cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) {
      cfds.push_back(tmp_cfd);
    }
1637
    MaybeFlushStatsCF(&cfds);
Y
Yanqin Jin 已提交
1638
  }
1639
  Status status;
1640 1641 1642 1643 1644
  WriteThread::Writer nonmem_w;
  if (two_write_queues_) {
    nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
  }

Y
Yanqin Jin 已提交
1645
  for (auto& cfd : cfds) {
1646 1647 1648
    if (!cfd->mem()->IsEmpty()) {
      status = SwitchMemtable(cfd, context);
    }
1649
    if (cfd->UnrefAndTryDelete()) {
Y
Yanqin Jin 已提交
1650
      cfd = nullptr;
S
Siying Dong 已提交
1651 1652
    }
    if (!status.ok()) {
1653 1654
      break;
    }
S
Siying Dong 已提交
1655
  }
1656 1657 1658 1659 1660

  if (two_write_queues_) {
    nonmem_write_thread_.ExitUnbatched(&nonmem_w);
  }

1661
  if (status.ok()) {
1662
    if (immutable_db_options_.atomic_flush) {
Y
Yanqin Jin 已提交
1663
      AssignAtomicFlushSeq(cfds);
1664 1665 1666 1667 1668 1669 1670 1671 1672
      FlushRequest flush_req;
      GenerateFlushRequest(cfds, &flush_req);
      SchedulePendingFlush(flush_req, FlushReason::kWriteBufferFull);
    } else {
      for (auto* cfd : cfds) {
        FlushRequest flush_req;
        GenerateFlushRequest({cfd}, &flush_req);
        SchedulePendingFlush(flush_req, FlushReason::kWriteBufferFull);
      }
Y
Yanqin Jin 已提交
1673
    }
1674 1675 1676
    MaybeScheduleFlushOrCompaction();
  }
  return status;
S
Siying Dong 已提交
1677 1678 1679
}

#ifndef ROCKSDB_LITE
A
Andrew Kryczka 已提交
1680
void DBImpl::NotifyOnMemTableSealed(ColumnFamilyData* /*cfd*/,
S
Siying Dong 已提交
1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696
                                    const MemTableInfo& mem_table_info) {
  if (immutable_db_options_.listeners.size() == 0U) {
    return;
  }
  if (shutting_down_.load(std::memory_order_acquire)) {
    return;
  }

  for (auto listener : immutable_db_options_.listeners) {
    listener->OnMemTableSealed(mem_table_info);
  }
}
#endif  // ROCKSDB_LITE

// REQUIRES: mutex_ is held
// REQUIRES: this thread is currently at the front of the writer queue
1697 1698
// REQUIRES: this thread is currently at the front of the 2nd writer queue if
// two_write_queues_ is true (This is to simplify the reasoning.)
1699
Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
S
Siying Dong 已提交
1700
  mutex_.AssertHeld();
M
Maysam Yabandeh 已提交
1701
  WriteThread::Writer nonmem_w;
1702
  std::unique_ptr<WritableFile> lfile;
S
Siying Dong 已提交
1703 1704
  log::Writer* new_log = nullptr;
  MemTable* new_mem = nullptr;
1705
  IOStatus io_s;
S
Siying Dong 已提交
1706

1707 1708 1709 1710 1711 1712 1713
  // Recoverable state is persisted in WAL. After memtable switch, WAL might
  // be deleted, so we write the state to memtable to be persisted as well.
  Status s = WriteRecoverableState();
  if (!s.ok()) {
    return s;
  }

S
Siying Dong 已提交
1714 1715 1716
  // Attempt to switch to a new memtable and trigger flush of old.
  // Do this without holding the dbmutex lock.
  assert(versions_->prev_log_number() == 0);
1717
  if (two_write_queues_) {
1718 1719
    log_write_mutex_.Lock();
  }
S
Siying Dong 已提交
1720
  bool creating_new_log = !log_empty_;
1721
  if (two_write_queues_) {
1722 1723
    log_write_mutex_.Unlock();
  }
S
Siying Dong 已提交
1724 1725
  uint64_t recycle_log_number = 0;
  if (creating_new_log && immutable_db_options_.recycle_log_file_num &&
1726 1727
      !log_recycle_files_.empty()) {
    recycle_log_number = log_recycle_files_.front();
S
Siying Dong 已提交
1728 1729 1730 1731 1732
  }
  uint64_t new_log_number =
      creating_new_log ? versions_->NewFileNumber() : logfile_number_;
  const MutableCFOptions mutable_cf_options = *cfd->GetLatestMutableCFOptions();

F
follitude 已提交
1733
  // Set memtable_info for memtable sealed callback
S
Siying Dong 已提交
1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745
#ifndef ROCKSDB_LITE
  MemTableInfo memtable_info;
  memtable_info.cf_name = cfd->GetName();
  memtable_info.first_seqno = cfd->mem()->GetFirstSequenceNumber();
  memtable_info.earliest_seqno = cfd->mem()->GetEarliestSequenceNumber();
  memtable_info.num_entries = cfd->mem()->num_entries();
  memtable_info.num_deletes = cfd->mem()->num_deletes();
#endif  // ROCKSDB_LITE
  // Log this later after lock release. It may be outdated, e.g., if background
  // flush happens before logging, but that should be ok.
  int num_imm_unflushed = cfd->imm()->NumNotFlushed();
  const auto preallocate_block_size =
1746
      GetWalPreallocateBlockSize(mutable_cf_options.write_buffer_size);
S
Siying Dong 已提交
1747
  mutex_.Unlock();
1748 1749 1750
  if (creating_new_log) {
    // TODO: Write buffer size passed in should be max of all CF's instead
    // of mutable_cf_options.write_buffer_size.
1751 1752 1753 1754 1755
    io_s = CreateWAL(new_log_number, recycle_log_number, preallocate_block_size,
                     &new_log);
    if (s.ok()) {
      s = io_s;
    }
1756 1757 1758 1759 1760
  }
  if (s.ok()) {
    SequenceNumber seq = versions_->LastSequence();
    new_mem = cfd->ConstructNewMemtable(mutable_cf_options, seq);
    context->superversion_context.NewSuperVersion();
S
Siying Dong 已提交
1761 1762 1763 1764 1765 1766
  }
  ROCKS_LOG_INFO(immutable_db_options_.info_log,
                 "[%s] New memtable created with log file: #%" PRIu64
                 ". Immutable memtables: %d.\n",
                 cfd->GetName().c_str(), new_log_number, num_imm_unflushed);
  mutex_.Lock();
1767 1768 1769 1770 1771 1772 1773 1774
  if (recycle_log_number != 0) {
    // Since renaming the file is done outside DB mutex, we need to ensure
    // concurrent full purges don't delete the file while we're recycling it.
    // To achieve that we hold the old log number in the recyclable list until
    // after it has been renamed.
    assert(log_recycle_files_.front() == recycle_log_number);
    log_recycle_files_.pop_front();
  }
1775
  if (s.ok() && creating_new_log) {
M
Maysam Yabandeh 已提交
1776
    log_write_mutex_.Lock();
S
Siying Dong 已提交
1777
    assert(new_log != nullptr);
1778 1779 1780
    if (!logs_.empty()) {
      // Alway flush the buffer of the last log before switching to a new one
      log::Writer* cur_log_writer = logs_.back().writer;
1781 1782 1783 1784
      io_s = cur_log_writer->WriteBuffer();
      if (s.ok()) {
        s = io_s;
      }
1785 1786 1787
      if (!s.ok()) {
        ROCKS_LOG_WARN(immutable_db_options_.info_log,
                       "[%s] Failed to switch from #%" PRIu64 " to #%" PRIu64
1788
                       "  WAL file\n",
1789 1790 1791
                       cfd->GetName().c_str(), cur_log_writer->get_log_number(),
                       new_log_number);
      }
1792
    }
1793 1794 1795 1796 1797 1798 1799
    if (s.ok()) {
      logfile_number_ = new_log_number;
      log_empty_ = true;
      log_dir_synced_ = false;
      logs_.emplace_back(logfile_number_, new_log);
      alive_log_files_.push_back(LogFileNumberSize(logfile_number_));
    }
1800
    log_write_mutex_.Unlock();
S
Siying Dong 已提交
1801
  }
1802 1803 1804 1805

  if (!s.ok()) {
    // how do we fail if we're not creating new log?
    assert(creating_new_log);
1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818
    if (new_mem) {
      delete new_mem;
    }
    if (new_log) {
      delete new_log;
    }
    SuperVersion* new_superversion =
        context->superversion_context.new_superversion.release();
    if (new_superversion != nullptr) {
      delete new_superversion;
    }
    // We may have lost data from the WritableFileBuffer in-memory buffer for
    // the current log, so treat it as a fatal error and set bg_error
1819
    if (!io_s.ok()) {
1820
      error_handler_.SetBGError(io_s, BackgroundErrorReason::kMemTable);
1821
    } else {
1822
      error_handler_.SetBGError(s, BackgroundErrorReason::kMemTable);
1823
    }
1824 1825
    // Read back bg_error in order to get the right severity
    s = error_handler_.GetBGError();
1826 1827 1828
    return s;
  }

1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885
  bool empty_cf_updated = false;
  if (immutable_db_options_.track_and_verify_wals_in_manifest &&
      !immutable_db_options_.allow_2pc && creating_new_log) {
    // In non-2pc mode, WALs become obsolete if they do not contain unflushed
    // data. Updating the empty CF's log number might cause some WALs to become
    // obsolete. So we should track the WAL obsoletion event before actually
    // updating the empty CF's log number.
    uint64_t min_wal_number_to_keep =
        versions_->PreComputeMinLogNumberWithUnflushedData(logfile_number_);
    if (min_wal_number_to_keep >
        versions_->GetWalSet().GetMinWalNumberToKeep()) {
      // Get a snapshot of the empty column families.
      // LogAndApply may release and reacquire db
      // mutex, during that period, column family may become empty (e.g. its
      // flush succeeds), then it affects the computed min_log_number_to_keep,
      // so we take a snapshot for consistency of column family data
      // status. If a column family becomes non-empty afterwards, its active log
      // should still be the created new log, so the min_log_number_to_keep is
      // not affected.
      autovector<ColumnFamilyData*> empty_cfs;
      for (auto cf : *versions_->GetColumnFamilySet()) {
        if (cf->IsEmpty()) {
          empty_cfs.push_back(cf);
        }
      }

      VersionEdit wal_deletion;
      wal_deletion.DeleteWalsBefore(min_wal_number_to_keep);
      s = versions_->LogAndApplyToDefaultColumnFamily(&wal_deletion, &mutex_);
      if (!s.ok() && versions_->io_status().IsIOError()) {
        s = error_handler_.SetBGError(versions_->io_status(),
                                      BackgroundErrorReason::kManifestWrite);
      }
      if (!s.ok()) {
        return s;
      }

      for (auto cf : empty_cfs) {
        if (cf->IsEmpty()) {
          cf->SetLogNumber(logfile_number_);
          cf->mem()->SetCreationSeq(versions_->LastSequence());
        }  // cf may become non-empty.
      }
      empty_cf_updated = true;
    }
  }
  if (!empty_cf_updated) {
    for (auto cf : *versions_->GetColumnFamilySet()) {
      // all this is just optimization to delete logs that
      // are no longer needed -- if CF is empty, that means it
      // doesn't need that particular log to stay alive, so we just
      // advance the log number. no need to persist this in the manifest
      if (cf->IsEmpty()) {
        if (creating_new_log) {
          cf->SetLogNumber(logfile_number_);
        }
        cf->mem()->SetCreationSeq(versions_->LastSequence());
S
Siying Dong 已提交
1886 1887 1888 1889 1890 1891 1892 1893
      }
    }
  }

  cfd->mem()->SetNextLogNumber(logfile_number_);
  cfd->imm()->Add(cfd->mem(), &context->memtables_to_free_);
  new_mem->Ref();
  cfd->SetMemtable(new_mem);
1894
  InstallSuperVersionAndScheduleWork(cfd, &context->superversion_context,
1895
                                     mutable_cf_options);
1896 1897 1898 1899 1900 1901 1902
#ifndef ROCKSDB_LITE
  mutex_.Unlock();
  // Notify client that memtable is sealed, now that we have successfully
  // installed a new memtable
  NotifyOnMemTableSealed(cfd, memtable_info);
  mutex_.Lock();
#endif  // ROCKSDB_LITE
1903 1904 1905 1906
  // It is possible that we got here without checking the value of i_os, but
  // that is okay.  If we did, it most likely means that s was already an error.
  // In any case, ignore any unchecked error for i_os here.
  io_s.PermitUncheckedError();
S
Siying Dong 已提交
1907 1908 1909 1910 1911
  return s;
}

size_t DBImpl::GetWalPreallocateBlockSize(uint64_t write_buffer_size) const {
  mutex_.AssertHeld();
1912 1913
  size_t bsize =
      static_cast<size_t>(write_buffer_size / 10 + write_buffer_size);
S
Siying Dong 已提交
1914 1915 1916
  // Some users might set very high write_buffer_size and rely on
  // max_total_wal_size or other parameters to control the WAL size.
  if (mutable_db_options_.max_total_wal_size > 0) {
1917 1918
    bsize = std::min<size_t>(
        bsize, static_cast<size_t>(mutable_db_options_.max_total_wal_size));
S
Siying Dong 已提交
1919 1920 1921 1922 1923 1924 1925 1926 1927 1928 1929 1930 1931 1932 1933 1934 1935
  }
  if (immutable_db_options_.db_write_buffer_size > 0) {
    bsize = std::min<size_t>(bsize, immutable_db_options_.db_write_buffer_size);
  }
  if (immutable_db_options_.write_buffer_manager &&
      immutable_db_options_.write_buffer_manager->enabled()) {
    bsize = std::min<size_t>(
        bsize, immutable_db_options_.write_buffer_manager->buffer_size());
  }

  return bsize;
}

// Default implementations of convenience methods that subclasses of DB
// can call if they wish
Status DB::Put(const WriteOptions& opt, ColumnFamilyHandle* column_family,
               const Slice& key, const Slice& value) {
1936 1937 1938 1939 1940 1941 1942 1943 1944 1945 1946
  if (nullptr == opt.timestamp) {
    // Pre-allocate size of write batch conservatively.
    // 8 bytes are taken by header, 4 bytes for count, 1 byte for type,
    // and we allocate 11 extra bytes for key length, as well as value length.
    WriteBatch batch(key.size() + value.size() + 24);
    Status s = batch.Put(column_family, key, value);
    if (!s.ok()) {
      return s;
    }
    return Write(opt, &batch);
  }
1947 1948 1949
  const Slice* ts = opt.timestamp;
  assert(nullptr != ts);
  size_t ts_sz = ts->size();
1950 1951
  assert(column_family->GetComparator());
  assert(ts_sz == column_family->GetComparator()->timestamp_size());
1952 1953 1954
  WriteBatch batch(key.size() + ts_sz + value.size() + 24, /*max_bytes=*/0,
                   ts_sz);
  Status s = batch.Put(column_family, key, value);
1955 1956 1957
  if (!s.ok()) {
    return s;
  }
1958
  s = batch.AssignTimestamp(*ts);
1959 1960 1961
  if (!s.ok()) {
    return s;
  }
S
Siying Dong 已提交
1962 1963 1964 1965 1966
  return Write(opt, &batch);
}

Status DB::Delete(const WriteOptions& opt, ColumnFamilyHandle* column_family,
                  const Slice& key) {
Y
Yanqin Jin 已提交
1967 1968 1969 1970 1971 1972 1973 1974 1975 1976 1977 1978 1979 1980 1981 1982 1983 1984 1985 1986 1987 1988 1989 1990
  if (nullptr == opt.timestamp) {
    WriteBatch batch;
    Status s = batch.Delete(column_family, key);
    if (!s.ok()) {
      return s;
    }
    return Write(opt, &batch);
  }
  const Slice* ts = opt.timestamp;
  assert(ts != nullptr);
  const size_t ts_sz = ts->size();
  constexpr size_t kKeyAndValueLenSize = 11;
  constexpr size_t kWriteBatchOverhead =
      WriteBatchInternal::kHeader + sizeof(ValueType) + kKeyAndValueLenSize;
  WriteBatch batch(key.size() + ts_sz + kWriteBatchOverhead, /*max_bytes=*/0,
                   ts_sz);
  Status s = batch.Delete(column_family, key);
  if (!s.ok()) {
    return s;
  }
  s = batch.AssignTimestamp(*ts);
  if (!s.ok()) {
    return s;
  }
S
Siying Dong 已提交
1991 1992 1993 1994 1995 1996
  return Write(opt, &batch);
}

Status DB::SingleDelete(const WriteOptions& opt,
                        ColumnFamilyHandle* column_family, const Slice& key) {
  WriteBatch batch;
1997 1998 1999 2000
  Status s = batch.SingleDelete(column_family, key);
  if (!s.ok()) {
    return s;
  }
S
Siying Dong 已提交
2001 2002 2003 2004 2005 2006 2007
  return Write(opt, &batch);
}

Status DB::DeleteRange(const WriteOptions& opt,
                       ColumnFamilyHandle* column_family,
                       const Slice& begin_key, const Slice& end_key) {
  WriteBatch batch;
2008 2009 2010 2011
  Status s = batch.DeleteRange(column_family, begin_key, end_key);
  if (!s.ok()) {
    return s;
  }
S
Siying Dong 已提交
2012 2013 2014 2015 2016 2017
  return Write(opt, &batch);
}

Status DB::Merge(const WriteOptions& opt, ColumnFamilyHandle* column_family,
                 const Slice& key, const Slice& value) {
  WriteBatch batch;
2018 2019 2020 2021
  Status s = batch.Merge(column_family, key, value);
  if (!s.ok()) {
    return s;
  }
S
Siying Dong 已提交
2022 2023
  return Write(opt, &batch);
}
2024
}  // namespace ROCKSDB_NAMESPACE