comp_node.cpp 41.6 KB
Newer Older
1 2 3 4
/**
 * \file src/core/impl/comp_node/cpu/comp_node.cpp
 * MegEngine is Licensed under the Apache License, Version 2.0 (the "License")
 *
5
 * Copyright (c) 2014-2021 Megvii Inc. All rights reserved.
6 7 8 9 10 11 12 13
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 */

#include "./comp_node.h"

14
#include "megbrain/common.h"
15 16 17 18 19
#include "megbrain/comp_node_env.h"
#include "megbrain/system.h"
#include "megbrain/utils/arith_helper.h"
#include "megbrain/utils/thread.h"
#include "megbrain/utils/thread_pool.h"
20
#include "megbrain/utils/timer.h"
21

22
#include <atomic>
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
#include <condition_variable>
#include <cstdint>
#include <cstring>

#include <stdlib.h>
#ifndef __APPLE__
#include <malloc.h>
#endif

using namespace mgb;

namespace {
bool enable_affinity = false;
using Task = CompNodeEnv::CpuEnv::Task;
using MultiThreadingTask = megcore::CPUDispatcher::MultiThreadingTask;

struct TaskElem {
    //! the task to be execute
    MultiThreadingTask task;
    //! number of the parallelism
    size_t nr_parallelism;
};
}  // anonymous namespace

void CpuCompNode::CpuDispatchableBase::add_callback(Task&& task) {
    dispatch(std::move(task));
}

M
Megvii Engine Team 已提交
51
class CpuCompNode::WorkerQueue final : public AsyncQueueSC<TaskElem, WorkerQueue> {
52
    const Locator m_locator;
53
    std::shared_ptr<ThreadPool> m_thread_pool = nullptr;
54 55 56 57

    void on_async_queue_worker_thread_start() override {
        mgb_assert(m_locator.device >= 0);
        if (enable_affinity) {
58
#if !defined(ANDROID) && !defined(__ANDROID__)
59
            sys::set_cpu_affinity({m_locator.device});
60
#endif
61
        }
62 63 64
#if __DEPLOY_ON_XP_SP2__
        __builtin_trap();
#else
65
        sys::set_thread_name(m_locator.to_string());
66
#endif
67 68 69
    }

    void on_sync_all_task_finish() override {
70
        if (m_thread_pool) {
71
            m_thread_pool->deactive();
72
        }
73 74 75 76 77 78 79
    }

public:
    class DispatcherImpl;

    explicit WorkerQueue(Locator locator) : m_locator(locator) {}

80
    void attach_thread_pool(std::shared_ptr<ThreadPool> thread_pool) {
81 82 83 84 85 86 87 88 89 90 91 92 93
        m_thread_pool = thread_pool;
    }

    void process_one_task(const TaskElem& task_elem) {
        if (m_thread_pool) {
            m_thread_pool->add_task(task_elem);
        } else {
            for (size_t i = 0; i < task_elem.nr_parallelism; i++) {
                task_elem.task(i, 0);
            }
        }
    }

M
Megvii Engine Team 已提交
94
    int nr_threads() { return m_thread_pool ? m_thread_pool->nr_threads() : 1_z; }
95

96
    ThreadPool* get_thread_pool() { return m_thread_pool.get(); }
97 98 99 100 101 102 103 104 105
};

class CpuCompNode::SeqRecorderImpl final : public CompNodeSeqRecorder {
    using CpuEnv = CompNodeEnv::CpuEnv;
    bool m_fake_exec = false, m_synchronized = false, m_stopped = false,
         m_first_replay = true;
    SeqRecorderImpl** const m_self_pointer;

    std::vector<TaskElem> m_tasks;
106
    std::shared_ptr<ThreadPool> m_thread_pool = nullptr;
107 108 109 110 111
    const CompNode m_record_compnode;
    /*!
     * \brief use to check the all ther recording tasks are its self CompNode
     * related task, void hook other CompNode related task to the recorder.
     */
112 113
    void check_the_same_comp_node(const CompNode& comp_node) const {
        if (mgb_unlikely(comp_node.valid())) {
M
Megvii Engine Team 已提交
114 115 116 117 118
            mgb_assert(
                    m_record_compnode == comp_node,
                    "CompNode %s can't hook in CompNode %s when recording\n",
                    comp_node.locator().to_string().c_str(),
                    m_record_compnode.locator().to_string().c_str());
119 120 121
        }
    }

122
public:
M
Megvii Engine Team 已提交
123 124 125
    SeqRecorderImpl(
            SeqRecorderImpl** self_pointer, std::shared_ptr<ThreadPool> thread_pool,
            const CompNode& comp_node)
126
            : m_self_pointer{self_pointer},
127 128
              m_thread_pool{thread_pool},
              m_record_compnode{comp_node} {
129 130 131 132 133 134 135 136 137 138
        mgb_assert(!*m_self_pointer);
        *m_self_pointer = this;
    }

    ~SeqRecorderImpl() {
        if (*m_self_pointer) {
            stop();
        }
    }

139
    void enter_fake_exec(const CompNode& comp_node) override {
140
        check_the_same_comp_node(comp_node);
141 142 143 144
        mgb_assert(!m_stopped && !m_fake_exec);
        m_fake_exec = true;
    }

145
    void exit_fake_exec(const CompNode& comp_node) override {
146
        check_the_same_comp_node(comp_node);
147 148 149 150 151 152
        mgb_assert(!m_stopped && m_fake_exec);
        mgb_assert(m_tasks.empty());
        m_fake_exec = false;
        m_synchronized = false;
    }

153 154
    void stop(const CompNode& comp_node = {}) override {
        check_the_same_comp_node(comp_node);
155 156 157 158 159 160 161 162 163 164
        mgb_assert(*m_self_pointer == this);
        mgb_assert(!m_fake_exec);
        *m_self_pointer = nullptr;
        m_stopped = true;
    }

