提交 24002a5d 编写于 作者: A Alexander Alekhin

Merge pull request #19731 from rgarnov:rg/basic_frame_drop

// This file is part of OpenCV project.
// It is subject to the license terms in the LICENSE file found in the top-level directory
// of this distribution and at http://opencv.org/license.html.
//
// Copyright (C) 2021 Intel Corporation
#ifndef OPENCV_GAPI_STREAMING_SYNC_HPP
#define OPENCV_GAPI_STREAMING_SYNC_HPP
namespace cv {
namespace gapi {
namespace streaming {
enum class sync_policy {
dont_sync,
drop
};
} // namespace streaming
} // namespace gapi
namespace detail {
template<> struct CompileArgTag<gapi::streaming::sync_policy> {
static const char* tag() { return "gapi.streaming.sync_policy"; }
};
} // namespace detail
} // namespace cv
#endif // OPENCV_GAPI_STREAMING_SYNC_HPP
......@@ -24,6 +24,9 @@
#include "executor/gstreamingexecutor.hpp"
#include <opencv2/gapi/streaming/meta.hpp>
#include <opencv2/gapi/streaming/sync.hpp>
namespace
{
using namespace cv::gimpl::stream;
......@@ -312,12 +315,8 @@ public:
cv::GRunArgs &out_results);
};
// This method handles a stop sign got from some input
// island. Reiterate through all _remaining valid_ queues (some of
// them can be set to nullptr already -- see handling in
// getInputVector) and rewind data to every Stop sign per queue.
void QueueReader::rewindToStop(std::vector<Q*> &in_queues,
const std::size_t this_id)
void rewindToStop(std::vector<Q*> &in_queues,
const std::size_t this_id)
{
for (auto &&qit : ade::util::indexed(in_queues))
{
......@@ -331,6 +330,16 @@ void QueueReader::rewindToStop(std::vector<Q*> &in_queues,
}
}
// This method handles a stop sign got from some input
// island. Reiterate through all _remaining valid_ queues (some of
// them can be set to nullptr already -- see handling in
// getInputVector) and rewind data to every Stop sign per queue.
void QueueReader::rewindToStop(std::vector<Q*> &in_queues,
const std::size_t this_id)
{
::rewindToStop(in_queues, this_id);
}
bool QueueReader::getInputVector(std::vector<Q*> &in_queues,
cv::GRunArgs &in_constants,
cv::GRunArgs &isl_inputs)
......@@ -496,7 +505,7 @@ void emitterActorThread(std::shared_ptr<cv::gimpl::GIslandEmitter> emitter,
return;
}
// Try to obrain next data chunk from the source
// Try to obtain next data chunk from the source
cv::GRunArg data;
if (emitter->pull(data))
{
......@@ -521,6 +530,87 @@ void emitterActorThread(std::shared_ptr<cv::gimpl::GIslandEmitter> emitter,
}
}
// This thread pulls data from the assigned input queues and makes sure that
// all input args are in sync (timestamps are equal), dropping some inputs if required.
// After getting synchronized inputs from all input queues, the thread pushes them to out queues
void syncActorThread(std::vector<Q*> in_queues,
std::vector<std::vector<Q*>> out_queues) {
using timestamp_t = int64_t;
std::vector<bool> pop_nexts(in_queues.size());
std::vector<Cmd> cmds(in_queues.size());
while (true) {
// pop_nexts indicates which queue still contains earlier timestamps and
// needs to be popped at least one more time.
// For each iteration (frame) we need to pull from each input queue at least once,
// so switch all to true when start processing new frame
for (auto&& p : pop_nexts) {
p = true;
}
timestamp_t max_ts = 0u;
// Iterate through all input queues, pop GRunArg's and compare timestamps.
// Continue pulling from queues whose timestamps are smaller.
// Finish when all timestamps are equal.
do {
for (auto&& it : ade::util::indexed(
ade::util::zip(pop_nexts, in_queues, cmds))) {
auto& val = ade::util::value(it);
auto& pop_next = std::get<0>(val);
if (!pop_next) {
continue;
}
auto& q = std::get<1>(val);
auto& cmd = std::get<2>(val);
q->pop(cmd);
if (cv::util::holds_alternative<Stop>(cmd)) {
// We got a stop command from one of the input queues.
// Rewind all input queues till Stop command,
// Push Stop command down the graph, finish the thread
rewindToStop(in_queues, ade::util::index(it));
for (auto &&oqs : out_queues) {
for (auto &&oq : oqs) {
oq->push(Cmd{Stop{}});
}
}
return;
}
// Extract the timestamp
auto& arg = cv::util::get<cv::GRunArg>(cmd);
auto ts = cv::util::any_cast<int64_t>(arg.meta[cv::gapi::streaming::meta_tag::timestamp]);
GAPI_Assert(ts >= 0u);
// TODO: this whole drop logic can be imported via compile args
// to give a user a way to customize it
if (ts < max_ts) {
// Continue popping from this queue
pop_next = true;
} else if (ts == max_ts) {
// Stop popping from this queue
pop_next = false;
} else if (ts > max_ts) {
// We got a timestamp which is greater than timestamps from other queues.
// It means that we need to reiterate through all the queues one more time
// (except the current one)
max_ts = ts;
for (auto&& p : pop_nexts) {
p = true;
}
pop_next = false;
}
}
} while (ade::util::any_of(pop_nexts, [](bool v){ return v; }));
// Finally we got all our inputs synchronized, push them further down the graph
for (auto &&it : ade::util::zip(out_queues, cmds)) {
for (auto &&q : std::get<0>(it)) {
q->push(std::get<1>(it));
}
}
}
}
class StreamingInput final: public cv::gimpl::GIslandExecutable::IInput
{
QueueReader &qr;
......@@ -874,6 +964,85 @@ void check_DesyncObjectConsumedByMultipleIslands(const cv::gimpl::GIslandModel::
} // anonymous namespace
class cv::gimpl::GStreamingExecutor::Synchronizer final {
gapi::streaming::sync_policy m_sync_policy = gapi::streaming::sync_policy::dont_sync;
ade::Graph& m_island_graph;
cv::gimpl::GIslandModel::Graph m_gim;
std::size_t m_queue_capacity = 0u;
std::thread m_thread;
std::vector<ade::NodeHandle> m_synchronized_emitters;
std::vector<stream::SyncQueue> m_sync_queues;
std::vector<stream::Q*> newSyncQueue() {
m_sync_queues.emplace_back(SyncQueue{});
m_sync_queues.back().set_capacity(m_queue_capacity);
return std::vector<Q*>{&m_sync_queues.back()};
}
public:
Synchronizer(gapi::streaming::sync_policy sync_policy,
ade::Graph& island_graph,
std::size_t queue_capacity)
: m_sync_policy(sync_policy)
, m_island_graph(island_graph)
, m_gim(m_island_graph)
, m_queue_capacity(queue_capacity) {
}
void registerVideoEmitters(std::vector<ade::NodeHandle>&& emitters) {
// There is no point to make synchronization for the one video input
// so do nothing in this case
if ( m_sync_policy == cv::gapi::streaming::sync_policy::drop
&& emitters.size() > 1u) {
m_synchronized_emitters = std::move(emitters);
m_sync_queues.reserve(m_synchronized_emitters.size());
}
}
std::vector<stream::Q*> outQueues(const ade::NodeHandle& emitter) {
// If the emitter was registered previously (which means it needs to be synchronized),
// create a new queue for this emitter to push the data to. Sync thread will
// pop from this queue and push data to emitter's readers.
// If the emitter was not registered, direct emitter output to its immediate readers right away
return m_synchronized_emitters.end() != std::find(m_synchronized_emitters.begin(),
m_synchronized_emitters.end(),
emitter)
? newSyncQueue()
: reader_queues(m_island_graph, emitter->outNodes().front());
}
// Start a thread which will handle the synchronization.
// Do nothing if synchronization is not needed
void start() {
if (m_synchronized_emitters.size() != 0) {
GAPI_Assert(m_synchronized_emitters.size() > 1u);
std::vector<Q*> sync_in_queues(m_synchronized_emitters.size());
std::vector<std::vector<Q*>> sync_out_queues(m_synchronized_emitters.size());
for (auto it : ade::util::indexed(m_synchronized_emitters)) {
const auto id = ade::util::index(it);
const auto eh = ade::util::value(it);
sync_in_queues[id] = &m_sync_queues[id];
sync_out_queues[id] = reader_queues(m_island_graph, eh->outNodes().front());
}
m_thread = std::thread(syncActorThread,
std::move(sync_in_queues),
std::move(sync_out_queues));
}
}
void join() {
if (m_synchronized_emitters.size() != 0) {
m_thread.join();
}
}
void clear() {
for (auto &q : m_sync_queues) q.clear();
m_sync_queues.clear();
m_synchronized_emitters.clear();
}
};
// GStreamingExecutor expects compile arguments as input to have possibility to do
// proper graph reshape and islands recompilation
cv::gimpl::GStreamingExecutor::GStreamingExecutor(std::unique_ptr<ade::Graph> &&g_model,
......@@ -911,6 +1080,10 @@ cv::gimpl::GStreamingExecutor::GStreamingExecutor(std::unique_ptr<ade::Graph> &&
return m_gim.metadata(nh).get<NodeKind>().k == NodeKind::ISLAND;
});
auto sync_policy = cv::gimpl::getCompileArg<cv::gapi::streaming::sync_policy>(m_comp_args)
.value_or(cv::gapi::streaming::sync_policy::dont_sync);
m_sync.reset(new Synchronizer(sync_policy, *m_island_graph, queue_capacity));
// If metadata was not passed to compileStreaming, Islands are not compiled at this point.
// It is fine -- Islands are then compiled in setSource (at the first valid call).
const bool islands_compiled = m_gim.metadata().contains<IslandsCompiled>();
......@@ -934,7 +1107,7 @@ cv::gimpl::GStreamingExecutor::GStreamingExecutor(std::unique_ptr<ade::Graph> &&
std::unordered_set<ade::NodeHandle, ade::HandleHasher<ade::Node> > const_ins;
// FIXME: THIS ORDER IS IRRELEVANT TO PROTOCOL OR ANY OTHER ORDER!
// FIXME: SAME APPLIES TO THE REGULAR GEEXECUTOR!!
// FIXME: SAME APPLIES TO THE REGULAR GEXECUTOR!!
auto xtract_in = [&](ade::NodeHandle slot_nh, std::vector<RcDesc> &vec)
{
const auto orig_data_nh
......@@ -1101,19 +1274,6 @@ void cv::gimpl::GStreamingExecutor::setSource(GRunArgs &&ins)
{
GAPI_Assert(state == State::READY || state == State::STOPPED);
const auto is_video = [](const GRunArg &arg)
{
return util::holds_alternative<cv::gapi::wip::IStreamSource::Ptr>(arg);
};
const auto num_videos = std::count_if(ins.begin(), ins.end(), is_video);
if (num_videos > 1)
{
// See below why (another reason - no documented behavior
// on handling videos streams of different length)
util::throw_error(std::logic_error("Only one video source is"
" currently supported!"));
}
GModel::ConstGraph gm(*m_orig_graph);
// Now the tricky-part: completing Islands compilation if compileStreaming
// has been called without meta arguments.
......@@ -1180,6 +1340,8 @@ void cv::gimpl::GStreamingExecutor::setSource(GRunArgs &&ins)
// Walk through the protocol, set-up emitters appropriately
// There's a 1:1 mapping between emitters and corresponding data inputs.
// Also collect video emitter nodes to use them later in synchronization
std::vector<ade::NodeHandle> video_emitters;
for (auto it : ade::util::zip(ade::util::toRange(m_emitters),
ade::util::toRange(ins),
ade::util::iota(m_emitters.size())))
......@@ -1197,6 +1359,9 @@ void cv::gimpl::GStreamingExecutor::setSource(GRunArgs &&ins)
case T::index_of<cv::gapi::wip::IStreamSource::Ptr>():
#if !defined(GAPI_STANDALONE)
emitter.reset(new VideoEmitter{emit_arg});
// Currently all video inputs are syncronized if sync policy is to drop,
// there is no different fps branches etc, so all video emitters are registered
video_emitters.emplace_back(emit_nh);
#else
util::throw_error(std::logic_error("Video is not supported in the "
"standalone mode"));
......@@ -1212,6 +1377,8 @@ void cv::gimpl::GStreamingExecutor::setSource(GRunArgs &&ins)
}
}
m_sync->registerVideoEmitters(std::move(video_emitters));
// FIXME: The below code assumes our graph may have only one
// real video source (and so, only one stream which may really end)
// all other inputs are "constant" generators.
......@@ -1249,7 +1416,7 @@ void cv::gimpl::GStreamingExecutor::setSource(GRunArgs &&ins)
auto emitter = m_gim.metadata(eh).get<Emitter>().object;
// Collect all reader queues from the emitter's the only output object
auto out_queues = reader_queues(*m_island_graph, eh->outNodes().front());
auto out_queues = m_sync->outQueues(eh);
m_threads.emplace_back(emitterActorThread,
emitter,
......@@ -1258,6 +1425,8 @@ void cv::gimpl::GStreamingExecutor::setSource(GRunArgs &&ins)
real_video_completion_cb);
}
m_sync->start();
// Now do this for every island (in a topological order)
for (auto &&op : m_ops)
{
......@@ -1341,6 +1510,7 @@ void cv::gimpl::GStreamingExecutor::wait_shutdown()
// FIXME: Of course it can be designed much better
for (auto &t : m_threads) t.join();
m_threads.clear();
m_sync->join();
// Clear all queues
// If there are constant emitters, internal queues
......@@ -1352,7 +1522,10 @@ void cv::gimpl::GStreamingExecutor::wait_shutdown()
for (auto &q : m_emitter_queues) q.clear();
for (auto &q : m_sink_queues) q->clear();
for (auto &q : m_internal_queues) q->clear();
m_const_emitter_queues.clear();
m_const_vals.clear();
m_out_queue.clear();
m_sync->clear();
for (auto &&op : m_ops) {
op.isl_exec->handleStopStream();
......
......@@ -167,6 +167,9 @@ protected:
std::vector<ade::NodeHandle> m_emitters;
std::vector<ade::NodeHandle> m_sinks;
class Synchronizer;
std::unique_ptr<Synchronizer> m_sync;
std::vector<std::thread> m_threads;
std::vector<stream::SyncQueue> m_emitter_queues;
......
// This file is part of OpenCV project.
// It is subject to the license terms in the LICENSE file found in the top-level directory
// of this distribution and at http://opencv.org/license.html.
//
// Copyright (C) 2021 Intel Corporation
#include "../test_precomp.hpp"
#include <opencv2/gapi/streaming/cap.hpp>
#include <opencv2/gapi/core.hpp>
#include <opencv2/gapi/fluid/imgproc.hpp>
#include <opencv2/gapi/streaming/cap.hpp>
#include <opencv2/gapi/streaming/sync.hpp>
namespace opencv_test {
namespace {
using ts_t = int64_t;
using ts_vec = std::vector<ts_t>;
using cv::gapi::streaming::sync_policy;
ts_t calcLeastCommonMultiple(const ts_vec& values) {
ts_t res = *std::max_element(values.begin(), values.end());
auto isDivisor = [&](ts_t v) { return res % v == 0; };
while(!std::all_of(values.begin(), values.end(), isDivisor)) {
res++;
}
return res;
}
struct TimestampGenerationParams {
const ts_vec frame_times;
sync_policy policy;
ts_t end_time;
TimestampGenerationParams(const ts_vec& ft, sync_policy sp, ts_t et = 25)
: frame_times(ft), policy(sp), end_time(et) {
}
};
class MultiFrameSource {
class SingleSource : public cv::gapi::wip::IStreamSource {
MultiFrameSource& m_source;
std::size_t m_idx;
public:
SingleSource(MultiFrameSource& s, std::size_t idx)
: m_source(s)
, m_idx(idx)
{}
virtual bool pull(cv::gapi::wip::Data& data) {
return m_source.pull(data, m_idx);
}
virtual GMetaArg descr_of() const { return GMetaArg{m_source.desc()}; }
};
TimestampGenerationParams p;
ts_vec m_current_times;
cv::Mat m_mat;
public:
MultiFrameSource(const TimestampGenerationParams& params)
: p(params)
, m_current_times(p.frame_times.size(), 0u)
, m_mat(8, 8, CV_8UC1) {
}
bool pull(cv::gapi::wip::Data& data, std::size_t idx) {
cv::randn(m_mat, 127, 32);
GAPI_Assert(idx < p.frame_times.size());
m_current_times[idx] += p.frame_times[idx];
if (m_current_times[idx] >= p.end_time) {
return false;
}
data = m_mat.clone();
data.meta[cv::gapi::streaming::meta_tag::timestamp] = m_current_times[idx];
return true;
}
cv::gapi::wip::IStreamSource::Ptr getSource(std::size_t idx) {
return cv::gapi::wip::IStreamSource::Ptr{new SingleSource(*this, idx)};
}
GMatDesc desc() const { return cv::descr_of(m_mat); }
};
class TimestampChecker {
TimestampGenerationParams p;
ts_t m_synced_time = 0u;
ts_t m_synced_frame_time = 0u;
public:
TimestampChecker(const TimestampGenerationParams& params)
: p(params)
, m_synced_frame_time(calcLeastCommonMultiple(p.frame_times)) {
}
void checkNext(const ts_vec& timestamps) {
if (p.policy == sync_policy::dont_sync) {
// don't check timestamps if the policy is dont_sync
return;
}
m_synced_time += m_synced_frame_time;
for (const auto& ts : timestamps) {
EXPECT_EQ(m_synced_time, ts);
}
}
std::size_t nFrames() const {
auto frame_time = p.policy == sync_policy::dont_sync
? *std::max_element(p.frame_times.begin(), p.frame_times.end())
: m_synced_frame_time;
auto n_frames = p.end_time / frame_time;
GAPI_Assert(n_frames > 0u);
return n_frames;
}
};
struct TimestampSyncTest : public ::testing::TestWithParam<sync_policy> {
void run(cv::GProtoInputArgs&& ins, cv::GProtoOutputArgs&& outs,
const ts_vec& frame_times) {
auto video_in_n = frame_times.size();
GAPI_Assert(video_in_n <= ins.m_args.size());
// Assume that all remaining inputs are const
auto const_in_n = ins.m_args.size() - video_in_n;
auto out_n = outs.m_args.size();
auto policy = GetParam();
TimestampGenerationParams ts_params(frame_times, policy);
MultiFrameSource source(ts_params);
GRunArgs gins;
for (std::size_t i = 0; i < video_in_n; i++) {
gins += cv::gin(source.getSource(i));
}
auto desc = source.desc();
cv::Mat const_mat = cv::Mat::eye(desc.size.height,
desc.size.width,
CV_MAKE_TYPE(desc.depth, desc.chan));
for (std::size_t i = 0; i < const_in_n; i++) {
gins += cv::gin(const_mat);
}
ts_vec out_timestamps(out_n);
cv::GRunArgsP gouts{};
for (auto& t : out_timestamps) {
gouts += cv::gout(t);
}
auto pipe = cv::GComputation(std::move(ins), std::move(outs))
.compileStreaming(cv::compile_args(policy));
pipe.setSource(std::move(gins));
pipe.start();
std::size_t frames = 0u;
TimestampChecker checker(ts_params);
while(pipe.pull(std::move(gouts))) {
checker.checkNext(out_timestamps);
frames++;
}
EXPECT_EQ(checker.nFrames(), frames);
}
};
} // anonymous namespace
TEST_P(TimestampSyncTest, Basic)
{
cv::GMat in1, in2;
auto out = cv::gapi::add(in1, in2);
auto ts = cv::gapi::streaming::timestamp(out);
run(cv::GIn(in1, in2), cv::GOut(ts), {1,2});
}
TEST_P(TimestampSyncTest, ThreeInputs)
{
cv::GMat in1, in2, in3;
auto tmp = cv::gapi::add(in1, in2);
auto out = cv::gapi::add(tmp, in3);
auto ts = cv::gapi::streaming::timestamp(out);
run(cv::GIn(in1, in2, in3), cv::GOut(ts), {2,4,3});
}
TEST_P(TimestampSyncTest, TwoOutputs)
{
cv::GMat in1, in2, in3;
auto out1 = cv::gapi::add(in1, in3);
auto out2 = cv::gapi::add(in2, in3);
auto ts1 = cv::gapi::streaming::timestamp(out1);
auto ts2 = cv::gapi::streaming::timestamp(out2);
run(cv::GIn(in1, in2, in3), cv::GOut(ts1, ts2), {1,4,2});
}
TEST_P(TimestampSyncTest, ConstInput)
{
cv::GMat in1, in2, in3;
auto out1 = cv::gapi::add(in1, in3);
auto out2 = cv::gapi::add(in2, in3);
auto ts1 = cv::gapi::streaming::timestamp(out1);
auto ts2 = cv::gapi::streaming::timestamp(out2);
run(cv::GIn(in1, in2, in3), cv::GOut(ts1, ts2), {1,2});
}
TEST_P(TimestampSyncTest, ChangeSource)
{
cv::GMat in1, in2, in3;
auto out1 = cv::gapi::add(in1, in3);
auto out2 = cv::gapi::add(in2, in3);
auto ts1 = cv::gapi::streaming::timestamp(out1);
auto ts2 = cv::gapi::streaming::timestamp(out2);
run(cv::GIn(in1, in2, in3), cv::GOut(ts1, ts2), {1,2});
run(cv::GIn(in1, in2, in3), cv::GOut(ts1, ts2), {1,2});
}
INSTANTIATE_TEST_CASE_P(InputSynchronization, TimestampSyncTest,
Values(sync_policy::dont_sync,
sync_policy::drop));
} // namespace opencv_test
......@@ -1104,25 +1104,37 @@ struct GAPI_Streaming_Unit: public ::testing::Test {
// FIXME: (GAPI_Streaming_Types, XChangeOpaque) test is missing here!
// FIXME: (GAPI_Streaming_Types, OutputOpaque) test is missing here!
TEST_F(GAPI_Streaming_Unit, TestTwoVideoSourcesFail)
TEST(GAPI_Streaming, TestTwoVideosDifferentLength)
{
auto c_desc = cv::GMatDesc{CV_8U,3,{768,576}};
auto m_desc = cv::descr_of(m);
auto path = findDataFile("cv/video/768x576.avi");
initTestDataPath();
auto desc = cv::GMatDesc{CV_8U,3,{768,576}};
std::string path1, path2;
try {
sc = cc.compileStreaming(c_desc, m_desc);
// FIXME: it should be EXPECT_NO_THROW()
sc.setSource(cv::gin(gapi::wip::make_src<cv::gapi::wip::GCaptureSource>(path), m));
sc = cc.compileStreaming(m_desc, c_desc);
// FIXME: it should be EXPECT_NO_THROW()
sc.setSource(cv::gin(m, gapi::wip::make_src<cv::gapi::wip::GCaptureSource>(path)));
path1 = findDataFile("cv/video/768x576.avi");
path2 = findDataFile("highgui/video/big_buck_bunny.avi");
} catch(...) {
throw SkipTestException("Video file can not be opened");
throw SkipTestException("Video file can not be found");
}
cv::GMat in1, in2;
auto out = in1 + cv::gapi::resize(in2, desc.size);
cv::GComputation cc(cv::GIn(in1, in2), cv::GOut(out));
auto sc = cc.compileStreaming();
sc.setSource(cv::gin(gapi::wip::make_src<cv::gapi::wip::GCaptureSource>(path1),
gapi::wip::make_src<cv::gapi::wip::GCaptureSource>(path2)));
sc.start();
cv::Mat out_mat;
std::size_t frames = 0u;
while(sc.pull(cv::gout(out_mat))) {
frames++;
}
sc = cc.compileStreaming(c_desc, c_desc);
auto c_ptr = gapi::wip::make_src<cv::gapi::wip::GCaptureSource>(path);
EXPECT_ANY_THROW(sc.setSource(cv::gin(c_ptr, c_ptr)));
// big_buck_bunny.avi has 125 frames, 768x576.avi - 100 frames,
// expect framework to stop after 100 frames
EXPECT_EQ(100u, frames);
}
TEST_F(GAPI_Streaming_Unit, TestStartWithoutnSetSource)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册