From 2d3e1c58c568cd418788cdd9fd57057204d39a1b Mon Sep 17 00:00:00 2001 From: Chris Yang Date: Thu, 27 Aug 2020 14:18:02 -0700 Subject: [PATCH] |MessageLoopImpl::FlushTasks| runs one task at a time (#20771) --- fml/message_loop_impl.cc | 16 ++- fml/message_loop_task_queues.cc | 32 ++--- fml/message_loop_task_queues.h | 6 +- fml/message_loop_task_queues_benchmark.cc | 15 ++- ...oop_task_queues_merge_unmerge_unittests.cc | 69 ++++++++-- fml/message_loop_task_queues_unittests.cc | 17 ++- fml/raster_thread_merger_unittests.cc | 119 ++++++++++++++++++ shell/common/rasterizer.cc | 12 +- shell/common/rasterizer.h | 7 +- 9 files changed, 233 insertions(+), 60 deletions(-) diff --git a/fml/message_loop_impl.cc b/fml/message_loop_impl.cc index d4c9331e35..a206b11a74 100644 --- a/fml/message_loop_impl.cc +++ b/fml/message_loop_impl.cc @@ -119,18 +119,24 @@ void MessageLoopImpl::DoTerminate() { void MessageLoopImpl::FlushTasks(FlushType type) { TRACE_EVENT0("fml", "MessageLoop::FlushTasks"); - std::vector invocations; - task_queue_->GetTasksToRunNow(queue_id_, type, invocations); - - for (const auto& invocation : invocations) { + const auto now = fml::TimePoint::Now(); + fml::closure invocation; + do { + invocation = task_queue_->GetNextTaskToRun(queue_id_, now); + if (!invocation) { + break; + } invocation(); std::vector observers = task_queue_->GetObserversToNotify(queue_id_); for (const auto& observer : observers) { observer(); } - } + if (type == FlushType::kSingle) { + break; + } + } while (invocation); } void MessageLoopImpl::RunExpiredTasksNow() { diff --git a/fml/message_loop_task_queues.cc b/fml/message_loop_task_queues.cc index 2c75156975..604e68acc5 100644 --- a/fml/message_loop_task_queues.cc +++ b/fml/message_loop_task_queues.cc @@ -87,35 +87,27 @@ bool MessageLoopTaskQueues::HasPendingTasks(TaskQueueId queue_id) const { return HasPendingTasksUnlocked(queue_id); } -void MessageLoopTaskQueues::GetTasksToRunNow( - TaskQueueId queue_id, - FlushType type, - std::vector& invocations) { +fml::closure MessageLoopTaskQueues::GetNextTaskToRun(TaskQueueId queue_id, + fml::TimePoint from_time) { std::lock_guard guard(queue_mutex_); if (!HasPendingTasksUnlocked(queue_id)) { - return; - } - - const auto now = fml::TimePoint::Now(); - - while (HasPendingTasksUnlocked(queue_id)) { - TaskQueueId top_queue = _kUnmerged; - const auto& top = PeekNextTaskUnlocked(queue_id, top_queue); - if (top.GetTargetTime() > now) { - break; - } - invocations.emplace_back(top.GetTask()); - queue_entries_.at(top_queue)->delayed_tasks.pop(); - if (type == FlushType::kSingle) { - break; - } + return nullptr; } + TaskQueueId top_queue = _kUnmerged; + const auto& top = PeekNextTaskUnlocked(queue_id, top_queue); if (!HasPendingTasksUnlocked(queue_id)) { WakeUpUnlocked(queue_id, fml::TimePoint::Max()); } else { WakeUpUnlocked(queue_id, GetNextWakeTimeUnlocked(queue_id)); } + + if (top.GetTargetTime() > from_time) { + return nullptr; + } + fml::closure invocation = top.GetTask(); + queue_entries_.at(top_queue)->delayed_tasks.pop(); + return invocation; } void MessageLoopTaskQueues::WakeUpUnlocked(TaskQueueId queue_id, diff --git a/fml/message_loop_task_queues.h b/fml/message_loop_task_queues.h index 3d2cfa0c17..eb056d6935 100644 --- a/fml/message_loop_task_queues.h +++ b/fml/message_loop_task_queues.h @@ -83,9 +83,7 @@ class MessageLoopTaskQueues bool HasPendingTasks(TaskQueueId queue_id) const; - void GetTasksToRunNow(TaskQueueId queue_id, - FlushType type, - std::vector& invocations); + fml::closure GetNextTaskToRun(TaskQueueId queue_id, fml::TimePoint from_time); size_t GetNumPendingTasks(TaskQueueId queue_id) const; @@ -112,7 +110,7 @@ class MessageLoopTaskQueues // 3. Each task queue can only be merged and subsumed once. // // Methods currently aware of the merged state of the queues: - // HasPendingTasks, GetTasksToRunNow, GetNumPendingTasks + // HasPendingTasks, GetNextTaskToRun, GetNumPendingTasks // This method returns false if either the owner or subsumed has already been // merged with something else. diff --git a/fml/message_loop_task_queues_benchmark.cc b/fml/message_loop_task_queues_benchmark.cc index 065a7c3cc1..165550213d 100644 --- a/fml/message_loop_task_queues_benchmark.cc +++ b/fml/message_loop_task_queues_benchmark.cc @@ -39,10 +39,17 @@ static void BM_RegisterAndGetTasks(benchmark::State& state) { // NOLINT } tasks_registered.CountDown(); tasks_registered.Wait(); - std::vector invocations; - task_queue->GetTasksToRunNow(TaskQueueId(task_runner_id), - fml::FlushType::kAll, invocations); - assert(invocations.size() == num_tasks_per_queue); + const auto now = fml::TimePoint::Now(); + int num_invocations = 0; + for (;;) { + fml::closure invocation = + task_queue->GetNextTaskToRun(TaskQueueId(task_runner_id), now); + if (!invocation) { + break; + } + num_invocations++; + } + assert(num_invocations == num_tasks_per_queue); tasks_done.CountDown(); }); } diff --git a/fml/message_loop_task_queues_merge_unmerge_unittests.cc b/fml/message_loop_task_queues_merge_unmerge_unittests.cc index 808183ee0c..0c928eba41 100644 --- a/fml/message_loop_task_queues_merge_unmerge_unittests.cc +++ b/fml/message_loop_task_queues_merge_unmerge_unittests.cc @@ -11,6 +11,9 @@ #include "flutter/fml/synchronization/waitable_event.h" #include "gtest/gtest.h" +namespace fml { +namespace testing { + class TestWakeable : public fml::Wakeable { public: using WakeUpCall = std::function; @@ -23,6 +26,25 @@ class TestWakeable : public fml::Wakeable { WakeUpCall wake_up_call_; }; +static int CountRemainingTasks(fml::RefPtr task_queue, + const TaskQueueId& queue_id, + bool run_invocation = false) { + const auto now = fml::TimePoint::Now(); + int count = 0; + fml::closure invocation; + do { + invocation = task_queue->GetNextTaskToRun(queue_id, now); + if (!invocation) { + break; + } + count++; + if (run_invocation) { + invocation(); + } + } while (invocation); + return count; +} + TEST(MessageLoopTaskQueueMergeUnmerge, AfterMergePrimaryTasksServicedOnPrimary) { auto task_queue = fml::MessageLoopTaskQueues::GetInstance(); @@ -127,8 +149,7 @@ TEST(MessageLoopTaskQueueMergeUnmerge, MergeInvokesBothWakeables) { task_queue->Merge(queue_id_1, queue_id_2); - std::vector invocations; - task_queue->GetTasksToRunNow(queue_id_1, fml::FlushType::kAll, invocations); + CountRemainingTasks(task_queue, queue_id_1); latch.Wait(); } @@ -157,12 +178,12 @@ TEST(MessageLoopTaskQueueMergeUnmerge, task_queue->Merge(queue_id_1, queue_id_2); task_queue->Unmerge(queue_id_1); - std::vector invocations; + CountRemainingTasks(task_queue, queue_id_1); - task_queue->GetTasksToRunNow(queue_id_1, fml::FlushType::kAll, invocations); latch_1.Wait(); - task_queue->GetTasksToRunNow(queue_id_2, fml::FlushType::kAll, invocations); + CountRemainingTasks(task_queue, queue_id_2); + latch_2.Wait(); } @@ -183,10 +204,8 @@ TEST(MessageLoopTaskQueueMergeUnmerge, GetTasksToRunNowBlocksMerge) { wake_up_end.Wait(); })); - std::thread tasks_to_run_now_thread([&]() { - std::vector invocations; - task_queue->GetTasksToRunNow(queue_id_1, fml::FlushType::kAll, invocations); - }); + std::thread tasks_to_run_now_thread( + [&]() { CountRemainingTasks(task_queue, queue_id_1); }); wake_up_start.Wait(); bool merge_done = false; @@ -208,3 +227,35 @@ TEST(MessageLoopTaskQueueMergeUnmerge, GetTasksToRunNowBlocksMerge) { tasks_to_run_now_thread.join(); merge_thread.join(); } + +TEST(MessageLoopTaskQueueMergeUnmerge, + FollowingTasksSwitchQueueIfFirstTaskMergesThreads) { + auto task_queue = fml::MessageLoopTaskQueues::GetInstance(); + + auto queue_id_1 = task_queue->CreateTaskQueue(); + auto queue_id_2 = task_queue->CreateTaskQueue(); + + fml::CountDownLatch latch(2); + + task_queue->SetWakeable( + queue_id_1, + new TestWakeable([&](fml::TimePoint wake_time) { latch.CountDown(); })); + task_queue->SetWakeable( + queue_id_2, + new TestWakeable([&](fml::TimePoint wake_time) { latch.CountDown(); })); + + task_queue->RegisterTask( + queue_id_2, [&]() { task_queue->Merge(queue_id_1, queue_id_2); }, + fml::TimePoint::Now()); + + task_queue->RegisterTask( + queue_id_2, []() {}, fml::TimePoint::Now()); + + ASSERT_EQ(CountRemainingTasks(task_queue, queue_id_2, true), 1); + ASSERT_EQ(CountRemainingTasks(task_queue, queue_id_1, true), 1); + + latch.Wait(); +} + +} // namespace testing +} // namespace fml diff --git a/fml/message_loop_task_queues_unittests.cc b/fml/message_loop_task_queues_unittests.cc index 3bda17e2cb..3678bf2ff7 100644 --- a/fml/message_loop_task_queues_unittests.cc +++ b/fml/message_loop_task_queues_unittests.cc @@ -11,6 +11,9 @@ #include "flutter/fml/synchronization/waitable_event.h" #include "gtest/gtest.h" +namespace fml { +namespace testing { + class TestWakeable : public fml::Wakeable { public: using WakeUpCall = std::function; @@ -69,12 +72,13 @@ TEST(MessageLoopTaskQueue, PreserveTaskOrdering) { task_queue->RegisterTask( queue_id, [&test_val]() { test_val = 2; }, fml::TimePoint::Now()); - std::vector invocations; - task_queue->GetTasksToRunNow(queue_id, fml::FlushType::kAll, invocations); - + const auto now = fml::TimePoint::Now(); int expected_value = 1; - - for (auto& invocation : invocations) { + for (;;) { + fml::closure invocation = task_queue->GetNextTaskToRun(queue_id, now); + if (!invocation) { + break; + } invocation(); ASSERT_TRUE(test_val == expected_value); expected_value++; @@ -277,3 +281,6 @@ TEST(MessageLoopTaskQueue, RegisterTaskWakesUpOwnerQueue) { ASSERT_EQ(time1, wakes[1]); ASSERT_EQ(time1, wakes[2]); } + +} // namespace testing +} // namespace fml diff --git a/fml/raster_thread_merger_unittests.cc b/fml/raster_thread_merger_unittests.cc index f3df1c1bb2..15e95831a1 100644 --- a/fml/raster_thread_merger_unittests.cc +++ b/fml/raster_thread_merger_unittests.cc @@ -14,6 +14,9 @@ #include "flutter/fml/task_runner.h" #include "gtest/gtest.h" +namespace fml { +namespace testing { + TEST(RasterThreadMerger, RemainMergedTillLeaseExpires) { fml::MessageLoop* loop1 = nullptr; fml::AutoResetWaitableEvent latch1; @@ -301,3 +304,119 @@ TEST(RasterThreadMerger, HandleTaskQueuesAreTheSame) { term1.Signal(); thread1.join(); } + +TEST(RasterThreadMerger, RunExpiredTasksWhileFirstTaskMergesThreads) { + fml::MessageLoop* loop_platform = nullptr; + fml::AutoResetWaitableEvent latch1; + std::thread thread_platform([&loop_platform, &latch1]() { + fml::MessageLoop::EnsureInitializedForCurrentThread(); + loop_platform = &fml::MessageLoop::GetCurrent(); + loop_platform->GetTaskRunner()->PostTask([&]() { latch1.Signal(); }); + loop_platform->Run(); + }); + + fml::MessageLoop* loop_raster = nullptr; + fml::AutoResetWaitableEvent latch2; + std::thread thread_raster([&loop_raster, &loop_platform, &latch1, &latch2]() { + latch1.Wait(); + + fml::MessageLoop::EnsureInitializedForCurrentThread(); + loop_raster = &fml::MessageLoop::GetCurrent(); + fml::TaskQueueId qid_platform = + loop_platform->GetTaskRunner()->GetTaskQueueId(); + fml::TaskQueueId qid_raster = + loop_raster->GetTaskRunner()->GetTaskQueueId(); + fml::CountDownLatch post_merge(2); + const auto raster_thread_merger_ = + fml::MakeRefCounted(qid_platform, qid_raster); + loop_raster->GetTaskRunner()->PostTask([&]() { + ASSERT_TRUE(raster_thread_merger_->IsOnRasterizingThread()); + ASSERT_FALSE(raster_thread_merger_->IsOnPlatformThread()); + ASSERT_EQ(fml::MessageLoop::GetCurrentTaskQueueId(), qid_raster); + raster_thread_merger_->MergeWithLease(1); + post_merge.CountDown(); + }); + + loop_raster->GetTaskRunner()->PostTask([&]() { + ASSERT_TRUE(raster_thread_merger_->IsOnRasterizingThread()); + ASSERT_TRUE(raster_thread_merger_->IsOnPlatformThread()); + ASSERT_EQ(fml::MessageLoop::GetCurrentTaskQueueId(), qid_platform); + raster_thread_merger_->DecrementLease(); + post_merge.CountDown(); + }); + + loop_raster->RunExpiredTasksNow(); + post_merge.Wait(); + latch2.Signal(); + }); + + latch2.Wait(); + loop_platform->GetTaskRunner()->PostTask( + [&]() { loop_platform->Terminate(); }); + + thread_platform.join(); + thread_raster.join(); +} + +TEST(RasterThreadMerger, RunExpiredTasksWhileFirstTaskUnMergesThreads) { + fml::MessageLoop* loop_platform = nullptr; + fml::AutoResetWaitableEvent latch1; + std::thread thread_platform([&loop_platform, &latch1]() { + fml::MessageLoop::EnsureInitializedForCurrentThread(); + loop_platform = &fml::MessageLoop::GetCurrent(); + loop_platform->GetTaskRunner()->PostTask([&]() { latch1.Signal(); }); + loop_platform->Run(); + }); + + fml::MessageLoop* loop_raster = nullptr; + fml::AutoResetWaitableEvent latch2; + std::thread thread_raster([&loop_raster, &loop_platform, &latch1, &latch2]() { + latch1.Wait(); + + fml::MessageLoop::EnsureInitializedForCurrentThread(); + loop_raster = &fml::MessageLoop::GetCurrent(); + fml::TaskQueueId qid_platform = + loop_platform->GetTaskRunner()->GetTaskQueueId(); + fml::TaskQueueId qid_raster = + loop_raster->GetTaskRunner()->GetTaskQueueId(); + fml::CountDownLatch post_merge(2); + + const auto raster_thread_merger_ = + fml::MakeRefCounted(qid_platform, qid_raster); + loop_raster->GetTaskRunner()->PostTask([&]() { + raster_thread_merger_->MergeWithLease(1); + post_merge.CountDown(); + }); + + loop_raster->RunExpiredTasksNow(); + + loop_raster->GetTaskRunner()->PostTask([&]() { + ASSERT_TRUE(raster_thread_merger_->IsOnRasterizingThread()); + ASSERT_TRUE(raster_thread_merger_->IsOnPlatformThread()); + ASSERT_EQ(fml::MessageLoop::GetCurrentTaskQueueId(), qid_platform); + raster_thread_merger_->DecrementLease(); + post_merge.CountDown(); + }); + + loop_raster->GetTaskRunner()->PostTask([&]() { + ASSERT_TRUE(raster_thread_merger_->IsOnRasterizingThread()); + ASSERT_FALSE(raster_thread_merger_->IsOnPlatformThread()); + ASSERT_EQ(fml::MessageLoop::GetCurrentTaskQueueId(), qid_platform); + post_merge.CountDown(); + }); + + loop_raster->RunExpiredTasksNow(); + post_merge.Wait(); + latch2.Signal(); + }); + + latch2.Wait(); + loop_platform->GetTaskRunner()->PostTask( + [&]() { loop_platform->Terminate(); }); + + thread_platform.join(); + thread_raster.join(); +} + +} // namespace testing +} // namespace fml diff --git a/shell/common/rasterizer.cc b/shell/common/rasterizer.cc index 8546f6bcde..0cc894d7d4 100644 --- a/shell/common/rasterizer.cc +++ b/shell/common/rasterizer.cc @@ -96,6 +96,7 @@ void Rasterizer::Teardown() { compositor_context_->OnGrContextDestroyed(); surface_.reset(); last_layer_tree_.reset(); + if (raster_thread_merger_.get() != nullptr && raster_thread_merger_.get()->IsMerged()) { raster_thread_merger_->UnMergeNow(); @@ -667,22 +668,19 @@ std::optional Rasterizer::GetResourceCacheMaxBytes() const { return std::nullopt; } -bool Rasterizer::EnsureThreadsAreMerged() { +void Rasterizer::EnsureThreadsAreMerged() { if (surface_ == nullptr || raster_thread_merger_.get() == nullptr) { - return false; + return; } + const size_t ThreadMergeLeaseTermDefault = 10; fml::TaskRunner::RunNowOrPostTask( delegate_.GetTaskRunners().GetRasterTaskRunner(), [weak_this = weak_factory_.GetWeakPtr(), thread_merger = raster_thread_merger_]() { - if (weak_this->surface_ == nullptr) { - return; - } - thread_merger->MergeWithLease(10); + thread_merger->MergeWithLease(ThreadMergeLeaseTermDefault); }); raster_thread_merger_->WaitUntilMerged(); FML_DCHECK(raster_thread_merger_->IsMerged()); - return true; } Rasterizer::Screenshot::Screenshot() {} diff --git a/shell/common/rasterizer.h b/shell/common/rasterizer.h index c584114e79..e2b0caaa84 100644 --- a/shell/common/rasterizer.h +++ b/shell/common/rasterizer.h @@ -400,12 +400,7 @@ class Rasterizer final : public SnapshotDelegate { /// blocking the current thread until the 2 task runners are /// merged. /// - /// @return `true` if raster and platform task runners are the same. - /// `true` if/when raster and platform task runners are merged. - /// `false` if the surface or the |RasterThreadMerger| has not - /// been initialized. - /// - bool EnsureThreadsAreMerged(); + void EnsureThreadsAreMerged(); private: Delegate& delegate_; -- GitLab