gstreaming.hpp 14.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10
// This file is part of OpenCV project.
// It is subject to the license terms in the LICENSE file found in the top-level directory
// of this distribution and at
// Copyright (C) 2018 Intel Corporation


#include <memory>
12 13 14 15
#include <vector>

#include <opencv2/gapi/opencv_includes.hpp>
#include <opencv2/gapi/own/assert.hpp>
#include <opencv2/gapi/util/optional.hpp>
17 18 19 20 21
#include <opencv2/gapi/garg.hpp>
#include <opencv2/gapi/streaming/source.hpp>

namespace cv {

22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
template<class T> using optional = cv::util::optional<T>;

namespace detail {
template<typename T> struct wref_spec {
    using type = T;
template<typename T> struct wref_spec<std::vector<T> > {
    using type = T;

template<typename RefHolder>
struct OptRef {
    struct OptHolder {
        virtual void mov(RefHolder &h) = 0;
        virtual void reset() = 0;
        virtual ~OptHolder() = default;
        using Ptr = std::shared_ptr<OptHolder>;
    template<class T> struct Holder final: OptHolder {
        std::reference_wrapper<cv::optional<T> > m_opt_ref;

        explicit Holder(cv::optional<T>& opt) : m_opt_ref(std::ref(opt)) {
        virtual void mov(RefHolder &h) override {
            using U = typename wref_spec<T>::type;
            m_opt_ref.get() = cv::util::make_optional(std::move(h.template wref<U>()));
        virtual void reset() override {
    template<class T>
    explicit OptRef(cv::optional<T>& t) : m_opt{new Holder<T>(t)} {}
    void mov(RefHolder &h) { m_opt->mov(h); }
    void reset()           { m_opt->reset();}
    typename OptHolder::Ptr m_opt;
using OptionalVectorRef = OptRef<cv::detail::VectorRef>;
using OptionalOpaqueRef = OptRef<cv::detail::OpaqueRef>;
} // namespace detail

// TODO: Keep it in sync with GRunArgP (derive the type automatically?)
using GOptRunArgP = util::variant<
using GOptRunArgsP = std::vector<GOptRunArgP>;

74 75 76 77 78 79 80 81 82
using GOptRunArg = util::variant<
using GOptRunArgs = std::vector<GOptRunArg>;

83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
namespace detail {

template<typename T> inline GOptRunArgP wrap_opt_arg(optional<T>& arg) {
    // By default, T goes to an OpaqueRef. All other types are specialized
    return GOptRunArgP{OptionalOpaqueRef(arg)};

template<typename T> inline GOptRunArgP wrap_opt_arg(optional<std::vector<T> >& arg) {
    return GOptRunArgP{OptionalVectorRef(arg)};

template<> inline GOptRunArgP wrap_opt_arg(optional<cv::Mat> &m) {
    return GOptRunArgP{&m};

template<> inline GOptRunArgP wrap_opt_arg(optional<cv::Scalar> &s) {
    return GOptRunArgP{&s};

} // namespace detail

// Now cv::gout() may produce an empty vector (see "dynamic graphs"), so
// there may be a conflict between these two. State here that Opt version
// _must_ have at least one input for this overload
template<typename T, typename... Ts>
inline GOptRunArgsP gout(optional<T>&arg, optional<Ts>&... args)
    return GOptRunArgsP{ detail::wrap_opt_arg(arg), detail::wrap_opt_arg(args)... };

113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
 * \addtogroup gapi_main_classes
 * @{
 * @brief Represents a computation (graph) compiled for streaming.
 * This class represents a product of graph compilation (calling
 * cv::GComputation::compileStreaming()). Objects of this class
 * actually do stream processing, and the whole pipeline execution
 * complexity is incapsulated into objects of this class. Execution
 * model has two levels: at the very top, the execution of a
 * heterogeneous graph is aggressively pipelined; at the very bottom
 * the execution of every internal block is determined by its
 * associated backend. Backends are selected based on kernel packages
 * passed via compilation arguments ( see @ref gapi_compile_args,
 * GNetworkPackage, GKernelPackage for details).
 * GStreamingCompiled objects have a "player" semantics -- there are
 * methods like start() and stop(). GStreamingCompiled has a full
 * control over a videostream and so is stateful. You need to specify the
 * input stream data using setSource() and then call start() to
 * actually start processing. After that, use pull() or try_pull() to
 * obtain next processed data frame from the graph in a blocking or
 * non-blocking way, respectively.
 * Currently a single GStreamingCompiled can process only one video
 * streat at time. Produce multiple GStreamingCompiled objects to run the
 * same graph on multiple video streams.
 * @sa GCompiled
class GAPI_EXPORTS_W_SIMPLE GStreamingCompiled
146 147 148
    class GAPI_EXPORTS Priv;
    GAPI_WRAP GStreamingCompiled();
150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191

    // FIXME: More overloads?
     * @brief Specify the input data to GStreamingCompiled for
     * processing, a generic version.
     * Use gin() to create an input parameter vector.
     * Input vectors must have the same number of elements as defined
     * in the cv::GComputation protocol (at the moment of its
     * construction). Shapes of elements also must conform to protocol
     * (e.g. cv::Mat needs to be passed where cv::GMat has been
     * declared as input, and so on). Run-time exception is generated
     * on type mismatch.
     * In contrast with regular GCompiled, user can also pass an
     * object of type GVideoCapture for a GMat parameter of the parent
     * GComputation.  The compiled pipeline will start fetching data
     * from that GVideoCapture and feeding it into the
     * pipeline. Pipeline stops when a GVideoCapture marks end of the
     * stream (or when stop() is called).
     * Passing a regular Mat for a GMat parameter makes it "infinite"
     * source -- pipeline may run forever feeding with this Mat until
     * stopped explicitly.
     * Currently only a single GVideoCapture is supported as input. If
     * the parent GComputation is declared with multiple input GMat's,
     * one of those can be specified as GVideoCapture but all others
     * must be regular Mat objects.
     * Throws if pipeline is already running. Use stop() and then
     * setSource() to run the graph on a new video stream.
     * @note This method is not thread-safe (with respect to the user
     * side) at the moment. Protect the access if
     * start()/stop()/setSource() may be called on the same object in
     * multiple threads in your application.
     * @param ins vector of inputs to process.
     * @sa gin
192 193 194 195
    void setSource(GRunArgs &&ins);

    /// @private -- Exclude this function from OpenCV documentation
    GAPI_WRAP void setSource(const cv::detail::ExtractArgsCallback& callback);
196 197 198 199 200 201 202 203 204 205 206 207

     * @brief Specify an input video stream for a single-input
     * computation pipeline.
     * Throws if pipeline is already running. Use stop() and then
     * setSource() to run the graph on a new video stream.
     * @overload
     * @param s a shared pointer to IStreamSource representing the
     * input video stream.
    void setSource(const gapi::wip::IStreamSource::Ptr& s);
209 210

211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226
     * @brief Constructs and specifies an input video stream for a
     * single-input computation pipeline with the given parameters.
     * Throws if pipeline is already running. Use stop() and then
     * setSource() to run the graph on a new video stream.
     * @overload
     * @param args arguments used to contruct and initialize a stream
     * source.
    template<typename T, typename... Args>
    void setSource(Args&&... args) {

227 228 229 230 231 232 233 234 235 236 237 238 239 240
     * @brief Start the pipeline execution.
     * Use pull()/try_pull() to obtain data. Throws an exception if
     * a video source was not specified.
     * setSource() must be called first, even if the pipeline has been
     * working already and then stopped (explicitly via stop() or due
     * stream completion)
     * @note This method is not thread-safe (with respect to the user
     * side) at the moment. Protect the access if
     * start()/stop()/setSource() may be called on the same object in
     * multiple threads in your application.
    GAPI_WRAP void start();
242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264

     * @brief Get the next processed frame from the pipeline.
     * Use gout() to create an output parameter vector.
     * Output vectors must have the same number of elements as defined
     * in the cv::GComputation protocol (at the moment of its
     * construction). Shapes of elements also must conform to protocol
     * (e.g. cv::Mat needs to be passed where cv::GMat has been
     * declared as output, and so on). Run-time exception is generated
     * on type mismatch.
     * This method writes new data into objects passed via output
     * vector.  If there is no data ready yet, this method blocks. Use
     * try_pull() if you need a non-blocking version.
     * @param outs vector of output parameters to obtain.
     * @return true if next result has been obtained,
     *    false marks end of the stream.
    bool pull(cv::GRunArgsP &&outs);

    // NB: Used from python
    /// @private -- Exclude this function from OpenCV documentation
    GAPI_WRAP std::tuple<bool, cv::util::variant<cv::GRunArgs, cv::GOptRunArgs>> pull();

269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306
     * @brief Get some next available data from the pipeline.
     * This method takes a vector of cv::optional object. An object is
     * assigned to some value if this value is available (ready) at
     * the time of the call, and resets the object to empty() if it is
     * not.
     * This is a blocking method which guarantees that some data has
     * been written to the output vector on return.
     * Using this method only makes sense if the graph has
     * desynchronized parts (see cv::gapi::desync). If there is no
     * desynchronized parts in the graph, the behavior of this
     * method is identical to the regular pull() (all data objects are
     * produced synchronously in the output vector).
     * Use gout() to create an output parameter vector.
     * Output vectors must have the same number of elements as defined
     * in the cv::GComputation protocol (at the moment of its
     * construction). Shapes of elements also must conform to protocol
     * (e.g. cv::optional<cv::Mat> needs to be passed where cv::GMat
     * has been declared as output, and so on). Run-time exception is
     * generated on type mismatch.
     * This method writes new data into objects passed via output
     * vector.  If there is no data ready yet, this method blocks. Use
     * try_pull() if you need a non-blocking version.
     * @param outs vector of output parameters to obtain.
     * @return true if next result has been obtained,
     *    false marks end of the stream.
     * @sa cv::gapi::desync
    bool pull(cv::GOptRunArgsP &&outs);

307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328
     * @brief Try to get the next processed frame from the pipeline.
     * Use gout() to create an output parameter vector.
     * This method writes new data into objects passed via output
     * vector.  If there is no data ready yet, the output vector
     * remains unchanged and false is returned.
     * @return true if data has been obtained, and false if it was
     *    not. Note: false here doesn't mark the end of the stream.
    bool try_pull(cv::GRunArgsP &&outs);

     * @brief Stop (abort) processing the pipeline.
     * Note - it is not pause but a complete stop. Calling start()
     * will cause G-API to start processing the stream from the early beginning.
     * Throws if the pipeline is not running.
    GAPI_WRAP void stop();
330 331 332 333 334 335 336 337 338 339 340

     * @brief Test if the pipeline is running.
     * @note This method is not thread-safe (with respect to the user
     * side) at the moment. Protect the access if
     * start()/stop()/setSource() may be called on the same object in
     * multiple threads in your application.
     * @return true if the current stream is not over yet.
    GAPI_WRAP bool running() const;
342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382

    /// @private
    Priv& priv();

     * @brief Check if compiled object is valid (non-empty)
     * @return true if the object is runnable (valid), false otherwise
    explicit operator bool () const;

     * @brief Vector of metadata this graph was compiled for.
     * @return Unless _reshape_ is not supported, return value is the
     * same vector which was passed to cv::GComputation::compile() to
     * produce this compiled object. Otherwise, it is the latest
     * metadata vector passed to reshape() (if that call was
     * successful).
    const GMetaArgs& metas() const; // Meta passed to compile()

     * @brief Vector of metadata descriptions of graph outputs
     * @return vector with formats/resolutions of graph's output
     * objects, auto-inferred from input metadata vector by
     * operations which form this computation.
     * @note GCompiled objects produced from the same
     * cv::GComputiation graph with different input metas may return
     * different values in this vector.
    const GMetaArgs& outMetas() const;

    /// @private
    std::shared_ptr<Priv> m_priv;
/** @} */

namespace gapi {
Dmitry Matveev 已提交
384 385 386 387 388 389 390 391

 * @brief This namespace contains G-API functions, structures, and
 * symbols related to the Streaming execution mode.
 * Some of the operations defined in this namespace (e.g. size(),
 * BGR(), etc.) can be used in the traditional execution mode too.
392 393 394 395 396 397 398
namespace streaming {
 * @brief Specify queue capacity for streaming execution.
 * In the streaming mode the pipeline steps are connected with queues
 * and this compile argument controls every queue's size.
struct GAPI_EXPORTS_W_SIMPLE queue_capacity
    explicit queue_capacity(size_t cap = 1) : capacity(cap) { };
404 405 406 407 408 409 410 411 412 413 414 415 416 417
    size_t capacity;
/** @} */
} // namespace streaming
} // namespace gapi

namespace detail
template<> struct CompileArgTag<cv::gapi::streaming::queue_capacity>
    static const char* tag() { return "gapi.queue_capacity"; }

418 419 420