提交 c47371c8 编写于 作者: dengyihao's avatar dengyihao

change filter interface

上级 dfbd780b
...@@ -196,7 +196,7 @@ typedef enum { SFLT_NOT_INDEX, SFLT_COARSE_INDEX, SFLT_ACCURATE_INDEX } SIdxFltS ...@@ -196,7 +196,7 @@ typedef enum { SFLT_NOT_INDEX, SFLT_COARSE_INDEX, SFLT_ACCURATE_INDEX } SIdxFltS
SIdxFltStatus idxGetFltStatus(SNode* pFilterNode); SIdxFltStatus idxGetFltStatus(SNode* pFilterNode);
int32_t doFilterTag(const SNode* pFilterNode, SArray* result); int32_t doFilterTag(const SNode* pFilterNode, void* metaHandle, SArray* result);
/* /*
* destory index env * destory index env
* *
......
...@@ -28,13 +28,13 @@ ...@@ -28,13 +28,13 @@
#include "ttime.h" #include "ttime.h"
#include "executorimpl.h" #include "executorimpl.h"
#include "index.h"
#include "query.h" #include "query.h"
#include "tcompare.h" #include "tcompare.h"
#include "tcompression.h" #include "tcompression.h"
#include "thash.h" #include "thash.h"
#include "ttypes.h" #include "ttypes.h"
#include "vnode.h" #include "vnode.h"
#include "index.h"
#define IS_MAIN_SCAN(runtime) ((runtime)->scanFlag == MAIN_SCAN) #define IS_MAIN_SCAN(runtime) ((runtime)->scanFlag == MAIN_SCAN)
#define IS_REVERSE_SCAN(runtime) ((runtime)->scanFlag == REVERSE_SCAN) #define IS_REVERSE_SCAN(runtime) ((runtime)->scanFlag == REVERSE_SCAN)
...@@ -125,7 +125,7 @@ static void destroySysTableScannerOperatorInfo(void* param, int32_t numOfOutput) ...@@ -125,7 +125,7 @@ static void destroySysTableScannerOperatorInfo(void* param, int32_t numOfOutput)
void doSetOperatorCompleted(SOperatorInfo* pOperator) { void doSetOperatorCompleted(SOperatorInfo* pOperator) {
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
pOperator->cost.totalCost = (taosGetTimestampUs() - pOperator->pTaskInfo->cost.start * 1000)/1000.0; pOperator->cost.totalCost = (taosGetTimestampUs() - pOperator->pTaskInfo->cost.start * 1000) / 1000.0;
if (pOperator->pTaskInfo != NULL) { if (pOperator->pTaskInfo != NULL) {
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
} }
...@@ -2718,7 +2718,7 @@ static void* setAllSourcesCompleted(SOperatorInfo* pOperator, int64_t startTs) { ...@@ -2718,7 +2718,7 @@ static void* setAllSourcesCompleted(SOperatorInfo* pOperator, int64_t startTs) {
SExchangeInfo* pExchangeInfo = pOperator->info; SExchangeInfo* pExchangeInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
int64_t el = taosGetTimestampUs() - startTs; int64_t el = taosGetTimestampUs() - startTs;
SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo; SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
pLoadInfo->totalElapsed += el; pLoadInfo->totalElapsed += el;
...@@ -3024,13 +3024,13 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, const SNodeList* p ...@@ -3024,13 +3024,13 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, const SNodeList* p
tsem_init(&pInfo->ready, 0, 0); tsem_init(&pInfo->ready, 0, 0);
pOperator->name = "ExchangeOperator"; pOperator->name = "ExchangeOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
pOperator->blocking = false; pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->numOfExprs = pBlock->info.numOfCols; pOperator->numOfExprs = pBlock->info.numOfCols;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL, pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL,
destroyExchangeOperatorInfo, NULL, NULL, NULL); destroyExchangeOperatorInfo, NULL, NULL, NULL);
...@@ -3466,7 +3466,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { ...@@ -3466,7 +3466,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, 0); initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, 0);
OPTR_SET_OPENED(pOperator); OPTR_SET_OPENED(pOperator);
pOperator->cost.openCost = (taosGetTimestampUs() - st)/1000.0; pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -3491,10 +3491,10 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) { ...@@ -3491,10 +3491,10 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
} }
size_t rows = blockDataGetNumOfRows(pInfo->pRes);//pInfo->pRes : NULL; size_t rows = blockDataGetNumOfRows(pInfo->pRes); // pInfo->pRes : NULL;
pOperator->resultInfo.totalRows += rows; pOperator->resultInfo.totalRows += rows;
return (rows == 0)? NULL:pInfo->pRes; return (rows == 0) ? NULL : pInfo->pRes;
} }
void aggEncodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasicInfo* pInfo, char** result, void aggEncodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasicInfo* pInfo, char** result,
...@@ -3779,10 +3779,10 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { ...@@ -3779,10 +3779,10 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
pOperator->resultInfo.totalRows += rows; pOperator->resultInfo.totalRows += rows;
if (pOperator->cost.openCost == 0) { if (pOperator->cost.openCost == 0) {
pOperator->cost.openCost = (taosGetTimestampUs() - st)/ 1000.0; pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
} }
return (rows > 0)? pInfo->pRes:NULL; return (rows > 0) ? pInfo->pRes : NULL;
} }
static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo* pInfo, SResultInfo* pResultInfo, bool* newgroup, static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo* pInfo, SResultInfo* pResultInfo, bool* newgroup,
...@@ -4438,7 +4438,8 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT ...@@ -4438,7 +4438,8 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT
} }
static tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, static tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
STableGroupInfo* pTableGroupInfo, uint64_t queryId, uint64_t taskId, SNode* pTagNode); STableGroupInfo* pTableGroupInfo, uint64_t queryId, uint64_t taskId,
SNode* pTagNode);
static int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t tableUid, STableGroupInfo* pGroupInfo, static int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t tableUid, STableGroupInfo* pGroupInfo,
uint64_t queryId, uint64_t taskId, SNode* pTagCond); uint64_t queryId, uint64_t taskId, SNode* pTagCond);
...@@ -4471,14 +4472,16 @@ void extractTableSchemaVersion(SReadHandle* pHandle, uint64_t uid, SExecTaskInfo ...@@ -4471,14 +4472,16 @@ void extractTableSchemaVersion(SReadHandle* pHandle, uint64_t uid, SExecTaskInfo
} }
SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle,
uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo, SNode* pTagCond) { uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo,
SNode* pTagCond) {
int32_t type = nodeType(pPhyNode); int32_t type = nodeType(pPhyNode);
if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) { if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) {
if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) { if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) {
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
tsdbReaderT pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId, pTagCond); tsdbReaderT pDataReader =
doCreateDataReader(pTableScanNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId, pTagCond);
if (pDataReader == NULL && terrno != 0) { if (pDataReader == NULL && terrno != 0) {
return NULL; return NULL;
} }
...@@ -4504,7 +4507,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4504,7 +4507,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
if (pHandle->vnode) { if (pHandle->vnode) {
pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId, pTagCond); pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId, pTagCond);
} else { } else {
doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId, pTagCond); doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId,
pTagCond);
} }
if (pDataReader == NULL && terrno != 0) { if (pDataReader == NULL && terrno != 0) {
...@@ -4669,8 +4673,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4669,8 +4673,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
int32_t tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId; int32_t tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
pOptr = pOptr = createStreamSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, tsSlotId, &as,
createStreamSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, tsSlotId, &as, pTaskInfo); pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) {
SPartitionPhysiNode* pPartNode = (SPartitionPhysiNode*)pPhyNode; SPartitionPhysiNode* pPartNode = (SPartitionPhysiNode*)pPhyNode;
...@@ -4898,9 +4902,9 @@ int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t tableUi ...@@ -4898,9 +4902,9 @@ int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t tableUi
if (tableType == TSDB_SUPER_TABLE) { if (tableType == TSDB_SUPER_TABLE) {
SArray* res = taosArrayInit(8, sizeof(STableKeyInfo)); SArray* res = taosArrayInit(8, sizeof(STableKeyInfo));
if(pTagCond){ if (pTagCond) {
code = doFilterTag(pTagCond, res); code = doFilterTag(pTagCond, metaHandle, res);
}else{ } else {
code = tsdbGetAllTableList(metaHandle, tableUid, res); code = tsdbGetAllTableList(metaHandle, tableUid, res);
} }
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -4938,8 +4942,8 @@ SArray* extractTableIdList(const STableGroupInfo* pTableGroupInfo) { ...@@ -4938,8 +4942,8 @@ SArray* extractTableIdList(const STableGroupInfo* pTableGroupInfo) {
tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
STableGroupInfo* pTableGroupInfo, uint64_t queryId, uint64_t taskId, SNode* pTagNode) { STableGroupInfo* pTableGroupInfo, uint64_t queryId, uint64_t taskId, SNode* pTagNode) {
uint64_t uid = pTableScanNode->scan.uid; uint64_t uid = pTableScanNode->scan.uid;
int32_t code = int32_t code = doCreateTableGroup(pHandle->meta, pTableScanNode->scan.tableType, uid, pTableGroupInfo, queryId,
doCreateTableGroup(pHandle->meta, pTableScanNode->scan.tableType, uid, pTableGroupInfo, queryId, taskId, pTagNode); taskId, pTagNode);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -4974,8 +4978,8 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead ...@@ -4974,8 +4978,8 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
goto _complete; goto _complete;
} }
(*pTaskInfo)->pRoot = (*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId,
createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, &(*pTaskInfo)->tableqinfoGroupInfo, pPlan->pTagCond); &(*pTaskInfo)->tableqinfoGroupInfo, pPlan->pTagCond);
if (NULL == (*pTaskInfo)->pRoot) { if (NULL == (*pTaskInfo)->pRoot) {
code = terrno; code = terrno;
goto _complete; goto _complete;
...@@ -5176,8 +5180,7 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo ...@@ -5176,8 +5180,7 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t initCatchSupporter(SCatchSupporter* pCatchSup, size_t rowSize, const char* pKey, int32_t initCatchSupporter(SCatchSupporter* pCatchSup, size_t rowSize, const char* pKey, const char* pDir) {
const char* pDir) {
pCatchSup->keySize = sizeof(int64_t) + sizeof(int64_t) + sizeof(TSKEY); pCatchSup->keySize = sizeof(int64_t) + sizeof(int64_t) + sizeof(TSKEY);
pCatchSup->pKeyBuf = taosMemoryCalloc(1, pCatchSup->keySize); pCatchSup->pKeyBuf = taosMemoryCalloc(1, pCatchSup->keySize);
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
......
...@@ -37,6 +37,8 @@ typedef struct SIFParam { ...@@ -37,6 +37,8 @@ typedef struct SIFParam {
int64_t suid; // add later int64_t suid; // add later
char dbName[TSDB_DB_NAME_LEN]; char dbName[TSDB_DB_NAME_LEN];
char colName[TSDB_COL_NAME_LEN]; char colName[TSDB_COL_NAME_LEN];
void *metaHandle;
} SIFParam; } SIFParam;
typedef struct SIFCtx { typedef struct SIFCtx {
...@@ -561,7 +563,7 @@ static int32_t sifGetFltHint(SNode *pNode, SIdxFltStatus *status) { ...@@ -561,7 +563,7 @@ static int32_t sifGetFltHint(SNode *pNode, SIdxFltStatus *status) {
SIF_RET(code); SIF_RET(code);
} }
int32_t doFilterTag(const SNode *pFilterNode, SArray *result) { int32_t doFilterTag(const SNode *pFilterNode, void *metaHandle, SArray *result) {
if (pFilterNode == NULL) { if (pFilterNode == NULL) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -570,7 +572,7 @@ int32_t doFilterTag(const SNode *pFilterNode, SArray *result) { ...@@ -570,7 +572,7 @@ int32_t doFilterTag(const SNode *pFilterNode, SArray *result) {
// todo move to the initialization function // todo move to the initialization function
// SIF_ERR_RET(filterInitFromNode((SNode *)pFilterNode, &filter, 0)); // SIF_ERR_RET(filterInitFromNode((SNode *)pFilterNode, &filter, 0));
SIFParam param = {0}; SIFParam param = {.metHandle = metaHandle};
SIF_ERR_RET(sifCalculate((SNode *)pFilterNode, &param)); SIF_ERR_RET(sifCalculate((SNode *)pFilterNode, &param));
taosArrayAddAll(result, param.result); taosArrayAddAll(result, param.result);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册