diff --git a/synchronization/BUILD.gn b/synchronization/BUILD.gn index fa027d887c23950bdbda903c1d666a6064585357..a48195eef5edd2855ab0d845a90a93c1d040895b 100644 --- a/synchronization/BUILD.gn +++ b/synchronization/BUILD.gn @@ -4,11 +4,14 @@ source_set("synchronization") { sources = [ + "pipeline.cc", + "pipeline.h", "semaphore.cc", "semaphore.h", ] deps = [ + "//flutter/glue", "//lib/ftl", ] } diff --git a/synchronization/pipeline.cc b/synchronization/pipeline.cc new file mode 100644 index 0000000000000000000000000000000000000000..577e3b0b6e7a0678739635abf8f6c57f924c1e32 --- /dev/null +++ b/synchronization/pipeline.cc @@ -0,0 +1,11 @@ +// Copyright 2016 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "flutter/synchronization/pipeline.h" + +namespace flutter { + +// + +} // namespace flutter diff --git a/synchronization/pipeline.h b/synchronization/pipeline.h new file mode 100644 index 0000000000000000000000000000000000000000..a05713b13b9cf0efc0c1fb162dc08aa6cc883b61 --- /dev/null +++ b/synchronization/pipeline.h @@ -0,0 +1,111 @@ +// Copyright 2016 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef SYNCHRONIZATION_PIPELINE_H_ +#define SYNCHRONIZATION_PIPELINE_H_ + +#include "lib/ftl/macros.h" +#include "lib/ftl/synchronization/mutex.h" +#include "lib/ftl/functional/closure.h" +#include "flutter/synchronization/semaphore.h" +#include "flutter/synchronization/pipeline.h" +#include "flutter/glue/trace_event.h" + +#include +#include + +namespace flutter { + +enum class PipelineConsumeResult { + NoneAvailable, + Done, + MoreAvailable, +}; + +template +class Pipeline { + public: + using Resource = R; + using ResourcePtr = std::unique_ptr; + + explicit Pipeline(uint32_t depth) : empty_(depth), available_(0) {} + + ~Pipeline() {} + + bool IsValid() const { return empty_.IsValid() && available_.IsValid(); } + + using Producer = std::function; + + FTL_WARN_UNUSED_RESULT + bool Produce(Producer producer) { + if (producer == nullptr) { + return false; + } + + if (!empty_.TryWait()) { + return false; + } + + ResourcePtr resource; + + { + TRACE_EVENT0("flutter", "PipelineProduce"); + resource = producer(); + } + + { + ftl::MutexLocker lock(&queue_mutex_); + queue_.emplace(std::move(resource)); + } + + available_.Signal(); + + return true; + } + + using Consumer = std::function; + + FTL_WARN_UNUSED_RESULT + PipelineConsumeResult Consume(Consumer consumer) { + if (consumer == nullptr) { + return PipelineConsumeResult::NoneAvailable; + } + + if (!available_.TryWait()) { + return PipelineConsumeResult::NoneAvailable; + } + + ResourcePtr resource; + size_t items_count = 0; + + { + ftl::MutexLocker lock(&queue_mutex_); + resource = std::move(queue_.front()); + queue_.pop(); + items_count = queue_.size(); + } + + { + TRACE_EVENT0("flutter", "PipelineConsume"); + consumer(std::move(resource)); + } + + empty_.Signal(); + + return items_count > 0 ? PipelineConsumeResult::MoreAvailable + : PipelineConsumeResult::Done; + } + + private: + Semaphore empty_; + Semaphore available_; + ftl::Mutex queue_mutex_; + std::queue queue_; + + FTL_DISALLOW_COPY_AND_ASSIGN(Pipeline); +}; + +} // namespace flutter + +#endif // SYNCHRONIZATION_PIPELINE_H_