gstreamingexecutor.hpp 6.8 KB
Newer Older
1 2 3 4
// This file is part of OpenCV project.
// It is subject to the license terms in the LICENSE file found in the top-level directory
// of this distribution and at http://opencv.org/license.html.
//
5
// Copyright (C) 2019-2020 Intel Corporation
6 7 8 9 10 11 12 13 14 15 16

#ifndef OPENCV_GAPI_GSTREAMING_EXECUTOR_HPP
#define OPENCV_GAPI_GSTREAMING_EXECUTOR_HPP

#ifdef _MSC_VER
#pragma warning(disable: 4503)  // "decorated name length exceeded"
                                // on concurrent_bounded_queue
#endif

#include <memory> // unique_ptr, shared_ptr
#include <thread> // thread
17 18
#include <vector>
#include <unordered_map>
19 20 21 22 23 24 25 26

#if defined(HAVE_TBB)
#  include <tbb/concurrent_queue.h> // FIXME: drop it from here!
template<typename T> using QueueClass = tbb::concurrent_bounded_queue<T>;
#else
#  include "executor/conc_queue.hpp"
template<typename T> using QueueClass = cv::gapi::own::concurrent_bounded_queue<T>;
#endif // TBB
27
#include "executor/last_value.hpp"
28 29 30 31 32 33 34 35 36 37

#include <ade/graph.hpp>

#include "backends/common/gbackend.hpp"

namespace cv {
namespace gimpl {

namespace stream {
struct Start {};
38 39 40 41 42 43 44
struct Stop {
    enum class Kind {
        HARD, // a hard-stop: end-of-pipeline reached or stop() called
        CNST, // a soft-stop emitted for/by constant sources (see QueueReader)
    } kind = Kind::HARD;
    cv::GRunArg cdata; // const data for CNST stop
};
45

46 47 48 49 50
struct Result {
    cv::GRunArgs      args;  // Full results vector
    std::vector<bool> flags; // Availability flags (in case of desync)
};

51 52 53 54 55
using Cmd = cv::util::variant
    < cv::util::monostate
    , Start        // Tells emitters to start working. Not broadcasted to workers.
    , Stop         // Tells emitters to stop working. Broadcasted to workers.
    , cv::GRunArg  // Workers data payload to process.
56
    , Result       // Pipeline's data for gout()
57
    >;
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100

// Interface over a queue. The underlying queue implementation may be
// different. This class is mainly introduced to bring some
// abstraction over the real queues (bounded in-order) and a
// desynchronized data slots (see required to implement
// cv::gapi::desync)

class Q {
public:
    virtual void push(const Cmd &cmd) = 0;
    virtual void pop(Cmd &cmd) = 0;
    virtual bool try_pop(Cmd &cmd) = 0;
    virtual void clear() = 0;
    virtual ~Q() = default;
};

// A regular queue implementation
class SyncQueue final: public Q {
    QueueClass<Cmd> m_q;    // FIXME: OWN or WRAP??

public:
    virtual void push(const Cmd &cmd) override { m_q.push(cmd); }
    virtual void pop(Cmd &cmd)        override { m_q.pop(cmd);  }
    virtual bool try_pop(Cmd &cmd)    override { return m_q.try_pop(cmd); }
    virtual void clear()              override { m_q.clear(); }

