未验证 提交 f5609de4 编写于 作者: Y Yuang Liu 提交者: GitHub

[fleet_executor] remove should reset (#37862)

上级 7b1bb874
...@@ -64,7 +64,7 @@ void ComputeInterceptor::IncreaseReady(int64_t up_id) { ...@@ -64,7 +64,7 @@ void ComputeInterceptor::IncreaseReady(int64_t up_id) {
// source node has no upstream, data_is_ready is send by carrier or others // source node has no upstream, data_is_ready is send by carrier or others
if (is_source_ && up_id == -1) { if (is_source_ && up_id == -1) {
it->second.second = GetTaskNode()->max_run_times(); it->second.second += GetTaskNode()->max_run_times();
return; return;
} }
...@@ -121,16 +121,6 @@ bool ComputeInterceptor::CanWriteOutput() { ...@@ -121,16 +121,6 @@ bool ComputeInterceptor::CanWriteOutput() {
return true; return true;
} }
// only source node need reset
bool ComputeInterceptor::ShouldReset() {
if (is_source_ && step_ == node_->max_run_times()) {
VLOG(3) << "Interceptor " << GetInterceptorId()
<< " should reset for step: " << step_ << ".";
return true;
}
return false;
}
void ComputeInterceptor::SendDataReadyToDownStream() { void ComputeInterceptor::SendDataReadyToDownStream() {
for (auto& outs : out_buffs_) { for (auto& outs : out_buffs_) {
auto down_id = outs.first; auto down_id = outs.first;
...@@ -186,24 +176,7 @@ void ComputeInterceptor::RunOps() { ...@@ -186,24 +176,7 @@ void ComputeInterceptor::RunOps() {
} }
void ComputeInterceptor::Run() { void ComputeInterceptor::Run() {
// If there is no limit, source interceptor can be executed while (IsInputReady() && CanWriteOutput()) {
// an unlimited number of times.
// Now source node can only run max_run_times.
if (ShouldReset()) {
for (auto& out_buff : out_buffs_) {
// buffer is using
if (out_buff.second.second != 0) {
VLOG(3) << "Interceptor " << GetInterceptorId()
<< " out buffer for downstream: " << out_buff.first
<< "'s counter is: " << out_buff.second.second
<< ". Cannot be reset.";
return;
}
}
step_ = 0; // reset
}
while (IsInputReady() && CanWriteOutput() && !ShouldReset()) {
VLOG(3) << "id=" << GetInterceptorId() << " ComputeInterceptor running"; VLOG(3) << "id=" << GetInterceptorId() << " ComputeInterceptor running";
RunOps(); RunOps();
......
...@@ -39,7 +39,6 @@ class ComputeInterceptor : public Interceptor { ...@@ -39,7 +39,6 @@ class ComputeInterceptor : public Interceptor {
void DecreaseBuff(int64_t down_id); void DecreaseBuff(int64_t down_id);
bool IsInputReady(); bool IsInputReady();
bool CanWriteOutput(); bool CanWriteOutput();
bool ShouldReset();
void Run(); void Run();
void Compute(const InterceptorMessage& msg); void Compute(const InterceptorMessage& msg);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册