提交 d2649862 编写于 作者: W willzhang4a58

total_reading_cnt_

上级 ca421d43
......@@ -41,6 +41,7 @@ void Actor::Init(const TaskProto& task_proto) {
produced_regst2reading_cnt_[regst.get()] = 0;
}
writeable_produced_regst_desc_num_ = writeable_produced_regst_.size();
total_reading_cnt_ = 0;
}
void Actor::AsyncWardKernelAndSendMsgToRegstReader(
......@@ -64,6 +65,7 @@ void Actor::AsyncWardKernelAndSendMsgToRegstReader(
}
});
produced_regst2reading_cnt_.at(regst) = regst->subscribers_actor_id().size();
total_reading_cnt_ += regst->subscribers_actor_id().size();
if (!regst->subscribers_actor_id().empty()) { pair.second.pop(); }
if (pair.second.empty()) { writeable_produced_regst_desc_num_ -= 1; }
}
......@@ -74,6 +76,7 @@ int Actor::TryUpdtStateAsFromRegstReader(Regst* regst) {
if (reading_cnt_it == produced_regst2reading_cnt_.end()) { return -1; }
CHECK_GE(reading_cnt_it->second, 1);
reading_cnt_it->second -= 1;
total_reading_cnt_ -= 1;
if (reading_cnt_it->second != 0) { return 0; }
auto writeable_it = writeable_produced_regst_.find(regst->regst_desc_id());
if (writeable_it == writeable_produced_regst_.end()) { return 0; }
......
......@@ -54,6 +54,7 @@ class Actor {
if (!it->second.empty()) { writeable_produced_regst_desc_num_ -= 1; }
writeable_produced_regst_.erase(it);
}
int64_t total_reading_cnt() const { return total_reading_cnt_; }
private:
uint64_t actor_id_;
......@@ -67,6 +68,7 @@ class Actor {
HashMap<uint64_t, std::queue<Regst*>> writeable_produced_regst_; // <regst_desc_id, regst>
uint64_t writeable_produced_regst_desc_num_;
HashMap<Regst*, int64_t> produced_regst2reading_cnt_;
int64_t total_reading_cnt_;
};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册