提交 c7495e06 编写于 作者: W willzhang4a58

TryActUntilFail

上级 a36c57f2
......@@ -68,7 +68,7 @@ int Actor::HandleWaitUntilReadingCntEqualZero(const ActorMsg& msg) {
return 0;
}
void Actor::TryActUntilFail() {
void Actor::ActUntilFail() {
while (IsReadReady() && IsWriteReady()) { Act(); }
}
......
......@@ -55,7 +55,7 @@ class Actor {
int HandleWaitUntilReadingCntEqualZero(const ActorMsg& msg);
// Act
void TryActUntilFail();
void ActUntilFail();
virtual void Act() = 0;
virtual bool IsReadReady() = 0;
......
......@@ -32,14 +32,14 @@ int BoxingActor::HandleNormal(const ActorMsg& msg) {
// do nothing
}
}
TryActUntilFail();
ActUntilFail();
return 0;
}
int BoxingActor::HandleWaitUntilNoReadableRegst(const ActorMsg& msg) {
CHECK_EQ(TryUpdtStateAsProducedRegst(msg.regst_warpper()->regst_raw_ptr()),
0);
TryActUntilFail();
ActUntilFail();
if (num_of_read_empty_ == num_of_subscribed_regsts_) {
AsyncSendEORDMsgForAllProducedRegstDesc();
if (total_reading_cnt() == 0) {
......
......@@ -59,14 +59,14 @@ int BpDataCompActor::HandleNormal(const ActorMsg& msg) {
read_regst_.at(regst_wp->regst_desc_id()).push(regst_wp);
}
}
TryActUntilFail();
ActUntilFail();
return 0;
}
int BpDataCompActor::HandleWaitUntilNoReadableRegst(const ActorMsg& msg) {
CHECK_EQ(TryUpdtStateAsProducedRegst(msg.regst_warpper()->regst_raw_ptr()),
0);
TryActUntilFail();
ActUntilFail();
if (read_regst_.at(activation_regst_desc_id_).empty()) {
while (!read_regst_.at(model_regst_desc_id_).empty()) {
AsyncSendRegstMsgToProducer(read_regst_.at(model_regst_desc_id_).front());
......
......@@ -23,14 +23,14 @@ int CopyCommNetActor::HandleNormal(const ActorMsg& msg) {
.second);
}
}
TryActUntilFail();
ActUntilFail();
return 0;
}
int CopyCommNetActor::HandleWaitUntilNoReadableRegst(const ActorMsg& msg) {
CHECK_EQ(TryUpdtStateAsProducedRegst(msg.regst_warpper()->regst_raw_ptr()),
0);
TryActUntilFail();
ActUntilFail();
if (piece_id2waiting_in_regst_.empty()) {
AsyncSendEORDMsgForAllProducedRegstDesc();
if (total_reading_cnt() == 0) {
......
......@@ -23,14 +23,14 @@ int CopyHdActor::HandleNormal(const ActorMsg& msg) {
waiting_in_regst_.push(msg.regst_warpper());
}
}
TryActUntilFail();
ActUntilFail();
return 0;
}
int CopyHdActor::HandleWaitUntilNoReadableRegst(const ActorMsg& msg) {
CHECK_EQ(TryUpdtStateAsProducedRegst(msg.regst_warpper()->regst_raw_ptr()),
0);
TryActUntilFail();
ActUntilFail();
if (waiting_in_regst_.empty()) {
AsyncSendEORDMsgForAllProducedRegstDesc();
if (total_reading_cnt() == 0) {
......
......@@ -52,7 +52,7 @@ bool FwDataCompActor::IsReadReady() {
int FwDataCompActor::WaitToStart(const ActorMsg& msg) {
CHECK_EQ(msg.actor_cmd(), ActorCmd::kStart);
TryActUntilFail();
ActUntilFail();
OF_SET_MSG_HANDLE(&FwDataCompActor::HandleWaitUntilNoReadableRegst);
return 0;
}
......@@ -83,14 +83,14 @@ int FwDataCompActor::HandleNormal(const ActorMsg& msg) {
}
}
}
TryActUntilFail();
ActUntilFail();
return 0;
}
int FwDataCompActor::HandleWaitUntilNoReadableRegst(const ActorMsg& msg) {
CHECK_EQ(TryUpdtStateAsProducedRegst(msg.regst_warpper()->regst_raw_ptr()),
0);
TryActUntilFail();
ActUntilFail();
int total_piece_num = JobDesc::Singleton()->total_piece_num();
if ((in_desc_id_ != -1 && in_.empty())
|| expected_piece_id() == total_piece_num) {
......
......@@ -29,14 +29,14 @@ int MdDiffAccActor::HandleNormal(const ActorMsg& msg) {
waiting_in_regst_.push(msg.regst_warpper());
}
}
TryActUntilFail();
ActUntilFail();
return 0;
}
int MdDiffAccActor::HandleWaitUntilNoReadableRegst(const ActorMsg& msg) {
CHECK_EQ(TryUpdtStateAsProducedRegst(msg.regst_warpper()->regst_raw_ptr()),
0);
TryActUntilFail();
ActUntilFail();
if (waiting_in_regst_.empty()) {
AsyncSendEORDMsgForAllProducedRegstDesc();
if (total_reading_cnt() == 0) {
......
......@@ -18,7 +18,7 @@ int MdSaveCompActor::HandleNormal(const ActorMsg& actor_msg) {
return 1;
} else if (actor_msg.msg_type() == ActorMsgType::kRegstMsg) {
regst_warpper_ = actor_msg.regst_warpper();
TryActUntilFail();
ActUntilFail();
} else {
UNEXPECTED_RUN();
}
......
......@@ -73,7 +73,7 @@ int MdUpdtCompActor::HandleNormal(const ActorMsg& actor_msg) {
if (TryUpdtStateAsProducedRegst(regst_warpper->regst_raw_ptr()) != 0) {
waiting_model_diff_acc_queue_.push(regst_warpper);
}
TryActUntilFail();
ActUntilFail();
} else {
UNEXPECTED_RUN();
}
......@@ -84,7 +84,7 @@ int MdUpdtCompActor::HandleWaitUntilNoReadableRegst(const ActorMsg& actor_msg) {
CHECK_EQ(
TryUpdtStateAsProducedRegst(actor_msg.regst_warpper()->regst_raw_ptr()),
0);
TryActUntilFail();
ActUntilFail();
if (waiting_model_diff_acc_queue_.empty()) {
AsyncSendEORDMsgToSubscribers(model_regst_desc_id_);
if (total_reading_cnt() == 0) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册