提交 79328fa0 编写于 作者: M Megvii Engine Team

refactor(imperative): add notes for CompNodeSyncManager

GitOrigin-RevId: 895a01326313f5908fa8431c5cc738acdd368991
上级 d2bbbb30
...@@ -49,6 +49,7 @@ template <typename... Ts> ...@@ -49,6 +49,7 @@ template <typename... Ts>
using Map = NoThrowMovable<std::map<Ts...>>; using Map = NoThrowMovable<std::map<Ts...>>;
class CompNodeSyncManager { class CompNodeSyncManager {
//! synchronization information for each compnode
struct CompNodeData { struct CompNodeData {
template <typename T> template <typename T>
class ReleaseQueue { class ReleaseQueue {
...@@ -66,22 +67,40 @@ class CompNodeSyncManager { ...@@ -66,22 +67,40 @@ class CompNodeSyncManager {
uint64_t next = 1; uint64_t next = 1;
//! last completed virtual event //! last completed virtual event
uint64_t completed = 0; uint64_t completed = 0;
//! virtual event to real event //! virtual event to real event, the map is ORDERLY, that means, in events {a:
//! e1, b: e2}, if b>a, e2 is later than e1
Map<uint64_t, EventPtr> events; Map<uint64_t, EventPtr> events;
//! ordering information at some virtual events: //! ordering information at some virtual events: what virtual events on other
//! what virtual events on other comp nodes is _sequenced before_ this virtual //! comp nodes is _sequenced before_ this virtual event. concretely, the key is
//! event //! the virtual event id `t` on this comp node, and the value is the ordering
//! information. the ordering is a vector, the index of the vector is the
//! compnode id, and the value of the vector is the event id which the event `t`
//! wait for on the compnode
Map<uint64_t, std::vector<uint64_t>> ordering; Map<uint64_t, std::vector<uint64_t>> ordering;
//! release queue for dev storage, keyed by releaser. this comp node is the //! in megengine, each compnode manager their own resources, and the resource
//! **receiver** //! can be used by other compnode. for example, we have compnodes: cn1, cn2,
//! cn1 can allocate a tensor alpha, and the tensor alpha can be used on cn2. we
//! want to release the tensor alpha after cn2 used, and the release maybe
//! asynchronized. so in each CompNodeData cn-i, we setup release queues for
//! each compnode. if other compnode cn-j use the resource on cn-i, we hold the
//! resource (refcnt) in the release queue cn-j in CompNodeData cn-i. if cn-j
//! have complete its task and do not need the resource on cn-i, we release the
//! resource(refcnt) in the queue cn-j in CompNodeData cn-i. here is the release
//! queues, which is a vector. the index of the vector is the compnode id, and
//! the value of the vector is the release queue
std::vector<ReleaseQueue<BlobPtr>> release_queues; std::vector<ReleaseQueue<BlobPtr>> release_queues;
//! release queue for host storage. this comp node is the **releaser** //! different from the device resource, if the host resource is used by a
//! compnode, then the compnode is responsible for the resource release rather
//! than the host. that means if cn-i used a host resource, then cn-i add the
//! resource(refcnt) to its host_release_queue. if the task is completed, cn-i
//! remove the resource in its host_release_queue
ReleaseQueue<HostTensorStorage::RawStorage> host_release_queue; ReleaseQueue<HostTensorStorage::RawStorage> host_release_queue;
}; };
std::mutex m_mtx; std::mutex m_mtx;
std::condition_variable m_cv; std::condition_variable m_cv;
bool m_should_stop = false; bool m_should_stop = false;
//! to realize the async release, we create a new thread to polling and release
std::thread m_polling_thread; std::thread m_polling_thread;
std::unordered_map<CompNode, size_t, CompNodeHash> m_cn2id; std::unordered_map<CompNode, size_t, CompNodeHash> m_cn2id;
std::vector<CompNodeData> m_cndata; std::vector<CompNodeData> m_cndata;
...@@ -93,6 +112,7 @@ class CompNodeSyncManager { ...@@ -93,6 +112,7 @@ class CompNodeSyncManager {
e->record(); e->record();
lock.lock(); lock.lock();
auto& cndata = m_cndata[cnid]; auto& cndata = m_cndata[cnid];
//! add the event to the event map of compnodedata[cnid]
return cndata.events.emplace_hint(cndata.events.end(), cndata.next++, e); return cndata.events.emplace_hint(cndata.events.end(), cndata.next++, e);
} }
...@@ -115,6 +135,7 @@ class CompNodeSyncManager { ...@@ -115,6 +135,7 @@ class CompNodeSyncManager {
return it->second; return it->second;
} }
//! the implementation of polling thread
void monitor_events() { void monitor_events() {
#if defined(__APPLE__) #if defined(__APPLE__)
pthread_setname_np("CompNodeSync"); pthread_setname_np("CompNodeSync");
...@@ -150,8 +171,9 @@ class CompNodeSyncManager { ...@@ -150,8 +171,9 @@ class CompNodeSyncManager {
updated.resize(m_cndata.size(), false); updated.resize(m_cndata.size(), false);
// copy events to a temporary storage so that we may unlock while polling // copy events to a temporary storage so that we may unlock while polling
stats.resize(m_cndata.size()); stats.resize(m_cndata.size());
// for each compnode
for (size_t cnid = 0; cnid < m_cndata.size(); ++cnid) { for (size_t cnid = 0; cnid < m_cndata.size(); ++cnid) {
// decide max number of events to query // decide max number of events to query for each compnode
// rule c: #successful // rule c: #successful
size_t n = stats[cnid].num_success; size_t n = stats[cnid].num_success;
if (n == stats[cnid].num_attempts) { if (n == stats[cnid].num_attempts) {
...@@ -162,7 +184,7 @@ class CompNodeSyncManager { ...@@ -162,7 +184,7 @@ class CompNodeSyncManager {
// rule b: 1 // rule b: 1
n = 1; n = 1;
} }
// now copy upto n events // now copy upto n events to todos for each compnode
auto& events = m_cndata[cnid].events; auto& events = m_cndata[cnid].events;
size_t i = 0; size_t i = 0;
for (auto it = events.begin(); i < n && it != events.end(); ++i, ++it) { for (auto it = events.begin(); i < n && it != events.end(); ++i, ++it) {
...@@ -186,6 +208,7 @@ class CompNodeSyncManager { ...@@ -186,6 +208,7 @@ class CompNodeSyncManager {
last_result = item.it->second->finished(); last_result = item.it->second->finished();
if (last_result) { if (last_result) {
stats[item.cnid].num_success++; stats[item.cnid].num_success++;
// the last finished event iterator
stats[item.cnid].it = item.it; stats[item.cnid].it = item.it;
} }
} }
...@@ -198,8 +221,10 @@ class CompNodeSyncManager { ...@@ -198,8 +221,10 @@ class CompNodeSyncManager {
if (stat.num_success == 0) { if (stat.num_success == 0) {
continue; continue;
} }
// the last finished event id of compnode `cnid`
auto t = stat.it->first; auto t = stat.it->first;
auto& cndata = m_cndata[cnid]; auto& cndata = m_cndata[cnid];
// update the complete information of compnode `cnid`
if (cndata.completed < t) { if (cndata.completed < t) {
cndata.completed = t; cndata.completed = t;
updated[cnid] = true; updated[cnid] = true;
...@@ -208,7 +233,12 @@ class CompNodeSyncManager { ...@@ -208,7 +233,12 @@ class CompNodeSyncManager {
// degradation even if some completion events are missed by our query // degradation even if some completion events are missed by our query
auto it = cndata.ordering.upper_bound(t); auto it = cndata.ordering.upper_bound(t);
if (it != cndata.ordering.begin()) { if (it != cndata.ordering.begin()) {
// get the ordering information of event t, if event t is
// finished, that means the events on other compnode which event
// t wait for are also finished, so we can update these compnode
// complete information
it = std::prev(it); it = std::prev(it);
// for each compnode and event which event t wait for
for (auto [cnid, t] : views::enumerate(it->second)) { for (auto [cnid, t] : views::enumerate(it->second)) {
auto& cndata = m_cndata[cnid]; auto& cndata = m_cndata[cnid];
if (cndata.completed < t) { if (cndata.completed < t) {
...@@ -221,24 +251,35 @@ class CompNodeSyncManager { ...@@ -221,24 +251,35 @@ class CompNodeSyncManager {
} }
// release dev storage // release dev storage
// receiver is the resource owner and the releaser is the resource user
// for each resource owner
for (size_t receiver_cnid = 0; receiver_cnid < m_cndata.size(); for (size_t receiver_cnid = 0; receiver_cnid < m_cndata.size();
++receiver_cnid) { ++receiver_cnid) {
// for each resource user
for (size_t releaser_cnid = 0; for (size_t releaser_cnid = 0;
releaser_cnid < m_cndata[receiver_cnid].release_queues.size(); releaser_cnid < m_cndata[receiver_cnid].release_queues.size();
++releaser_cnid) { ++releaser_cnid) {
// if the user has not updated its completed, that means no event
// are finished on resource user, the resource owner still should
// hold the resource for these unfinished events, skip
if (!(releaser_cnid < updated.size() && updated[releaser_cnid])) { if (!(releaser_cnid < updated.size() && updated[releaser_cnid])) {
continue; continue;
} }
// if some events are finished on resource user, the resource owner
// does not need hold resource for these events, release them
auto& q = m_cndata[receiver_cnid].release_queues[releaser_cnid]; auto& q = m_cndata[receiver_cnid].release_queues[releaser_cnid];
q.release(m_cndata[releaser_cnid].completed); q.release(m_cndata[releaser_cnid].completed);
} }
} }
// for each compnode
for (size_t cnid = 0; cnid < updated.size(); ++cnid) { for (size_t cnid = 0; cnid < updated.size(); ++cnid) {
if (!updated[cnid]) { if (!updated[cnid]) {
continue; continue;
} }
auto& cndata = m_cndata[cnid]; auto& cndata = m_cndata[cnid];
// if event `t` on compnode `cnid` is finished, the host resource which
// reserve for the events `<=t` can be release
auto t = cndata.completed; auto t = cndata.completed;
// release host storage // release host storage
cndata.host_release_queue.release(t); cndata.host_release_queue.release(t);
...@@ -275,40 +316,60 @@ public: ...@@ -275,40 +316,60 @@ public:
static CompNodeSyncManager& inst(); static CompNodeSyncManager& inst();
//! record an event on cn
uint64_t record(CompNode cn, bool doitnow = false) { uint64_t record(CompNode cn, bool doitnow = false) {
std::unique_lock lock(m_mtx); std::unique_lock lock(m_mtx);
auto cnid = get_cnid_unsafe(cn); auto cnid = get_cnid_unsafe(cn);
if (doitnow) { if (doitnow) {
return do_record(cn, cnid, lock)->first; return do_record(cn, cnid, lock)->first;
} }
//! if we do not DOITNOW, we only increase the counter, and then the get_event()
//! function will do the actual recording
return m_cndata[cnid].next++; return m_cndata[cnid].next++;
} }
//! try to async release a resource until `cn` complete event `t`
void async_release(CompNode cn, uint64_t t, BlobPtr blob) { void async_release(CompNode cn, uint64_t t, BlobPtr blob) {
MGB_LOCK_GUARD(m_mtx); MGB_LOCK_GUARD(m_mtx);
//! the releaser can be seen as a resource user, the receiver can be seen as a
//! resource owner so we represent releaser as user and represent
//! receiver as owner
auto releaser_cnid = get_cnid_unsafe(cn); auto releaser_cnid = get_cnid_unsafe(cn);
//! if the user has complete event t, so we do not need to hold a blob
//! reference
if (t <= m_cndata[releaser_cnid].completed) { if (t <= m_cndata[releaser_cnid].completed) {
return; return;
} }
//! the owner is the compnode of the blob
auto receiver_cnid = get_cnid_unsafe(blob->comp_node()); auto receiver_cnid = get_cnid_unsafe(blob->comp_node());
//! resource owner hold queues for each resource user, which
//! is represented as a vector, the index is the user compnode id, and
//! the value is the queue which hold the blobs for the correspond user
//! compnode to use until the event `t` of user compnode completed
auto& qs = m_cndata[receiver_cnid].release_queues; auto& qs = m_cndata[receiver_cnid].release_queues;
if (releaser_cnid >= qs.size()) { if (releaser_cnid >= qs.size()) {
qs.resize(releaser_cnid + 1); qs.resize(releaser_cnid + 1);
} }
//! get the releaser/user queue of compnode `releaser_cnid`
auto& q = qs[releaser_cnid]; auto& q = qs[releaser_cnid];
//! add the blob and the event `t` to the compnode `releaser_cnid` queue
q.emplace(t, std::move(blob)); q.emplace(t, std::move(blob));
} }
void async_release(CompNode cn, uint64_t t, HostTensorStorage::RawStorage storage) { void async_release(CompNode cn, uint64_t t, HostTensorStorage::RawStorage storage) {
MGB_LOCK_GUARD(m_mtx); MGB_LOCK_GUARD(m_mtx);
// the releaser is the resource user
auto releaser_cnid = get_cnid_unsafe(cn); auto releaser_cnid = get_cnid_unsafe(cn);
// if the resource user have complete event `t`, do not hold anything
if (t <= m_cndata[releaser_cnid].completed) { if (t <= m_cndata[releaser_cnid].completed) {
return; return;
} }
// hold the host tensor resource in the user compnode host_release_queue
auto& q = m_cndata[releaser_cnid].host_release_queue; auto& q = m_cndata[releaser_cnid].host_release_queue;
q.emplace(t, std::move(storage)); q.emplace(t, std::move(storage));
} }
//! the `waiter` compnode wait the completion of the event `t` on `waitee` compnode
void device_wait(CompNode waiter, CompNode waitee, uint64_t t) { void device_wait(CompNode waiter, CompNode waitee, uint64_t t) {
std::unique_lock lock(m_mtx); std::unique_lock lock(m_mtx);
...@@ -317,42 +378,73 @@ public: ...@@ -317,42 +378,73 @@ public:
auto& waiter_data = m_cndata.at(waiter_id); auto& waiter_data = m_cndata.at(waiter_id);
auto& waitee_data = m_cndata.at(waitee_id); auto& waitee_data = m_cndata.at(waitee_id);
//! waitee has already completed the event t, so waiter does not need to wait
if (t <= waitee_data.completed) { if (t <= waitee_data.completed) {
return; return;
} }
//! the ordering are orderly, so the rbegin() of ordering is the last event of
//! compnode. if the last event of waiter is alreadty waiting for the event
//! which is later than event t(>=t) of waitee, we do not need to add the new
//! device_wait information
if (waiter_data.ordering.size() && if (waiter_data.ordering.size() &&
waitee_id < waiter_data.ordering.rbegin()->second.size() && waitee_id < waiter_data.ordering.rbegin()->second.size() &&
t <= waiter_data.ordering.rbegin()->second[waitee_id]) { t <= waiter_data.ordering.rbegin()->second[waitee_id]) {
return; return;
} }
//! get the virtual event t and the corresponding real event of waitee
//! you can think of the t_waitee as the virtual event t on the waitee compnode
auto [t_waitee, e] = get_event(waitee, waitee_id, t, lock); auto [t_waitee, e] = get_event(waitee, waitee_id, t, lock);
// DO NOT unlock around this line! Event* could be invalidated! //! DO NOT unlock around this line! Event* could be invalidated!
e->device_wait_by(waiter); e->device_wait_by(waiter);
//! add a new event t_waiter on the waiter compnode, and t_waiter event wait for
//! t_waitee event
auto t_waiter = waiter_data.next++; auto t_waiter = waiter_data.next++;
std::vector<uint64_t> ordering(m_cndata.size(), 0); //! try to add an ordering information to the waiter.ordering, this ordering
//! describe the event t_waiter wait what events on other compnodes, the
//! ordering is a vector, the index of the vector is the compnode id, and
//! the value of the vector is the event id which the event t_waiter wait for
//! on the compnode
std::vector<uint64_t> t_waiter_ordering(m_cndata.size(), 0);
if (!waiter_data.ordering.empty()) { if (!waiter_data.ordering.empty()) {
//! if the last event of waiter has already waited for some events, and the
//! new event t_waiter is later than the last event,so the new event
//! t_waiter also need to wait for these events
auto& o = waiter_data.ordering.rbegin()->second; auto& o = waiter_data.ordering.rbegin()->second;
std::copy(o.begin(), o.end(), ordering.begin()); std::copy(o.begin(), o.end(), t_waiter_ordering.begin());
} }
ordering[waitee_id] = t_waitee; //! on the waitee compnode, the new event t_waiter wait for the event t_waitee
ordering[waiter_id] = t_waiter; t_waiter_ordering[waitee_id] = t_waitee;
//! on the waiter compnode itself, the new event t_waiter wait for itself
t_waiter_ordering[waiter_id] = t_waiter;
{ {
//! get all the events ordering information on the waitee compnode which are
//! ahead of t_waitee, event t_waiter should also wait for all the events
//! which are ahead of t_waitee on the waitee compnode,
auto it = waitee_data.ordering.upper_bound(t_waitee); auto it = waitee_data.ordering.upper_bound(t_waitee);
if (it != waitee_data.ordering.begin()) { if (it != waitee_data.ordering.begin()) {
for (auto [a, b] : views::zip(ordering, std::prev(it)->second)) { //! these events ahead of t_waitee maybe wait for other events which is
//! recorded in their ordering information, so we update the ordering
//! information of event t_waiter according to the ordering information
//! of these events
for (auto [a, b] :
views::zip(t_waiter_ordering, std::prev(it)->second)) {
static_assert(std::is_lvalue_reference_v<decltype(a)>); static_assert(std::is_lvalue_reference_v<decltype(a)>);
a = std::max(a, b); a = std::max(a, b);
} }
} }
} }
//! add the new event t_waiter and its ordering information to the waiter compnode
waiter_data.ordering.emplace_hint( waiter_data.ordering.emplace_hint(
waiter_data.ordering.end(), t_waiter, ordering); waiter_data.ordering.end(), t_waiter, t_waiter_ordering);
for (auto [t, q] : views::zip(ordering, waiter_data.release_queues)) { //! the event t_waiter is completed because the above code
//! `e->wait_device_by()`, that means all its depentent events on other compnode
//! are also completed, so we can release
for (auto [t, q] : views::zip(t_waiter_ordering, waiter_data.release_queues)) {
q.release(t); q.release(t);
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册