diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 89e330b78d53eed4d83fac81c293d07022d51f64..3739897ec0d3f823ee78deab387ae2d8c7e0da3d 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -215,10 +215,10 @@ int32_t tqCheckColModifiable(STQ* pTq, int32_t colId) { if (pIter == NULL) break; STqHandle* pExec = (STqHandle*)pIter; if (pExec->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { - int32_t sz = taosArrayGetSize(pExec->colIdList); + int32_t sz = pExec->execHandle.pSchemaWrapper->nCols; for (int32_t i = 0; i < sz; i++) { - int32_t forbidColId = *(int32_t*)taosArrayGet(pExec->colIdList, i); - if (forbidColId == colId) { + SSchema* pSchema = &pExec->execHandle.pSchemaWrapper->pSchema[i]; + if (pSchema->colId == colId) { taosHashCancelIterate(pTq->handles, pIter); return -1; } @@ -523,7 +523,8 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { .version = ver, }; pHandle->execHandle.execCol.task = - qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols, &pHandle->execHandle.pSchemaWrapper); + qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols, + &pHandle->execHandle.pSchemaWrapper); ASSERT(pHandle->execHandle.execCol.task); void* scanner = NULL; qExtractStreamScanner(pHandle->execHandle.execCol.task, &scanner); diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index d04b7d036fc03d5619eed7705c4d20b8f43404ad..9fc51cb59d013ec6fa99d9df1150c85d0bcc8f32 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -108,6 +108,7 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVa } if (pRsp->blockNum == 0 && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { + tqDebug("vgId: %d, tsdb consume over, switch to wal, ver %ld", TD_VID(pTq->pVnode), pHandle->snapshotVer + 1); tqOffsetResetToLog(pOffset, pHandle->snapshotVer); qStreamPrepareScan(task, pOffset); continue; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index b00dc9dba54db4ed2851cf09101f8fb244aed266..c5aa90e0eb1947eb26cc4f3ac18340577ec5841d 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -120,7 +120,8 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO return code; } -qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols, SSchemaWrapper** pSchemaWrapper) { +qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols, + SSchemaWrapper** pSchemaWrapper) { if (msg == NULL) { // TODO create raw scan return NULL; @@ -146,7 +147,7 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* n SDataBlockDescNode* pDescNode = pPlan->pNode->pOutputDataBlockDesc; *numOfCols = 0; - SNode* pNode; + SNode* pNode; FOREACH(pNode, pDescNode->pSlots) { SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode; if (pSlotDesc->output) { @@ -249,9 +250,11 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo // add to qTaskInfo // todo refactor STableList - for(int32_t i = 0; i < taosArrayGetSize(qa); ++i) { + for (int32_t i = 0; i < taosArrayGetSize(qa); ++i) { uint64_t* uid = taosArrayGet(qa, i); + qDebug("table %ld added to task info", *uid); + STableKeyInfo keyInfo = {.uid = *uid, .groupId = 0}; taosArrayPush(pTaskInfo->tableqinfoList.pTableList, &keyInfo); } diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index e0020a496ea9d7bc0ab268f11c78d2823f690e55..27d8b3f1cc8113b4ec9f7139955ffb34f23f10f0 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -341,11 +341,20 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) { return -1; } } + /*if (pTaskInfo->streamInfo.lastStatus.type != TMQ_OFFSET__SNAPSHOT_DATA ||*/ /*pTaskInfo->streamInfo.lastStatus.uid != uid || pTaskInfo->streamInfo.lastStatus.ts != ts) {*/ STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; int32_t tableSz = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList); - bool found = false; + +#ifndef NDEBUG + + qDebug("switch to next table %ld (cursor %d), %ld rows returned", uid, + pTableScanInfo->currentTable, pInfo->pTableScanOp->resultInfo.totalRows); + pInfo->pTableScanOp->resultInfo.totalRows = 0; +#endif + + bool found = false; for (int32_t i = 0; i < tableSz; i++) { STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, i); if (pTableInfo->uid == uid) { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index c42d477b33ec1c96c5bb03aa9dad726aa6be7c10..07bd368c90622e442382878865823c45aae8bf0e 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -376,9 +376,7 @@ void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow colDataAppendInt64(pColData, 4, &pQueryWindow->ekey); } -void cleanupExecTimeWindowInfo(SColumnInfoData* pColData) { - colDataDestroy(pColData); -} +void cleanupExecTimeWindowInfo(SColumnInfoData* pColData) { colDataDestroy(pColData); } void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, TSKEY* tsCol, @@ -524,8 +522,8 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt // NOTE: the last parameter is the primary timestamp column // todo: refactor this if (fmIsTimelineFunc(pCtx[i].functionId) && (j == pOneExpr->base.numOfParams - 1)) { - pInput->pPTS = pInput->pData[j]; // in case of merge function, this is not always the ts column data. -// ASSERT(pInput->pPTS->info.type == TSDB_DATA_TYPE_TIMESTAMP); + pInput->pPTS = pInput->pData[j]; // in case of merge function, this is not always the ts column data. + // ASSERT(pInput->pPTS->info.type == TSDB_DATA_TYPE_TIMESTAMP); } ASSERT(pInput->pData[j] != NULL); } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) { @@ -633,7 +631,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc ASSERT(pResult->info.capacity > 0); colDataMergeCol(pResColData, startOffset, &pResult->info.capacity, &idata, dest.numOfRows); colDataDestroy(&idata); - + numOfRows = dest.numOfRows; taosArrayDestroy(pBlockList); } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_FUNCTION) { @@ -835,7 +833,7 @@ void setTaskKilled(SExecTaskInfo* pTaskInfo) { pTaskInfo->code = TSDB_CODE_TSC_Q ///////////////////////////////////////////////////////////////////////////////////////////// STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key) { - STimeWindow win = {0}; + STimeWindow win = {0}; win.skey = taosTimeTruncate(key, pInterval, precision); /* @@ -2378,7 +2376,7 @@ static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator) { return NULL; } - while(1) { + while (1) { SSDataBlock* pBlock = doLoadRemoteDataImpl(pOperator); if (pBlock == NULL) { return NULL; @@ -3431,13 +3429,13 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) { blockDataUpdateTsWindow(pBlock, pInfo->primaryTsCol); if (pInfo->curGroupId == 0 || pInfo->curGroupId == pBlock->info.groupId) { - pInfo->curGroupId = pBlock->info.groupId; // the first data block + pInfo->curGroupId = pBlock->info.groupId; // the first data block pInfo->totalInputRows += pBlock->info.rows; taosFillSetStartInfo(pInfo->pFillInfo, pBlock->info.rows, pBlock->info.window.ekey); taosFillSetInputDataBlock(pInfo->pFillInfo, pBlock); - } else if (pInfo->curGroupId != pBlock->info.groupId) { // the new group data block + } else if (pInfo->curGroupId != pBlock->info.groupId) { // the new group data block pInfo->existNewGroupBlock = pBlock; // Fill the previous group data block, before handle the data block of new group. @@ -3511,7 +3509,7 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator) { static void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) { for (int32_t i = 0; i < numOfExprs; ++i) { SExprInfo* pExprInfo = &pExpr[i]; - for(int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) { + for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) { if (pExprInfo->base.pParam[j].type == FUNC_PARAM_TYPE_COLUMN) { taosMemoryFreeClear(pExprInfo->base.pParam[j].pCol); } @@ -3604,7 +3602,7 @@ int32_t initAggInfo(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInf return TSDB_CODE_SUCCESS; } -void initResultSizeInfo(SResultInfo * pResultInfo, int32_t numOfRows) { +void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows) { ASSERT(numOfRows != 0); pResultInfo->capacity = numOfRows; pResultInfo->threshold = numOfRows * 0.75; @@ -3724,7 +3722,6 @@ void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) { taosMemoryFreeClear(param); } - static void freeItem(void* pItem) { void** p = pItem; if (*p != NULL) { @@ -4051,8 +4048,8 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t w = getFirstQualifiedTimeWindow(win.skey, &w, pInterval, TSDB_ORDER_ASC); int32_t order = TSDB_ORDER_ASC; - pInfo->pFillInfo = taosCreateFillInfo(order, w.skey, 0, capacity, numOfCols, pInterval, - fillType, pColInfo, pInfo->primaryTsCol, id); + pInfo->pFillInfo = + taosCreateFillInfo(order, w.skey, 0, capacity, numOfCols, pInterval, fillType, pColInfo, pInfo->primaryTsCol, id); pInfo->win = win; pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES); @@ -4066,7 +4063,8 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t } } -SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, + SExecTaskInfo* pTaskInfo) { SFillOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SFillOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -4149,8 +4147,8 @@ int32_t extractTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, metaReaderInit(&mr, pHandle->meta, 0); int32_t code = metaGetTableEntryByUid(&mr, pScanNode->uid); if (code != TSDB_CODE_SUCCESS) { - qError("failed to get the table meta, uid:0x%"PRIx64", suid:0x%"PRIx64 ", %s", pScanNode->uid, pScanNode->suid, - GET_TASKID(pTaskInfo)); + qError("failed to get the table meta, uid:0x%" PRIx64 ", suid:0x%" PRIx64 ", %s", pScanNode->uid, pScanNode->suid, + GET_TASKID(pTaskInfo)); metaReaderClear(&mr); return terrno; @@ -4180,11 +4178,11 @@ int32_t extractTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, } SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) { - int32_t numOfCols = LIST_LENGTH(pScanNode->pScanCols); + int32_t numOfCols = LIST_LENGTH(pScanNode->pScanCols); SSchemaWrapper* pqSw = taosMemoryCalloc(1, sizeof(SSchemaWrapper)); pqSw->pSchema = taosMemoryCalloc(numOfCols, sizeof(SSchema)); - for(int32_t i = 0; i < numOfCols; ++i) { + for (int32_t i = 0; i < numOfCols; ++i) { STargetNode* pNode = (STargetNode*)nodesListGetNode(pScanNode->pScanCols, i); SColumnNode* pColNode = (SColumnNode*)pNode->pExpr; @@ -4387,21 +4385,23 @@ static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pC pCond->suid = uid; pCond->type = BLOCK_LOAD_OFFSET_ORDER; pCond->startVersion = -1; - pCond->endVersion = -1; + pCond->endVersion = -1; return TSDB_CODE_SUCCESS; } SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, - STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond, const char* pUser) { + STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond, + const char* pUser) { int32_t type = nodeType(pPhyNode); if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) { if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) { STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; - int32_t code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, - pTableScanNode->groupSort, pHandle, pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo)); + int32_t code = + createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, pHandle, + pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo)); if (code) { pTaskInfo->code = code; return NULL; @@ -4420,8 +4420,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) { STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode; - int32_t code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, - pTableScanNode->groupSort, pHandle, pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo)); + int32_t code = + createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, pHandle, + pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo)); if (code) { pTaskInfo->code = code; return NULL; @@ -4433,8 +4434,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo return NULL; } - SOperatorInfo* pOperator = - createTableMergeScanOperatorInfo(pTableScanNode, pTableListInfo, pHandle, pTaskInfo); + SOperatorInfo* pOperator = createTableMergeScanOperatorInfo(pTableScanNode, pTableListInfo, pHandle, pTaskInfo); STableScanInfo* pScanInfo = pOperator->info; pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder; @@ -4445,13 +4445,22 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) { STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; if (pHandle->vnode) { - int32_t code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, - pTableScanNode->groupSort, pHandle, pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo)); + int32_t code = + createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, + pHandle, pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo)); if (code) { pTaskInfo->code = code; return NULL; } + +#ifndef NDEBUG + int32_t sz = taosArrayGetSize(pTableListInfo->pTableList); + for (int32_t i = 0; i < sz; i++) { + STableKeyInfo* pKeyInfo = taosArrayGet(pTableListInfo->pTableList, i); + qDebug("creating stream task: add table %ld", pKeyInfo->uid); + } } +#endif pTaskInfo->schemaInfo.qsw = extractQueriedColumnSchema(&pTableScanNode->scan); SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTagCond, pTaskInfo); @@ -4486,7 +4495,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } SQueryTableDataCond cond = {0}; - int32_t code = initTableblockDistQueryCond(pBlockNode->suid, &cond); + int32_t code = initTableblockDistQueryCond(pBlockNode->suid, &cond); if (code != TSDB_CODE_SUCCESS) { return NULL; } @@ -4499,7 +4508,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } else if (QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN == type) { SLastRowScanPhysiNode* pScanNode = (SLastRowScanPhysiNode*)pPhyNode; - int32_t code = createScanTableListInfo(&pScanNode->scan, pScanNode->pGroupTags, true, pHandle, pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo)); + int32_t code = createScanTableListInfo(&pScanNode->scan, pScanNode->pGroupTags, true, pHandle, pTableListInfo, + pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo)); if (code != TSDB_CODE_SUCCESS) { pTaskInfo->code = code; return NULL; @@ -4961,7 +4971,8 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead (*pTaskInfo)->sql = sql; (*pTaskInfo)->pSubplan = pPlan; - (*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, &(*pTaskInfo)->tableqinfoList, pPlan->pTagCond, pPlan->pTagIndexCond, pPlan->user); + (*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, &(*pTaskInfo)->tableqinfoList, + pPlan->pTagCond, pPlan->pTagIndexCond, pPlan->user); if (NULL == (*pTaskInfo)->pRoot) { code = (*pTaskInfo)->code; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index a8192b49f338b5060b8329b23edfea8678733e7e..52b610228eb7bdd2253a47b97457032d9aa0a1a1 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -159,6 +159,7 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) { if (data == NULL) { data = qItem; streamQueueProcessSuccess(pTask->inputQueue); + if (pTask->execType == TASK_EXEC__NONE) break; /*if (qItem->type == STREAM_INPUT__DATA_BLOCK) {*/ /*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/ /*}*/