未验证 提交 2d3e1c58 编写于 作者: C Chris Yang 提交者: GitHub

|MessageLoopImpl::FlushTasks| runs one task at a time (#20771)

上级 fc4da62b
......@@ -119,18 +119,24 @@ void MessageLoopImpl::DoTerminate() {
void MessageLoopImpl::FlushTasks(FlushType type) {
TRACE_EVENT0("fml", "MessageLoop::FlushTasks");
std::vector<fml::closure> invocations;
task_queue_->GetTasksToRunNow(queue_id_, type, invocations);
for (const auto& invocation : invocations) {
const auto now = fml::TimePoint::Now();
fml::closure invocation;
do {
invocation = task_queue_->GetNextTaskToRun(queue_id_, now);
if (!invocation) {
break;
}
invocation();
std::vector<fml::closure> observers =
task_queue_->GetObserversToNotify(queue_id_);
for (const auto& observer : observers) {
observer();
}
}
if (type == FlushType::kSingle) {
break;
}
} while (invocation);
}
void MessageLoopImpl::RunExpiredTasksNow() {
......
......@@ -87,35 +87,27 @@ bool MessageLoopTaskQueues::HasPendingTasks(TaskQueueId queue_id) const {
return HasPendingTasksUnlocked(queue_id);
}
void MessageLoopTaskQueues::GetTasksToRunNow(
TaskQueueId queue_id,
FlushType type,
std::vector<fml::closure>& invocations) {
fml::closure MessageLoopTaskQueues::GetNextTaskToRun(TaskQueueId queue_id,
fml::TimePoint from_time) {
std::lock_guard guard(queue_mutex_);
if (!HasPendingTasksUnlocked(queue_id)) {
return;
}
const auto now = fml::TimePoint::Now();
while (HasPendingTasksUnlocked(queue_id)) {
TaskQueueId top_queue = _kUnmerged;
const auto& top = PeekNextTaskUnlocked(queue_id, top_queue);
if (top.GetTargetTime() > now) {
break;
}
invocations.emplace_back(top.GetTask());
queue_entries_.at(top_queue)->delayed_tasks.pop();
if (type == FlushType::kSingle) {
break;
}
return nullptr;
}
TaskQueueId top_queue = _kUnmerged;
const auto& top = PeekNextTaskUnlocked(queue_id, top_queue);
if (!HasPendingTasksUnlocked(queue_id)) {
WakeUpUnlocked(queue_id, fml::TimePoint::Max());
} else {
WakeUpUnlocked(queue_id, GetNextWakeTimeUnlocked(queue_id));
}
if (top.GetTargetTime() > from_time) {
return nullptr;
}
fml::closure invocation = top.GetTask();
queue_entries_.at(top_queue)->delayed_tasks.pop();
return invocation;
}
void MessageLoopTaskQueues::WakeUpUnlocked(TaskQueueId queue_id,
......
......@@ -83,9 +83,7 @@ class MessageLoopTaskQueues
bool HasPendingTasks(TaskQueueId queue_id) const;
void GetTasksToRunNow(TaskQueueId queue_id,
FlushType type,
std::vector<fml::closure>& invocations);
fml::closure GetNextTaskToRun(TaskQueueId queue_id, fml::TimePoint from_time);
size_t GetNumPendingTasks(TaskQueueId queue_id) const;
......@@ -112,7 +110,7 @@ class MessageLoopTaskQueues
// 3. Each task queue can only be merged and subsumed once.
//
// Methods currently aware of the merged state of the queues:
// HasPendingTasks, GetTasksToRunNow, GetNumPendingTasks
// HasPendingTasks, GetNextTaskToRun, GetNumPendingTasks
// This method returns false if either the owner or subsumed has already been
// merged with something else.
......
......@@ -39,10 +39,17 @@ static void BM_RegisterAndGetTasks(benchmark::State& state) { // NOLINT
}
tasks_registered.CountDown();
tasks_registered.Wait();
std::vector<fml::closure> invocations;
task_queue->GetTasksToRunNow(TaskQueueId(task_runner_id),
fml::FlushType::kAll, invocations);
assert(invocations.size() == num_tasks_per_queue);
const auto now = fml::TimePoint::Now();
int num_invocations = 0;
for (;;) {
fml::closure invocation =
task_queue->GetNextTaskToRun(TaskQueueId(task_runner_id), now);
if (!invocation) {
break;
}
num_invocations++;
}
assert(num_invocations == num_tasks_per_queue);
tasks_done.CountDown();
});
}
......
......@@ -11,6 +11,9 @@
#include "flutter/fml/synchronization/waitable_event.h"
#include "gtest/gtest.h"
namespace fml {
namespace testing {
class TestWakeable : public fml::Wakeable {
public:
using WakeUpCall = std::function<void(const fml::TimePoint)>;
......@@ -23,6 +26,25 @@ class TestWakeable : public fml::Wakeable {
WakeUpCall wake_up_call_;
};
static int CountRemainingTasks(fml::RefPtr<MessageLoopTaskQueues> task_queue,
const TaskQueueId& queue_id,
bool run_invocation = false) {
const auto now = fml::TimePoint::Now();
int count = 0;
fml::closure invocation;
do {
invocation = task_queue->GetNextTaskToRun(queue_id, now);
if (!invocation) {
break;
}
count++;
if (run_invocation) {
invocation();
}
} while (invocation);
return count;
}
TEST(MessageLoopTaskQueueMergeUnmerge,
AfterMergePrimaryTasksServicedOnPrimary) {
auto task_queue = fml::MessageLoopTaskQueues::GetInstance();
......@@ -127,8 +149,7 @@ TEST(MessageLoopTaskQueueMergeUnmerge, MergeInvokesBothWakeables) {
task_queue->Merge(queue_id_1, queue_id_2);
std::vector<fml::closure> invocations;
task_queue->GetTasksToRunNow(queue_id_1, fml::FlushType::kAll, invocations);
CountRemainingTasks(task_queue, queue_id_1);
latch.Wait();
}
......@@ -157,12 +178,12 @@ TEST(MessageLoopTaskQueueMergeUnmerge,
task_queue->Merge(queue_id_1, queue_id_2);
task_queue->Unmerge(queue_id_1);
std::vector<fml::closure> invocations;
CountRemainingTasks(task_queue, queue_id_1);
task_queue->GetTasksToRunNow(queue_id_1, fml::FlushType::kAll, invocations);
latch_1.Wait();
task_queue->GetTasksToRunNow(queue_id_2, fml::FlushType::kAll, invocations);
CountRemainingTasks(task_queue, queue_id_2);
latch_2.Wait();
}
......@@ -183,10 +204,8 @@ TEST(MessageLoopTaskQueueMergeUnmerge, GetTasksToRunNowBlocksMerge) {
wake_up_end.Wait();
}));
std::thread tasks_to_run_now_thread([&]() {
std::vector<fml::closure> invocations;
task_queue->GetTasksToRunNow(queue_id_1, fml::FlushType::kAll, invocations);
});
std::thread tasks_to_run_now_thread(
[&]() { CountRemainingTasks(task_queue, queue_id_1); });
wake_up_start.Wait();
bool merge_done = false;
......@@ -208,3 +227,35 @@ TEST(MessageLoopTaskQueueMergeUnmerge, GetTasksToRunNowBlocksMerge) {
tasks_to_run_now_thread.join();
merge_thread.join();
}
TEST(MessageLoopTaskQueueMergeUnmerge,
FollowingTasksSwitchQueueIfFirstTaskMergesThreads) {
auto task_queue = fml::MessageLoopTaskQueues::GetInstance();
auto queue_id_1 = task_queue->CreateTaskQueue();
auto queue_id_2 = task_queue->CreateTaskQueue();
fml::CountDownLatch latch(2);
task_queue->SetWakeable(
queue_id_1,
new TestWakeable([&](fml::TimePoint wake_time) { latch.CountDown(); }));
task_queue->SetWakeable(
queue_id_2,
new TestWakeable([&](fml::TimePoint wake_time) { latch.CountDown(); }));
task_queue->RegisterTask(
queue_id_2, [&]() { task_queue->Merge(queue_id_1, queue_id_2); },
fml::TimePoint::Now());
task_queue->RegisterTask(
queue_id_2, []() {}, fml::TimePoint::Now());
ASSERT_EQ(CountRemainingTasks(task_queue, queue_id_2, true), 1);
ASSERT_EQ(CountRemainingTasks(task_queue, queue_id_1, true), 1);
latch.Wait();
}
} // namespace testing
} // namespace fml
......@@ -11,6 +11,9 @@
#include "flutter/fml/synchronization/waitable_event.h"
#include "gtest/gtest.h"
namespace fml {
namespace testing {
class TestWakeable : public fml::Wakeable {
public:
using WakeUpCall = std::function<void(const fml::TimePoint)>;
......@@ -69,12 +72,13 @@ TEST(MessageLoopTaskQueue, PreserveTaskOrdering) {
task_queue->RegisterTask(
queue_id, [&test_val]() { test_val = 2; }, fml::TimePoint::Now());
std::vector<fml::closure> invocations;
task_queue->GetTasksToRunNow(queue_id, fml::FlushType::kAll, invocations);
const auto now = fml::TimePoint::Now();
int expected_value = 1;
for (auto& invocation : invocations) {
for (;;) {
fml::closure invocation = task_queue->GetNextTaskToRun(queue_id, now);
if (!invocation) {
break;
}
invocation();
ASSERT_TRUE(test_val == expected_value);
expected_value++;
......@@ -277,3 +281,6 @@ TEST(MessageLoopTaskQueue, RegisterTaskWakesUpOwnerQueue) {
ASSERT_EQ(time1, wakes[1]);
ASSERT_EQ(time1, wakes[2]);
}
} // namespace testing
} // namespace fml
......@@ -14,6 +14,9 @@
#include "flutter/fml/task_runner.h"
#include "gtest/gtest.h"
namespace fml {
namespace testing {
TEST(RasterThreadMerger, RemainMergedTillLeaseExpires) {
fml::MessageLoop* loop1 = nullptr;
fml::AutoResetWaitableEvent latch1;
......@@ -301,3 +304,119 @@ TEST(RasterThreadMerger, HandleTaskQueuesAreTheSame) {
term1.Signal();
thread1.join();
}
TEST(RasterThreadMerger, RunExpiredTasksWhileFirstTaskMergesThreads) {
fml::MessageLoop* loop_platform = nullptr;
fml::AutoResetWaitableEvent latch1;
std::thread thread_platform([&loop_platform, &latch1]() {
fml::MessageLoop::EnsureInitializedForCurrentThread();
loop_platform = &fml::MessageLoop::GetCurrent();
loop_platform->GetTaskRunner()->PostTask([&]() { latch1.Signal(); });
loop_platform->Run();
});
fml::MessageLoop* loop_raster = nullptr;
fml::AutoResetWaitableEvent latch2;
std::thread thread_raster([&loop_raster, &loop_platform, &latch1, &latch2]() {
latch1.Wait();
fml::MessageLoop::EnsureInitializedForCurrentThread();
loop_raster = &fml::MessageLoop::GetCurrent();
fml::TaskQueueId qid_platform =
loop_platform->GetTaskRunner()->GetTaskQueueId();
fml::TaskQueueId qid_raster =
loop_raster->GetTaskRunner()->GetTaskQueueId();
fml::CountDownLatch post_merge(2);
const auto raster_thread_merger_ =
fml::MakeRefCounted<fml::RasterThreadMerger>(qid_platform, qid_raster);
loop_raster->GetTaskRunner()->PostTask([&]() {
ASSERT_TRUE(raster_thread_merger_->IsOnRasterizingThread());
ASSERT_FALSE(raster_thread_merger_->IsOnPlatformThread());
ASSERT_EQ(fml::MessageLoop::GetCurrentTaskQueueId(), qid_raster);
raster_thread_merger_->MergeWithLease(1);
post_merge.CountDown();
});
loop_raster->GetTaskRunner()->PostTask([&]() {
ASSERT_TRUE(raster_thread_merger_->IsOnRasterizingThread());
ASSERT_TRUE(raster_thread_merger_->IsOnPlatformThread());
ASSERT_EQ(fml::MessageLoop::GetCurrentTaskQueueId(), qid_platform);
raster_thread_merger_->DecrementLease();
post_merge.CountDown();
});
loop_raster->RunExpiredTasksNow();
post_merge.Wait();
latch2.Signal();
});
latch2.Wait();
loop_platform->GetTaskRunner()->PostTask(
[&]() { loop_platform->Terminate(); });
thread_platform.join();
thread_raster.join();
}
TEST(RasterThreadMerger, RunExpiredTasksWhileFirstTaskUnMergesThreads) {
fml::MessageLoop* loop_platform = nullptr;
fml::AutoResetWaitableEvent latch1;
std::thread thread_platform([&loop_platform, &latch1]() {
fml::MessageLoop::EnsureInitializedForCurrentThread();
loop_platform = &fml::MessageLoop::GetCurrent();
loop_platform->GetTaskRunner()->PostTask([&]() { latch1.Signal(); });
loop_platform->Run();
});
fml::MessageLoop* loop_raster = nullptr;
fml::AutoResetWaitableEvent latch2;
std::thread thread_raster([&loop_raster, &loop_platform, &latch1, &latch2]() {
latch1.Wait();
fml::MessageLoop::EnsureInitializedForCurrentThread();
loop_raster = &fml::MessageLoop::GetCurrent();
fml::TaskQueueId qid_platform =
loop_platform->GetTaskRunner()->GetTaskQueueId();
fml::TaskQueueId qid_raster =
loop_raster->GetTaskRunner()->GetTaskQueueId();
fml::CountDownLatch post_merge(2);
const auto raster_thread_merger_ =
fml::MakeRefCounted<fml::RasterThreadMerger>(qid_platform, qid_raster);
loop_raster->GetTaskRunner()->PostTask([&]() {
raster_thread_merger_->MergeWithLease(1);
post_merge.CountDown();
});
loop_raster->RunExpiredTasksNow();
loop_raster->GetTaskRunner()->PostTask([&]() {
ASSERT_TRUE(raster_thread_merger_->IsOnRasterizingThread());
ASSERT_TRUE(raster_thread_merger_->IsOnPlatformThread());
ASSERT_EQ(fml::MessageLoop::GetCurrentTaskQueueId(), qid_platform);
raster_thread_merger_->DecrementLease();
post_merge.CountDown();
});
loop_raster->GetTaskRunner()->PostTask([&]() {
ASSERT_TRUE(raster_thread_merger_->IsOnRasterizingThread());
ASSERT_FALSE(raster_thread_merger_->IsOnPlatformThread());
ASSERT_EQ(fml::MessageLoop::GetCurrentTaskQueueId(), qid_platform);
post_merge.CountDown();
});
loop_raster->RunExpiredTasksNow();
post_merge.Wait();
latch2.Signal();
});
latch2.Wait();
loop_platform->GetTaskRunner()->PostTask(
[&]() { loop_platform->Terminate(); });
thread_platform.join();
thread_raster.join();
}
} // namespace testing
} // namespace fml
......@@ -96,6 +96,7 @@ void Rasterizer::Teardown() {
compositor_context_->OnGrContextDestroyed();
surface_.reset();
last_layer_tree_.reset();
if (raster_thread_merger_.get() != nullptr &&
raster_thread_merger_.get()->IsMerged()) {
raster_thread_merger_->UnMergeNow();
......@@ -667,22 +668,19 @@ std::optional<size_t> Rasterizer::GetResourceCacheMaxBytes() const {
return std::nullopt;
}
bool Rasterizer::EnsureThreadsAreMerged() {
void Rasterizer::EnsureThreadsAreMerged() {
if (surface_ == nullptr || raster_thread_merger_.get() == nullptr) {
return false;
return;
}
const size_t ThreadMergeLeaseTermDefault = 10;
fml::TaskRunner::RunNowOrPostTask(
delegate_.GetTaskRunners().GetRasterTaskRunner(),
[weak_this = weak_factory_.GetWeakPtr(),
thread_merger = raster_thread_merger_]() {
if (weak_this->surface_ == nullptr) {
return;
}
thread_merger->MergeWithLease(10);
thread_merger->MergeWithLease(ThreadMergeLeaseTermDefault);
});
raster_thread_merger_->WaitUntilMerged();
FML_DCHECK(raster_thread_merger_->IsMerged());
return true;
}
Rasterizer::Screenshot::Screenshot() {}
......
......@@ -400,12 +400,7 @@ class Rasterizer final : public SnapshotDelegate {
/// blocking the current thread until the 2 task runners are
/// merged.
///
/// @return `true` if raster and platform task runners are the same.
/// `true` if/when raster and platform task runners are merged.
/// `false` if the surface or the |RasterThreadMerger| has not
/// been initialized.
///
bool EnsureThreadsAreMerged();
void EnsureThreadsAreMerged();
private:
Delegate& delegate_;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册