未验证 提交 632a37b5 编写于 作者: K Kaushik Iska 提交者: GitHub

Make message loop task entry containers thread safe (#11367)

The core underlying issue is that vector push_back could re-allocate and cause us to segfault. I have switched the backing queues to a map per @jason-simmons suggestion in flutter/flutter#38778.

I've also added a test to capture the aforementioned bug. I've run internal tests several times to validate that this is fixed.

General threading note for this class is that only the following operations take a write lock on the meta mutex:

1. Create
2. Dispose

The rest of the operations take read lock on the meta mutex and acquire finer grained locks for the duration of the operation. We can not grab read lock for the entire duration of NotifyObservers for example because observer can in-turn create other queues -- Which we should not block.

Additional changes:

1. Make as many methods as possible const. Unlocked methods are all const.
2. Migrate all the queue members to a struct, and have a map.
3. Get rid of the un-used Swap functionality.
上级 975a8aa5
......@@ -127,7 +127,6 @@ FILE: ../../../flutter/fml/memory/weak_ptr.h
FILE: ../../../flutter/fml/memory/weak_ptr_internal.cc
FILE: ../../../flutter/fml/memory/weak_ptr_internal.h
FILE: ../../../flutter/fml/memory/weak_ptr_unittest.cc
FILE: ../../../flutter/fml/merged_queues_runner.cc
FILE: ../../../flutter/fml/message.cc
FILE: ../../../flutter/fml/message.h
FILE: ../../../flutter/fml/message_loop.cc
......
......@@ -42,7 +42,6 @@ source_set("fml") {
"memory/weak_ptr.h",
"memory/weak_ptr_internal.cc",
"memory/weak_ptr_internal.h",
"merged_queues_runner.cc",
"message.cc",
"message.h",
"message_loop.cc",
......
// Copyright 2013 The Flutter Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#define FML_USED_ON_EMBEDDER
#include "flutter/fml/message_loop_task_queues.h"
namespace fml {
// RAII class for managing merged locks.
class MessageLoopTaskQueues::MergedQueuesRunner {
public:
// TODO (kaushikiska): refactor mutexes out side of MessageLoopTaskQueues
// for better DI.
MergedQueuesRunner(MessageLoopTaskQueues& task_queues,
TaskQueueId owner,
MutexType type = MutexType::kTasks)
: owner_(owner),
subsumed_(MessageLoopTaskQueues::_kUnmerged),
task_queues_(task_queues),
type_(type) {
task_queues_.GetMutex(owner, type).lock();
subsumed_ = task_queues_.owner_to_subsumed_[owner];
if (isMerged(subsumed_)) {
task_queues_.GetMutex(subsumed_, type).lock();
}
}
// First invokes on owner and then subsumed (if present).
void InvokeMerged(std::function<void(const TaskQueueId)> closure) {
closure(owner_);
if (isMerged(subsumed_)) {
closure(subsumed_);
}
}
~MergedQueuesRunner() {
if (isMerged(subsumed_)) {
task_queues_.GetMutex(subsumed_, type_).unlock();
}
task_queues_.GetMutex(owner_, type_).unlock();
}
private:
bool isMerged(TaskQueueId queue_id) {
return queue_id != MessageLoopTaskQueues::_kUnmerged;
}
const TaskQueueId owner_;
TaskQueueId subsumed_;
MessageLoopTaskQueues& task_queues_;
const MutexType type_;
FML_DISALLOW_COPY_ASSIGN_AND_MOVE(MergedQueuesRunner);
};
} // namespace fml
......@@ -73,12 +73,6 @@ void MessageLoop::RunExpiredTasksNow() {
loop_->RunExpiredTasksNow();
}
void MessageLoop::SwapTaskQueues(MessageLoop* other) {
FML_CHECK(loop_);
FML_CHECK(other->loop_);
loop_->SwapTaskQueues(other->loop_);
}
TaskQueueId MessageLoop::GetCurrentTaskQueueId() {
auto* loop = tls_message_loop.get();
FML_CHECK(loop != nullptr)
......
......@@ -34,8 +34,6 @@ class MessageLoop {
// instead of dedicating a thread to the message loop.
void RunExpiredTasksNow();
void SwapTaskQueues(MessageLoop* other);
static void EnsureInitializedForCurrentThread();
static bool IsInitializedForCurrentThread();
......
......@@ -46,7 +46,9 @@ MessageLoopImpl::MessageLoopImpl()
task_queue_->SetWakeable(queue_id_, this);
}
MessageLoopImpl::~MessageLoopImpl() = default;
MessageLoopImpl::~MessageLoopImpl() {
task_queue_->Dispose(queue_id_);
}
void MessageLoopImpl::PostTask(fml::closure task, fml::TimePoint target_time) {
FML_DCHECK(task != nullptr);
......@@ -101,7 +103,7 @@ void MessageLoopImpl::DoRun() {
// should be destructed on the message loop's thread. We have just returned
// from the implementations |Run| method which we know is on the correct
// thread. Drop all pending tasks on the floor.
task_queue_->Dispose(queue_id_);
task_queue_->DisposeTasks(queue_id_);
}
void MessageLoopImpl::DoTerminate() {
......@@ -109,38 +111,19 @@ void MessageLoopImpl::DoTerminate() {
Terminate();
}
// Thread safety analysis disabled as it does not account for defered locks.
void MessageLoopImpl::SwapTaskQueues(const fml::RefPtr<MessageLoopImpl>& other)
FML_NO_THREAD_SAFETY_ANALYSIS {
if (terminated_ || other->terminated_) {
return;
}
// task_flushing locks
std::unique_lock<std::mutex> t1(tasks_flushing_mutex_, std::defer_lock);
std::unique_lock<std::mutex> t2(other->tasks_flushing_mutex_,
std::defer_lock);
std::lock(t1, t2);
task_queue_->Swap(queue_id_, other->queue_id_);
}
void MessageLoopImpl::FlushTasks(FlushType type) {
TRACE_EVENT0("fml", "MessageLoop::FlushTasks");
std::vector<fml::closure> invocations;
// We are grabbing this lock here as a proxy to indicate
// that we are running tasks and will invoke the
// "right" observers, we are trying to avoid the scenario
// where:
// gather invocations -> Swap -> execute invocations
// will lead us to run invocations on the wrong thread.
std::scoped_lock task_flush_lock(tasks_flushing_mutex_);
task_queue_->GetTasksToRunNow(queue_id_, type, invocations);
for (const auto& invocation : invocations) {
invocation();
task_queue_->NotifyObservers(queue_id_);
std::vector<fml::closure> observers =
task_queue_->GetObserversToNotify(queue_id_);
for (const auto& observer : observers) {
observer();
}
}
}
......
......@@ -47,8 +47,6 @@ class MessageLoopImpl : public Wakeable,
virtual TaskQueueId GetTaskQueueId() const;
void SwapTaskQueues(const fml::RefPtr<MessageLoopImpl>& other);
protected:
// Exposed for the embedder shell which allows clients to poll for events
// instead of dedicating a thread to the message loop.
......@@ -65,8 +63,6 @@ class MessageLoopImpl : public Wakeable,
fml::RefPtr<MessageLoopTaskQueues> task_queue_;
TaskQueueId queue_id_;
std::mutex tasks_flushing_mutex_;
std::atomic_bool terminated_;
void FlushTasks(FlushType type);
......
......@@ -5,17 +5,24 @@
#define FML_USED_ON_EMBEDDER
#include "flutter/fml/message_loop_task_queues.h"
#include "flutter/fml/merged_queues_runner.cc"
#include "flutter/fml/make_copyable.h"
#include "flutter/fml/message_loop_impl.h"
namespace fml {
std::mutex MessageLoopTaskQueues::creation_mutex_;
const size_t TaskQueueId::kUnmerged = ULONG_MAX;
const TaskQueueId MessageLoopTaskQueues::_kUnmerged =
TaskQueueId(TaskQueueId::kUnmerged);
fml::RefPtr<MessageLoopTaskQueues> MessageLoopTaskQueues::instance_;
TaskQueueEntry::TaskQueueEntry()
: owner_of(_kUnmerged), subsumed_by(_kUnmerged) {
wakeable = NULL;
task_observers = TaskObservers();
delayed_tasks = DelayedTaskQueue();
}
fml::RefPtr<MessageLoopTaskQueues> MessageLoopTaskQueues::GetInstance() {
std::scoped_lock creation(creation_mutex_);
if (!instance_) {
......@@ -25,50 +32,67 @@ fml::RefPtr<MessageLoopTaskQueues> MessageLoopTaskQueues::GetInstance() {
}
TaskQueueId MessageLoopTaskQueues::CreateTaskQueue() {
std::scoped_lock creation(queue_meta_mutex_);
fml::UniqueLock lock(*queue_meta_mutex_);
TaskQueueId loop_id = TaskQueueId(task_queue_id_counter_);
++task_queue_id_counter_;
observers_mutexes_.push_back(std::make_unique<std::mutex>());
delayed_tasks_mutexes_.push_back(std::make_unique<std::mutex>());
wakeable_mutexes_.push_back(std::make_unique<std::mutex>());
task_observers_.push_back(TaskObservers());
delayed_tasks_.push_back(DelayedTaskQueue());
wakeables_.push_back(NULL);
owner_to_subsumed_.push_back(_kUnmerged);
subsumed_to_owner_.push_back(_kUnmerged);
queue_entries_[loop_id] = std::make_unique<TaskQueueEntry>();
queue_locks_[loop_id] = std::make_unique<std::mutex>();
return loop_id;
}
MessageLoopTaskQueues::MessageLoopTaskQueues()
: task_queue_id_counter_(0), order_(0) {}
: queue_meta_mutex_(fml::SharedMutex::Create()),
task_queue_id_counter_(0),
order_(0) {}
MessageLoopTaskQueues::~MessageLoopTaskQueues() = default;
void MessageLoopTaskQueues::Dispose(TaskQueueId queue_id) {
MergedQueuesRunner merged_tasks = MergedQueuesRunner(*this, queue_id);
merged_tasks.InvokeMerged(
[&](TaskQueueId queue_id) { delayed_tasks_[queue_id] = {}; });
std::scoped_lock queue_lock(GetMutex(queue_id));
const auto& queue_entry = queue_entries_.at(queue_id);
FML_DCHECK(queue_entry->subsumed_by == _kUnmerged);
TaskQueueId subsumed = queue_entry->owner_of;
queue_entries_.erase(queue_id);
if (subsumed != _kUnmerged) {
std::scoped_lock subsumed_lock(*queue_locks_.at(subsumed));
queue_entries_.erase(subsumed);
}
}
void MessageLoopTaskQueues::DisposeTasks(TaskQueueId queue_id) {
std::scoped_lock queue_lock(GetMutex(queue_id));
const auto& queue_entry = queue_entries_.at(queue_id);
FML_DCHECK(queue_entry->subsumed_by == _kUnmerged);
TaskQueueId subsumed = queue_entry->owner_of;
queue_entry->delayed_tasks = {};
if (subsumed != _kUnmerged) {
std::scoped_lock subsumed_lock(*queue_locks_.at(subsumed));
queue_entries_.at(subsumed)->delayed_tasks = {};
}
}
void MessageLoopTaskQueues::RegisterTask(TaskQueueId queue_id,
fml::closure task,
fml::TimePoint target_time) {
std::scoped_lock lock(GetMutex(queue_id, MutexType::kTasks));
std::scoped_lock queue_lock(GetMutex(queue_id));
size_t order = order_++;
delayed_tasks_[queue_id].push({order, std::move(task), target_time});
const auto& queue_entry = queue_entries_[queue_id];
queue_entry->delayed_tasks.push({order, std::move(task), target_time});
TaskQueueId loop_to_wake = queue_id;
if (subsumed_to_owner_[queue_id] != _kUnmerged) {
loop_to_wake = subsumed_to_owner_[queue_id];
if (queue_entry->subsumed_by != _kUnmerged) {
loop_to_wake = queue_entry->subsumed_by;
}
WakeUp(loop_to_wake, delayed_tasks_[queue_id].top().GetTargetTime());
WakeUpUnlocked(loop_to_wake,
queue_entry->delayed_tasks.top().GetTargetTime());
}
bool MessageLoopTaskQueues::HasPendingTasks(TaskQueueId queue_id) {
MergedQueuesRunner merged_tasks = MergedQueuesRunner(*this, queue_id);
bool MessageLoopTaskQueues::HasPendingTasks(TaskQueueId queue_id) const {
std::scoped_lock queue_lock(GetMutex(queue_id));
return HasPendingTasksUnlocked(queue_id);
}
......@@ -76,7 +100,7 @@ void MessageLoopTaskQueues::GetTasksToRunNow(
TaskQueueId queue_id,
FlushType type,
std::vector<fml::closure>& invocations) {
MergedQueuesRunner merged_tasks = MergedQueuesRunner(*this, queue_id);
std::scoped_lock queue_lock(GetMutex(queue_id));
if (!HasPendingTasksUnlocked(queue_id)) {
return;
......@@ -91,108 +115,115 @@ void MessageLoopTaskQueues::GetTasksToRunNow(
break;
}
invocations.emplace_back(std::move(top.GetTask()));
delayed_tasks_[top_queue].pop();
queue_entries_[top_queue]->delayed_tasks.pop();
if (type == FlushType::kSingle) {
break;
}
}
if (!HasPendingTasksUnlocked(queue_id)) {
WakeUp(queue_id, fml::TimePoint::Max());
WakeUpUnlocked(queue_id, fml::TimePoint::Max());
} else {
WakeUp(queue_id, GetNextWakeTimeUnlocked(queue_id));
WakeUpUnlocked(queue_id, GetNextWakeTimeUnlocked(queue_id));
}
}
void MessageLoopTaskQueues::WakeUp(TaskQueueId queue_id, fml::TimePoint time) {
std::scoped_lock lock(GetMutex(queue_id, MutexType::kWakeables));
if (wakeables_[queue_id]) {
wakeables_[queue_id]->WakeUp(time);
void MessageLoopTaskQueues::WakeUpUnlocked(TaskQueueId queue_id,
fml::TimePoint time) const {
if (queue_entries_.at(queue_id)->wakeable) {
queue_entries_.at(queue_id)->wakeable->WakeUp(time);
}
}
size_t MessageLoopTaskQueues::GetNumPendingTasks(TaskQueueId queue_id) {
MergedQueuesRunner merged_tasks = MergedQueuesRunner(*this, queue_id);
if (subsumed_to_owner_[queue_id] != _kUnmerged) {
size_t MessageLoopTaskQueues::GetNumPendingTasks(TaskQueueId queue_id) const {
std::scoped_lock queue_lock(GetMutex(queue_id));
const auto& queue_entry = queue_entries_.at(queue_id);
if (queue_entry->subsumed_by != _kUnmerged) {
return 0;
}
size_t total_tasks = 0;
merged_tasks.InvokeMerged(
[&](TaskQueueId queue) { total_tasks += delayed_tasks_[queue].size(); });
total_tasks += queue_entry->delayed_tasks.size();
TaskQueueId subsumed = queue_entry->owner_of;
if (subsumed != _kUnmerged) {
std::scoped_lock subsumed_lock(*queue_locks_.at(subsumed));
const auto& subsumed_entry = queue_entries_.at(subsumed);
total_tasks += subsumed_entry->delayed_tasks.size();
}
return total_tasks;
}
void MessageLoopTaskQueues::AddTaskObserver(TaskQueueId queue_id,
intptr_t key,
fml::closure callback) {
std::scoped_lock queue_lock(GetMutex(queue_id));
FML_DCHECK(callback != nullptr) << "Observer callback must be non-null.";
std::scoped_lock lock(GetMutex(queue_id, MutexType::kObservers));
task_observers_[queue_id][key] = std::move(callback);
queue_entries_[queue_id]->task_observers[key] = std::move(callback);
}
void MessageLoopTaskQueues::RemoveTaskObserver(TaskQueueId queue_id,
intptr_t key) {
std::scoped_lock lock(GetMutex(queue_id, MutexType::kObservers));
task_observers_[queue_id].erase(key);
}
std::scoped_lock queue_lock(GetMutex(queue_id));
void MessageLoopTaskQueues::NotifyObservers(TaskQueueId queue_id) {
MergedQueuesRunner merged_observers =
MergedQueuesRunner(*this, queue_id, MutexType::kObservers);
merged_observers.InvokeMerged([&](TaskQueueId queue) {
for (const auto& observer : task_observers_[queue]) {
observer.second();
}
});
queue_entries_[queue_id]->task_observers.erase(key);
}
// Thread safety analysis disabled as it does not account for defered locks.
void MessageLoopTaskQueues::Swap(TaskQueueId primary, TaskQueueId secondary)
FML_NO_THREAD_SAFETY_ANALYSIS {
// task_observers locks
std::mutex& o1 = GetMutex(primary, MutexType::kObservers);
std::mutex& o2 = GetMutex(secondary, MutexType::kObservers);
std::vector<fml::closure> MessageLoopTaskQueues::GetObserversToNotify(
TaskQueueId queue_id) const {
std::scoped_lock queue_lock(GetMutex(queue_id));
std::vector<fml::closure> observers;
// delayed_tasks locks
std::mutex& t1 = GetMutex(primary, MutexType::kTasks);
std::mutex& t2 = GetMutex(secondary, MutexType::kTasks);
if (queue_entries_.at(queue_id)->subsumed_by != _kUnmerged) {
return observers;
}
for (const auto& observer : queue_entries_.at(queue_id)->task_observers) {
observers.push_back(observer.second);
}
std::scoped_lock lock(o1, o2, t1, t2);
TaskQueueId subsumed = queue_entries_.at(queue_id)->owner_of;
if (subsumed != _kUnmerged) {
std::scoped_lock subsumed_lock(*queue_locks_.at(subsumed));
for (const auto& observer : queue_entries_.at(subsumed)->task_observers) {
observers.push_back(observer.second);
}
}
std::swap(task_observers_[primary], task_observers_[secondary]);
std::swap(delayed_tasks_[primary], delayed_tasks_[secondary]);
return observers;
}
void MessageLoopTaskQueues::SetWakeable(TaskQueueId queue_id,
fml::Wakeable* wakeable) {
std::scoped_lock lock(GetMutex(queue_id, MutexType::kWakeables));
FML_CHECK(!wakeables_[queue_id]) << "Wakeable can only be set once.";
wakeables_[queue_id] = wakeable;
std::scoped_lock queue_lock(GetMutex(queue_id));
FML_CHECK(!queue_entries_[queue_id]->wakeable)
<< "Wakeable can only be set once.";
queue_entries_.at(queue_id)->wakeable = wakeable;
}
bool MessageLoopTaskQueues::Merge(TaskQueueId owner, TaskQueueId subsumed) {
// task_observers locks
std::mutex& o1 = GetMutex(owner, MutexType::kObservers);
std::mutex& o2 = GetMutex(subsumed, MutexType::kObservers);
// delayed_tasks locks
std::mutex& t1 = GetMutex(owner, MutexType::kTasks);
std::mutex& t2 = GetMutex(subsumed, MutexType::kTasks);
std::scoped_lock lock(o1, o2, t1, t2);
if (owner == subsumed) {
return true;
}
if (owner_to_subsumed_[owner] == subsumed) {
std::mutex& owner_mutex = GetMutex(owner);
std::mutex& subsumed_mutex = GetMutex(subsumed);
std::scoped_lock lock(owner_mutex, subsumed_mutex);
auto& owner_entry = queue_entries_.at(owner);
auto& subsumed_entry = queue_entries_.at(subsumed);
if (owner_entry->owner_of == subsumed) {
return true;
}
std::vector<TaskQueueId> owner_subsumed_keys = {
owner_to_subsumed_[owner], owner_to_subsumed_[subsumed],
subsumed_to_owner_[owner], subsumed_to_owner_[subsumed]};
owner_entry->owner_of, owner_entry->subsumed_by, subsumed_entry->owner_of,
subsumed_entry->subsumed_by};
for (auto key : owner_subsumed_keys) {
if (key != _kUnmerged) {
......@@ -200,87 +231,101 @@ bool MessageLoopTaskQueues::Merge(TaskQueueId owner, TaskQueueId subsumed) {
}
}
owner_to_subsumed_[owner] = subsumed;
subsumed_to_owner_[subsumed] = owner;
owner_entry->owner_of = subsumed;
subsumed_entry->subsumed_by = owner;
if (HasPendingTasksUnlocked(owner)) {
WakeUp(owner, GetNextWakeTimeUnlocked(owner));
WakeUpUnlocked(owner, GetNextWakeTimeUnlocked(owner));
}
return true;
}
bool MessageLoopTaskQueues::Unmerge(TaskQueueId owner) {
MergedQueuesRunner merged_observers =
MergedQueuesRunner(*this, owner, MutexType::kObservers);
MergedQueuesRunner merged_tasks =
MergedQueuesRunner(*this, owner, MutexType::kTasks);
std::scoped_lock owner_lock(GetMutex(owner));
const TaskQueueId subsumed = owner_to_subsumed_[owner];
auto& owner_entry = queue_entries_[owner];
const TaskQueueId subsumed = owner_entry->owner_of;
if (subsumed == _kUnmerged) {
return false;
}
subsumed_to_owner_[subsumed] = _kUnmerged;
owner_to_subsumed_[owner] = _kUnmerged;
queue_entries_[subsumed]->subsumed_by = _kUnmerged;
owner_entry->owner_of = _kUnmerged;
if (HasPendingTasksUnlocked(owner)) {
WakeUp(owner, GetNextWakeTimeUnlocked(owner));
WakeUpUnlocked(owner, GetNextWakeTimeUnlocked(owner));
}
if (HasPendingTasksUnlocked(subsumed)) {
WakeUp(subsumed, GetNextWakeTimeUnlocked(subsumed));
WakeUpUnlocked(subsumed, GetNextWakeTimeUnlocked(subsumed));
}
return true;
}
bool MessageLoopTaskQueues::Owns(TaskQueueId owner, TaskQueueId subsumed) {
MergedQueuesRunner merged_observers = MergedQueuesRunner(*this, owner);
return subsumed == owner_to_subsumed_[owner] || owner == subsumed;
bool MessageLoopTaskQueues::Owns(TaskQueueId owner,
TaskQueueId subsumed) const {
std::scoped_lock owner_lock(GetMutex(owner));
return subsumed == queue_entries_.at(owner)->owner_of || owner == subsumed;
}
std::mutex& MessageLoopTaskQueues::GetMutex(TaskQueueId queue_id) const {
fml::SharedLock queue_reader(*queue_meta_mutex_);
FML_DCHECK(queue_locks_.count(queue_id) && queue_entries_.count(queue_id))
<< "Trying to acquire a lock on an invalid queue_id: " << queue_id;
return *queue_locks_.at(queue_id);
}
// Subsumed queues will never have pending tasks.
// Owning queues will consider both their and their subsumed tasks.
bool MessageLoopTaskQueues::HasPendingTasksUnlocked(TaskQueueId queue_id) {
if (subsumed_to_owner_[queue_id] != _kUnmerged) {
bool MessageLoopTaskQueues::HasPendingTasksUnlocked(
TaskQueueId queue_id) const {
const auto& entry = queue_entries_.at(queue_id);
bool is_subsumed = entry->subsumed_by != _kUnmerged;
if (is_subsumed) {
return false;
}
if (!delayed_tasks_[queue_id].empty()) {
if (!entry->delayed_tasks.empty()) {
return true;
}
const TaskQueueId subsumed = owner_to_subsumed_[queue_id];
const TaskQueueId subsumed = entry->owner_of;
if (subsumed == _kUnmerged) {
// this is not an owner and queue is empty.
return false;
} else {
return !delayed_tasks_[subsumed].empty();
return !queue_entries_.at(subsumed)->delayed_tasks.empty();
}
}
fml::TimePoint MessageLoopTaskQueues::GetNextWakeTimeUnlocked(
TaskQueueId queue_id) {
TaskQueueId queue_id) const {
TaskQueueId tmp = _kUnmerged;
return PeekNextTaskUnlocked(queue_id, tmp).GetTargetTime();
}
const DelayedTask& MessageLoopTaskQueues::PeekNextTaskUnlocked(
TaskQueueId owner,
TaskQueueId& top_queue_id) {
TaskQueueId& top_queue_id) const {
FML_DCHECK(HasPendingTasksUnlocked(owner));
const TaskQueueId subsumed = owner_to_subsumed_[owner];
const auto& entry = queue_entries_.at(owner);
const TaskQueueId subsumed = entry->owner_of;
if (subsumed == _kUnmerged) {
top_queue_id = owner;
return delayed_tasks_[owner].top();
return entry->delayed_tasks.top();
}
const auto& owner_tasks = entry->delayed_tasks;
const auto& subsumed_tasks = queue_entries_.at(subsumed)->delayed_tasks;
// we are owning another task queue
const bool subsumed_has_task = !delayed_tasks_[subsumed].empty();
const bool owner_has_task = !delayed_tasks_[owner].empty();
const bool subsumed_has_task = !subsumed_tasks.empty();
const bool owner_has_task = !owner_tasks.empty();
if (owner_has_task && subsumed_has_task) {
const auto owner_task = delayed_tasks_[owner].top();
const auto subsumed_task = delayed_tasks_[subsumed].top();
const auto owner_task = owner_tasks.top();
const auto subsumed_task = subsumed_tasks.top();
if (owner_task > subsumed_task) {
top_queue_id = subsumed;
} else {
......@@ -291,19 +336,7 @@ const DelayedTask& MessageLoopTaskQueues::PeekNextTaskUnlocked(
} else {
top_queue_id = subsumed;
}
return delayed_tasks_[top_queue_id].top();
}
std::mutex& MessageLoopTaskQueues::GetMutex(TaskQueueId queue_id,
MutexType type) {
std::scoped_lock lock(queue_meta_mutex_);
if (type == MutexType::kTasks) {
return *delayed_tasks_mutexes_[queue_id];
} else if (type == MutexType::kObservers) {
return *observers_mutexes_[queue_id];
} else {
return *wakeable_mutexes_[queue_id];
}
return queue_entries_.at(top_queue_id)->delayed_tasks.top();
}
} // namespace fml
......@@ -13,6 +13,7 @@
#include "flutter/fml/delayed_task.h"
#include "flutter/fml/macros.h"
#include "flutter/fml/memory/ref_counted.h"
#include "flutter/fml/synchronization/shared_mutex.h"
#include "flutter/fml/synchronization/thread_annotations.h"
#include "flutter/fml/wakeable.h"
......@@ -30,6 +31,30 @@ class TaskQueueId {
size_t value_ = kUnmerged;
};
static const TaskQueueId _kUnmerged = TaskQueueId(TaskQueueId::kUnmerged);
// This is keyed by the |TaskQueueId| and contains all the queue
// components that make up a single TaskQueue.
class TaskQueueEntry {
public:
using TaskObservers = std::map<intptr_t, fml::closure>;
Wakeable* wakeable;
TaskObservers task_observers;
DelayedTaskQueue delayed_tasks;
// Note: Both of these can be _kUnmerged, which indicates that
// this queue has not been merged or subsumed. OR exactly one
// of these will be _kUnmerged, if owner_of is _kUnmerged, it means
// that the queue has been subsumed or else it owns another queue.
TaskQueueId owner_of;
TaskQueueId subsumed_by;
TaskQueueEntry();
private:
FML_DISALLOW_COPY_ASSIGN_AND_MOVE(TaskQueueEntry);
};
enum class FlushType {
kSingle,
kAll,
......@@ -49,19 +74,21 @@ class MessageLoopTaskQueues
void Dispose(TaskQueueId queue_id);
void DisposeTasks(TaskQueueId queue_id);
// Tasks methods.
void RegisterTask(TaskQueueId queue_id,
fml::closure task,
fml::TimePoint target_time);
bool HasPendingTasks(TaskQueueId queue_id);
bool HasPendingTasks(TaskQueueId queue_id) const;
void GetTasksToRunNow(TaskQueueId queue_id,
FlushType type,
std::vector<fml::closure>& invocations);
size_t GetNumPendingTasks(TaskQueueId queue_id);
size_t GetNumPendingTasks(TaskQueueId queue_id) const;
// Observers methods.
......@@ -71,12 +98,10 @@ class MessageLoopTaskQueues
void RemoveTaskObserver(TaskQueueId queue_id, intptr_t key);
void NotifyObservers(TaskQueueId queue_id);
std::vector<fml::closure> GetObserversToNotify(TaskQueueId queue_id) const;
// Misc.
void Swap(TaskQueueId primary, TaskQueueId secondary);
void SetWakeable(TaskQueueId queue_id, fml::Wakeable* wakeable);
// Invariants for merge and un-merge
......@@ -98,56 +123,37 @@ class MessageLoopTaskQueues
bool Unmerge(TaskQueueId owner);
// Returns true if owner owns the subsumed task queue.
bool Owns(TaskQueueId owner, TaskQueueId subsumed);
bool Owns(TaskQueueId owner, TaskQueueId subsumed) const;
private:
class MergedQueuesRunner;
enum class MutexType {
kTasks,
kObservers,
kWakeables,
};
using Mutexes = std::vector<std::unique_ptr<std::mutex>>;
using TaskObservers = std::map<intptr_t, fml::closure>;
MessageLoopTaskQueues();
~MessageLoopTaskQueues();
void WakeUp(TaskQueueId queue_id, fml::TimePoint time);
void WakeUpUnlocked(TaskQueueId queue_id, fml::TimePoint time) const;
bool HasPendingTasksUnlocked(TaskQueueId queue_id);
std::mutex& GetMutex(TaskQueueId queue_id) const;
const DelayedTask& PeekNextTaskUnlocked(TaskQueueId queue_id,
TaskQueueId& top_queue_id);
bool HasPendingTasksUnlocked(TaskQueueId queue_id) const;
fml::TimePoint GetNextWakeTimeUnlocked(TaskQueueId queue_id);
const DelayedTask& PeekNextTaskUnlocked(TaskQueueId queue_id,
TaskQueueId& top_queue_id) const;
std::mutex& GetMutex(TaskQueueId queue_id, MutexType type);
fml::TimePoint GetNextWakeTimeUnlocked(TaskQueueId queue_id) const;
static std::mutex creation_mutex_;
static fml::RefPtr<MessageLoopTaskQueues> instance_
FML_GUARDED_BY(creation_mutex_);
std::mutex queue_meta_mutex_;
size_t task_queue_id_counter_ FML_GUARDED_BY(queue_meta_mutex_);
Mutexes observers_mutexes_ FML_GUARDED_BY(queue_meta_mutex_);
Mutexes delayed_tasks_mutexes_ FML_GUARDED_BY(queue_meta_mutex_);
Mutexes wakeable_mutexes_ FML_GUARDED_BY(queue_meta_mutex_);
// These are guarded by their corresponding `Mutexes`
std::vector<Wakeable*> wakeables_;
std::vector<TaskObservers> task_observers_;
std::vector<DelayedTaskQueue> delayed_tasks_;
std::unique_ptr<fml::SharedMutex> queue_meta_mutex_;
std::map<TaskQueueId, std::unique_ptr<TaskQueueEntry>> queue_entries_;
std::map<TaskQueueId, std::unique_ptr<std::mutex>> queue_locks_;
static const TaskQueueId _kUnmerged;
// These are guarded by delayed_tasks_mutexes_
std::vector<TaskQueueId> owner_to_subsumed_;
std::vector<TaskQueueId> subsumed_to_owner_;
size_t task_queue_id_counter_;
std::atomic_int order_;
......
......@@ -4,6 +4,8 @@
#define FML_USED_ON_EMBEDDER
#include <thread>
#include "flutter/fml/message_loop_task_queues.h"
#include "flutter/fml/synchronization/count_down_latch.h"
#include "flutter/fml/synchronization/waitable_event.h"
......@@ -79,6 +81,15 @@ TEST(MessageLoopTaskQueue, PreserveTaskOrdering) {
}
}
void TestNotifyObservers(fml::TaskQueueId queue_id) {
auto task_queue = fml::MessageLoopTaskQueues::GetInstance();
std::vector<fml::closure> observers =
task_queue->GetObserversToNotify(queue_id);
for (const auto& observer : observers) {
observer();
}
}
TEST(MessageLoopTaskQueue, AddRemoveNotifyObservers) {
auto task_queue = fml::MessageLoopTaskQueues::GetInstance();
auto queue_id = task_queue->CreateTaskQueue();
......@@ -87,12 +98,12 @@ TEST(MessageLoopTaskQueue, AddRemoveNotifyObservers) {
intptr_t key = 123;
task_queue->AddTaskObserver(queue_id, key, [&test_val]() { test_val = 1; });
task_queue->NotifyObservers(queue_id);
TestNotifyObservers(queue_id);
ASSERT_TRUE(test_val == 1);
test_val = 0;
task_queue->RemoveTaskObserver(queue_id, key);
task_queue->NotifyObservers(queue_id);
TestNotifyObservers(queue_id);
ASSERT_TRUE(test_val == 0);
}
......@@ -136,3 +147,83 @@ TEST(MessageLoopTaskQueue, WokenUpWithNewerTime) {
latch.Wait();
}
TEST(MessageLoopTaskQueue, NotifyObserversWhileCreatingQueues) {
auto task_queues = fml::MessageLoopTaskQueues::GetInstance();
fml::TaskQueueId queue_id = task_queues->CreateTaskQueue();
fml::AutoResetWaitableEvent first_observer_executing, before_second_observer;
task_queues->AddTaskObserver(queue_id, queue_id + 1, [&]() {
first_observer_executing.Signal();
before_second_observer.Wait();
});
for (int i = 0; i < 100; i++) {
task_queues->AddTaskObserver(queue_id, queue_id + i + 2, [] {});
}
std::thread notify_observers([&]() { TestNotifyObservers(queue_id); });
first_observer_executing.Wait();
for (int i = 0; i < 100; i++) {
task_queues->CreateTaskQueue();
}
before_second_observer.Signal();
notify_observers.join();
}
TEST(MessageLoopTaskQueue, ConcurrentQueueAndTaskCreatingCounts) {
auto task_queues = fml::MessageLoopTaskQueues::GetInstance();
const int base_queue_id = task_queues->CreateTaskQueue();
const int num_queues = 100;
std::atomic_bool created[num_queues * 3];
std::atomic_int num_tasks[num_queues * 3];
std::mutex task_count_mutex[num_queues * 3];
std::atomic_int done = 0;
for (int i = 0; i < num_queues * 3; i++) {
num_tasks[i] = 0;
created[i] = false;
}
auto creation_func = [&] {
for (int i = 0; i < num_queues; i++) {
fml::TaskQueueId queue_id = task_queues->CreateTaskQueue();
created[queue_id - base_queue_id] = true;
for (int cur_q = 1; cur_q < i; cur_q++) {
if (created[cur_q - base_queue_id]) {
std::scoped_lock counter(task_count_mutex[cur_q - base_queue_id]);
int cur_num_tasks = rand() % 10;
for (int k = 0; k < cur_num_tasks; k++) {
task_queues->RegisterTask(
fml::TaskQueueId(cur_q), [] {}, fml::TimePoint::Now());
}
num_tasks[cur_q - base_queue_id] += cur_num_tasks;
}
}
}
done++;
};
std::thread creation_1(creation_func);
std::thread creation_2(creation_func);
while (done < 2) {
for (int i = 0; i < num_queues * 3; i++) {
if (created[i]) {
std::scoped_lock counter(task_count_mutex[i]);
int num_pending = task_queues->GetNumPendingTasks(
fml::TaskQueueId(base_queue_id + i));
int num_added = num_tasks[i];
ASSERT_EQ(num_pending, num_added);
}
}
}
creation_1.join();
creation_2.join();
}
......@@ -308,139 +308,3 @@ TEST(MessageLoop, CanCreateConcurrentMessageLoop) {
latch.Wait();
ASSERT_GE(thread_ids.size(), 1u);
}
TEST(MessageLoop, CanSwapMessageLoopsAndPreserveThreadConfiguration) {
// synchronization notes:
// 1. term1 and term2 are to wait for Swap.
// 2. task_started_1 is to wait for the task runners
// to signal that they are done.
// 3. loop_init_1 and loop_init_2 are to wait for the message loops to
// get initialized.
fml::MessageLoop* loop1 = nullptr;
fml::AutoResetWaitableEvent loop_init_1;
fml::AutoResetWaitableEvent task_started_1;
fml::AutoResetWaitableEvent term1;
std::thread thread1([&loop1, &loop_init_1, &term1, &task_started_1]() {
fml::MessageLoop::EnsureInitializedForCurrentThread();
loop1 = &fml::MessageLoop::GetCurrent();
// this task will be run on thread1 after Swap.
loop1->GetTaskRunner()->PostTask([&task_started_1]() {
task_started_1.Signal();
fml::MessageLoop::GetCurrent().Terminate();
});
loop_init_1.Signal();
term1.Wait();
loop1->Run();
});
loop_init_1.Wait();
fml::MessageLoop* loop2 = nullptr;
fml::AutoResetWaitableEvent loop_init_2;
fml::AutoResetWaitableEvent task_started_2;
fml::AutoResetWaitableEvent term2;
std::thread thread2(
[&loop2, &loop_init_2, &term2, &task_started_2, &loop1]() {
fml::MessageLoop::EnsureInitializedForCurrentThread();
loop2 = &fml::MessageLoop::GetCurrent();
// this task will be run on thread1 after Swap.
loop2->GetTaskRunner()->PostTask([&task_started_2, &loop1]() {
// ensure that we run the task on loop1 after the swap.
ASSERT_TRUE(loop1 == &fml::MessageLoop::GetCurrent());
task_started_2.Signal();
fml::MessageLoop::GetCurrent().Terminate();
});
loop_init_2.Signal();
term2.Wait();
loop2->Run();
});
loop_init_2.Wait();
// swap the loops.
loop1->SwapTaskQueues(loop2);
// thread_1 should wait for tr_term2 latch.
term1.Signal();
task_started_2.Wait();
// thread_2 should wait for tr_term2 latch.
term2.Signal();
task_started_1.Wait();
thread1.join();
thread2.join();
}
TEST(MessageLoop, TIME_SENSITIVE(DelayedTaskSwap)) {
// Task execution order:
// time (ms): 0 10 20 30 40
// thread 1: A1 A2 A3 A4 TERM
// thread 2: B1 B2 B3 TERM
// At time 15, we swap thread 1 and 2, and assert
// that tasks run on the right threads.
std::thread::id t1, t2;
fml::AutoResetWaitableEvent tid_1, tid_2;
fml::MessageLoop* loop1 = nullptr;
fml::MessageLoop* loop2 = nullptr;
std::thread thread_1([&loop1, &t1, &t2, &tid_1, &tid_2]() {
t1 = std::this_thread::get_id();
tid_1.Signal();
tid_2.Wait();
fml::MessageLoop::EnsureInitializedForCurrentThread();
loop1 = &fml::MessageLoop::GetCurrent();
for (int t = 0; t <= 4; t++) {
loop1->GetTaskRunner()->PostDelayedTask(
[t, &t1, &t2]() {
auto cur_tid = std::this_thread::get_id();
if (t <= 1) {
ASSERT_EQ(cur_tid, t1);
} else {
ASSERT_EQ(cur_tid, t2);
}
if (t == 4) {
fml::MessageLoop::GetCurrent().Terminate();
}
},
fml::TimeDelta::FromMilliseconds(t * 10));
}
loop1->Run();
});
std::thread thread_2([&loop2, &t1, &t2, &tid_1, &tid_2]() {
t2 = std::this_thread::get_id();
tid_2.Signal();
tid_1.Wait();
fml::MessageLoop::EnsureInitializedForCurrentThread();
loop2 = &fml::MessageLoop::GetCurrent();
for (int t = 1; t <= 4; t++) {
loop2->GetTaskRunner()->PostDelayedTask(
[t, &t1, &t2]() {
auto cur_tid = std::this_thread::get_id();
if (t <= 1) {
ASSERT_EQ(cur_tid, t2);
} else {
ASSERT_EQ(cur_tid, t1);
}
if (t == 4) {
fml::MessageLoop::GetCurrent().Terminate();
}
},
fml::TimeDelta::FromMilliseconds(t * 10));
}
loop2->Run();
});
// on main thread we swap the threads at 15 ms.
std::this_thread::sleep_for(std::chrono::milliseconds(15));
loop1->SwapTaskQueues(loop2);
thread_1.join();
thread_2.join();
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册