    void replay() override {
        mgb_assert(m_stopped, "not stopped yet");
        if (m_first_replay) {
            // check that dispatch is not called from tasks
M
Megvii Engine Team 已提交
165 166 167 168
            mgb_assert(
                    !*m_self_pointer,
                    "no other seq recorder should be created before first "
                    "replay");
169 170 171 172 173 174 175 176 177
            *m_self_pointer = this;
        }
        MGB_TRY {
            if (m_thread_pool) {
                m_thread_pool->active();
                for (auto&& i : m_tasks) {
                    m_thread_pool->add_task(i);
                }
                m_thread_pool->deactive();
178
            } else {
179
                for (auto&& task : m_tasks) {
180
                    for (size_t i = 0; i < task.nr_parallelism; i++) {
181 182 183 184 185 186 187 188 189 190 191 192 193
                        task.task(i, 0);
                    }
                }
            }
        }
        MGB_FINALLY({
            if (m_first_replay) {
                stop();
                m_first_replay = false;
            }
        });
    }

194 195
    void on_alloc(const CompNode& comp_node) {
        check_the_same_comp_node(comp_node);
M
Megvii Engine Team 已提交
196
        mgb_assert(m_fake_exec, "alloc is disallowed during comp node seq recording");
197 198
    }

199 200
    void on_free(const CompNode& comp_node) {
        check_the_same_comp_node(comp_node);
M
Megvii Engine Team 已提交
201
        mgb_assert(m_fake_exec, "free is disallowed during comp node seq recording");
202 203
    }

204 205 206 207
    void on_sync(const CompNode& comp_node) {
        check_the_same_comp_node(comp_node);
        m_synchronized = true;
    }
208

209
    void dispatch(Task&& task, const CompNode& comp_node) {
M
Megvii Engine Team 已提交
210 211 212
        mgb_assert(
                !m_synchronized,
                "no more tasks should be dispatched after synchronization");
213
        auto kern = [task](size_t, size_t) { task(); };
M
Megvii Engine Team 已提交
214 215
        dispatch_allow_after_sync(
                {std::move(kern), static_cast<size_t>(1_z)}, comp_node);
216
    }
217 218
    void dispatch_allow_after_sync(Task&& task, const CompNode& comp_node) {
        check_the_same_comp_node(comp_node);
M
Megvii Engine Team 已提交
219 220
        mgb_assert(
                !m_stopped, "dispatch should not be called after recording is stopped");
221 222 223 224 225
        if (!m_fake_exec) {
            auto kern = [task](size_t, size_t) { task(); };
            m_tasks.push_back({std::move(kern), static_cast<size_t>(1_z)});
        }
    }
226
    void dispatch(TaskElem&& task_elem, const CompNode& comp_node) {
M
Megvii Engine Team 已提交
227 228 229
        mgb_assert(
                !m_synchronized,
                "no more tasks should be dispatched after synchronization");
230
        dispatch_allow_after_sync(std::move(task_elem), comp_node);
231
    }
M
Megvii Engine Team 已提交
232
    void dispatch_allow_after_sync(TaskElem&& task_elem, const CompNode& comp_node) {
233
        check_the_same_comp_node(comp_node);
M
Megvii Engine Team 已提交
234 235
        mgb_assert(
                !m_stopped, "dispatch should not be called after recording is stopped");
236 237 238 239
        if (!m_fake_exec) {
            m_tasks.push_back(task_elem);
        }
    }
240 241
    size_t nr_threads(const CompNode& comp_node) {
        check_the_same_comp_node(comp_node);
242 243 244
        return m_thread_pool ? m_thread_pool->nr_threads() : 1_z;
    }

245
    ThreadPool* get_thread_pool() { return m_thread_pool.get(); }
246 247
};

248
using CompNodeBaseImpl = CpuCompNode::CompNodeBaseImpl;
249
using CompNodeDefaultImpl = CpuCompNode::CompNodeDefaultImpl;
250
using CompNodeRecorderImpl = CpuCompNode::CompNodeRecorderImpl;
251

252 253 254
//! ==================== CompNodeBaseImpl ======================
class CpuCompNode::CompNodeBaseImpl : public CpuDispatchableBase {
protected:
255 256
    Locator m_locator, m_locator_logical;

257
public:
M
Megvii Engine Team 已提交
258 259 260
    CompNodeBaseImpl(
            const Locator& locator, const Locator& locator_logical, free_func_t fd,
            free_func_t fh)
261 262 263
            : CpuDispatchableBase(fd, fh),
              m_locator(locator),
              m_locator_logical(locator_logical) {}
264

265
    virtual ~CompNodeBaseImpl() {}
266

267 268
    void* mgb_aligned_alloc(size_t size) {
        auto alignment = get_mem_addr_alignment();
269
#ifdef WIN32
270
        return _aligned_malloc(size, alignment);
271
#elif defined(__ANDROID__) || defined(ANDROID)
272
        return memalign(alignment, size);
273
#else
274 275
        void* ptr = nullptr;
        auto err = posix_memalign(&ptr, alignment, size);
M
Megvii Engine Team 已提交
276
        mgb_assert(!err, "failed to malloc %zubytes with align %zu", size, alignment);
277
        return ptr;
278
#endif
279
    }
280

281
    static void mgb_aligned_free(void* ptr) {
282
#ifdef WIN32
283
        _aligned_free(ptr);
284
#else
285
        ::free(ptr);
286
#endif
287
    }
288

289
    void* alloc_device(size_t size) override { return mgb_aligned_alloc(size); }
290

291
    void* alloc_host(size_t size) override { return mgb_aligned_alloc(size); }
292

M
Megvii Engine Team 已提交
293
    void copy_to_host(void* host_ptr, const void* device_ptr, size_t size) override {
294 295 296 297 298 299
        // use lambda capture to avoid memory allocation in std::bind
        auto do_copy = [host_ptr, device_ptr, size]() {
            std::memcpy(host_ptr, device_ptr, size);
        };
        m_env.cpu_env().dispatch(do_copy);
    }
300

M
Megvii Engine Team 已提交
301
    void copy_to_device(void* device_ptr, const void* host_ptr, size_t size) override {
302 303 304 305 306 307
        // use lambda capture to avoid memory allocation in std::bind
        auto do_copy = [device_ptr, host_ptr, size]() {
            std::memcpy(device_ptr, host_ptr, size);
        };
        m_env.cpu_env().dispatch(do_copy);
    }
308

309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328
    void copy_to_host_ref(
            megdnn::RefPtr& host_ref_ptr, megdnn::RefPtr& device_ref_ptr,
            size_t size) override {
        // use lambda capture to avoid memory allocation in std::bind
        auto do_copy = [host_ref_ptr, device_ref_ptr, size]() {
            std::memcpy(host_ref_ptr.get_ptr(), device_ref_ptr.get_ptr(), size);
        };
        m_env.cpu_env().dispatch(do_copy);
    }

