diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index ba349e11f1f3204b0c4ac8e6b3bc46d87498edae..78fd9bed5ddff78b11383c096769e767f92a6648 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -273,6 +273,7 @@ typedef struct SStreamId { typedef struct SCheckpointInfo { int64_t id; int64_t version; // offset in WAL + int64_t currentVer;// current offset in WAL, not serialize it } SCheckpointInfo; typedef struct SStreamStatus { @@ -537,6 +538,7 @@ void streamTaskInputFail(SStreamTask* pTask); int32_t streamTryExec(SStreamTask* pTask); int32_t streamSchedExec(SStreamTask* pTask); int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock); +bool streamTaskShouldStop(const SStreamStatus* pStatus); int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index 0244a4fd6e34c8f362047a9829ecc56285ff31fa..16e7ffc5367c857dad1414c788dc46d9f6263b42 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -119,6 +119,7 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal) pVnode->pFetchQ->threadId); while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10); + tqNotifyClose(pVnode->pImpl->pTq); dInfo("vgId:%d, wait for vnode stream queue:%p is empty", pVnode->vgId, pVnode->pStreamQ); while (!taosQueueEmpty(pVnode->pStreamQ)) taosMsleep(10); @@ -141,7 +142,6 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal) dInfo("vgId:%d, vnode is closed", pVnode->vgId); if (commitAndRemoveWal) { - char path[TSDB_FILENAME_LEN] = {0}; snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d%swal", TD_DIRSEP, pVnode->vgId, TD_DIRSEP); dInfo("vgId:%d, remove all wals, path:%s", pVnode->vgId, path); tfsRmdir(pMgmt->pTfs, path); diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index db17e4f533128191474690d2ace5764cf6c4b3b2..acc0d29382a17a0fef955bdec489b50d4a7a2233 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -180,16 +180,9 @@ int32_t tqStreamTasksScanWal(STQ* pTq); // tq util char* createStreamTaskIdStr(int64_t streamId, int32_t taskId); -void createStreamTaskOffsetKey(char* dst, uint64_t streamId, uint32_t taskId); int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem, int64_t ver); -int32_t launchTaskForWalBlock(SStreamTask* pTask, SFetchRet* pRet, STqOffset* pOffset); int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg); -void doSaveTaskOffset(STqOffsetStore* pOffsetStore, const char* pKey, int64_t ver); -void saveOffsetForAllTasks(STQ* pTq, int64_t ver); -void initOffsetForAllRestoreTasks(STQ* pTq); -int32_t transferToWalReadTask(SStreamMeta* pStreamMeta, SArray* pTaskList); - #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 81f7c3d52ae37a14bb95e636d3c94171ae27c1bb..416bc6cdc73a794b77eac85c724f6f21aec83a47 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -190,6 +190,7 @@ int32_t tsdbSetKeepCfg(STsdb* pTsdb, STsdbCfg* pCfg); int tqInit(); void tqCleanUp(); STQ* tqOpen(const char* path, SVnode* pVnode); +void tqNotifyClose(STQ*); void tqClose(STQ*); int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver); int tqRegisterPushHandle(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg, SMqDataRsp* pDataRsp, diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 962b89732e2a391e6f82a98855fe5f3c502c3bbe..ae52db163fb3038f4983f5a63f2a0099986ef628 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -154,6 +154,31 @@ void tqClose(STQ* pTq) { taosMemoryFree(pTq); } +void tqNotifyClose(STQ* pTq) { + if (pTq != NULL) { + taosWLockLatch(&pTq->pStreamMeta->lock); + + void* pIter = NULL; + while (1) { + pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter); + if (pIter == NULL) { + break; + } + + SStreamTask* pTask = *(SStreamTask**)pIter; + tqDebug("vgId:%d s-task:%s set dropping flag", pTq->pStreamMeta->vgId, pTask->id.idStr); + pTask->status.taskStatus = TASK_STATUS__STOP; + + int64_t st = taosGetTimestampMs(); + qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS); + int64_t el = taosGetTimestampMs() - st; + tqDebug("vgId:%d s-task:%s is closed in %" PRId64 "ms", pTq->pStreamMeta->vgId, pTask->id.idStr, el); + } + + taosWUnLockLatch(&pTq->pStreamMeta->lock); + } +} + static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId, int32_t type) { int32_t len = 0; @@ -573,6 +598,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { pTask->pMsgCb = &pTq->pVnode->msgCb; pTask->pMeta = pTq->pStreamMeta; pTask->chkInfo.version = ver; + pTask->chkInfo.currentVer = ver; // expand executor if (pTask->fillHistory) { @@ -966,14 +992,21 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { int32_t* pRef = taosMemoryMalloc(sizeof(int32_t)); *pRef = 1; + taosWLockLatch(&pTq->pStreamMeta->lock); + void* pIter = NULL; while (1) { pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter); - if (pIter == NULL) break; + if (pIter == NULL) { + break; + } + SStreamTask* pTask = *(SStreamTask**)pIter; - if (pTask->taskLevel != TASK_LEVEL__SOURCE) continue; + if (pTask->taskLevel != TASK_LEVEL__SOURCE) { + continue; + } - qDebug("delete req enqueue stream task: %d, ver: %" PRId64, pTask->id.taskId, ver); + qDebug("s-task:%s delete req enqueue, ver: %" PRId64, pTask->id.idStr, ver); if (!failed) { SStreamRefDataBlock* pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0); @@ -983,15 +1016,13 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { atomic_add_fetch_32(pRefBlock->dataRef, 1); if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pRefBlock) < 0) { - qError("stream task input del failed, task id %d", pTask->id.taskId); - atomic_sub_fetch_32(pRef, 1); taosFreeQitem(pRefBlock); continue; } if (streamSchedExec(pTask) < 0) { - qError("stream task launch failed, task id %d", pTask->id.taskId); + qError("s-task:%s stream task launch failed", pTask->id.idStr); continue; } @@ -1000,8 +1031,9 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { } } + taosWUnLockLatch(&pTq->pStreamMeta->lock); + int32_t ref = atomic_sub_fetch_32(pRef, 1); - /*A(ref >= 0);*/ if (ref == 0) { blockDataDestroy(pDelBlock); taosMemoryFree(pRef); @@ -1032,23 +1064,9 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { } blockDataDestroy(pDelBlock); #endif - return 0; } -static int32_t addSubmitBlockNLaunchTask(STqOffsetStore* pOffsetStore, SStreamTask* pTask, SStreamDataSubmit2* pSubmit, - const char* key, int64_t ver) { - doSaveTaskOffset(pOffsetStore, key, ver); - int32_t code = tqAddInputBlockNLaunchTask(pTask, (SStreamQueueItem*)pSubmit, ver); - - // remove the offset, if all functions are completed successfully. - if (code == TSDB_CODE_SUCCESS) { - tqOffsetDelete(pOffsetStore, key); - } - - return code; -} - int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) { #if 0 void* pIter = NULL; @@ -1310,8 +1328,6 @@ int32_t tqStartStreamTasks(STQ* pTq) { } tqDebug("vgId:%d start wal scan stream tasks, tasks:%d", vgId, numOfTasks); - initOffsetForAllRestoreTasks(pTq); - pRunReq->head.vgId = vgId; pRunReq->streamId = 0; pRunReq->taskId = WAL_READ_TASKS_ID; diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 25ab7209d2ca33c928ba5b648348bcc329189d1f..2cda12c0e10fb02bd5928b4f349f75b7d1fad12b 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -1023,6 +1023,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { } // update the table list handle for each stream scanner/wal reader + taosWLockLatch(&pTq->pStreamMeta->lock); while (1) { pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter); if (pIter == NULL) { @@ -1039,5 +1040,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { } } + taosWUnLockLatch(&pTq->pStreamMeta->lock); + return 0; } diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index 56f0a80b9e079518e332b950162f71838773adb4..c164d037e0bc6e124aa5edfbe6a3570502afbfcf 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -15,12 +15,12 @@ #include "tq.h" -static int32_t doCreateReqsByScanWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetStore, bool* pScanIdle); +static int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle); // this function should be executed by stream threads. // there is a case that the WAL increases more fast than the restore procedure, and this restore procedure // will not stop eventually. -int tqStreamTasksScanWal(STQ* pTq) { +int32_t tqStreamTasksScanWal(STQ* pTq) { int32_t vgId = TD_VID(pTq->pVnode); SStreamMeta* pMeta = pTq->pStreamMeta; int64_t st = taosGetTimestampMs(); @@ -31,7 +31,7 @@ int tqStreamTasksScanWal(STQ* pTq) { // check all restore tasks bool shouldIdle = true; - doCreateReqsByScanWal(pTq->pStreamMeta, pTq->pOffsetStore, &shouldIdle); + createStreamRunReq(pTq->pStreamMeta, &shouldIdle); int32_t times = 0; @@ -76,7 +76,7 @@ static SArray* extractTaskIdList(SStreamMeta* pStreamMeta, int32_t numOfTasks) { return pTaskIdList; } -int32_t doCreateReqsByScanWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetStore, bool* pScanIdle) { +int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { *pScanIdle = true; bool noNewDataInWal = true; int32_t vgId = pStreamMeta->vgId; @@ -89,6 +89,8 @@ int32_t doCreateReqsByScanWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetS tqDebug("vgId:%d start to check wal to extract new submit block for %d tasks", vgId, numOfTasks); SArray* pTaskIdList = extractTaskIdList(pStreamMeta, numOfTasks); + // update the new task number + numOfTasks = taosArrayGetSize(pTaskIdList); for (int32_t i = 0; i < numOfTasks; ++i) { int32_t* pTaskId = taosArrayGet(pTaskIdList, i); SStreamTask* pTask = streamMetaAcquireTask(pStreamMeta, *pTaskId); @@ -97,21 +99,19 @@ int32_t doCreateReqsByScanWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetS } int32_t status = pTask->status.taskStatus; - if ((pTask->taskLevel != TASK_LEVEL__SOURCE) || (status == TASK_STATUS__DROPPING)) { + if (pTask->taskLevel != TASK_LEVEL__SOURCE) { + tqDebug("s-task:%s not source task, no need to start", pTask->id.idStr); streamMetaReleaseTask(pStreamMeta, pTask); continue; } - if (status == TASK_STATUS__RECOVER_PREPARE || status == TASK_STATUS__WAIT_DOWNSTREAM) { + if (streamTaskShouldStop(&pTask->status) || status == TASK_STATUS__RECOVER_PREPARE || + status == TASK_STATUS__WAIT_DOWNSTREAM) { tqDebug("s-task:%s skip push data, not ready for processing, status %d", pTask->id.idStr, status); streamMetaReleaseTask(pStreamMeta, pTask); continue; } - // check if offset value exists - char key[128] = {0}; - createStreamTaskOffsetKey(key, pTask->id.streamId, pTask->id.taskId); - if (tInputQueueIsFull(pTask)) { tqDebug("vgId:%d s-task:%s input queue is full, do nothing", vgId, pTask->id.idStr); streamMetaReleaseTask(pStreamMeta, pTask); @@ -120,19 +120,15 @@ int32_t doCreateReqsByScanWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetS *pScanIdle = false; - // check if offset value exists - STqOffset* pOffset = tqOffsetRead(pOffsetStore, key); - ASSERT(pOffset != NULL); - // seek the stored version and extract data from WAL - int32_t code = walReadSeekVer(pTask->exec.pWalReader, pOffset->val.version); + int32_t code = walReadSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer); if (code != TSDB_CODE_SUCCESS) { // no data in wal, quit streamMetaReleaseTask(pStreamMeta, pTask); continue; } // append the data for the stream - tqDebug("vgId:%d wal reader seek to ver:%" PRId64 " %s", vgId, pOffset->val.version, pTask->id.idStr); + tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer); SPackedData packData = {0}; code = extractSubmitMsgFromWal(pTask->exec.pWalReader, &packData); @@ -151,14 +147,13 @@ int32_t doCreateReqsByScanWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetS noNewDataInWal = false; - tqDebug("s-task:%s submit data extracted from WAL", pTask->id.idStr); code = tqAddInputBlockNLaunchTask(pTask, (SStreamQueueItem*)p, packData.ver); if (code == TSDB_CODE_SUCCESS) { - pOffset->val.version = walReaderGetCurrentVer(pTask->exec.pWalReader); + pTask->chkInfo.currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader); tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr, - pOffset->val.version); + pTask->chkInfo.currentVer); } else { - // do nothing + tqError("s-task:%s append input queue failed, ver:%"PRId64, pTask->id.idStr, pTask->chkInfo.currentVer); } streamDataSubmitDestroy(p); diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 5ac747947f8c2058a94315e4684883b3d138c1e2..017b479c1b8ca55aaace9c99bbf5420554b88370 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -25,21 +25,6 @@ char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) { return taosStrdup(buf); } -// stream_task:stream_id:task_id -void createStreamTaskOffsetKey(char* dst, uint64_t streamId, uint32_t taskId) { - int32_t n = 12; - char* p = dst; - - memcpy(p, "stream_task:", n); - p += n; - - int32_t inc = tintToHex(streamId, p); - p += inc; - - *(p++) = ':'; - tintToHex(taskId, p); -} - int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem, int64_t ver) { int32_t code = tAppendDataToInputQueue(pTask, pQueueItem); if (code < 0) { @@ -55,75 +40,6 @@ int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueI return TSDB_CODE_SUCCESS; } -void initOffsetForAllRestoreTasks(STQ* pTq) { - void* pIter = NULL; - - while(1) { - pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter); - if (pIter == NULL) { - break; - } - - SStreamTask* pTask = *(SStreamTask**)pIter; - if (pTask->taskLevel != TASK_LEVEL__SOURCE) { - continue; - } - - if (pTask->status.taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) { - tqDebug("s-task:%s skip push data, since not ready, status %d", pTask->id.idStr, pTask->status.taskStatus); - continue; - } - - char key[128] = {0}; - createStreamTaskOffsetKey(key, pTask->id.streamId, pTask->id.taskId); - - STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, key); - if (pOffset == NULL) { - doSaveTaskOffset(pTq->pOffsetStore, key, pTask->chkInfo.version); - } - } -} - -void saveOffsetForAllTasks(STQ* pTq, int64_t ver) { - void* pIter = NULL; - - while(1) { - pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter); - if (pIter == NULL) { - break; - } - - SStreamTask* pTask = *(SStreamTask**)pIter; - if (pTask->taskLevel != TASK_LEVEL__SOURCE) { - continue; - } - - if (pTask->status.taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) { - tqDebug("s-task:%s skip push data, not ready for processing, status %d", pTask->id.idStr, - pTask->status.taskStatus); - continue; - } - - char key[128] = {0}; - createStreamTaskOffsetKey(key, pTask->id.streamId, pTask->id.taskId); - - STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, key); - if (pOffset == NULL) { - doSaveTaskOffset(pTq->pOffsetStore, key, ver); - } - } -} - -void doSaveTaskOffset(STqOffsetStore* pOffsetStore, const char* pKey, int64_t ver) { - STqOffset offset = {0}; - tqOffsetResetToLog(&offset.val, ver); - - tstrncpy(offset.subKey, pKey, tListLen(offset.subKey)); - - // keep the offset info in the offset store - tqOffsetWrite(pOffsetStore, &offset); -} - static int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq, int8_t subType) { pRsp->reqOffset = pReq->reqOffset; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 007a6f63d16d59da4b648f43803a058963a6c487..be2bd0e6e2ac0d8e099a24f8c077d54db868f102 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2541,6 +2541,20 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { } while (1) { + if (isTaskKilled(pTaskInfo)) { + + if (pInfo->pUpdated != NULL) { + pInfo->pUpdated = taosArrayDestroy(pInfo->pUpdated); + } + + if (pInfo->pUpdatedMap != NULL) { + tSimpleHashCleanup(pInfo->pUpdatedMap); + pInfo->pUpdatedMap = NULL; + } + + T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); + } + SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); if (pBlock == NULL) { pOperator->status = OP_RES_TO_RETURN; @@ -2635,6 +2649,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { while ((pIte = tSimpleHashIterate(pInfo->pUpdatedMap, pIte, &iter)) != NULL) { taosArrayPush(pInfo->pUpdated, pIte); } + tSimpleHashCleanup(pInfo->pUpdatedMap); pInfo->pUpdatedMap = NULL; taosArraySort(pInfo->pUpdated, winKeyCmprImpl); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index f52af663872650107d34ddf126108cfe22db1bf5..325d315262c5cfc64ee3b5e9b9ff0e77230405e5 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -17,6 +17,11 @@ #define STREAM_EXEC_MAX_BATCH_NUM 100 +bool streamTaskShouldStop(const SStreamStatus* pStatus) { + int32_t status = atomic_load_8((int8_t*) &pStatus->taskStatus); + return (status == TASK_STATUS__STOP) || (status == TASK_STATUS__DROPPING); +} + static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* pRes) { int32_t code = TSDB_CODE_SUCCESS; void* pExecutor = pTask->exec.pExecutor; @@ -66,7 +71,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* // pExecutor while (1) { - if (pTask->status.taskStatus == TASK_STATUS__DROPPING) { + if (streamTaskShouldStop(&pTask->status)) { return 0; } @@ -106,7 +111,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* continue; } - qDebug("task %d(child %d) executed and get block", pTask->id.taskId, pTask->selfChildId); + qDebug("s-task:%s (child %d) executed and get block", pTask->id.idStr, pTask->selfChildId); SSDataBlock block = {0}; assignOneDataBlock(&block, output); @@ -134,7 +139,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { int32_t batchCnt = 0; while (1) { - if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) { + if (streamTaskShouldStop(&pTask->status)) { taosArrayDestroy(pRes); return 0; } @@ -270,7 +275,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { } } - if (pTask->status.taskStatus == TASK_STATUS__DROPPING) { + if (streamTaskShouldStop(&pTask->status)) { if (pInput) { streamFreeQitem(pInput); } @@ -301,7 +306,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { ", checkPoint id:%" PRId64 " -> %" PRId64, pTask->id.idStr, pTask->chkInfo.version, dataVer, pTask->chkInfo.id, ckId); - pTask->chkInfo = (SCheckpointInfo) {.version = dataVer, .id = ckId}; + pTask->chkInfo = (SCheckpointInfo) {.version = dataVer, .id = ckId, .currentVer = pTask->chkInfo.currentVer}; taosWLockLatch(&pTask->pMeta->lock); streamMetaSaveTask(pTask->pMeta, pTask); @@ -368,7 +373,7 @@ int32_t streamTryExec(SStreamTask* pTask) { atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); qDebug("s-task:%s exec completed", pTask->id.idStr); - if (!taosQueueEmpty(pTask->inputQueue->queue) && (pTask->status.taskStatus != TASK_STATUS__DROPPING)) { + if (!taosQueueEmpty(pTask->inputQueue->queue) && (!streamTaskShouldStop(&pTask->status))) { streamSchedExec(pTask); } } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 90c06dbd69af3c6d743256bfeca94ef8bcee19b8..065e9d280f41834a35fff2409a9006e8a61bbd6f 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -191,10 +191,12 @@ SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) { taosRLockLatch(&pMeta->lock); SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); - if (ppTask != NULL && (atomic_load_8(&((*ppTask)->status.taskStatus)) != TASK_STATUS__DROPPING)) { - atomic_add_fetch_32(&(*ppTask)->refCnt, 1); - taosRUnLockLatch(&pMeta->lock); - return *ppTask; + if (ppTask != NULL) { + if (!streamTaskShouldStop(&(*ppTask)->status)) { + atomic_add_fetch_32(&(*ppTask)->refCnt, 1); + taosRUnLockLatch(&pMeta->lock); + return *ppTask; + } } taosRUnLockLatch(&pMeta->lock); @@ -205,7 +207,7 @@ void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask) { int32_t left = atomic_sub_fetch_32(&pTask->refCnt, 1); ASSERT(left >= 0); if (left == 0) { - ASSERT(atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING); + ASSERT(streamTaskShouldStop(&pTask->status)); tFreeStreamTask(pTask); } } @@ -216,11 +218,8 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) { SStreamTask* pTask = *ppTask; taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t)); tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), pMeta->txn); - /*if (pTask->timer) { - * taosTmrStop(pTask->timer);*/ - /*pTask->timer = NULL;*/ - /*}*/ - atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING); + + atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__STOP); taosWLockLatch(&pMeta->lock); streamMetaReleaseTask(pMeta, pTask); diff --git a/tests/script/tsim/stream/basic1.sim b/tests/script/tsim/stream/basic1.sim index 15ca6bf7c924bb7cffba8f0607cb0a5c1fea95cb..65032817b31c6aa2060b7c681d87801d73f7c84b 100644 --- a/tests/script/tsim/stream/basic1.sim +++ b/tests/script/tsim/stream/basic1.sim @@ -37,7 +37,7 @@ if $loop_count == 20 then endi if $rows != 4 then - print =====rows=$rows, expect 4 + print =====rows=$rows expect 4 goto loop0 endi