diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index b77bb54714c09254b61746f807e2e04e26a8d4cc..13b991e0386e8419b542f0be8406fa7cdd20254d 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -172,7 +172,6 @@ int32_t tqStreamTasksStatusCheck(STQ* pTq); // tq util int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStreamRefDataBlock** pRefBlock); -int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem); int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg); int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId, int32_t type, int64_t sver, int64_t ever); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 1f6c162b9d162d310315aba8a65cecf02337c0de..30194360a8762f61810014bbc34f1271836cc0c2 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1136,7 +1136,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { pTask->streamTaskId.taskId, pTask->id.idStr); pTask->status.taskStatus = TASK_STATUS__DROPPING; - tqDebug("s-task:%s scan-history-task set status to be dropping", id); + tqDebug("s-task:%s fill-history task set status to be dropping", id); streamMetaSaveTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask); @@ -1166,12 +1166,14 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { } if (!streamTaskRecoverScanStep1Finished(pTask)) { - tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " do secondary scan-history-data after halt the related stream task:%s", - id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, id); + STimeWindow* pWindow = &pTask->dataRange.window; + tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64 + ", do secondary scan-history data after halt the related stream task:%s", + id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pWindow->skey, pWindow->ekey, id); ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING); st = taosGetTimestampMs(); - streamSetParamForStreamScannerStep2(pTask, pRange, &pTask->dataRange.window); + streamSetParamForStreamScannerStep2(pTask, pRange, pWindow); } if (!streamTaskRecoverScanStep2Finished(pTask)) { diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index ba983b1833fa04ca26d6093d07f9f85eafef36ec..675cbe4549d1bf3d0dc916e34bef968951e1363b 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -336,6 +336,7 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, const char* id) { int32_t len = pReader->pHead->head.bodyLen - sizeof(SMsgHead); extractDelDataBlock(pBody, len, ver, (SStreamRefDataBlock**)pItem); + tqDebug("s-task:%s delete msg extract from WAL, len:%d, ver:%"PRId64, id, len, ver); } else { ASSERT(0); } diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 11bfcf7fc528e0426043eb08df00dc395e923a25..c2e4374505a568afc7809f05afe5a99fbc71e440 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -20,21 +20,6 @@ static int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp, int32_t vgId); -int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem) { - int32_t code = tAppendDataToInputQueue(pTask, pQueueItem); - if (code < 0) { - tqError("s-task:%s failed to put into queue, too many", pTask->id.idStr); - return -1; - } - - if (streamSchedExec(pTask) < 0) { - tqError("stream task:%d failed to be launched, code:%s", pTask->id.taskId, tstrerror(terrno)); - return -1; - } - - return TSDB_CODE_SUCCESS; -} - int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq) { pRsp->reqOffset = pReq->reqOffset; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 810f9709f5887d9bb526eeccfdd07e10dc782e9d..2059ed18e5a8010b459a2f86755ad84e4088dbc9 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1800,8 +1800,10 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { } else { pTSInfo->base.cond.startVersion = pStreamInfo->fillHistoryVer.minVer; pTSInfo->base.cond.endVersion = pStreamInfo->fillHistoryVer.maxVer; - qDebug("stream recover step2, verRange:%" PRId64 " - %" PRId64", %s", pTSInfo->base.cond.startVersion, - pTSInfo->base.cond.endVersion, id); + pTSInfo->base.cond.twindows = pStreamInfo->fillHistoryWindow; + qDebug("stream recover step2, verRange:%" PRId64 " - %" PRId64 ", window:%" PRId64 "-%" PRId64 ", %s", + pTSInfo->base.cond.startVersion, pTSInfo->base.cond.endVersion, pTSInfo->base.cond.twindows.skey, + pTSInfo->base.cond.twindows.ekey, id); pStreamInfo->recoverStep = STREAM_RECOVER_STEP__SCAN2; } diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index cae7e10e3e927304c766150468c0c4c84469ef66..1c824db3b074c760cce06ef9fd2a451c4a679f29 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -65,7 +65,7 @@ static void streamSchedByTimer(void* param, void* tmrId) { SStreamTask* pTask = (void*)param; int8_t status = atomic_load_8(&pTask->triggerStatus); - qDebug("s-task:%s in scheduler timer, trigger status:%d", pTask->id.idStr, status); + qDebug("s-task:%s in scheduler, trigger status:%d, next:%dms", pTask->id.idStr, status, (int32_t)pTask->triggerParam); if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) { streamMetaReleaseTask(NULL, pTask); @@ -74,23 +74,22 @@ static void streamSchedByTimer(void* param, void* tmrId) { } if (status == TASK_TRIGGER_STATUS__ACTIVE) { - SStreamTrigger* trigger = taosAllocateQitem(sizeof(SStreamTrigger), DEF_QITEM, 0); - if (trigger == NULL) { + SStreamTrigger* pTrigger = taosAllocateQitem(sizeof(SStreamTrigger), DEF_QITEM, 0); + if (pTrigger == NULL) { return; } - trigger->type = STREAM_INPUT__GET_RES; - trigger->pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); - if (trigger->pBlock == NULL) { - taosFreeQitem(trigger); + pTrigger->type = STREAM_INPUT__GET_RES; + pTrigger->pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); + if (pTrigger->pBlock == NULL) { + taosFreeQitem(pTrigger); return; } - trigger->pBlock->info.type = STREAM_GET_ALL; atomic_store_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE); - - if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)trigger) < 0) { - taosFreeQitem(trigger); + pTrigger->pBlock->info.type = STREAM_GET_ALL; + if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pTrigger) < 0) { + taosFreeQitem(pTrigger); taosTmrReset(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->schedTimer); return; } @@ -399,6 +398,7 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) { atomic_val_compare_exchange_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE, TASK_TRIGGER_STATUS__ACTIVE); + qDebug("s-task:%s new data arrived, active the trigger, trigerStatus:%d", pTask->id.idStr, pTask->triggerStatus); } return 0; diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index 92f1fc47abb8211d652e666fccd6f8e5245d8c5f..bb4b842787a640435f561d6e75074869da8885af 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -166,6 +166,8 @@ int32_t streamMergeSubmit(SStreamMergedSubmit* pMerged, SStreamDataSubmit* pSubm } SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem) { + terrno = 0; + if (dst->type == STREAM_INPUT__DATA_BLOCK && pElem->type == STREAM_INPUT__DATA_BLOCK) { SStreamDataBlock* pBlock = (SStreamDataBlock*)dst; SStreamDataBlock* pBlockSrc = (SStreamDataBlock*)pElem; @@ -181,7 +183,10 @@ SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* return dst; } else if (dst->type == STREAM_INPUT__DATA_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) { SStreamMergedSubmit* pMerged = streamMergedSubmitNew(); - // todo handle error + if (pMerged == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } streamMergeSubmit(pMerged, (SStreamDataSubmit*)dst); streamMergeSubmit(pMerged, (SStreamDataSubmit*)pElem); @@ -189,6 +194,7 @@ SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* taosFreeQitem(pElem); return (SStreamQueueItem*)pMerged; } else { + qDebug("block type:%d not merged with existed blocks list, type:%d", pElem->type, dst->type); return NULL; } } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index e7adcf36dc5b4b2873ecb61390162579d9c922de..a1922f355312070b08905a70d33b2e6b6eaf3065 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -407,9 +407,11 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { streamTaskResumeFromHalt(pStreamTask); - pTask->status.taskStatus = TASK_STATUS__DROPPING; qDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr); + pTask->status.taskStatus = TASK_STATUS__DROPPING; + streamMetaRemoveTask(pMeta, pTask->id.taskId); + // save to disk taosWLockLatch(&pMeta->lock); streamMetaSaveTask(pMeta, pTask); @@ -464,7 +466,12 @@ static int32_t extractMsgFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu // todo we need to sort the data block, instead of just appending into the array list. void* newRet = streamMergeQueueItem(*pInput, qItem); if (newRet == NULL) { - qError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d", id, *numOfBlocks); + if (terrno == 0) { + qDebug("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d", id, *numOfBlocks); + } else { + qDebug("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d, code:%s", id, *numOfBlocks, + tstrerror(terrno)); + } streamQueueProcessFail(pTask->inputQueue); return TSDB_CODE_SUCCESS; }