未验证 提交 5c70356a 编写于 作者: K Kaushik Iska 提交者: GitHub

Simplify task queues locking mechanism (#16477)

We now have one mutex guarding all accesses to
the underlying task heaps. This simplifies the more granular
but bug prone mechanism of having striped locks.

This also re-enables GPUThreadMerger tests that are currently
disabled due to their flaky nature. The scenario that gets fixed by this
change is as follows:

1. Thread-1: We lock `queue_meta_mutex_` and grab locks on `queue_1` and release the meta mutex.
2. Thread-1: We add an Observer on `queues` object.
3. Thread-2: We lock `queue_meta_mutex_` and grab locks on `queue_2`.
4. Thread-2: We try to dispose all the pending tasks on `queue_2` which calls `erase` on `queues`.

The above situation is not thread safe without having 1 lock.

Note: This increases the contention on one lock and could potentially be bad for perf. We are
explicitly making this trade-off towards reducing the complexity.

Fixes: https://github.com/flutter/flutter/issues/49007
上级 c9322145
......@@ -14,8 +14,7 @@
#include "flutter/fml/task_runner.h"
#include "gtest/gtest.h"
// TODO(49007): Flaky. Investigate, fix and re-enable.
TEST(GpuThreadMerger, DISABLED_RemainMergedTillLeaseExpires) {
TEST(GpuThreadMerger, RemainMergedTillLeaseExpires) {
fml::MessageLoop* loop1 = nullptr;
fml::AutoResetWaitableEvent latch1;
fml::AutoResetWaitableEvent term1;
......@@ -62,8 +61,7 @@ TEST(GpuThreadMerger, DISABLED_RemainMergedTillLeaseExpires) {
thread2.join();
}
// TODO(49007): Flaky. Investigate, fix and re-enable.
TEST(GpuThreadMerger, DISABLED_IsNotOnRasterizingThread) {
TEST(GpuThreadMerger, IsNotOnRasterizingThread) {
fml::MessageLoop* loop1 = nullptr;
fml::AutoResetWaitableEvent latch1;
std::thread thread1([&loop1, &latch1]() {
......@@ -148,8 +146,7 @@ TEST(GpuThreadMerger, DISABLED_IsNotOnRasterizingThread) {
thread2.join();
}
// TODO(49007): Flaky. Investigate, fix and re-enable.
TEST(GpuThreadMerger, DISABLED_LeaseExtension) {
TEST(GpuThreadMerger, LeaseExtension) {
fml::MessageLoop* loop1 = nullptr;
fml::AutoResetWaitableEvent latch1;
fml::AutoResetWaitableEvent term1;
......
......@@ -8,6 +8,8 @@
#include "flutter/fml/make_copyable.h"
#include "flutter/fml/message_loop_impl.h"
#include <iostream>
namespace fml {
std::mutex MessageLoopTaskQueues::creation_mutex_;
......@@ -32,44 +34,36 @@ fml::RefPtr<MessageLoopTaskQueues> MessageLoopTaskQueues::GetInstance() {
}
TaskQueueId MessageLoopTaskQueues::CreateTaskQueue() {
fml::UniqueLock lock(*queue_meta_mutex_);
std::lock_guard guard(queue_mutex_);
TaskQueueId loop_id = TaskQueueId(task_queue_id_counter_);
++task_queue_id_counter_;
queue_entries_[loop_id] = std::make_unique<TaskQueueEntry>();
queue_locks_[loop_id] = std::make_unique<std::mutex>();
return loop_id;
}
MessageLoopTaskQueues::MessageLoopTaskQueues()
: queue_meta_mutex_(fml::SharedMutex::Create()),
task_queue_id_counter_(0),
order_(0) {}
: task_queue_id_counter_(0), order_(0) {}
MessageLoopTaskQueues::~MessageLoopTaskQueues() = default;
void MessageLoopTaskQueues::Dispose(TaskQueueId queue_id) {
std::scoped_lock queue_lock(GetMutex(queue_id));
std::lock_guard guard(queue_mutex_);
const auto& queue_entry = queue_entries_.at(queue_id);
FML_DCHECK(queue_entry->subsumed_by == _kUnmerged);
TaskQueueId subsumed = queue_entry->owner_of;
queue_entries_.erase(queue_id);
if (subsumed != _kUnmerged) {
std::scoped_lock subsumed_lock(*queue_locks_.at(subsumed));
queue_entries_.erase(subsumed);
}
}
void MessageLoopTaskQueues::DisposeTasks(TaskQueueId queue_id) {
std::scoped_lock queue_lock(GetMutex(queue_id));
std::lock_guard guard(queue_mutex_);
const auto& queue_entry = queue_entries_.at(queue_id);
FML_DCHECK(queue_entry->subsumed_by == _kUnmerged);
TaskQueueId subsumed = queue_entry->owner_of;
queue_entry->delayed_tasks = {};
if (subsumed != _kUnmerged) {
std::scoped_lock subsumed_lock(*queue_locks_.at(subsumed));
queue_entries_.at(subsumed)->delayed_tasks = {};
}
}
......@@ -77,10 +71,9 @@ void MessageLoopTaskQueues::DisposeTasks(TaskQueueId queue_id) {
void MessageLoopTaskQueues::RegisterTask(TaskQueueId queue_id,
const fml::closure& task,
fml::TimePoint target_time) {
std::scoped_lock queue_lock(GetMutex(queue_id));
std::lock_guard guard(queue_mutex_);
size_t order = order_++;
const auto& queue_entry = queue_entries_[queue_id];
const auto& queue_entry = queue_entries_.at(queue_id);
queue_entry->delayed_tasks.push({order, task, target_time});
TaskQueueId loop_to_wake = queue_id;
if (queue_entry->subsumed_by != _kUnmerged) {
......@@ -91,8 +84,7 @@ void MessageLoopTaskQueues::RegisterTask(TaskQueueId queue_id,
}
bool MessageLoopTaskQueues::HasPendingTasks(TaskQueueId queue_id) const {
std::scoped_lock queue_lock(GetMutex(queue_id));
std::lock_guard guard(queue_mutex_);
return HasPendingTasksUnlocked(queue_id);
}
......@@ -100,8 +92,7 @@ void MessageLoopTaskQueues::GetTasksToRunNow(
TaskQueueId queue_id,
FlushType type,
std::vector<fml::closure>& invocations) {
std::scoped_lock queue_lock(GetMutex(queue_id));
std::lock_guard guard(queue_mutex_);
if (!HasPendingTasksUnlocked(queue_id)) {
return;
}
......@@ -115,7 +106,7 @@ void MessageLoopTaskQueues::GetTasksToRunNow(
break;
}
invocations.emplace_back(std::move(top.GetTask()));
queue_entries_[top_queue]->delayed_tasks.pop();
queue_entries_.at(top_queue)->delayed_tasks.pop();
if (type == FlushType::kSingle) {
break;
}
......@@ -136,8 +127,7 @@ void MessageLoopTaskQueues::WakeUpUnlocked(TaskQueueId queue_id,
}
size_t MessageLoopTaskQueues::GetNumPendingTasks(TaskQueueId queue_id) const {
std::scoped_lock queue_lock(GetMutex(queue_id));
std::lock_guard guard(queue_mutex_);
const auto& queue_entry = queue_entries_.at(queue_id);
if (queue_entry->subsumed_by != _kUnmerged) {
return 0;
......@@ -148,7 +138,6 @@ size_t MessageLoopTaskQueues::GetNumPendingTasks(TaskQueueId queue_id) const {
TaskQueueId subsumed = queue_entry->owner_of;
if (subsumed != _kUnmerged) {
std::scoped_lock subsumed_lock(*queue_locks_.at(subsumed));
const auto& subsumed_entry = queue_entries_.at(subsumed);
total_tasks += subsumed_entry->delayed_tasks.size();
}
......@@ -158,22 +147,20 @@ size_t MessageLoopTaskQueues::GetNumPendingTasks(TaskQueueId queue_id) const {
void MessageLoopTaskQueues::AddTaskObserver(TaskQueueId queue_id,
intptr_t key,
const fml::closure& callback) {
std::scoped_lock queue_lock(GetMutex(queue_id));
std::lock_guard guard(queue_mutex_);
FML_DCHECK(callback != nullptr) << "Observer callback must be non-null.";
queue_entries_[queue_id]->task_observers[key] = std::move(callback);
queue_entries_.at(queue_id)->task_observers[key] = std::move(callback);
}
void MessageLoopTaskQueues::RemoveTaskObserver(TaskQueueId queue_id,
intptr_t key) {
std::scoped_lock queue_lock(GetMutex(queue_id));
queue_entries_[queue_id]->task_observers.erase(key);
std::lock_guard guard(queue_mutex_);
queue_entries_.at(queue_id)->task_observers.erase(key);
}
std::vector<fml::closure> MessageLoopTaskQueues::GetObserversToNotify(
TaskQueueId queue_id) const {
std::scoped_lock queue_lock(GetMutex(queue_id));
std::lock_guard guard(queue_mutex_);
std::vector<fml::closure> observers;
if (queue_entries_.at(queue_id)->subsumed_by != _kUnmerged) {
......@@ -186,7 +173,6 @@ std::vector<fml::closure> MessageLoopTaskQueues::GetObserversToNotify(
TaskQueueId subsumed = queue_entries_.at(queue_id)->owner_of;
if (subsumed != _kUnmerged) {
std::scoped_lock subsumed_lock(*queue_locks_.at(subsumed));
for (const auto& observer : queue_entries_.at(subsumed)->task_observers) {
observers.push_back(observer.second);
}
......@@ -197,9 +183,8 @@ std::vector<fml::closure> MessageLoopTaskQueues::GetObserversToNotify(
void MessageLoopTaskQueues::SetWakeable(TaskQueueId queue_id,
fml::Wakeable* wakeable) {
std::scoped_lock queue_lock(GetMutex(queue_id));
FML_CHECK(!queue_entries_[queue_id]->wakeable)
std::lock_guard guard(queue_mutex_);
FML_CHECK(!queue_entries_.at(queue_id)->wakeable)
<< "Wakeable can only be set once.";
queue_entries_.at(queue_id)->wakeable = wakeable;
}
......@@ -208,12 +193,7 @@ bool MessageLoopTaskQueues::Merge(TaskQueueId owner, TaskQueueId subsumed) {
if (owner == subsumed) {
return true;
}
std::mutex& owner_mutex = GetMutex(owner);
std::mutex& subsumed_mutex = GetMutex(subsumed);
std::scoped_lock lock(owner_mutex, subsumed_mutex);
std::lock_guard guard(queue_mutex_);
auto& owner_entry = queue_entries_.at(owner);
auto& subsumed_entry = queue_entries_.at(subsumed);
......@@ -242,15 +222,14 @@ bool MessageLoopTaskQueues::Merge(TaskQueueId owner, TaskQueueId subsumed) {
}
bool MessageLoopTaskQueues::Unmerge(TaskQueueId owner) {
std::scoped_lock owner_lock(GetMutex(owner));
auto& owner_entry = queue_entries_[owner];
std::lock_guard guard(queue_mutex_);
const auto& owner_entry = queue_entries_.at(owner);
const TaskQueueId subsumed = owner_entry->owner_of;
if (subsumed == _kUnmerged) {
return false;
}
queue_entries_[subsumed]->subsumed_by = _kUnmerged;
queue_entries_.at(subsumed)->subsumed_by = _kUnmerged;
owner_entry->owner_of = _kUnmerged;
if (HasPendingTasksUnlocked(owner)) {
......@@ -266,17 +245,10 @@ bool MessageLoopTaskQueues::Unmerge(TaskQueueId owner) {
bool MessageLoopTaskQueues::Owns(TaskQueueId owner,
TaskQueueId subsumed) const {
std::scoped_lock owner_lock(GetMutex(owner));
std::lock_guard guard(queue_mutex_);
return subsumed == queue_entries_.at(owner)->owner_of || owner == subsumed;
}
std::mutex& MessageLoopTaskQueues::GetMutex(TaskQueueId queue_id) const {
fml::SharedLock queue_reader(*queue_meta_mutex_);
FML_DCHECK(queue_locks_.count(queue_id) && queue_entries_.count(queue_id))
<< "Trying to acquire a lock on an invalid queue_id: " << queue_id;
return *queue_locks_.at(queue_id);
}
// Subsumed queues will never have pending tasks.
// Owning queues will consider both their and their subsumed tasks.
bool MessageLoopTaskQueues::HasPendingTasksUnlocked(
......
......@@ -127,16 +127,12 @@ class MessageLoopTaskQueues
private:
class MergedQueuesRunner;
using Mutexes = std::vector<std::unique_ptr<std::mutex>>;
MessageLoopTaskQueues();
~MessageLoopTaskQueues();
void WakeUpUnlocked(TaskQueueId queue_id, fml::TimePoint time) const;
std::mutex& GetMutex(TaskQueueId queue_id) const;
bool HasPendingTasksUnlocked(TaskQueueId queue_id) const;
const DelayedTask& PeekNextTaskUnlocked(TaskQueueId queue_id,
......@@ -147,9 +143,8 @@ class MessageLoopTaskQueues
static std::mutex creation_mutex_;
static fml::RefPtr<MessageLoopTaskQueues> instance_;
std::unique_ptr<fml::SharedMutex> queue_meta_mutex_;
mutable std::mutex queue_mutex_;
std::map<TaskQueueId, std::unique_ptr<TaskQueueEntry>> queue_entries_;
std::map<TaskQueueId, std::unique_ptr<std::mutex>> queue_locks_;
size_t task_queue_id_counter_;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册