提交 b6f4cac6 编写于 作者: H Haojun Liao

fix(stream): set the correct offset version.

上级 e4199a62
...@@ -128,7 +128,7 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { ...@@ -128,7 +128,7 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
} }
// append the data for the stream // append the data for the stream
tqDebug("vgId:%d wal reader seek to ver:%" PRId64 " %s", vgId, pTask->chkInfo.currentVer, pTask->id.idStr); tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer);
SPackedData packData = {0}; SPackedData packData = {0};
code = extractSubmitMsgFromWal(pTask->exec.pWalReader, &packData); code = extractSubmitMsgFromWal(pTask->exec.pWalReader, &packData);
...@@ -147,14 +147,13 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { ...@@ -147,14 +147,13 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
noNewDataInWal = false; noNewDataInWal = false;
tqDebug("s-task:%s submit data extracted from WAL", pTask->id.idStr);
code = tqAddInputBlockNLaunchTask(pTask, (SStreamQueueItem*)p, packData.ver); code = tqAddInputBlockNLaunchTask(pTask, (SStreamQueueItem*)p, packData.ver);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
pTask->chkInfo.currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader); pTask->chkInfo.currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr, tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr,
pTask->chkInfo.currentVer); pTask->chkInfo.currentVer);
} else { } else {
// do nothing tqError("s-task:%s append input queue failed, ver:%"PRId64, pTask->id.idStr, pTask->chkInfo.currentVer);
} }
streamDataSubmitDestroy(p); streamDataSubmitDestroy(p);
......
...@@ -111,7 +111,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* ...@@ -111,7 +111,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
continue; continue;
} }
qDebug("task %d(child %d) executed and get block", pTask->id.taskId, pTask->selfChildId); qDebug("s-task:%s (child %d) executed and get block", pTask->id.idStr, pTask->selfChildId);
SSDataBlock block = {0}; SSDataBlock block = {0};
assignOneDataBlock(&block, output); assignOneDataBlock(&block, output);
...@@ -306,7 +306,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { ...@@ -306,7 +306,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
", checkPoint id:%" PRId64 " -> %" PRId64, ", checkPoint id:%" PRId64 " -> %" PRId64,
pTask->id.idStr, pTask->chkInfo.version, dataVer, pTask->chkInfo.id, ckId); pTask->id.idStr, pTask->chkInfo.version, dataVer, pTask->chkInfo.id, ckId);
pTask->chkInfo = (SCheckpointInfo) {.version = dataVer, .id = ckId}; pTask->chkInfo = (SCheckpointInfo) {.version = dataVer, .id = ckId, .currentVer = pTask->chkInfo.currentVer};
taosWLockLatch(&pTask->pMeta->lock); taosWLockLatch(&pTask->pMeta->lock);
streamMetaSaveTask(pTask->pMeta, pTask); streamMetaSaveTask(pTask->pMeta, pTask);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册