    void copy_to_device_ref(
            megdnn::RefPtr& device_ref_ptr, megdnn::RefPtr& host_ref_ptr,
            size_t size) override {
        // use lambda capture to avoid memory allocation in std::bind
        auto do_copy = [device_ref_ptr, host_ref_ptr, size]() {
            std::memcpy(device_ref_ptr.get_ptr(), host_ref_ptr.get_ptr(), size);
        };
        m_env.cpu_env().dispatch(do_copy);
    }

M
Megvii Engine Team 已提交
329 330
    void peer_copy_to(
            Impl* dest_impl, void* dest, const void* src, size_t size) override {
331 332
        dest_impl->copy_to_device(dest, src, size);
    }
333

334 335 336 337 338 339
    void peer_copy_to_ref(
            Impl* dest_impl, megdnn::RefPtr& dest, megdnn::RefPtr& src,
            size_t size) override {
        dest_impl->copy_to_device_ref(dest, src, size);
    }

M
Megvii Engine Team 已提交
340
    size_t get_mem_addr_alignment() override { return m_env.property().mem_alignment; }
341

M
Megvii Engine Team 已提交
342
    void dispatch(Task&& task) override { m_env.cpu_env().dispatch(std::move(task)); }
343

344 345 346 347
    MemNode mem_node() override {
        // TODO: numa nodes
        return get_host_cpu_mem_node();
    }
348

349 350 351
    std::pair<size_t, size_t> get_mem_status_bytes() override {
        return sys::get_ram_status_bytes();
    }
352

353
    Locator locator() override { return m_locator; }
354

355
    Locator locator_logical() override { return m_locator_logical; }
356

357 358 359
    void add_callback(Task&& task) override {
        CpuDispatchableBase::add_callback(std::move(task));
    }
360

361
    virtual SeqRecorderImpl* cur_recorder() const = 0;
362
};
363 364

//! implementation of CPUDispatcher that is passed to megdnn via megcore
365
class CpuCompNode::WorkerQueue::DispatcherImpl final : public CPUDispatcher {
366 367
    std::atomic_size_t m_nr_task{0};
    std::shared_ptr<WorkerQueue> m_queue;
368 369 370
    //! DispatcherImpl only used by CompNodeRecorderImpl, but we still use
    //! CompNodeBaseImpl* because of incomplete type error
    CompNodeBaseImpl* const m_comp_node;
371 372

public:
M
Megvii Engine Team 已提交
373 374
    DispatcherImpl(
            const std::shared_ptr<WorkerQueue>& queue, CompNodeBaseImpl* comp_node)
375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418
            : m_queue{queue}, m_comp_node{comp_node} {}

    void dispatch(Task&& task) override {
        if (auto recorder = m_comp_node->cur_recorder()) {
            recorder->dispatch(std::move(task), m_comp_node);
        } else {
            m_nr_task.fetch_add(1, std::memory_order_relaxed);
            auto kern = [task](size_t, size_t) { task(); };
            m_queue->add_task({kern, static_cast<size_t>(1_z)});
        }
    }

    void dispatch(MultiThreadingTask&& task, size_t parallelism) override {
        if (auto recorder = m_comp_node->cur_recorder()) {
            recorder->dispatch({std::move(task), parallelism}, m_comp_node);
        } else {
            m_nr_task.fetch_add(1, std::memory_order_relaxed);
            m_queue->add_task({std::move(task), parallelism});
        }
    }

    void sync() override {
        if (auto recorder = m_comp_node->cur_recorder()) {
            recorder->on_sync(m_comp_node);
        } else {
            m_queue->wait_all_task_finish();
        }
    }

    size_t nr_threads() override {
        if (auto recorder = m_comp_node->cur_recorder()) {
            return recorder->nr_threads(m_comp_node);
        } else {
            return m_queue->nr_threads();
        }
    }

    size_t get_nr_dispatched_tasks() const override { return m_nr_task; }

    void set_affinity(AffinityCallBack&& affinity_cb) override {
        auto thread_pool = m_queue->get_thread_pool();
        if (thread_pool) {
            thread_pool->set_affinity(affinity_cb);
        } else {
M
Megvii Engine Team 已提交
419
            auto affinity_run = [affinity_cb](size_t, size_t) { affinity_cb(0); };
420 421 422 423 424 425 426 427
            m_queue->add_task({affinity_run, 1_z});
        }
    }
};

//! implementation of InplaceCPUDispatcher
class InplaceCPUDispatcher final : public CPUDispatcher {
    std::atomic_size_t m_nr_task{0};
428
    std::shared_ptr<ThreadPool> m_thread_pool = nullptr;
429 430 431
    //! InplaceCPUDispatcher may used by both type of compnodes, so
    //! m_comp_node's type should be base class.
    CompNodeBaseImpl* const m_comp_node;
432 433

public:
M
Megvii Engine Team 已提交
434 435 436
    InplaceCPUDispatcher(
            CompNodeBaseImpl* comp_node,
            std::shared_ptr<ThreadPool> thread_pool = nullptr)
437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457
            : m_thread_pool(thread_pool), m_comp_node(comp_node) {}

    void dispatch(Task&& task) override {
        if (auto recorder = m_comp_node->cur_recorder()) {
            recorder->dispatch(std::move(task), m_comp_node);
        } else if (m_thread_pool) {
            m_nr_task.fetch_add(1, std::memory_order_relaxed);
            auto kern = [task](size_t, size_t) { task(); };
            m_thread_pool->add_task({kern, static_cast<size_t>(1_z)});
        } else {
            m_nr_task.fetch_add(1, std::memory_order_relaxed);
            task();
        }
    }

    void dispatch(MultiThreadingTask&& task, size_t parallelism) override {
        if (auto recorder = m_comp_node->cur_recorder()) {
            recorder->dispatch({std::move(task), parallelism}, m_comp_node);
        } else if (m_thread_pool) {
            m_nr_task.fetch_add(1, std::memory_order_relaxed);
            m_thread_pool->add_task({task, parallelism});
458
        } else {
459
            m_nr_task.fetch_add(1, std::memory_order_relaxed);
460
            for (size_t i = 0; i < parallelism; i++) {
461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484
                task(i, 0);
            }
        }
    }

    size_t nr_threads() override {
        return m_thread_pool ? m_thread_pool->nr_threads() : 1_z;
    }

    void sync() override {
        if (auto recorder = m_comp_node->cur_recorder()) {
            recorder->on_sync(m_comp_node);
        } else if (m_thread_pool) {
            m_thread_pool->deactive();
        }
    }

    size_t get_nr_dispatched_tasks() const override { return m_nr_task; }

