提交 4bb78df2 编写于 作者: H Haojun Liao

fix(stream): reduce the sleep time.

上级 32fddeff
......@@ -156,6 +156,29 @@ void tqClose(STQ* pTq) {
taosMemoryFree(pTq);
}
static bool hasStreamTaskInTimer(SStreamMeta* pMeta) {
bool inTimer = false;
taosWLockLatch(&pMeta->lock);
void* pIter = NULL;
while(1) {
pIter = taosHashIterate(pMeta->pTasks, pIter);
if (pIter == NULL) {
break;
}
SStreamTask* pTask = *(SStreamTask**)pIter;
if (pTask->status.timerActive == 1) {
inTimer = true;
}
}
taosWUnLockLatch(&pMeta->lock);
return inTimer;
}
void tqNotifyClose(STQ* pTq) {
if (pTq != NULL) {
taosWLockLatch(&pTq->pStreamMeta->lock);
......@@ -183,38 +206,14 @@ void tqNotifyClose(STQ* pTq) {
tqDebug("vgId:%d start to check all tasks", pTq->pStreamMeta->vgId);
int64_t st = taosGetTimestampMs();
while(1) {
taosMsleep(1000);
bool inTimer = false;
taosWLockLatch(&pTq->pStreamMeta->lock);
pIter = NULL;
while(1) {
pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
if (pIter == NULL) {
break;
}
SStreamTask* pTask = *(SStreamTask**)pIter;
if (pTask->status.timerActive == 1) {
inTimer = true;
}
}
taosWUnLockLatch(&pTq->pStreamMeta->lock);
if (inTimer) {
tqDebug("vgId:%d some tasks in timer, wait for 1sec and recheck", pTq->pStreamMeta->vgId);
} else {
break;
}
while(hasStreamTaskInTimer(pTq->pStreamMeta)) {
tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pTq->pStreamMeta->vgId);
taosMsleep(100);
}
int64_t el = taosGetTimestampMs() - st;
tqDebug("vgId:%d all stream tasks are not in timer, continue close, elapsed time:%"PRId64" ms", pTq->pStreamMeta->vgId, el);
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册