提交 14c57164 编写于 作者: Z ZhaoMing 提交者: guokuankuan

Fix FlushLargest write stop

上级 8bae0769
......@@ -455,7 +455,7 @@ ColumnFamilyData::ColumnFamilyData(
log_number_(0),
flush_reason_(FlushReason::kOthers),
column_family_set_(column_family_set),
queued_for_flush_(false),
queued_for_flush_(0),
queued_for_compaction_(false),
queued_for_garbage_collection_(false),
prev_compaction_needed_bytes_(0),
......@@ -533,7 +533,7 @@ ColumnFamilyData::~ColumnFamilyData() {
// It would be wrong if this ColumnFamilyData is in flush_queue_ or
// compaction_queue_ and we destroyed it
assert(!queued_for_flush_);
assert(queued_for_flush_ == 0);
assert(!queued_for_compaction_);
assert(!queued_for_garbage_collection_);
......
......@@ -360,12 +360,16 @@ class ColumnFamilyData {
void ResetThreadLocalSuperVersions();
// Protected by DB mutex
void set_queued_for_flush(bool value) { queued_for_flush_ = value; }
void inc_queued_for_flush() { ++queued_for_flush_; }
void dec_queued_for_flush() {
--queued_for_flush_;
assert(queued_for_flush_ >= 0);
}
void set_queued_for_compaction(bool value) { queued_for_compaction_ = value; }
void set_queued_for_garbage_collection(bool value) {
queued_for_garbage_collection_ = value;
}
bool queued_for_flush() { return queued_for_flush_; }
bool queued_for_flush() { return queued_for_flush_ > 0; }
bool queued_for_compaction() { return queued_for_compaction_; }
bool queued_for_garbage_collection() {
return queued_for_garbage_collection_;
......@@ -477,7 +481,7 @@ class ColumnFamilyData {
std::unique_ptr<WriteControllerToken> write_controller_token_;
// If true --> this ColumnFamily is currently present in DBImpl::flush_queue_
bool queued_for_flush_;
int queued_for_flush_;
// If true --> this ColumnFamily is currently present in
// DBImpl::compaction_queue_
......
......@@ -583,6 +583,7 @@ Status DBImpl::CloseHelper() {
const FlushRequest& flush_req = PopFirstFromFlushQueue();
for (const auto& iter : flush_req) {
ColumnFamilyData* cfd = iter.first;
cfd->dec_queued_for_flush();
if (cfd->Unref()) {
delete cfd;
}
......
......@@ -2009,6 +2009,7 @@ void DBImpl::SchedulePendingFlush(const FlushRequest& flush_req,
ColumnFamilyData* cfd = iter.first;
cfd->Ref();
cfd->SetFlushReason(flush_reason);
cfd->inc_queued_for_flush();
}
unscheduled_flushes_ += static_cast<int>(flush_req.size());
flush_queue_.push_back(flush_req);
......@@ -2132,6 +2133,7 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
if (cfd->IsDropped() || !cfd->imm()->IsFlushPending()) {
// can't flush this CF, try next one
if (cfd->Unref()) {
cfd->dec_queued_for_flush();
delete cfd;
}
continue;
......@@ -2166,6 +2168,7 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
*reason = bg_flush_args[0].cfd_->GetFlushReason();
for (auto& arg : bg_flush_args) {
ColumnFamilyData* cfd = arg.cfd_;
cfd->dec_queued_for_flush();
if (cfd->Unref()) {
delete cfd;
arg.cfd_ = nullptr;
......
......@@ -1180,14 +1180,14 @@ Status DBImpl::HandleWriteBufferFull(WriteContext* write_context) {
if (!cfd->mem()->IsEmpty()) {
// We only consider active mem table, hoping immutable memtable is
// already in the process of flushing.
if (immutable_db_options_.write_buffer_flush_pri == kFlushOldest) {
if (flush_pri == kFlushOldest) {
uint64_t seq = cfd->mem()->GetCreationSeq();
if (cfd_picked == nullptr || seq < seq_num_for_cf_picked) {
cfd_picked = cfd;
seq_num_for_cf_picked = seq;
}
} else {
assert(immutable_db_options_.write_buffer_flush_pri == kFlushLargest);
} else if (!cfd->queued_for_flush()) {
assert(flush_pri == kFlushLargest);
size_t cfd_size = cfd->mem()->ApproximateMemoryUsage();
if (cfd_picked == nullptr || cfd_size > largest_cfd_size) {
cfd_picked = cfd;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册