提交 fb32ffd9 编写于 作者: H Haojun Liao

fix(stream): update the reference count value to be int32, insead of int8

上级 2af63992
...@@ -345,7 +345,7 @@ typedef struct SStreamMeta { ...@@ -345,7 +345,7 @@ typedef struct SStreamMeta {
FTaskExpand* expandFunc; FTaskExpand* expandFunc;
int32_t vgId; int32_t vgId;
SRWLatch lock; SRWLatch lock;
int8_t walScan; int32_t walScan;
} SStreamMeta; } SStreamMeta;
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo); int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
......
...@@ -27,31 +27,37 @@ int tqStreamTasksScanWal(STQ* pTq) { ...@@ -27,31 +27,37 @@ int tqStreamTasksScanWal(STQ* pTq) {
int64_t st = taosGetTimestampMs(); int64_t st = taosGetTimestampMs();
while (1) { while (1) {
tqInfo("vgId:%d continue check if data in wal are available", vgId); int32_t scan = pMeta->walScan;
tqDebug("vgId:%d continue check if data in wal are available, scan:%d", vgId, scan);
// check all restore tasks // check all restore tasks
bool allFull = true; bool shouldIdle = true;
streamTaskReplayWal(pTq->pStreamMeta, pTq->pOffsetStore, &allFull); streamTaskReplayWal(pTq->pStreamMeta, pTq->pOffsetStore, &shouldIdle);
int32_t times = 0; int32_t times = 0;
if (allFull) { if (shouldIdle) {
taosWLockLatch(&pMeta->lock); taosWLockLatch(&pMeta->lock);
pMeta->walScan -= 1; pMeta->walScan -= 1;
times = pMeta->walScan; times = pMeta->walScan;
ASSERT(pMeta->walScan >= 0);
if (pMeta->walScan <= 0) { if (pMeta->walScan <= 0) {
taosWUnLockLatch(&pMeta->lock); taosWUnLockLatch(&pMeta->lock);
break; break;
} }
taosWUnLockLatch(&pMeta->lock); taosWUnLockLatch(&pMeta->lock);
tqInfo("vgId:%d scan wal for stream tasks for %d times", vgId, times); tqDebug("vgId:%d scan wal for stream tasks for %d times", vgId, times);
} }
} }
double el = (taosGetTimestampMs() - st) / 1000.0; int64_t el = (taosGetTimestampMs() - st);
tqInfo("vgId:%d scan wal for stream tasks completed, elapsed time:%.2f sec", vgId, el); tqDebug("vgId:%d scan wal for stream tasks completed, elapsed time:%"PRId64" ms", vgId, el);
// restore wal scan flag
// atomic_store_8(&pTq->pStreamMeta->walScan, 0);
return 0; return 0;
} }
...@@ -96,8 +102,8 @@ int32_t streamTaskReplayWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetSto ...@@ -96,8 +102,8 @@ int32_t streamTaskReplayWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetSto
continue; continue;
} }
int8_t status = pTask->status.taskStatus; if (pTask->status.taskStatus == TASK_STATUS__RECOVER_PREPARE ||
if (status == TASK_STATUS__RECOVER_PREPARE || status == TASK_STATUS__WAIT_DOWNSTREAM) { pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) {
tqDebug("s-task:%s skip push data, not ready for processing, status %d", pTask->id.idStr, tqDebug("s-task:%s skip push data, not ready for processing, status %d", pTask->id.idStr,
pTask->status.taskStatus); pTask->status.taskStatus);
continue; continue;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册