concurrent_message_loop.cc 3.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13
// Copyright 2013 The Flutter Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "flutter/fml/concurrent_message_loop.h"

#include <algorithm>

#include "flutter/fml/thread.h"
#include "flutter/fml/trace_event.h"

namespace fml {

14 15 16 17 18 19 20 21
std::shared_ptr<ConcurrentMessageLoop> ConcurrentMessageLoop::Create(
    size_t worker_count) {
  return std::shared_ptr<ConcurrentMessageLoop>{
      new ConcurrentMessageLoop(worker_count)};
}

ConcurrentMessageLoop::ConcurrentMessageLoop(size_t worker_count)
    : worker_count_(std::max<size_t>(worker_count, 1ul)) {
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
  for (size_t i = 0; i < worker_count_; ++i) {
    workers_.emplace_back([i, this]() {
      fml::Thread::SetCurrentThreadName(
          std::string{"io.flutter.worker." + std::to_string(i + 1)});
      WorkerMain();
    });
  }
}

ConcurrentMessageLoop::~ConcurrentMessageLoop() {
  Terminate();
  for (auto& worker : workers_) {
    worker.join();
  }
}

38 39
size_t ConcurrentMessageLoop::GetWorkerCount() const {
  return worker_count_;
40 41
}

42 43
std::shared_ptr<ConcurrentTaskRunner> ConcurrentMessageLoop::GetTaskRunner() {
  return std::make_shared<ConcurrentTaskRunner>(weak_from_this());
44 45
}

46
void ConcurrentMessageLoop::PostTask(const fml::closure& task) {
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
  if (!task) {
    return;
  }

  std::unique_lock lock(tasks_mutex_);

  // Don't just drop tasks on the floor in case of shutdown.
  if (shutdown_) {
    FML_DLOG(WARNING)
        << "Tried to post a task to shutdown concurrent message "
           "loop. The task will be executed on the callers thread.";
    lock.unlock();
    task();
    return;
  }

  tasks_.push(task);

  // Unlock the mutex before notifying the condition variable because that mutex
  // has to be acquired on the other thread anyway. Waiting in this scope till
  // it is acquired there is a pessimization.
  lock.unlock();

  tasks_condition_.notify_one();
71 72 73
}

void ConcurrentMessageLoop::WorkerMain() {
74 75 76 77 78 79 80 81 82
  while (true) {
    std::unique_lock lock(tasks_mutex_);
    tasks_condition_.wait(lock,
                          [&]() { return tasks_.size() > 0 || shutdown_; });

    if (tasks_.size() == 0) {
      // This can only be caused by shutdown.
      FML_DCHECK(shutdown_);
      break;
83
    }
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109

    auto task = tasks_.front();
    tasks_.pop();

    // Don't hold onto the mutex while the task is being executed as it could
    // itself try to post another tasks to this message loop.
    lock.unlock();

    TRACE_EVENT0("flutter", "ConcurrentWorkerWake");
    // Execute the one tasks we woke up for.
    task();
  }
}

void ConcurrentMessageLoop::Terminate() {
  std::scoped_lock lock(tasks_mutex_);
  shutdown_ = true;
  tasks_condition_.notify_all();
}

ConcurrentTaskRunner::ConcurrentTaskRunner(
    std::weak_ptr<ConcurrentMessageLoop> weak_loop)
    : weak_loop_(std::move(weak_loop)) {}

ConcurrentTaskRunner::~ConcurrentTaskRunner() = default;

110
void ConcurrentTaskRunner::PostTask(const fml::closure& task) {
111 112 113 114 115 116 117
  if (!task) {
    return;
  }

  if (auto loop = weak_loop_.lock()) {
    loop->PostTask(task);
    return;
118 119
  }

120 121 122 123
  FML_DLOG(WARNING)
      << "Tried to post to a concurrent message loop that has already died. "
         "Executing the task on the callers thread.";
  task();
124 125 126
}

}  // namespace fml