    void set_affinity(AffinityCallBack&& affinity_cb) override {
        if (auto recorder = m_comp_node->cur_recorder()) {
            recorder->get_thread_pool()->set_affinity(affinity_cb);
        } else if (m_thread_pool) {
            m_thread_pool->set_affinity(affinity_cb);
485
        } else {
486 487 488 489 490
            affinity_cb(0);
        }
    }
};

491
//! ==================== CompNodeDefaultImpl ======================
492
/**
493
 * \note: CompNodeDefaultImpl will use most implements in base including:
494 495 496
 * alloc_device, alloc_host, copy_to_host, copy_to_device, peer_copy_to,
 * add_callback ...
 */
497
class CpuCompNode::CompNodeDefaultImpl final : public CompNodeBaseImpl {
498 499 500 501
    MGB_DYN_TYPE_OBJ_FINAL_DECL;

public:
    //! ptr to default cpu, only used by check_global_finalized
502
    static CompNodeDefaultImpl* sm_default_cpu_comp_node_ptr;
503 504

    static void static_free_device(ImplBase* self, void* ptr) {
505
        static_cast<CompNodeDefaultImpl*>(self)->free_device(ptr);
506 507
    }

508
    static void static_free_host(ImplBase* self, void* ptr) {
509
        static_cast<CompNodeDefaultImpl*>(self)->free_host(ptr);
510 511 512
    }
    using CpuEventImpl = CpuDispatchableBase::EventImpl;

M
Megvii Engine Team 已提交
513 514 515
    CompNodeDefaultImpl(const Locator& locator, const Locator& locator_logical)
            : CompNodeBaseImpl(
                      locator, locator_logical, static_free_device, static_free_host) {
516 517 518 519 520 521 522 523 524
        mgb_assert(
                locator.type == DeviceType::CPU &&
                        locator.device == Locator::DEVICE_CPU_DEFAULT,
                "CompNodeNoRecorder is only constructed On DEVICE_CPU_DEFAULT");
        auto cn = make_comp_node_from_impl(this);
        m_env.init_cpu({std::make_shared<InplaceCPUDispatcher>(this)}, cn);
        sm_default_cpu_comp_node_ptr = this;
    }

525
    ~CompNodeDefaultImpl() {
526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541
        m_env.fini();
        sm_default_cpu_comp_node_ptr = nullptr;
    }

    //! return whether global finalized, and print warning in such case
    bool check_global_finalized(const char* reason) {
        MGB_MARK_USED_VAR(reason);
        if (!sm_default_cpu_comp_node_ptr) {
            static std::atomic_flag warn_printed = ATOMIC_FLAG_INIT;
            if (!warn_printed.test_and_set()) {
                mgb_log_debug(
                        "cpu comp node method called after global finalize: "
                        "reason=%s",
                        reason);
            }
            return true;
542
        }
543
        return false;
544
    }
545

546 547 548 549
    void free_device(void* ptr) {
        if (check_global_finalized("free_device()")) {
            CompNodeBaseImpl::mgb_aligned_free(ptr);
            return;
550
        } else {
551 552
            auto do_free = [ptr]() { CompNodeBaseImpl::mgb_aligned_free(ptr); };
            m_env.cpu_env().dispatch(do_free);
553 554 555
        }
    }

556 557 558
    void free_host(void* ptr) {
        check_global_finalized("free_host()");
        return CompNodeBaseImpl::mgb_aligned_free(ptr);
559 560
    }

561 562 563 564 565 566 567 568 569 570 571 572 573
    std::unique_ptr<Event> create_event(size_t flags) override {
        return std::make_unique<CpuEventImpl>(this, flags);
    }

    void sync() override {}

    std::unique_ptr<CompNodeSeqRecorder> create_seq_recorder(
            cg::ComputingGraph*) override {
        mgb_assert(false, "default_cpu has no ability to record");
        return nullptr;
    }

    SeqRecorderImpl* cur_recorder() const override { return nullptr; }
574
};
575
MGB_DYN_TYPE_OBJ_FINAL_IMPL(CompNodeDefaultImpl);
M
Megvii Engine Team 已提交
576
CompNodeDefaultImpl* CompNodeDefaultImpl::sm_default_cpu_comp_node_ptr = nullptr;
577 578 579 580

//! ==================== CompNodeRecorderImpl ======================
class CpuCompNode::CompNodeRecorderImpl final : public CompNodeBaseImpl {
    MGB_DYN_TYPE_OBJ_FINAL_DECL;
581
    std::shared_ptr<ThreadPool> m_thread_pool;
582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599
    std::shared_ptr<WorkerQueue> m_worker_queue;

    //! used during comp node seq rec
    class CompSeqRecEventImpl final : public CpuDispatchableBase::EventImpl {
        void do_record() override {
            auto impl = static_cast<CompNodeRecorderImpl*>(m_comp_node_impl);
            if (auto rec = impl->cur_recorder()) {
                auto callback = [this]() {
                    incr_nr_req();
                    on_finish();
                };
                rec->dispatch_allow_after_sync(callback, m_comp_node_impl);
            } else {
                EventImpl::do_record();
            }
        }

        void do_device_wait_by(Impl*) override {
M
Megvii Engine Team 已提交
600 601 602 603 604
            mgb_throw(
                    MegBrainError,
                    "device_wait() should not be called on events created "
                    "during "
                    "comp node seq recording");
605
        }
606

607 608 609 610 611
    public:
        using EventImpl::EventImpl;
    };

