diff --git a/include/libs/index/index.h b/include/libs/index/index.h index 05db99db0f199169ce71e4a76d56899361aa403b..890908b0ed750840739614cd6a998a7f905d71bf 100644 --- a/include/libs/index/index.h +++ b/include/libs/index/index.h @@ -196,7 +196,7 @@ typedef enum { SFLT_NOT_INDEX, SFLT_COARSE_INDEX, SFLT_ACCURATE_INDEX } SIdxFltS SIdxFltStatus idxGetFltStatus(SNode* pFilterNode); -int32_t doFilterTag(const SNode* pFilterNode, SArray* result); +int32_t doFilterTag(const SNode* pFilterNode, void* metaHandle, SArray* result); /* * destory index env * diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 8c6c6f0f646485d4154f5acaf848f51fbd2ed20a..af96bc34d18481c08c9410f81e6b625a64f718ce 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -28,13 +28,13 @@ #include "ttime.h" #include "executorimpl.h" +#include "index.h" #include "query.h" #include "tcompare.h" #include "tcompression.h" #include "thash.h" #include "ttypes.h" #include "vnode.h" -#include "index.h" #define IS_MAIN_SCAN(runtime) ((runtime)->scanFlag == MAIN_SCAN) #define IS_REVERSE_SCAN(runtime) ((runtime)->scanFlag == REVERSE_SCAN) @@ -125,7 +125,7 @@ static void destroySysTableScannerOperatorInfo(void* param, int32_t numOfOutput) void doSetOperatorCompleted(SOperatorInfo* pOperator) { pOperator->status = OP_EXEC_DONE; - pOperator->cost.totalCost = (taosGetTimestampUs() - pOperator->pTaskInfo->cost.start * 1000)/1000.0; + pOperator->cost.totalCost = (taosGetTimestampUs() - pOperator->pTaskInfo->cost.start * 1000) / 1000.0; if (pOperator->pTaskInfo != NULL) { setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); } @@ -2718,7 +2718,7 @@ static void* setAllSourcesCompleted(SOperatorInfo* pOperator, int64_t startTs) { SExchangeInfo* pExchangeInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - int64_t el = taosGetTimestampUs() - startTs; + int64_t el = taosGetTimestampUs() - startTs; SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo; pLoadInfo->totalElapsed += el; @@ -3024,13 +3024,13 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, const SNodeList* p tsem_init(&pInfo->ready, 0, 0); - pOperator->name = "ExchangeOperator"; + pOperator->name = "ExchangeOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->numOfExprs = pBlock->info.numOfCols; - pOperator->pTaskInfo = pTaskInfo; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->numOfExprs = pBlock->info.numOfCols; + pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL, destroyExchangeOperatorInfo, NULL, NULL, NULL); @@ -3466,7 +3466,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, 0); OPTR_SET_OPENED(pOperator); - pOperator->cost.openCost = (taosGetTimestampUs() - st)/1000.0; + pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; return TSDB_CODE_SUCCESS; } @@ -3491,10 +3491,10 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) { doSetOperatorCompleted(pOperator); } - size_t rows = blockDataGetNumOfRows(pInfo->pRes);//pInfo->pRes : NULL; + size_t rows = blockDataGetNumOfRows(pInfo->pRes); // pInfo->pRes : NULL; pOperator->resultInfo.totalRows += rows; - return (rows == 0)? NULL:pInfo->pRes; + return (rows == 0) ? NULL : pInfo->pRes; } void aggEncodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasicInfo* pInfo, char** result, @@ -3779,10 +3779,10 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { pOperator->resultInfo.totalRows += rows; if (pOperator->cost.openCost == 0) { - pOperator->cost.openCost = (taosGetTimestampUs() - st)/ 1000.0; + pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; } - return (rows > 0)? pInfo->pRes:NULL; + return (rows > 0) ? pInfo->pRes : NULL; } static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo* pInfo, SResultInfo* pResultInfo, bool* newgroup, @@ -4438,7 +4438,8 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT } static tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, - STableGroupInfo* pTableGroupInfo, uint64_t queryId, uint64_t taskId, SNode* pTagNode); + STableGroupInfo* pTableGroupInfo, uint64_t queryId, uint64_t taskId, + SNode* pTagNode); static int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t tableUid, STableGroupInfo* pGroupInfo, uint64_t queryId, uint64_t taskId, SNode* pTagCond); @@ -4471,14 +4472,16 @@ void extractTableSchemaVersion(SReadHandle* pHandle, uint64_t uid, SExecTaskInfo } SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, - uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo, SNode* pTagCond) { + uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo, + SNode* pTagCond) { int32_t type = nodeType(pPhyNode); if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) { if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) { STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; - tsdbReaderT pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId, pTagCond); + tsdbReaderT pDataReader = + doCreateDataReader(pTableScanNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId, pTagCond); if (pDataReader == NULL && terrno != 0) { return NULL; } @@ -4504,7 +4507,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo if (pHandle->vnode) { pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId, pTagCond); } else { - doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId, pTagCond); + doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId, + pTagCond); } if (pDataReader == NULL && terrno != 0) { @@ -4669,8 +4673,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); int32_t tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId; - pOptr = - createStreamSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, tsSlotId, &as, pTaskInfo); + pOptr = createStreamSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, tsSlotId, &as, + pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) { SPartitionPhysiNode* pPartNode = (SPartitionPhysiNode*)pPhyNode; @@ -4898,9 +4902,9 @@ int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t tableUi if (tableType == TSDB_SUPER_TABLE) { SArray* res = taosArrayInit(8, sizeof(STableKeyInfo)); - if(pTagCond){ - code = doFilterTag(pTagCond, res); - }else{ + if (pTagCond) { + code = doFilterTag(pTagCond, metaHandle, res); + } else { code = tsdbGetAllTableList(metaHandle, tableUid, res); } if (code != TSDB_CODE_SUCCESS) { @@ -4938,8 +4942,8 @@ SArray* extractTableIdList(const STableGroupInfo* pTableGroupInfo) { tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, STableGroupInfo* pTableGroupInfo, uint64_t queryId, uint64_t taskId, SNode* pTagNode) { uint64_t uid = pTableScanNode->scan.uid; - int32_t code = - doCreateTableGroup(pHandle->meta, pTableScanNode->scan.tableType, uid, pTableGroupInfo, queryId, taskId, pTagNode); + int32_t code = doCreateTableGroup(pHandle->meta, pTableScanNode->scan.tableType, uid, pTableGroupInfo, queryId, + taskId, pTagNode); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -4974,8 +4978,8 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead goto _complete; } - (*pTaskInfo)->pRoot = - createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, &(*pTaskInfo)->tableqinfoGroupInfo, pPlan->pTagCond); + (*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, + &(*pTaskInfo)->tableqinfoGroupInfo, pPlan->pTagCond); if (NULL == (*pTaskInfo)->pRoot) { code = terrno; goto _complete; @@ -5176,8 +5180,7 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo return TSDB_CODE_SUCCESS; } -int32_t initCatchSupporter(SCatchSupporter* pCatchSup, size_t rowSize, const char* pKey, - const char* pDir) { +int32_t initCatchSupporter(SCatchSupporter* pCatchSup, size_t rowSize, const char* pKey, const char* pDir) { pCatchSup->keySize = sizeof(int64_t) + sizeof(int64_t) + sizeof(TSKEY); pCatchSup->pKeyBuf = taosMemoryCalloc(1, pCatchSup->keySize); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); diff --git a/source/libs/index/src/indexFilter.c b/source/libs/index/src/indexFilter.c index 0273867ccf040f3d3344066270ef3b8aa6a3bae2..15323cc80797d756580993bdfb926b512efbef07 100644 --- a/source/libs/index/src/indexFilter.c +++ b/source/libs/index/src/indexFilter.c @@ -37,6 +37,8 @@ typedef struct SIFParam { int64_t suid; // add later char dbName[TSDB_DB_NAME_LEN]; char colName[TSDB_COL_NAME_LEN]; + + void *metaHandle; } SIFParam; typedef struct SIFCtx { @@ -561,7 +563,7 @@ static int32_t sifGetFltHint(SNode *pNode, SIdxFltStatus *status) { SIF_RET(code); } -int32_t doFilterTag(const SNode *pFilterNode, SArray *result) { +int32_t doFilterTag(const SNode *pFilterNode, void *metaHandle, SArray *result) { if (pFilterNode == NULL) { return TSDB_CODE_SUCCESS; } @@ -570,7 +572,7 @@ int32_t doFilterTag(const SNode *pFilterNode, SArray *result) { // todo move to the initialization function // SIF_ERR_RET(filterInitFromNode((SNode *)pFilterNode, &filter, 0)); - SIFParam param = {0}; + SIFParam param = {.metHandle = metaHandle}; SIF_ERR_RET(sifCalculate((SNode *)pFilterNode, ¶m)); taosArrayAddAll(result, param.result);