diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 07fbf34793408bc11185378dc4333bdd77db28a3..a8e30bfd525638be4fb8d07fe865c0ef67b132d2 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -1,5 +1,4 @@ -#include "../../libs/scheduler/inc/schedulerInt.h" #include "clientInt.h" #include "clientLog.h" #include "parser.h" diff --git a/source/dnode/vnode/inc/tsdb.h b/source/dnode/vnode/inc/tsdb.h index 5f5acc1b05cf4ff5c40d234b0e4cee681ad28ed1..af81cd20467b63c16085b0ad102f0124dca88760 100644 --- a/source/dnode/vnode/inc/tsdb.h +++ b/source/dnode/vnode/inc/tsdb.h @@ -220,7 +220,7 @@ int32_t tsdbGetTableGroupFromIdList(STsdb *tsdb, SArray *pTableIdList, STableGro * clean up the query handle * @param queryHandle */ -void tsdbCleanupQueryHandle(tsdbReadHandleT queryHandle); +void tsdbCleanupReadHandle(tsdbReadHandleT queryHandle); #ifdef __cplusplus } diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 6fd7a99f544da5329752acf043c6b2440ef2cbc6..e5ee7bcebe4f4c98ad35b33014d6da13540d39bf 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -208,8 +208,8 @@ static FORCE_INLINE void tqReadHandleSetColIdList(STqReadHandle* pReadHandle, SA pReadHandle->pColIdList = pColIdList; } -static FORCE_INLINE void tqReadHandleSetTbUid(STqReadHandle* pHandle, uint64_t tbUid) { - pHandle->tbUid = tbUid; +static FORCE_INLINE void tqReadHandleSetTbUid(STqReadHandle* pHandle, const SArray* pTableIdList) { +// pHandle->tbUid = pTableIdList; } static FORCE_INLINE int tqReadHandleSetTbUidList(STqReadHandle* pHandle, SArray* tbUidList) { diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 1bca7aea1028f115c1f4752e5387c16e719cf417..964848f0184603bdfe5bcc90f801670d8ab0eed7 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -427,7 +427,7 @@ static STsdbReadHandle* tsdbQueryTablesImpl(STsdb* tsdb, STsdbQueryCond* pCond, return (tsdbReadHandleT)pReadHandle; _end: - tsdbCleanupQueryHandle(pReadHandle); + tsdbCleanupReadHandle(pReadHandle); terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return NULL; } @@ -445,7 +445,7 @@ tsdbReadHandleT* tsdbQueryTables(STsdb* tsdb, STsdbQueryCond* pCond, STableGroup // todo apply the lastkey of table check to avoid to load header file pTsdbReadHandle->pTableCheckInfo = createCheckInfoFromTableGroup(pTsdbReadHandle, groupList); if (pTsdbReadHandle->pTableCheckInfo == NULL) { -// tsdbCleanupQueryHandle(pTsdbReadHandle); +// tsdbCleanupReadHandle(pTsdbReadHandle); terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return NULL; } @@ -525,7 +525,7 @@ void tsdbResetQueryHandleForNewTable(tsdbReadHandleT queryHandle, STsdbQueryCond pTsdbReadHandle->pTableCheckInfo = NULL;//createCheckInfoFromTableGroup(pTsdbReadHandle, groupList, pMeta, &pTable); if (pTsdbReadHandle->pTableCheckInfo == NULL) { -// tsdbCleanupQueryHandle(pTsdbReadHandle); +// tsdbCleanupReadHandle(pTsdbReadHandle); terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; } @@ -3051,7 +3051,7 @@ bool tsdbNextDataBlock(tsdbReadHandleT pHandle) { // } // //out_of_memory: -// tsdbCleanupQueryHandle(pSecQueryHandle); +// tsdbCleanupReadHandle(pSecQueryHandle); // return terrno; //} @@ -3722,26 +3722,19 @@ int32_t tsdbQuerySTableByTagCond(STsdb* tsdb, uint64_t uid, TSKEY skey, const ch return terrno; } -#if 0 int32_t tsdbGetOneTableGroup(STsdb* tsdb, uint64_t uid, TSKEY startKey, STableGroupInfo* pGroupInfo) { - if (tsdbRLockRepoMeta(tsdb) < 0) goto _error; - - STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid); - if (pTable == NULL) { + STbCfg* pTbCfg = metaGetTbInfoByUid(tsdb->pMeta, uid); + if (pTbCfg == NULL) { terrno = TSDB_CODE_TDB_INVALID_TABLE_ID; - tsdbUnlockRepoMeta(tsdb); goto _error; } - assert(pTable->type == TSDB_CHILD_TABLE || pTable->type == TSDB_NORMAL_TABLE || pTable->type == TSDB_STREAM_TABLE); - if (tsdbUnlockRepoMeta(tsdb) < 0) goto _error; - pGroupInfo->numOfTables = 1; pGroupInfo->pGroupList = taosArrayInit(1, POINTER_BYTES); SArray* group = taosArrayInit(1, sizeof(STableKeyInfo)); - STableKeyInfo info = {.pTable = pTable, .lastKey = startKey}; + STableKeyInfo info = {.lastKey = startKey, .uid = uid}; taosArrayPush(group, &info); taosArrayPush(pGroupInfo->pGroupList, &group); @@ -3751,6 +3744,7 @@ int32_t tsdbGetOneTableGroup(STsdb* tsdb, uint64_t uid, TSKEY startKey, STableGr return terrno; } +#if 0 int32_t tsdbGetTableGroupFromIdList(STsdb* tsdb, SArray* pTableIdList, STableGroupInfo* pGroupInfo) { if (tsdbRLockRepoMeta(tsdb) < 0) { return terrno; @@ -3826,7 +3820,7 @@ static void* destroyTableCheckInfo(SArray* pTableCheckInfo) { } -void tsdbCleanupQueryHandle(tsdbReadHandleT queryHandle) { +void tsdbCleanupReadHandle(tsdbReadHandleT queryHandle) { STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)queryHandle; if (pTsdbReadHandle == NULL) { return; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 6efd7ef87e0ef1d8d67528c8a451b0d142b3478e..a45b9d1fb41c98fde9ea29de0311e3a270306c53 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -2371,7 +2371,7 @@ _clean: static void doFreeQueryHandle(STaskRuntimeEnv* pRuntimeEnv) { STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; -// tsdbCleanupQueryHandle(pRuntimeEnv->pTsdbReadHandle); +// tsdbCleanupReadHandle(pRuntimeEnv->pTsdbReadHandle); pRuntimeEnv->pTsdbReadHandle = NULL; // SMemRef* pMemRef = &pQueryAttr->memRef; @@ -5454,7 +5454,7 @@ SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbReadHandle, STaskRunt return pOperator; } -SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SArray* pExprInfo, uint64_t uid, SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SArray* pExprInfo, SArray* pTableIdList, SExecTaskInfo* pTaskInfo) { SStreamBlockScanInfo* pInfo = calloc(1, sizeof(SStreamBlockScanInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -5477,7 +5477,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SArray* pExp // set the extract column id to streamHandle tqReadHandleSetColIdList((STqReadHandle* )streamReadHandle, pColList); - tqReadHandleSetTbUid(streamReadHandle, uid); + tqReadHandleSetTbUid(streamReadHandle, pTableIdList); pInfo->readerHandle = streamReadHandle; @@ -7750,20 +7750,21 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId) { } static tsdbReadHandleT doCreateDataReadHandle(STableScanPhyNode* pTableScanNode, void* readerHandle, uint64_t queryId, uint64_t taskId); +static int32_t doCreateTableGroup(void* readerHandle, int32_t tableType, uint64_t tableUid, STableGroupInfo* pGroupInfo, uint64_t queryId, uint64_t taskId); SOperatorInfo* doCreateOperatorTreeNode(SPhyNode* pPhyNode, SExecTaskInfo* pTaskInfo, void* readerHandle, uint64_t queryId, uint64_t taskId) { if (pPhyNode->pChildren == NULL || taosArrayGetSize(pPhyNode->pChildren) == 0) { if (pPhyNode->info.type == OP_TableScan) { - SScanPhyNode* pScanPhyNode = (SScanPhyNode*)pPhyNode; - size_t numOfCols = taosArrayGetSize(pPhyNode->pTargets); + SScanPhyNode* pScanPhyNode = (SScanPhyNode*)pPhyNode; + size_t numOfCols = taosArrayGetSize(pPhyNode->pTargets); tsdbReadHandleT tReaderHandle = doCreateDataReadHandle((STableScanPhyNode*) pPhyNode, readerHandle, (uint64_t) queryId, taskId); return createTableScanOperatorInfo(tReaderHandle, pScanPhyNode->order, numOfCols, pScanPhyNode->count, pTaskInfo); } else if (pPhyNode->info.type == OP_DataBlocksOptScan) { - SScanPhyNode* pScanPhyNode = (SScanPhyNode*)pPhyNode; - size_t numOfCols = taosArrayGetSize(pPhyNode->pTargets); + SScanPhyNode* pScanPhyNode = (SScanPhyNode*)pPhyNode; + size_t numOfCols = taosArrayGetSize(pPhyNode->pTargets); tsdbReadHandleT tReaderHandle = doCreateDataReadHandle((STableScanPhyNode*) pPhyNode, readerHandle, (uint64_t) queryId, taskId); @@ -7772,8 +7773,13 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhyNode* pPhyNode, SExecTaskInfo* pTask SExchangePhyNode* pEx = (SExchangePhyNode*) pPhyNode; return createExchangeOperatorInfo(pEx->pSrcEndPoints, pEx->node.pTargets, pTaskInfo); } else if (pPhyNode->info.type == OP_StreamScan) { - SScanPhyNode* pScanPhyNode = (SScanPhyNode*)pPhyNode; // simple child table. - return createStreamScanOperatorInfo(readerHandle, pPhyNode->pTargets, pScanPhyNode->uid, pTaskInfo); + SScanPhyNode* pScanPhyNode = (SScanPhyNode*)pPhyNode; // simple child table. + STableGroupInfo groupInfo = {0}; + int32_t code = doCreateTableGroup(readerHandle, pScanPhyNode->tableType, pScanPhyNode->uid, &groupInfo, queryId, taskId); + + SArray* pa = taosArrayGetP(groupInfo.pGroupList, 0); + ASSERT(taosArrayGetSize(groupInfo.pGroupList) == 1); + return createStreamScanOperatorInfo(readerHandle, pPhyNode->pTargets, pa, pTaskInfo); } } @@ -7811,29 +7817,24 @@ static tsdbReadHandleT createDataReadHandle(STableScanPhyNode* pTableScanNode, S return tsdbQueryTables(readerHandle, &cond, pGroupInfo, queryId, taskId); } -static tsdbReadHandleT doCreateDataReadHandle(STableScanPhyNode* pTableScanNode, void* readerHandle, uint64_t queryId, uint64_t taskId) { - int32_t code = 0; - STableGroupInfo groupInfo = {0}; - - uint64_t uid = pTableScanNode->scan.uid; - STimeWindow window = pTableScanNode->window; - int32_t tableType = pTableScanNode->scan.tableType; - +static int32_t doCreateTableGroup(void* readerHandle, int32_t tableType, uint64_t tableUid, STableGroupInfo* pGroupInfo, uint64_t queryId, uint64_t taskId) { + int32_t code = 0; if (tableType == TSDB_SUPER_TABLE) { - code = - tsdbQuerySTableByTagCond(readerHandle, uid, window.skey, NULL, 0, 0, NULL, &groupInfo, NULL, 0, queryId, taskId); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } + code = tsdbQuerySTableByTagCond(readerHandle, tableUid, 0, NULL, 0, 0, NULL, pGroupInfo, NULL, 0, queryId, taskId); } else { // Create one table group. - groupInfo.numOfTables = 1; - groupInfo.pGroupList = taosArrayInit(1, POINTER_BYTES); + code = tsdbGetOneTableGroup(readerHandle, tableUid, 0, pGroupInfo); + } - SArray* pa = taosArrayInit(1, sizeof(STableKeyInfo)); + return code; +} - STableKeyInfo info = {.lastKey = 0, .uid = uid}; - taosArrayPush(pa, &info); - taosArrayPush(groupInfo.pGroupList, &pa); +static tsdbReadHandleT doCreateDataReadHandle(STableScanPhyNode* pTableScanNode, void* readerHandle, uint64_t queryId, uint64_t taskId) { + STableGroupInfo groupInfo = {0}; + + uint64_t uid = pTableScanNode->scan.uid; + int32_t code = doCreateTableGroup(readerHandle, pTableScanNode->scan.tableType, uid, &groupInfo, queryId, taskId); + if (code != TSDB_CODE_SUCCESS) { + goto _error; } if (groupInfo.numOfTables == 0) {