    class CpuEventImpl final : public CpuDispatchableBase::EventImpl {
612
#if MGB_HAVE_THREAD
613 614
        void host_wait_cv() override {
            CpuDispatchableBase::EventImpl::host_wait_cv();
M
Megvii Engine Team 已提交
615 616
            auto thread_pool = static_cast<CompNodeRecorderImpl*>(m_comp_node_impl)
                                       ->get_thread_pool();
617 618 619
            if (thread_pool) {
                thread_pool->deactive();
            }
620 621
        }
#endif
622 623 624 625
    public:
        using EventImpl::EventImpl;
    };

626 627
#if MGB_HAVE_THREAD
    static MGB_THREAD_LOCAL_PTR(SeqRecorderImpl) sm_cur_recorder;
628 629 630 631
#else
    SeqRecorderImpl* sm_cur_recorder = nullptr;
#endif

632
public:
633 634 635
    static void static_free_device(ImplBase* self, void* ptr) {
        static_cast<CompNodeRecorderImpl*>(self)->free_device(ptr);
    }
636

637 638
    static void static_free_host(ImplBase* self, void* ptr) {
        static_cast<CompNodeRecorderImpl*>(self)->free_host(ptr);
639
    }
640

M
Megvii Engine Team 已提交
641 642 643 644 645
    CompNodeRecorderImpl(
            const Locator& locator, const Locator& locator_logical,
            const std::shared_ptr<WorkerQueue>& worker_queue)
            : CompNodeBaseImpl(
                      locator, locator_logical, static_free_device, static_free_host),
646 647 648
              m_worker_queue(worker_queue) {
        auto cn = make_comp_node_from_impl(this);
        if (locator.type == DeviceType::MULTITHREAD) {
649
            m_thread_pool = std::shared_ptr<ThreadPool>(
650 651 652 653 654
                    new ThreadPool(static_cast<size_t>(locator.nr_threads)));
            mgb_assert(m_thread_pool, "ThradPool create failed");
        }
        if (locator.type == DeviceType::CPU) {
            if (locator.device == Locator::DEVICE_CPU_DEFAULT) {
M
Megvii Engine Team 已提交
655
                m_env.init_cpu({std::make_shared<InplaceCPUDispatcher>(this)}, cn);
656
            } else {
M
Megvii Engine Team 已提交
657 658 659 660
                m_env.init_cpu(
                        {std::make_shared<WorkerQueue::DispatcherImpl>(
                                m_worker_queue, this)},
                        cn);
661 662 663
            }
        } else if (locator.type == DeviceType::MULTITHREAD) {
            if (locator.device == Locator::DEVICE_MULTITHREAD_DEFAULT) {
M
Megvii Engine Team 已提交
664 665 666
                m_env.init_cpu(
                        {std::make_shared<InplaceCPUDispatcher>(this, m_thread_pool)},
                        cn);
667
            } else {
668
                m_worker_queue->attach_thread_pool(m_thread_pool);
M
Megvii Engine Team 已提交
669 670 671 672
                m_env.init_cpu(
                        {std::make_shared<WorkerQueue::DispatcherImpl>(
                                m_worker_queue, this)},
                        cn);
673 674
            }
        }
675
    }
676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747

    ~CompNodeRecorderImpl() {
        if (sm_cur_recorder) {
            sm_cur_recorder->stop();
        }
        if (m_worker_queue) {
            // synchronize before fini
            m_worker_queue->wait_all_task_finish();
        }
        m_env.fini();
        if (m_worker_queue) {
            // wait for new kernels dispatched in fini() (like free_device())
            m_worker_queue->wait_all_task_finish();
        }
    }

    ThreadPool* get_thread_pool() const { return m_thread_pool.get(); }

    //! return whether global finalized, and print warning in such case
    bool check_global_finalized(const char* reason) {
        MGB_MARK_USED_VAR(reason);
        if (!sm_pool) {
            static std::atomic_flag warn_printed = ATOMIC_FLAG_INIT;
            if (!warn_printed.test_and_set()) {
                mgb_log_debug(
                        "cpu comp node method called after global finalize: "
                        "reason=%s",
                        reason);
            }
            return true;
        }
        return false;
    }

    void* alloc_device(size_t size) override {
        if (sm_cur_recorder) {
            sm_cur_recorder->on_alloc(this);
        }
        return CompNodeBaseImpl::alloc_device(size);
    }

    void free_device(void* ptr) {
        if (sm_cur_recorder || check_global_finalized("free_device()")) {
            CompNodeBaseImpl::mgb_aligned_free(ptr);
            if (sm_cur_recorder) {
                sm_cur_recorder->on_free(this);
            }
            return;
        } else {
            auto do_free = [ptr]() { CompNodeBaseImpl::mgb_aligned_free(ptr); };
            m_env.cpu_env().dispatch(do_free);
        }
    }

    void* alloc_host(size_t size) override {
        if (m_worker_queue) {
            m_worker_queue->check_exception();
        }
        return CompNodeBaseImpl::alloc_host(size);
    }

    void free_host(void* ptr) {
        if (check_global_finalized("free_host()")) {
            CompNodeBaseImpl::mgb_aligned_free(ptr);
            return;
        }
        if (m_worker_queue) {
            m_worker_queue->check_exception();
        }
        CompNodeBaseImpl::mgb_aligned_free(ptr);
    }

M
Megvii Engine Team 已提交
748
    void copy_to_host(void* host_ptr, const void* device_ptr, size_t size) override {
749 750 751 752 753 754
        if (m_worker_queue) {
            m_worker_queue->check_exception();
        }
        CompNodeBaseImpl::copy_to_host(host_ptr, device_ptr, size);
    }

M
Megvii Engine Team 已提交
755
    void copy_to_device(void* device_ptr, const void* host_ptr, size_t size) override {
756 757 758 759 760 761
        if (m_worker_queue) {
            m_worker_queue->check_exception();
        }
        CompNodeBaseImpl::copy_to_device(device_ptr, host_ptr, size);
    }

762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779
    void copy_to_host_ref(
            megdnn::RefPtr& host_ref_ptr, megdnn::RefPtr& device_ref_ptr,
            size_t size) override {
        if (m_worker_queue) {
            m_worker_queue->check_exception();
        }
        CompNodeBaseImpl::copy_to_host_ref(host_ref_ptr, device_ref_ptr, size);
    }

