未验证 提交 d0e26d2e 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #21216 from taosdata/fix/TS-3358

fix:after restarting taosd, stream does not work.
...@@ -109,6 +109,15 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { ...@@ -109,6 +109,15 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
// seek the stored version and extract data from WAL // seek the stored version and extract data from WAL
int32_t code = walReadSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer); int32_t code = walReadSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer);
if (code != TSDB_CODE_SUCCESS) { // no data in wal, quit if (code != TSDB_CODE_SUCCESS) { // no data in wal, quit
SWal *pWal = pTask->exec.pWalReader->pWal;
if (pTask->chkInfo.currentVer < pWal->vers.firstVer ) {
pTask->chkInfo.currentVer = pWal->vers.firstVer;
code = walReadSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer);
if (code != TSDB_CODE_SUCCESS) {
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
}
}
streamMetaReleaseTask(pStreamMeta, pTask); streamMetaReleaseTask(pStreamMeta, pTask);
continue; continue;
} }
......
...@@ -188,6 +188,11 @@ int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* ...@@ -188,6 +188,11 @@ int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask*
return -1; return -1;
} }
if (streamMetaCommit(pMeta) < 0) {
tFreeStreamTask(pTask);
return -1;
}
taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, POINTER_BYTES); taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, POINTER_BYTES);
taosArrayPush(pMeta->pTaskList, &pTask->id.taskId); taosArrayPush(pMeta->pTaskList, &pTask->id.taskId);
return 0; return 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册