diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index b0542e350f43f01106aca2e6a6f966b9daea9836..0d9cd4bcff0d7336e43461487dd213342fa613ef 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -343,7 +343,7 @@ tmq_list_t* tmq_list_new() { int32_t tmq_list_append(tmq_list_t* list, const char* src) { SArray* container = &list->container; - char* topic = strDupUnquo(src); + char* topic = strdup(src); if (taosArrayPush(container, &topic) == NULL) return -1; return 0; } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index c2125f75f8f81fc2627a4755fbd1cb44277fdfd8..5777df4fa6b96cdb1c4693ce76ca5c44d50cc80f 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -235,7 +235,8 @@ static int32_t mndStreamGetPlanString(const char *ast, int8_t triggerType, int64 } static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, SCMCreateStreamReq *pCreate) { - SNode *pAst = NULL; + SNode *pAst = NULL; + SQueryPlan *pPlan = NULL; mDebug("stream:%s to create", pCreate->name); memcpy(pObj->name, pCreate->name, TSDB_STREAM_FNAME_LEN); @@ -293,7 +294,6 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, goto FAIL; } - SQueryPlan *pPlan = NULL; SPlanContext cxt = { .pAstRoot = pAst, .topicQuery = false, @@ -317,6 +317,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, FAIL: if (pAst != NULL) nodesDestroyNode(pAst); + if (pPlan != NULL) qDestroyQueryPlan(pPlan); return 0; } @@ -541,7 +542,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { // build stream obj from request SStreamObj streamObj = {0}; if (mndBuildStreamObjFromCreateReq(pMnode, &streamObj, &createStreamReq) < 0) { - ASSERT(0); + /*ASSERT(0);*/ mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr()); goto _OVER; } @@ -689,7 +690,14 @@ int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { terrno = TSDB_CODE_MND_STREAM_ALREADY_EXIST; return -1; } else { - // TODO drop all task on snode +#if 0 + if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) { + mError("stream:%s, failed to drop task since %s", pStream->name, terrstr()); + sdbRelease(pMnode->pSdb, pStream); + sdbCancelFetch(pSdb, pIter); + return -1; + } +#endif if (mndPersistDropStreamLog(pMnode, pTrans, pStream) < 0) { sdbRelease(pSdb, pStream); sdbCancelFetch(pSdb, pIter); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index df80a4c218f2b98aad11de5c3c5cf0113714470a..3b6e9abc67169e770a42c73f8f24a9354778ab17 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -82,6 +82,13 @@ void tqClose(STQ* pTq) { if (pTq) { tqOffsetClose(pTq->pOffsetStore); taosHashCleanup(pTq->handles); + void* pIter = NULL; + while (1) { + pIter = taosHashIterate(pTq->pStreamTasks, pIter); + if (pIter == NULL) break; + SStreamTask* pTask = *(SStreamTask**)pIter; + tFreeSStreamTask(pTask); + } taosHashCleanup(pTq->pStreamTasks); taosHashCleanup(pTq->pushMgr); taosMemoryFree(pTq->path); @@ -608,7 +615,8 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) { streamSetupTrigger(pTask); - tqInfo("deploy stream task id %d child id %d on vgId:%d", pTask->taskId, pTask->selfChildId, TD_VID(pTq->pVnode)); + tqInfo("deploy stream task on vg %d, task id %d, child id %d", TD_VID(pTq->pVnode), pTask->taskId, + pTask->selfChildId); taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*)); @@ -634,9 +642,6 @@ int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq) { pIter = taosHashIterate(pTq->pStreamTasks, pIter); if (pIter == NULL) break; SStreamTask* pTask = *(SStreamTask**)pIter; - if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING) { - continue; - } if (!pTask->isDataScan) continue; if (!failed) { @@ -665,11 +670,12 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { SStreamTaskRunReq* pReq = pMsg->pCont; int32_t taskId = pReq->taskId; SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); - if (atomic_load_8(&pTask->taskStatus) != TASK_STATUS__NORMAL) { + if (pTask) { + streamProcessRunReq(pTask); return 0; + } else { + return -1; } - streamProcessRunReq(pTask); - return 0; } int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) { @@ -682,55 +688,62 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) { tDecodeStreamDispatchReq(&decoder, &req); int32_t taskId = req.taskId; SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); - if (atomic_load_8(&pTask->taskStatus) != TASK_STATUS__NORMAL) { + if (pTask) { + SRpcMsg rsp = { + .info = pMsg->info, + .code = 0, + }; + streamProcessDispatchReq(pTask, &req, &rsp); return 0; + } else { + return -1; } - SRpcMsg rsp = { - .info = pMsg->info, - .code = 0, - }; - streamProcessDispatchReq(pTask, &req, &rsp); - return 0; } int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) { SStreamTaskRecoverReq* pReq = pMsg->pCont; int32_t taskId = pReq->taskId; SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); - if (atomic_load_8(&pTask->taskStatus) != TASK_STATUS__NORMAL) { + if (pTask) { + streamProcessRecoverReq(pTask, pReq, pMsg); return 0; + } else { + return -1; } - streamProcessRecoverReq(pTask, pReq, pMsg); - return 0; } int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t taskId = pRsp->taskId; SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); - if (atomic_load_8(&pTask->taskStatus) != TASK_STATUS__NORMAL) { + if (pTask) { + streamProcessDispatchRsp(pTask, pRsp); return 0; + } else { + return -1; } - streamProcessDispatchRsp(pTask, pRsp); - return 0; } int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) { SStreamTaskRecoverRsp* pRsp = pMsg->pCont; int32_t taskId = pRsp->taskId; SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); - if (atomic_load_8(&pTask->taskStatus) != TASK_STATUS__NORMAL) { + if (pTask) { + streamProcessRecoverRsp(pTask, pRsp); return 0; + } else { + return -1; } - streamProcessRecoverRsp(pTask, pRsp); - return 0; } int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) { SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg; SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &pReq->taskId, sizeof(int32_t)); - atomic_store_8(&pTask->taskStatus, TASK_STATUS__DROPPING); + if (pTask) { + taosHashRemove(pTq->pStreamTasks, &pReq->taskId, sizeof(int32_t)); + atomic_store_8(&pTask->taskStatus, TASK_STATUS__DROPPING); + } // todo // clear queue // push drop req into queue diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 8753ecc47c04f1d5d4e151c502436225cc49e806..dda306ccf8eb69507162dc40d6b33f1a5cafbb65 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -98,8 +98,21 @@ STqReader* tqOpenReader(SVnode* pVnode) { void tqCloseReader(STqReader* pReader) { // close wal reader + if (pReader->pWalReader) { + walCloseReader(pReader->pWalReader); + } // free cached schema + if (pReader->pSchema) { + taosMemoryFree(pReader->pSchema); + } + if (pReader->pSchemaWrapper) { + tDeleteSSchemaWrapper(pReader->pSchemaWrapper); + } + if (pReader->pColIdList) { + taosArrayDestroy(pReader->pColIdList); + } // free hash + taosHashCleanup(pReader->tbIdHash); taosMemoryFree(pReader); } diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index dbbb2b266101d2d21fc7bbbe1590b0f59af82e33..1980f826b0e3b0fe35e9f7e20741e4abce62e0f2 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -146,7 +146,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo const STColumn* pColumn = &pTSchema->columns[k]; SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, k); if (colDataIsNull_s(pColData, j)) { - tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NONE, NULL, false, pColumn->offset, k); + tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, pColumn->offset, k); } else { void* data = colDataGetData(pColData, j); tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, pColumn->offset, k); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 045b4379ca582e82e927bd4a67868ebcc3431b86..24a2ad055822f1a593fdb95daf19da3ae026c531 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -147,7 +147,6 @@ typedef struct { SSDataBlock* pullOverBlk; // for streaming SWalFilterCond cond; int64_t lastScanUid; - SStreamQueue* inputQueue; } SStreamTaskInfo; typedef struct SExecTaskInfo { diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index 6381d20255004a71541f0e6c227ea90fa6613769..d910b8be344be744d72ee176c2cf549eac2c4341 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -44,13 +44,6 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, goto _error; } - if (model == OPTR_EXEC_MODEL_STREAM) { - (*pTask)->streamInfo.inputQueue = streamQueueOpen(); - if ((*pTask)->streamInfo.inputQueue == NULL) { - goto _error; - } - } - SDataSinkMgtCfg cfg = {.maxDataBlockNum = 1000, .maxDataBlockNumPerQuery = 100}; code = dsDataSinkMgtInit(&cfg); if (code != TSDB_CODE_SUCCESS) { @@ -259,12 +252,14 @@ int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner) { } } +#if 0 int32_t qStreamInput(qTaskInfo_t tinfo, void* pItem) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM); taosWriteQitem(pTaskInfo->streamInfo.inputQueue->queue, pItem); return 0; } +#endif void* qExtractReaderFromStreamScanner(void* scanner) { SStreamScanInfo* pInfo = scanner; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index b14648991e62dddabfac180495f6f790d96f038c..38ac6f942dc8411a6fdb653463e4713afdf18e14 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -884,22 +884,22 @@ static bool prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_ return true; } -static STimeWindow getSlidingWindow(TSKEY* tsCol, SInterval* pInterval, SDataBlockInfo* pDataBlockInfo, int32_t* pRowIndex) { - SResultRowInfo dumyInfo; +static STimeWindow getSlidingWindow(TSKEY* tsCol, SInterval* pInterval, SDataBlockInfo* pDataBlockInfo, + int32_t* pRowIndex) { + SResultRowInfo dumyInfo; dumyInfo.cur.pageId = -1; - STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[*pRowIndex], pInterval, - TSDB_ORDER_ASC); + STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[*pRowIndex], pInterval, TSDB_ORDER_ASC); STimeWindow endWin = win; STimeWindow preWin = win; while (1) { - (*pRowIndex) += getNumOfRowsInTimeWindow(pDataBlockInfo, tsCol, *pRowIndex, endWin.ekey, - binarySearchForKey, NULL, TSDB_ORDER_ASC); + (*pRowIndex) += getNumOfRowsInTimeWindow(pDataBlockInfo, tsCol, *pRowIndex, endWin.ekey, binarySearchForKey, NULL, + TSDB_ORDER_ASC); do { preWin = endWin; getNextTimeWindow(pInterval, &endWin, TSDB_ORDER_ASC); } while (tsCol[(*pRowIndex) - 1] >= endWin.skey); endWin = preWin; - if (win.ekey == endWin.ekey || (*pRowIndex) == pDataBlockInfo->rows ) { + if (win.ekey == endWin.ekey || (*pRowIndex) == pDataBlockInfo->rows) { win.ekey = endWin.ekey; return win; } @@ -933,7 +933,8 @@ static bool prepareDataScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t t pInfo->updateWin.ekey = tsCols[*pRowIndex - 1]; // win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[*pRowIndex], &pInfo->interval, TSDB_ORDER_ASC); // (*pRowIndex) += - // getNumOfRowsInTimeWindow(&pSDB->info, tsCols, *pRowIndex, win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); + // getNumOfRowsInTimeWindow(&pSDB->info, tsCols, *pRowIndex, win.ekey, binarySearchForKey, NULL, + // TSDB_ORDER_ASC); } needRead = true; } else if (isStateWindow(pInfo)) { @@ -1447,6 +1448,28 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNo return NULL; } +static void destroyStreamScanOperatorInfo(void* param, int32_t numOfOutput) { + SStreamScanInfo* pStreamScan = (SStreamScanInfo*)param; +#if 0 + if (pStreamScan->pTableScanOp && pStreamScan->pTableScanOp->info) { + STableScanInfo* pTableScanInfo = pStreamScan->pTableScanOp->info; + destroyTableScanOperatorInfo(pTableScanInfo, 1); + } +#endif + if (pStreamScan->tqReader) { + tqCloseReader(pStreamScan->tqReader); + } + if (pStreamScan->pColMatchInfo) { + taosArrayDestroy(pStreamScan->pColMatchInfo); + } + blockDataDestroy(pStreamScan->pRes); + blockDataDestroy(pStreamScan->pUpdateRes); + blockDataDestroy(pStreamScan->pPullDataRes); + blockDataDestroy(pStreamScan->pDeleteDataRes); + taosArrayDestroy(pStreamScan->pBlockLists); + taosMemoryFree(pStreamScan); +} + SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup, uint64_t queryId, uint64_t taskId) { @@ -1561,8 +1584,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); pOperator->pTaskInfo = pTaskInfo; - pOperator->fpSet = - createOperatorFpSet(operatorDummyOpenFn, doStreamScan, NULL, NULL, operatorDummyCloseFn, NULL, NULL, NULL); + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamScan, NULL, NULL, destroyStreamScanOperatorInfo, + NULL, NULL, NULL); return pOperator; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index d0d81e3343af3e6933986e376bbd1a66cdc0fb73..635ae820fee964dc6f318548714a14714730514b 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -74,63 +74,62 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) } static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) { + int32_t cnt = 0; + void* data = NULL; while (1) { - int32_t cnt = 0; - void* data = NULL; - while (1) { - SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue); - if (qItem == NULL) { - qDebug("stream exec over, queue empty"); + SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue); + if (qItem == NULL) { + qDebug("stream exec over, queue empty"); + break; + } + if (data == NULL) { + data = qItem; + streamQueueProcessSuccess(pTask->inputQueue); + continue; + } else { + if (streamAppendQueueItem(data, qItem) < 0) { + streamQueueProcessFail(pTask->inputQueue); break; - } - if (data == NULL) { - data = qItem; - streamQueueProcessSuccess(pTask->inputQueue); - continue; } else { - if (streamAppendQueueItem(data, qItem) < 0) { - streamQueueProcessFail(pTask->inputQueue); - break; - } else { - cnt++; - streamQueueProcessSuccess(pTask->inputQueue); - taosArrayDestroy(((SStreamDataBlock*)qItem)->blocks); - taosFreeQitem(qItem); - } + cnt++; + streamQueueProcessSuccess(pTask->inputQueue); + taosArrayDestroy(((SStreamDataBlock*)qItem)->blocks); + taosFreeQitem(qItem); } } - if (data == NULL) break; + } + if (pTask->taskStatus == TASK_STATUS__DROPPING) { + if (data) streamFreeQitem(data); + taosArrayDestroyEx(pRes, (FDelete)tDeleteSSDataBlock); + return NULL; + } - qDebug("stream task %d exec begin, batch msg: %d", pTask->taskId, cnt); - streamTaskExecImpl(pTask, data, pRes); - qDebug("stream task %d exec end", pTask->taskId); + if (data == NULL) return pRes; - if (pTask->taskStatus == TASK_STATUS__DROPPING) { - taosArrayDestroyEx(pRes, (FDelete)tDeleteSSDataBlock); + qDebug("stream task %d exec begin, msg batch: %d", pTask->taskId, cnt); + streamTaskExecImpl(pTask, data, pRes); + qDebug("stream task %d exec end", pTask->taskId); + + if (taosArrayGetSize(pRes) != 0) { + SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM); + if (qRes == NULL) { + streamQueueProcessFail(pTask->inputQueue); + taosArrayDestroy(pRes); return NULL; } - - if (taosArrayGetSize(pRes) != 0) { - SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM); - if (qRes == NULL) { - streamQueueProcessFail(pTask->inputQueue); - taosArrayDestroy(pRes); - return NULL; - } - qRes->type = STREAM_INPUT__DATA_BLOCK; - qRes->blocks = pRes; - if (streamTaskOutput(pTask, qRes) < 0) { - /*streamQueueProcessFail(pTask->inputQueue);*/ - taosArrayDestroyEx(pRes, (FDelete)tDeleteSSDataBlock); - taosFreeQitem(qRes); - return NULL; - } - /*streamQueueProcessSuccess(pTask->inputQueue);*/ - pRes = taosArrayInit(0, sizeof(SSDataBlock)); + qRes->type = STREAM_INPUT__DATA_BLOCK; + qRes->blocks = pRes; + if (streamTaskOutput(pTask, qRes) < 0) { + /*streamQueueProcessFail(pTask->inputQueue);*/ + taosArrayDestroyEx(pRes, (FDelete)tDeleteSSDataBlock); + taosFreeQitem(qRes); + return NULL; } - - streamFreeQitem(data); + /*streamQueueProcessSuccess(pTask->inputQueue);*/ + pRes = taosArrayInit(0, sizeof(SSDataBlock)); } + + streamFreeQitem(data); return pRes; } @@ -171,6 +170,11 @@ int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb) { } FAIL: if (pRes) taosArrayDestroy(pRes); - atomic_store_8(&pTask->execStatus, TASK_EXEC_STATUS__IDLE); - return -1; + if (pTask->taskStatus == TASK_STATUS__DROPPING) { + tFreeSStreamTask(pTask); + return 0; + } else { + atomic_store_8(&pTask->execStatus, TASK_EXEC_STATUS__IDLE); + return -1; + } } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 97aa182dc96eeb402316bb9d54df576312d461e3..7488c009bdc9436621c136c007da12f3ee533cac 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -163,6 +163,6 @@ void tFreeSStreamTask(SStreamTask* pTask) { streamQueueClose(pTask->inputQueue); streamQueueClose(pTask->outputQueue); if (pTask->exec.qmsg) taosMemoryFree(pTask->exec.qmsg); - qDestroyTask(pTask->exec.executor); + if (pTask->exec.executor) qDestroyTask(pTask->exec.executor); taosMemoryFree(pTask); }