    void copy_to_device_ref(
            megdnn::RefPtr& device_ref_ptr, megdnn::RefPtr& host_ref_ptr,
            size_t size) override {
        if (m_worker_queue) {
            m_worker_queue->check_exception();
        }
        CompNodeBaseImpl::copy_to_device_ref(device_ref_ptr, host_ref_ptr, size);
    }

M
Megvii Engine Team 已提交
780 781
    void peer_copy_to(
            Impl* dest_impl, void* dest, const void* src, size_t size) override {
782
        //! copy to default_cpu
783
        if (dest_impl->same_type<CpuCompNode::CompNodeDefaultImpl>()) {
784 785 786 787 788 789 790 791 792 793
            CompNodeBaseImpl::peer_copy_to(dest_impl, dest, src, size);
            return;
        }

        if (!dest_impl->same_type<CpuCompNode::CompNodeRecorderImpl>()) {
            if (dest_impl->env().property().type == DeviceType::ATLAS) {
#if MGB_ATLAS
                dest_impl->copy_to_device(dest, src, size);
                return;
#else
M
Megvii Engine Team 已提交
794 795 796 797
                mgb_throw(
                        MegBrainError,
                        "Atlas comp_node used but "
                        "ATLAS BUILD not enabled");
798
#endif
M
Megvii Engine Team 已提交
799
            } else if (dest_impl->env().property().type == DeviceType::CAMBRICON) {
800 801 802 803
#if MGB_CAMBRICON
                dest_impl->copy_to_device(dest, src, size);
                return;
#else
M
Megvii Engine Team 已提交
804 805 806 807
                mgb_throw(
                        MegBrainError,
                        "Cambricon comp_node used but "
                        "CAMBRICON BUILD not enabled");
808 809 810
#endif
            }
            else {
M
Megvii Engine Team 已提交
811 812 813 814 815
                mgb_assert(
                        locator().device == Locator::DEVICE_CPU_DEFAULT,
                        "currently only peer copy from default cpu comp "
                        "nodes "
                        "is implemented");
816 817 818 819 820
            }
        }
        dest_impl->copy_to_device(dest, src, size);
    }

821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862
    void peer_copy_to_ref(
            Impl* dest_impl, megdnn::RefPtr& dest, megdnn::RefPtr& src,
            size_t size) override {
        //! copy to default_cpu
        if (dest_impl->same_type<CpuCompNode::CompNodeDefaultImpl>()) {
            CompNodeBaseImpl::peer_copy_to_ref(dest_impl, dest, src, size);
            return;
        }

        if (!dest_impl->same_type<CpuCompNode::CompNodeRecorderImpl>()) {
            if (dest_impl->env().property().type == DeviceType::ATLAS) {
#if MGB_ATLAS
                dest_impl->copy_to_device(dest.get_ptr(), src.get_ptr(), size);
                return;
#else
                mgb_throw(
                        MegBrainError,
                        "Atlas comp_node used but "
                        "ATLAS BUILD not enabled");
#endif
            } else if (dest_impl->env().property().type == DeviceType::CAMBRICON) {
#if MGB_CAMBRICON
                dest_impl->copy_to_device(dest.get_ptr(), src.get_ptr(), size);
                return;
#else
                mgb_throw(
                        MegBrainError,
                        "Cambricon comp_node used but "
                        "CAMBRICON BUILD not enabled");
#endif
            }
            else {
                mgb_assert(
                        locator().device == Locator::DEVICE_CPU_DEFAULT,
                        "currently only peer copy from default cpu comp "
                        "nodes "
                        "is implemented");
            }
        }
        dest_impl->copy_to_device_ref(dest, src, size);
    }

863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886
    std::unique_ptr<Event> create_event(size_t flags) override {
        if (m_worker_queue) {
            m_worker_queue->check_exception();
        }
        if (sm_cur_recorder) {
            return std::make_unique<CompSeqRecEventImpl>(this, flags);
        } else {
            return std::make_unique<CpuEventImpl>(this, flags);
        }
    }

    void sync() override {
        if (sm_cur_recorder) {
            sm_cur_recorder->on_sync(this);
        } else if (m_worker_queue) {
            m_worker_queue->wait_all_task_finish();
        }
        if (m_thread_pool) {
            m_thread_pool->deactive();
        }
    }

    std::unique_ptr<CompNodeSeqRecorder> create_seq_recorder(
            cg::ComputingGraph*) override {
M
Megvii Engine Team 已提交
887
        return std::make_unique<SeqRecorderImpl>(&sm_cur_recorder, m_thread_pool, this);
888 889 890 891 892 893 894 895 896 897 898 899 900
    }

    SeqRecorderImpl* cur_recorder() const override { return sm_cur_recorder; }

    void add_callback(Task&& task) override {
        if (!check_global_finalized("add_callback()")) {
            CompNodeBaseImpl::add_callback(std::move(task));
        } else {
            task();
        }
    }
};
MGB_DYN_TYPE_OBJ_FINAL_IMPL(CompNodeRecorderImpl);
901 902 903
#if MGB_HAVE_THREAD
MGB_THREAD_LOCAL_PTR(CpuCompNode::SeqRecorderImpl)
CompNodeRecorderImpl::sm_cur_recorder = nullptr;
904
#endif
905 906 907 908

/* ======================== CpuCompNode ======================== */
struct CpuCompNode::Pool {
    static constexpr int MAX_NR_COMP_NODE = 1024;
909 910
    struct CompNodeRecorderImplDeleter {
        void operator()(CompNodeRecorderImpl* p) { p->~CompNodeRecorderImpl(); }
911 912
    };

913
    MGB_RECURSIVE_MUTEX mtx;
914 915
    // use global memory pool to ensuare object memory accessible even after
    // global finalize
M
Megvii Engine Team 已提交
916
    std::aligned_storage_t<sizeof(CompNodeRecorderImpl), alignof(CompNodeRecorderImpl)>
917
            impl_storage[MAX_NR_COMP_NODE];
918 919
    size_t nr_used_impl_storage = 0;

920 921 922 923 924
    std::unordered_map<
            CompNode::LocatorPairHashKey,
            std::unique_ptr<CompNodeRecorderImpl, CompNodeRecorderImplDeleter>,
            CompNode::LocatorPairHashKey::Hash>
            locator2impl;
925
    ThinHashMap<std::pair<int, int>, std::weak_ptr<WorkerQueue>> physical2queue;
926 927 928 929 930
    std::unordered_map<
            CompNode::LocatorPairHashKey,
            std::unique_ptr<CompNodeRecorderImpl, CompNodeRecorderImplDeleter>,
            CompNode::LocatorPairHashKey::Hash>
            locator2impl_multi_thread;
931 932 933 934 935 936
    ThinHashMap<std::pair<int, int>, std::weak_ptr<WorkerQueue>>
            physical2queue_multithead;
};
CpuCompNode::Pool* CpuCompNode::sm_pool;
Spinlock CpuCompNode::sm_pool_mtx;

937
void CpuCompNode::foreach (thin_function<void(CompNode)> callback) {
938 939 940
    if (!sm_pool)
        return;

941
    for (size_t i = 0;; ++i) {
942 943 944 945 946 947
        CompNode cur;
        {
            MGB_LOCK_GUARD(sm_pool->mtx);
            if (i >= sm_pool->nr_used_impl_storage)
                return;
            cur = make_comp_node_from_impl(
M
Megvii Engine Team 已提交
948
                    reinterpret_cast<CompNodeRecorderImpl*>(&sm_pool->impl_storage[i]));
949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966
        }
        callback(cur);
    }
}

