未验证 提交 6f5347c5 编写于 作者: K Kaushik Iska 提交者: GitHub

MessageLoopTaskQueue schedules Wakes (#9316)

* Refactor to move Task Queue to its own class

- This is to help with sharing task queue among
  multiple message loops going forward.

- currently there is 1:1 mapping between task queue
  and message loop, we are still maintaining the semantics
  for this change.

* Add mutex include

* Most of the waking up changes minus test failures

* Refactor MessageLoopImpl to be Wakeable

- Makes testing easier by letting us putting a TestWakeable

- Also move the waking up logic to the task queue

* add tests

* Fix formatting and license
上级 b9c790e2
......@@ -224,6 +224,7 @@ FILE: ../../../flutter/fml/trace_event.h
FILE: ../../../flutter/fml/unique_fd.cc
FILE: ../../../flutter/fml/unique_fd.h
FILE: ../../../flutter/fml/unique_object.h
FILE: ../../../flutter/fml/wakeable.h
FILE: ../../../flutter/lib/io/dart_io.cc
FILE: ../../../flutter/lib/io/dart_io.h
FILE: ../../../flutter/lib/snapshot/libraries.json
......
......@@ -77,6 +77,7 @@ source_set("fml") {
"unique_fd.cc",
"unique_fd.h",
"unique_object.h",
"wakeable.h",
]
public_deps = []
......
......@@ -41,6 +41,7 @@ fml::RefPtr<MessageLoopImpl> MessageLoopImpl::Create() {
MessageLoopImpl::MessageLoopImpl() : terminated_(false) {
task_queue_ = std::make_unique<MessageLoopTaskQueue>();
task_queue_->SetWakeable(this);
}
MessageLoopImpl::~MessageLoopImpl() = default;
......@@ -53,8 +54,7 @@ void MessageLoopImpl::PostTask(fml::closure task, fml::TimePoint target_time) {
// |task| synchronously within this function.
return;
}
const auto wake_up = task_queue_->RegisterTask(task, target_time);
WakeUp(wake_up);
task_queue_->RegisterTask(task, target_time);
}
void MessageLoopImpl::AddTaskObserver(intptr_t key, fml::closure callback) {
......@@ -130,9 +130,7 @@ void MessageLoopImpl::FlushTasks(FlushType type) {
// gather invocations -> Swap -> execute invocations
// will lead us to run invocations on the wrong thread.
std::lock_guard<std::mutex> task_flush_lock(tasks_flushing_mutex_);
const auto wake_up = task_queue_->GetTasksToRunNow(type, invocations);
WakeUp(wake_up);
task_queue_->GetTasksToRunNow(type, invocations);
for (const auto& invocation : invocations) {
invocation();
......
......@@ -20,10 +20,12 @@
#include "flutter/fml/message_loop_task_queue.h"
#include "flutter/fml/synchronization/thread_annotations.h"
#include "flutter/fml/time/time_point.h"
#include "flutter/fml/wakeable.h"
namespace fml {
class MessageLoopImpl : public fml::RefCountedThreadSafe<MessageLoopImpl> {
class MessageLoopImpl : public Wakeable,
public fml::RefCountedThreadSafe<MessageLoopImpl> {
public:
static fml::RefPtr<MessageLoopImpl> Create();
......@@ -33,8 +35,6 @@ class MessageLoopImpl : public fml::RefCountedThreadSafe<MessageLoopImpl> {
virtual void Terminate() = 0;
virtual void WakeUp(fml::TimePoint time_point) = 0;
void PostTask(fml::closure task, fml::TimePoint target_time);
void AddTaskObserver(intptr_t key, fml::closure callback);
......
......@@ -5,6 +5,7 @@
#define FML_USED_ON_EMBEDDER
#include "flutter/fml/message_loop_task_queue.h"
#include "flutter/fml/message_loop_impl.h"
namespace fml {
......@@ -17,11 +18,11 @@ void MessageLoopTaskQueue::Dispose() {
delayed_tasks_ = {};
}
fml::TimePoint MessageLoopTaskQueue::RegisterTask(fml::closure task,
fml::TimePoint target_time) {
void MessageLoopTaskQueue::RegisterTask(fml::closure task,
fml::TimePoint target_time) {
std::lock_guard<std::mutex> lock(delayed_tasks_mutex_);
delayed_tasks_.push({++order_, std::move(task), target_time});
return delayed_tasks_.top().GetTargetTime();
WakeUp(delayed_tasks_.top().GetTargetTime());
}
bool MessageLoopTaskQueue::HasPendingTasks() {
......@@ -29,7 +30,7 @@ bool MessageLoopTaskQueue::HasPendingTasks() {
return !delayed_tasks_.empty();
}
fml::TimePoint MessageLoopTaskQueue::GetTasksToRunNow(
void MessageLoopTaskQueue::GetTasksToRunNow(
FlushType type,
std::vector<fml::closure>& invocations) {
std::lock_guard<std::mutex> lock(delayed_tasks_mutex_);
......@@ -48,9 +49,15 @@ fml::TimePoint MessageLoopTaskQueue::GetTasksToRunNow(
}
if (delayed_tasks_.empty()) {
return fml::TimePoint::Max();
WakeUp(fml::TimePoint::Max());
} else {
return delayed_tasks_.top().GetTargetTime();
WakeUp(delayed_tasks_.top().GetTargetTime());
}
}
void MessageLoopTaskQueue::WakeUp(fml::TimePoint time) {
if (wakeable_) {
wakeable_->WakeUp(time);
}
}
......@@ -94,4 +101,8 @@ void MessageLoopTaskQueue::Swap(MessageLoopTaskQueue& other)
std::swap(delayed_tasks_, other.delayed_tasks_);
}
void MessageLoopTaskQueue::SetWakeable(fml::Wakeable* wakeable) {
wakeable_ = wakeable;
}
} // namespace fml
......@@ -14,6 +14,7 @@
#include "flutter/fml/macros.h"
#include "flutter/fml/memory/ref_counted.h"
#include "flutter/fml/synchronization/thread_annotations.h"
#include "flutter/fml/wakeable.h"
namespace fml {
......@@ -23,7 +24,8 @@ enum class FlushType {
};
// This class keeps track of all the tasks and observers that
// need to be run on it's MessageLoopImpl.
// need to be run on it's MessageLoopImpl. This also wakes up the
// loop at the required times.
class MessageLoopTaskQueue {
public:
// Lifecycle.
......@@ -36,13 +38,11 @@ class MessageLoopTaskQueue {
// Tasks methods.
fml::TimePoint RegisterTask(fml::closure task, fml::TimePoint target_time);
void RegisterTask(fml::closure task, fml::TimePoint target_time);
bool HasPendingTasks();
// Returns the wake up time.
fml::TimePoint GetTasksToRunNow(FlushType type,
std::vector<fml::closure>& invocations);
void GetTasksToRunNow(FlushType type, std::vector<fml::closure>& invocations);
size_t GetNumPendingTasks();
......@@ -58,7 +58,13 @@ class MessageLoopTaskQueue {
void Swap(MessageLoopTaskQueue& other);
void SetWakeable(fml::Wakeable* wakeable);
private:
void WakeUp(fml::TimePoint time);
Wakeable* wakeable_ = NULL;
std::mutex observers_mutex_;
std::map<intptr_t, fml::closure> task_observers_
FML_GUARDED_BY(observers_mutex_);
......
......@@ -5,20 +5,37 @@
#define FML_USED_ON_EMBEDDER
#include "flutter/fml/message_loop_task_queue.h"
#include "flutter/fml/synchronization/count_down_latch.h"
#include "flutter/fml/synchronization/waitable_event.h"
#include "gtest/gtest.h"
class TestWakeable : public fml::Wakeable {
public:
using WakeUpCall = std::function<void(const fml::TimePoint)>;
TestWakeable(WakeUpCall call) : wake_up_call_(call) {}
void WakeUp(fml::TimePoint time_point) override { wake_up_call_(time_point); }
private:
WakeUpCall wake_up_call_;
};
TEST(MessageLoopTaskQueue, StartsWithNoPendingTasks) {
auto task_queue = std::make_unique<fml::MessageLoopTaskQueue>();
ASSERT_FALSE(task_queue->HasPendingTasks());
}
TEST(MessageLoopTaskQueue, RegisterOneTask) {
auto task_queue = std::make_unique<fml::MessageLoopTaskQueue>();
const auto time = fml::TimePoint::Max();
const auto wake_time = task_queue->RegisterTask([] {}, time);
auto task_queue = std::make_unique<fml::MessageLoopTaskQueue>();
task_queue->SetWakeable(new TestWakeable(
[&time](fml::TimePoint wake_time) { ASSERT_TRUE(wake_time == time); }));
task_queue->RegisterTask([] {}, time);
ASSERT_TRUE(task_queue->HasPendingTasks());
ASSERT_TRUE(task_queue->GetNumPendingTasks() == 1);
ASSERT_TRUE(wake_time == time);
}
TEST(MessageLoopTaskQueue, RegisterTwoTasksAndCount) {
......@@ -68,3 +85,51 @@ TEST(MessageLoopTaskQueue, AddRemoveNotifyObservers) {
task_queue->NotifyObservers();
ASSERT_TRUE(test_val == 0);
}
TEST(MessageLoopTaskQueue, WakeUpIndependentOfTime) {
auto task_queue = std::make_unique<fml::MessageLoopTaskQueue>();
int num_wakes = 0;
task_queue->SetWakeable(new TestWakeable(
[&num_wakes](fml::TimePoint wake_time) { ++num_wakes; }));
task_queue->RegisterTask([]() {}, fml::TimePoint::Now());
task_queue->RegisterTask([]() {}, fml::TimePoint::Max());
ASSERT_TRUE(num_wakes == 2);
}
TEST(MessageLoopTaskQueue, WakeUpWithMaxIfNoInvocations) {
auto task_queue = std::make_unique<fml::MessageLoopTaskQueue>();
fml::AutoResetWaitableEvent ev;
task_queue->SetWakeable(new TestWakeable([&ev](fml::TimePoint wake_time) {
ASSERT_TRUE(wake_time == fml::TimePoint::Max());
ev.Signal();
}));
std::vector<fml::closure> invocations;
task_queue->GetTasksToRunNow(fml::FlushType::kAll, invocations);
ev.Wait();
}
TEST(MessageLoopTaskQueue, WokenUpWithNewerTime) {
auto task_queue = std::make_unique<fml::MessageLoopTaskQueue>();
fml::CountDownLatch latch(2);
fml::TimePoint expected = fml::TimePoint::Max();
task_queue->SetWakeable(
new TestWakeable([&latch, &expected](fml::TimePoint wake_time) {
ASSERT_TRUE(wake_time == expected);
latch.CountDown();
}));
task_queue->RegisterTask([]() {}, fml::TimePoint::Max());
const auto now = fml::TimePoint::Now();
expected = now;
task_queue->RegisterTask([]() {}, now);
latch.Wait();
}
// Copyright 2013 The Flutter Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef FLUTTER_FML_WAKEABLE_H_
#define FLUTTER_FML_WAKEABLE_H_
#include "flutter/fml/time/time_point.h"
namespace fml {
class Wakeable {
public:
virtual ~Wakeable() {}
virtual void WakeUp(fml::TimePoint time_point) = 0;
};
} // namespace fml
#endif // FLUTTER_FML_WAKEABLE_H_
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册