diff --git a/docs/zh/12-taos-sql/14-stream.md b/docs/zh/12-taos-sql/14-stream.md index cd726e0a0ea644f575e16c656eeb4bb2cabf425d..932ad30b1a949d172d81819f2432daa42ce331c8 100644 --- a/docs/zh/12-taos-sql/14-stream.md +++ b/docs/zh/12-taos-sql/14-stream.md @@ -72,7 +72,7 @@ SHOW STREAMS; 若要展示更详细的信息,可以使用: ```sql -SELECT * from performance_schema.`perf_streams`; +SELECT * from information_schema.`ins_streams`; ``` ## 流式计算的触发模式 diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index ecd1b6f916c814fe5ddcb2b72591c80b6e6c450a..4099551188cc1c8e75a01a5bb0dec177ad559da7 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -140,15 +140,40 @@ typedef struct { int8_t type; } SStreamCheckpoint; -typedef struct { - int8_t type; -} SStreamTaskDestroy; - typedef struct { int8_t type; SSDataBlock* pBlock; } SStreamTrigger; +typedef struct SStreamQueueNode SStreamQueueNode; + +struct SStreamQueueNode { + SStreamQueueItem* item; + SStreamQueueNode* next; +}; + +typedef struct { + SStreamQueueNode* head; + int64_t size; +} SStreamQueueRes; + +void streamFreeQitem(SStreamQueueItem* data); + +bool streamQueueResEmpty(const SStreamQueueRes* pRes); +int64_t streamQueueResSize(const SStreamQueueRes* pRes); +SStreamQueueNode* streamQueueResFront(SStreamQueueRes* pRes); +SStreamQueueNode* streamQueueResPop(SStreamQueueRes* pRes); +void streamQueueResClear(SStreamQueueRes* pRes); +SStreamQueueRes streamQueueBuildRes(SStreamQueueNode* pNode); + +typedef struct { + SStreamQueueNode* pHead; +} SStreamQueue1; + +bool streamQueueHasTask(const SStreamQueue1* pQueue); +int32_t streamQueuePush(SStreamQueue1* pQueue, SStreamQueueItem* pItem); +SStreamQueueRes streamQueueGetRes(SStreamQueue1* pQueue); + typedef struct { STaosQueue* queue; STaosQall* qall; diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 300251d64d0626859fdb5a79fc7df81bcb016ca4..58f8172282361d5d5a95c6e66b8c6da2cc90b292 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -324,15 +324,15 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { } static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { - SMnode *pMnode = pMsg->info.node; - SMqHbReq req = {0}; + SMnode *pMnode = pMsg->info.node; + SMqHbReq req = {0}; if (tDeserializeSMqHbReq(pMsg->pCont, pMsg->contLen, &req) < 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - int64_t consumerId = req.consumerId; + int64_t consumerId = req.consumerId; SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId); if (pConsumer == NULL) { mError("consumer %" PRId64 " not exist", consumerId); @@ -363,17 +363,17 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { } static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { - SMnode *pMnode = pMsg->info.node; - SMqAskEpReq req = {0}; - SMqAskEpRsp rsp = {0}; + SMnode *pMnode = pMsg->info.node; + SMqAskEpReq req = {0}; + SMqAskEpRsp rsp = {0}; if (tDeserializeSMqAskEpReq(pMsg->pCont, pMsg->contLen, &req) < 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - int64_t consumerId = req.consumerId; - int32_t epoch = req.epoch; + int64_t consumerId = req.consumerId; + int32_t epoch = req.epoch; SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId); if (pConsumer == NULL) { @@ -457,6 +457,8 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { if (topicEp.vgs == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; taosRUnLockLatch(&pConsumer->lock); + taosRUnLockLatch(&pSub->lock); + mndReleaseSubscribe(pMnode, pSub); goto FAIL; } diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index e8428ea4700741254cabbc43dc3b4380a50f23ad..3c1d3f09bf297363b4b74c8971c50f3d3a64408a 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -317,9 +317,9 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { bool externalTargetDB = strcmp(pStream->sourceDb, pStream->targetDb) != 0; SDbObj* pDbObj = mndAcquireDb(pMnode, pStream->targetDb); ASSERT(pDbObj != NULL); - sdbRelease(pSdb, pDbObj); bool multiTarget = pDbObj->cfg.numOfVgroups > 1; + sdbRelease(pSdb, pDbObj); if (planTotLevel == 2 || externalTargetDB || multiTarget || pStream->fixedSinkVgId) { /*if (true) {*/ @@ -451,7 +451,6 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { SStreamChildEpInfo* pEpInfo = taosMemoryMalloc(sizeof(SStreamChildEpInfo)); if (pEpInfo == NULL) { - ASSERT(0); terrno = TSDB_CODE_OUT_OF_MEMORY; sdbRelease(pSdb, pVgroup); qDestroyQueryPlan(pPlan); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 6c1c552ccb56cf6057f1dd2cc8e9c8e9a35414cd..61f027039d3d64b60c9a00be39585465c64cf0a9 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -379,7 +379,6 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t version, char* msg, int32_t m STqHandle* pHandle = taosHashGet(pTq->pHandle, offset.subKey, strlen(offset.subKey)); if (pHandle) { if (walRefVer(pHandle->pRef, offset.val.version) < 0) { - ASSERT(0); return -1; } } diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index cde8346487d641a5d80e033555c7e507ce0ef42d..82398d6e340b7d5439c6ffe8de21df194c429083 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -915,33 +915,39 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) { } pDest->info.rows++; if (pInfo->tbnameCalSup.numOfExprs > 0 && i == 0) { - SSDataBlock* pTmpBlock = blockCopyOneRow(pSrc, rowIndex); - SSDataBlock* pResBlock = createDataBlock(); - pResBlock->info.rowSize = TSDB_TABLE_NAME_LEN; - SColumnInfoData data = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, TSDB_TABLE_NAME_LEN, 0); - taosArrayPush(pResBlock->pDataBlock, &data); - blockDataEnsureCapacity(pResBlock, 1); - projectApplyFunctions(pInfo->tbnameCalSup.pExprInfo, pResBlock, pTmpBlock, pInfo->tbnameCalSup.pCtx, 1, NULL); - ASSERT(pResBlock->info.rows == 1); - ASSERT(taosArrayGetSize(pResBlock->pDataBlock) == 1); - SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, 0); - ASSERT(pCol->info.type == TSDB_DATA_TYPE_VARCHAR); - void* pData = colDataGetVarData(pCol, 0); - // TODO check tbname validity - if (pData != (void*)-1) { - memset(pDest->info.parTbName, 0, TSDB_TABLE_NAME_LEN); - int32_t len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN - 1); - memcpy(pDest->info.parTbName, varDataVal(pData), len); - /*pDest->info.parTbName[len + 1] = 0;*/ + void* tbname = NULL; + if (streamStateGetParName(pOperator->pTaskInfo->streamInfo.pState, pParInfo->groupId, &tbname) == 0) { + memcpy(pDest->info.parTbName, tbname, TSDB_TABLE_NAME_LEN); + tdbFree(tbname); } else { - pDest->info.parTbName[0] = 0; - } - if (pParInfo->groupId && pDest->info.parTbName[0]) { - streamStatePutParName(pOperator->pTaskInfo->streamInfo.pState, pParInfo->groupId, pDest->info.parTbName); + SSDataBlock* pTmpBlock = blockCopyOneRow(pSrc, rowIndex); + SSDataBlock* pResBlock = createDataBlock(); + pResBlock->info.rowSize = TSDB_TABLE_NAME_LEN; + SColumnInfoData data = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, TSDB_TABLE_NAME_LEN, 0); + taosArrayPush(pResBlock->pDataBlock, &data); + blockDataEnsureCapacity(pResBlock, 1); + projectApplyFunctions(pInfo->tbnameCalSup.pExprInfo, pResBlock, pTmpBlock, pInfo->tbnameCalSup.pCtx, 1, NULL); + ASSERT(pResBlock->info.rows == 1); + ASSERT(taosArrayGetSize(pResBlock->pDataBlock) == 1); + SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, 0); + ASSERT(pCol->info.type == TSDB_DATA_TYPE_VARCHAR); + void* pData = colDataGetVarData(pCol, 0); + // TODO check tbname validity + if (pData != (void*)-1) { + memset(pDest->info.parTbName, 0, TSDB_TABLE_NAME_LEN); + int32_t len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN - 1); + memcpy(pDest->info.parTbName, varDataVal(pData), len); + /*pDest->info.parTbName[len + 1] = 0;*/ + } else { + pDest->info.parTbName[0] = 0; + } + if (pParInfo->groupId && pDest->info.parTbName[0]) { + streamStatePutParName(pOperator->pTaskInfo->streamInfo.pState, pParInfo->groupId, pDest->info.parTbName); + } + /*printf("\n\n set name %s\n\n", pDest->info.parTbName);*/ + blockDataDestroy(pTmpBlock); + blockDataDestroy(pResBlock); } - /*printf("\n\n set name %s\n\n", pDest->info.parTbName);*/ - blockDataDestroy(pTmpBlock); - blockDataDestroy(pResBlock); } } taosArrayDestroy(pParInfo->rowIds); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 8db450ad50e441059e0fbbf4db7233e89114756e..12015d4fc94d85d0c8bc722b0f00c53affb896f4 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -163,8 +163,8 @@ static SResultRow* getTableGroupOutputBuf(SOperatorInfo* pOperator, uint64_t gro STableScanInfo* pTableScanInfo = pOperator->info; - SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(pTableScanInfo->base.pdInfo.pAggSup->pResultRowHashTable, buf, - GET_RES_WINDOW_KEY_LEN(sizeof(groupId))); + SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(pTableScanInfo->base.pdInfo.pAggSup->pResultRowHashTable, + buf, GET_RES_WINDOW_KEY_LEN(sizeof(groupId))); if (p1 == NULL) { return NULL; @@ -306,7 +306,7 @@ void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableScanInfo, SSDataBlock* pBlock, uint32_t* status) { - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder; pCost->totalBlocks += 1; @@ -1312,6 +1312,7 @@ static int32_t generateDeleteResultBlock(SStreamScanInfo* pInfo, SSDataBlock* pS memcpy(varDataVal(tbname), parTbname, TSDB_TABLE_NAME_LEN); varDataSetLen(tbname, strlen(varDataVal(tbname))); + tdbFree(parTbname); } appendOneRowToStreamSpecialBlock(pDestBlock, srcStartTsCol + i, srcEndTsCol + i, srcUidData + i, &groupId, tbname[0] == 0 ? NULL : tbname); @@ -1928,6 +1929,7 @@ FETCH_NEXT_BLOCK: if (pInfo->validBlockIndex >= totBlockNum) { updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo); doClearBufferedBlocks(pInfo); + qDebug("stream scan return empty, consume block %d", totBlockNum); return NULL; } @@ -2562,7 +2564,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { uint32_t status = 0; loadDataBlock(pOperator, &pTableScanInfo->base, pBlock, &status); -// code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, pBlock, &status); + // code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, pBlock, &status); if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, code); } @@ -2893,7 +2895,6 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN goto _error; } - initResultSizeInfo(&pOperator->resultInfo, 1024); pInfo->pResBlock = createResDataBlock(pDescNode); blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity); diff --git a/source/libs/stream/inc/streamInc.h b/source/libs/stream/inc/streamInc.h index 0fc75c4798c2e24f92893a763077dea373ccade1..5ff49502df60401e1f08ea7aeeaf37f66e717e19 100644 --- a/source/libs/stream/inc/streamInc.h +++ b/source/libs/stream/inc/streamInc.h @@ -47,7 +47,6 @@ int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecov SEpSet* pEpSet); SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* elem); -void streamFreeQitem(SStreamQueueItem* data); #ifdef __cplusplus } diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index ac10c8258744f178bd2010543176f545b40c88b6..7eafcdc93ea20b96140391c0e555ff5caf5bff93 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -45,3 +45,59 @@ void streamQueueClose(SStreamQueue* queue) { taosCloseQueue(queue->queue); taosMemoryFree(queue); } + +bool streamQueueResEmpty(const SStreamQueueRes* pRes) { + // + return true; +} +int64_t streamQueueResSize(const SStreamQueueRes* pRes) { return pRes->size; } +SStreamQueueNode* streamQueueResFront(SStreamQueueRes* pRes) { return pRes->head; } +SStreamQueueNode* streamQueueResPop(SStreamQueueRes* pRes) { + SStreamQueueNode* pRet = pRes->head; + pRes->head = pRes->head->next; + return pRet; +} + +void streamQueueResClear(SStreamQueueRes* pRes) { + while (pRes->head) { + SStreamQueueNode* pNode = pRes->head; + streamFreeQitem(pRes->head->item); + pRes->head = pNode; + } +} + +SStreamQueueRes streamQueueBuildRes(SStreamQueueNode* pTail) { + int64_t size = 0; + SStreamQueueNode* head = NULL; + + while (pTail) { + SStreamQueueNode* pTmp = pTail->next; + pTail->next = head; + head = pTail; + pTail = pTmp; + size++; + } + + return (SStreamQueueRes){.head = head, .size = size}; +} + +bool streamQueueHasTask(const SStreamQueue1* pQueue) { return atomic_load_ptr(pQueue->pHead); } +int32_t streamQueuePush(SStreamQueue1* pQueue, SStreamQueueItem* pItem) { + SStreamQueueNode* pNode = taosMemoryMalloc(sizeof(SStreamQueueNode)); + pNode->item = pItem; + SStreamQueueNode* pHead = atomic_load_ptr(pQueue->pHead); + while (1) { + pNode->next = pHead; + SStreamQueueNode* pOld = atomic_val_compare_exchange_ptr(pQueue->pHead, pHead, pNode); + if (pOld == pHead) { + break; + } + } + return 0; +} + +SStreamQueueRes streamQueueGetRes(SStreamQueue1* pQueue) { + SStreamQueueNode* pNode = atomic_exchange_ptr(pQueue->pHead, NULL); + if (pNode) return streamQueueBuildRes(pNode); + return (SStreamQueueRes){0}; +} diff --git a/tests/script/tsim/show/basic.sim b/tests/script/tsim/show/basic.sim index 274476e17c44fb7562fef7d92134b30ec4289263..cae7a66589bac72c352ed855902f3b23bb798046 100644 --- a/tests/script/tsim/show/basic.sim +++ b/tests/script/tsim/show/basic.sim @@ -195,7 +195,7 @@ sql select * from information_schema.ins_stables if $rows != 1 then return -1 endi -#sql select * from performance_schema.perf_streams +#sql select * frominformation_schema.ins_streams sql select * from information_schema.ins_tables if $rows <= 0 then return -1