void CpuCompNode::finalize() {
    if (sm_pool) {
        sync_all();

        sm_pool->~Pool();
        sm_pool = nullptr;
    }
}

size_t CpuCompNode::get_device_count() {
    return sys::get_cpu_count();
}

M
Megvii Engine Team 已提交
967
CpuCompNode::Impl* CpuCompNode::load_cpu(Locator locator, Locator locator_logical) {
968 969
#if !MGB_HAVE_THREAD
    // use only cpu:default and cpu0:1023 comp node when threading is disabled
M
Megvii Engine Team 已提交
970 971 972
    mgb_assert(
            locator.device == Locator::DEVICE_CPU_DEFAULT ||
            (locator.device == 0 && locator.stream == 1023));
973 974 975 976 977 978 979 980
    locator_logical = {locator_logical.type, locator.device, locator.stream};
#endif
    {
        MGB_LOCK_GUARD(sm_pool_mtx);
        if (!sm_pool) {
            // use static storage so object can be safely accessed even after
            // global finalize
            static std::aligned_storage_t<sizeof(Pool), alignof(Pool)> storage;
981
            sm_pool = new (&storage) Pool;
982 983
        }
    }
M
Megvii Engine Team 已提交
984 985 986 987 988 989 990
    mgb_assert(
            locator.device >= 0 ||
                    (locator.device == Locator::DEVICE_CPU_DEFAULT &&
                     locator.stream == 0) ||
                    locator.device == Locator::DEVICE_MULTITHREAD_DEFAULT,
            "failed to load cpu for device:%d stream:%d", locator.device,
            locator.stream);
991 992 993
    MGB_LOCK_GUARD(sm_pool->mtx);

    // encode both device ID and type into a int
M
Megvii Engine Team 已提交
994 995 996
    mgb_assert(
            locator_logical.device >= -1 ||
            locator_logical.device <= Locator::DEVICE_CPU_DEFAULT);
997
    if (locator_logical.type != CompNode::DeviceType::UNSPEC) {
M
Megvii Engine Team 已提交
998 999 1000
        mgb_assert(
                locator_logical.type == CompNode::DeviceType::CPU ||
                locator_logical.type == CompNode::DeviceType::MULTITHREAD);
1001 1002
    }
    if (locator.type == DeviceType::CPU) {
M
Megvii Engine Team 已提交
1003
        auto&& pqueue_weak = sm_pool->physical2queue[{locator.device, locator.stream}];
1004 1005 1006 1007 1008
        auto pqueue = pqueue_weak.lock();
        if (!pqueue) {
            pqueue = std::make_shared<WorkerQueue>(locator);
            pqueue_weak = pqueue;
        }
1009
        auto&& pimpl = sm_pool->locator2impl[{locator, locator_logical}];
1010
        if (!pimpl) {
M
Megvii Engine Team 已提交
1011 1012 1013 1014 1015
            mgb_assert(
                    sm_pool->nr_used_impl_storage < Pool::MAX_NR_COMP_NODE,
                    "too many cpu comp nodes; max %d allowed", Pool::MAX_NR_COMP_NODE);
            pimpl.reset(new (&sm_pool->impl_storage[sm_pool->nr_used_impl_storage++])
                                CompNodeRecorderImpl{locator, locator_logical, pqueue});
1016 1017 1018 1019 1020 1021
        }
        log_comp_node_created(locator, locator_logical);
        return pimpl.get();
    } else {
        mgb_assert(locator.type == DeviceType::MULTITHREAD);
        auto&& pqueue_weak = sm_pool->physical2queue_multithead[{
1022
                locator.device, locator.nr_threads}];
1023 1024 1025 1026 1027
        auto pqueue = pqueue_weak.lock();
        if (!pqueue) {
            pqueue = std::make_shared<WorkerQueue>(locator);
            pqueue_weak = pqueue;
        }
M
Megvii Engine Team 已提交
1028
        auto&& pimpl = sm_pool->locator2impl_multi_thread[{locator, locator_logical}];
1029
        if (!pimpl) {
M
Megvii Engine Team 已提交
1030 1031 1032 1033 1034 1035
            mgb_assert(
                    sm_pool->nr_used_impl_storage < Pool::MAX_NR_COMP_NODE,
                    "too many cpu multithread comp nodes; max %d allowed",
                    Pool::MAX_NR_COMP_NODE);
            pimpl.reset(new (&sm_pool->impl_storage[sm_pool->nr_used_impl_storage++])
                                CompNodeRecorderImpl{locator, locator_logical, pqueue});
1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046
        }
        log_comp_node_created(locator, locator_logical);
        return pimpl.get();
    }
}

void CpuCompNode::sync_all() {
    if (!sm_pool)
        return;

    MGB_LOCK_GUARD(sm_pool->mtx);
1047
    for (auto&& i : sm_pool->locator2impl)
1048
        i.second->sync();
1049
    for (auto&& i : sm_pool->locator2impl_multi_thread)
1050 1051 1052 1053
        i.second->sync();
}

/* ======================== CompNode methods ========================  */
1054 1055 1056 1057 1058 1059
// CompNode get by default_cpu() is different from the CompNode which is
// produced by CompNode::load("cpu:default")
// default_cpu() is used for static infer and it is not allowed to send up the
// compute kernel
// CompNode::load("cpu:default") is "inplace cpu" which is in the
// CpuCompNode::Pool
1060
CompNode CompNode::default_cpu() {
M
Megvii Engine Team 已提交
1061
    static Locator locator{DeviceType::CPU, Locator::DEVICE_CPU_DEFAULT, {-1}};
1062
    static CompNodeDefaultImpl impl{locator, locator};
1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073
    return &impl;
}

bool CompNode::enable_affinity_for_cpu(bool flag) {
    bool old = enable_affinity;
    enable_affinity = flag;
    return old;
}

/* ======================== EventImpl ========================  */
double CpuCompNode::CpuDispatchableBase::EventImpl::do_elapsed_time_until(
1074 1075
        EventImplHelper& end) {
    auto&& f1 = static_cast<EventImpl&>(end).m_prev_finish_time;
1076 1077 1078 1079
    return m_prev_finish_time.time_until_secs(f1);
}