    void set_capacity(std::size_t c) { m_q.set_capacity(c);}
};

// Desynchronized "queue" implementation
// Every push overwrites value which is not yet popped
// This container can hold 0 or 1 element
// Special handling for Stop is implemented (FIXME: not really)
class DesyncQueue final: public Q {
    cv::gapi::own::last_written_value<Cmd> m_v;

public:
    virtual void push(const Cmd &cmd) override { m_v.push(cmd); }
    virtual void pop(Cmd &cmd)        override { m_v.pop(cmd);  }
    virtual bool try_pop(Cmd &cmd)    override { return m_v.try_pop(cmd); }
    virtual void clear()              override { m_v.clear(); }
};

101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
} // namespace stream

// FIXME: Currently all GExecutor comments apply also
// to this one. Please document it separately in the future.

class GStreamingExecutor final
{
protected:
    // GStreamingExecutor is a state machine described as follows
    //
    //              setSource() called
    //   STOPPED:  - - - - - - - - - ->READY:
    //   --------                      ------
    //   Initial state                 Input data specified
    //   No threads running            Threads are created and IDLE
    //   ^                             (Currently our emitter threads
    //   :                             are bounded to input data)
    //   : stop() called               No processing happending
    //   : OR                          :
    //   : end-of-stream reached       :  start() called
    //   : during pull()/try_pull()    V
    //   :                             RUNNING:
    //   :                             --------
    //   :                             Actual pipeline execution
    //    - - - - - - - - - - - - - -  Threads are running
    //
    enum class State {
        STOPPED,
        READY,
        RUNNING,
    } state = State::STOPPED;

    std::unique_ptr<ade::Graph> m_orig_graph;
    std::shared_ptr<ade::Graph> m_island_graph;
135 136 137
    cv::GCompileArgs m_comp_args;
    cv::GMetaArgs m_last_metas;
    util::optional<bool> m_reshapable;
138 139

    cv::gimpl::GIslandModel::Graph m_gim; // FIXME: make const?
140
    const bool m_desync;
141 142 143 144 145 146 147 148 149 150

    // FIXME: Naive executor details are here for now
    // but then it should be moved to another place
    struct OpDesc
    {
        std::vector<RcDesc> in_objects;
        std::vector<RcDesc> out_objects;
        cv::GMetaArgs       out_metas;
        ade::NodeHandle     nh;

151
        cv::GRunArgs in_constants;
152 153 154 155 156 157 158 159 160 161 162 163

        std::shared_ptr<GIslandExecutable> isl_exec;
    };
    std::vector<OpDesc> m_ops;

    struct DataDesc
    {
        ade::NodeHandle slot_nh;
        ade::NodeHandle data_nh;
    };
    std::vector<DataDesc> m_slots;

164 165
    cv::GRunArgs m_const_vals;

166 167 168 169
    // Order in these vectors follows the GComputaion's protocol
    std::vector<ade::NodeHandle> m_emitters;
    std::vector<ade::NodeHandle> m_sinks;

170 171 172
    class Synchronizer;
    std::unique_ptr<Synchronizer> m_sync;

173
    std::vector<std::thread> m_threads;
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194
    std::vector<stream::SyncQueue>   m_emitter_queues;

    // a view over m_emitter_queues
    std::vector<stream::SyncQueue*>  m_const_emitter_queues;

    std::vector<stream::Q*>          m_sink_queues;

    // desync path tags for outputs. -1 means that output
    // doesn't belong to a desync path
    std::vector<int>                 m_sink_sync;

    std::unordered_set<stream::Q*>   m_internal_queues;
    stream::SyncQueue m_out_queue;

    // Describes mapping from desync paths to collector threads
    struct CollectorThreadInfo {
        std::vector<stream::Q*>  queues;
        std::vector<int> mapping;
    };
    std::unordered_map<int, CollectorThreadInfo> m_collector_map;

195 196 197

    void wait_shutdown();

M
Maxim Pashchenkov 已提交
198 199
    cv::GTypesInfo out_info;

200
public:
201 202
    explicit GStreamingExecutor(std::unique_ptr<ade::Graph> &&g_model,
                                const cv::GCompileArgs &comp_args);
203 204 205 206
    ~GStreamingExecutor();
    void setSource(GRunArgs &&args);
    void start();
    bool pull(cv::GRunArgsP &&outs);
207
    bool pull(cv::GOptRunArgsP &&outs);
M
Maxim Pashchenkov 已提交
208
    std::tuple<bool, cv::util::variant<cv::GRunArgs, cv::GOptRunArgs>> pull();
209 210 211 212 213 214 215 216 217
    bool try_pull(cv::GRunArgsP &&outs);
    void stop();
    bool running() const;
};

} // namespace gimpl
} // namespace cv

#endif // OPENCV_GAPI_GSTREAMING_EXECUTOR_HPP