#if MGB_HAVE_THREAD
M
Megvii Engine Team 已提交
1080
void CpuCompNode::CpuDispatchableBase::EventImpl::do_device_wait_by(Impl* cn_impl) {
1081 1082 1083
    {
        auto locator = m_comp_node_impl->locator();
        if (locator.device == Locator::DEVICE_CPU_DEFAULT &&
1084
            !static_cast<CpuCompNode::CompNodeRecorderImpl*>(m_comp_node_impl)
1085 1086 1087
                     ->cur_recorder()) {
            auto v0 = m_record_nr_req.load(std::memory_order_relaxed),
                 v1 = m_record_nr_finish.load(std::memory_order_relaxed);
M
Megvii Engine Team 已提交
1088 1089 1090
            mgb_assert(
                    v0 && v0 == v1,
                    "event on cpu:default hasn't been recorded inplace.");
1091 1092 1093 1094 1095 1096
            return;
        }
    }

    {
        auto type = cn_impl->env().property().type;
M
Megvii Engine Team 已提交
1097 1098 1099
        mgb_throw_if(
                type != CompNode::DeviceType::CPU &&
                        type != CompNode::DeviceType::CUDA
1100 1101
                        && type != CompNode::DeviceType::ATLAS &&
                        type != CompNode::DeviceType::CAMBRICON
M
Megvii Engine Team 已提交
1102 1103 1104
                ,
                MegBrainError,
                "currently CPU can only wait for CPU, CUDA, ATLAS"
1105 1106 1107
        );
    }

1108 1109 1110 1111
    if (cn_impl->env().property().type == CompNode::DeviceType::ATLAS) {
#if MGB_ATLAS
        return m_comp_node_impl->sync();
#else
M
Megvii Engine Team 已提交
1112
        mgb_throw(MegBrainError, "Atlas comp_node used but ATLAS BUILD not enabled");
1113
#endif
M
Megvii Engine Team 已提交
1114
    } else if (cn_impl->env().property().type == CompNode::DeviceType::CAMBRICON) {
1115 1116 1117
#if MGB_CAMBRICON
        return m_comp_node_impl->sync();
#else
M
Megvii Engine Team 已提交
1118 1119
        mgb_throw(
                MegBrainError,
1120
                "Cambricon comp_node used but CAMBRICON BUILD not enabled");
1121
#endif
1122
    }
1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143

    auto version = m_record_nr_req.load(std::memory_order_relaxed);
    mgb_assert(version, "device wait on non-recorded event");

    auto waiter = [this, version]() {
        while (m_record_nr_finish.load(std::memory_order_acquire) < version) {
            std::unique_lock<std::mutex> lk{m_dev_wait_mtx};
            if (m_record_nr_finish.load(std::memory_order_acquire) >= version) {
                break;
            }
            m_dev_wait_cv.wait(lk);
        }
        m_dev_wait_nr_waiter.fetch_sub(1, std::memory_order_release);
    };
    m_dev_wait_nr_waiter.fetch_add(1, std::memory_order_release);
    cn_impl->add_callback(waiter);
}

void CpuCompNode::CpuDispatchableBase::EventImpl::do_record() {
    incr_nr_req();
    auto call_on_finish = [this]() { on_finish(); };
M
Megvii Engine Team 已提交
1144
    static_cast<CpuDispatchableBase*>(m_comp_node_impl)->dispatch(call_on_finish);
1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169
}

void CpuCompNode::CpuDispatchableBase::EventImpl::on_finish() {
    if (m_create_flags & Flags::NEED_TIMER) {
        auto v0 = m_record_nr_finish.load(std::memory_order_relaxed) + 1,
             v1 = m_record_nr_req.load(std::memory_order_relaxed);
        if (v0 == v1) {
            m_prev_finish_time = RealTimer::get_time();
        }
    }

    m_record_nr_finish.fetch_add(1, std::memory_order_release);
    if (m_dev_wait_nr_waiter.load(std::memory_order_acquire)) {
        MGB_LOCK_GUARD(m_dev_wait_mtx);
        m_dev_wait_cv.notify_all();
    }
}

bool CpuCompNode::CpuDispatchableBase::EventImpl::do_finished() {
    auto v0 = m_record_nr_req.load(std::memory_order_relaxed);
    auto v1 = m_record_nr_finish.load(std::memory_order_acquire);
    return v0 == v1;
}

void CpuCompNode::CpuDispatchableBase::EventImpl::host_wait_cv() {
M
Megvii Engine Team 已提交
1170 1171
    for (size_t i = 0, it = SCQueueSynchronizer::get_default_max_spin() / 20; i < it;
         ++i) {
1172 1173 1174 1175 1176 1177
        if (finished()) {
            return;
        }
    }

    m_dev_wait_nr_waiter.fetch_add(1, std::memory_order_release);
1178
    for (;;) {
1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189
        std::unique_lock<std::mutex> lock{m_dev_wait_mtx};
        if (finished()) {
            break;
        }
        m_dev_wait_cv.wait(lock);
    }
    m_dev_wait_nr_waiter.fetch_sub(1, std::memory_order_release);
}

CpuCompNode::CpuDispatchableBase::EventImpl::~EventImpl() noexcept {
    auto check_all_finished = [this]() {
M
Megvii Engine Team 已提交
1190
        return do_finished() && !m_dev_wait_nr_waiter.load(std::memory_order_acquire);
1191 1192
    };
    if (!check_all_finished()) {
1193 1194 1195 1196
        mgb_log_debug(
                "event %p has unfinished callbacks when destructed; "
                "waiting ...",
                this);
1197 1198 1199 1200 1201
        while (!check_all_finished()) {
            std::this_thread::yield();
        }
    }
}
1202
#else  // MGB_HAVE_THREAD
1203

1204
void CpuCompNode::CpuDispatchableBase::EventImpl::host_wait_cv() {}
1205

1206
void CpuCompNode::CpuDispatchableBase::EventImpl::do_device_wait_by(Impl*) {}
1207 1208 1209 1210 1211 1212 1213

void CpuCompNode::CpuDispatchableBase::EventImpl::do_record() {
    if (m_create_flags & Flags::NEED_TIMER) {
        m_prev_finish_time = RealTimer::get_time();
    }
}

1214
void CpuCompNode::CpuDispatchableBase::EventImpl::on_finish() {}
1215 1216 1217 1218 1219 1220 1221 1222 1223 1224

bool CpuCompNode::CpuDispatchableBase::EventImpl::do_finished() {
    return true;
}

CpuCompNode::CpuDispatchableBase::EventImpl::~EventImpl() noexcept = default;

#endif  // MGB_HAVE_THREAD

// vim: syntax=cpp.doxygen foldmethod=marker foldmarker=f{{{,f}}}