未验证 提交 3eb6c8c3 编写于 作者: wmmhello's avatar wmmhello 提交者: GitHub

Merge pull request #13025 from taosdata/feature/TD-15956

feat:add logic for tag filter
......@@ -53,10 +53,9 @@ typedef enum EStreamType {
} EStreamType;
typedef struct {
uint32_t numOfTables;
SArray* pGroupList;
SArray* pTableList;
SHashObj* map; // speedup acquire the tableQueryInfo by table uid
} STableGroupInfo;
} STableListInfo;
typedef struct SColumnDataAgg {
int16_t colId;
......
......@@ -99,24 +99,19 @@ typedef void *tsdbReaderT;
#define BLOCK_LOAD_TABLE_SEQ_ORDER 2
#define BLOCK_LOAD_TABLE_RR_ORDER 3
tsdbReaderT *tsdbQueryTables(SVnode *pVnode, SQueryTableDataCond *pCond, STableGroupInfo *tableInfoGroup, uint64_t qId,
tsdbReaderT *tsdbQueryTables(SVnode *pVnode, SQueryTableDataCond *pCond, STableListInfo *tableInfoGroup, uint64_t qId,
uint64_t taskId);
tsdbReaderT tsdbQueryCacheLast(SVnode *pVnode, SQueryTableDataCond *pCond, STableGroupInfo *groupList, uint64_t qId,
tsdbReaderT tsdbQueryCacheLast(SVnode *pVnode, SQueryTableDataCond *pCond, STableListInfo *groupList, uint64_t qId,
void *pMemRef);
int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT *pReader, STableBlockDistInfo *pTableBlockInfo);
bool isTsdbCacheLastRow(tsdbReaderT *pReader);
int32_t tsdbQuerySTableByTagCond(void *pMeta, uint64_t uid, TSKEY skey, const char *pTagCond, size_t len,
int16_t tagNameRelType, const char *tbnameCond, STableGroupInfo *pGroupInfo,
SColIndex *pColIndex, int32_t numOfCols, uint64_t reqId, uint64_t taskId);
int32_t tsdbGetAllTableList(SMeta* pMeta, uint64_t uid, SArray* list);
int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT *pHandle);
bool tsdbNextDataBlock(tsdbReaderT pTsdbReadHandle);
void tsdbRetrieveDataBlockInfo(tsdbReaderT *pTsdbReadHandle, SDataBlockInfo *pBlockInfo);
int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT *pTsdbReadHandle, SColumnDataAgg ***pBlockStatis, bool *allHave);
SArray *tsdbRetrieveDataBlock(tsdbReaderT *pTsdbReadHandle, SArray *pColumnIdList);
void tsdbResetReadHandle(tsdbReaderT queryHandle, SQueryTableDataCond *pCond);
void tsdbDestroyTableGroup(STableGroupInfo *pGroupList);
int32_t tsdbGetOneTableGroup(void *pMeta, uint64_t uid, TSKEY startKey, STableGroupInfo *pGroupInfo);
int32_t tsdbGetTableGroupFromIdList(SVnode *pVnode, SArray *pTableIdList, STableGroupInfo *pGroupInfo);
void tsdbCleanupReadHandle(tsdbReaderT queryHandle);
// tq
......
......@@ -114,11 +114,10 @@ int tsdbCommit(STsdb* pTsdb);
int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq* pMsg);
int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp);
int tsdbInsertTableData(STsdb* pTsdb, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkRsp* pRsp);
tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId,
tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
uint64_t taskId);
tsdbReaderT tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId,
tsdbReaderT tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
void* pMemRef);
int32_t tsdbGetTableGroupFromIdListT(STsdb* tsdb, SArray* pTableIdList, STableGroupInfo* pGroupInfo);
int32_t tsdbSnapshotReaderOpen(STsdb* pTsdb, STsdbSnapshotReader** ppReader, int64_t sver, int64_t ever);
int32_t tsdbSnapshotReaderClose(STsdbSnapshotReader* pReader);
int32_t tsdbSnapshotRead(STsdbSnapshotReader* pReader, void** ppData, uint32_t* nData);
......
......@@ -147,16 +147,10 @@ void vnodeGetInfo(SVnode *pVnode, const char **dbname, int32_t *vgId) {
}
// wrapper of tsdb read interface
tsdbReaderT tsdbQueryCacheLast(SVnode *pVnode, SQueryTableDataCond *pCond, STableGroupInfo *groupList, uint64_t qId,
tsdbReaderT tsdbQueryCacheLast(SVnode *pVnode, SQueryTableDataCond *pCond, STableListInfo* tableList, uint64_t qId,
void *pMemRef) {
#if 0
return tsdbQueryCacheLastT(pVnode->pTsdb, pCond, groupList, qId, pMemRef);
#endif
return 0;
}
\ No newline at end of file
int32_t tsdbGetTableGroupFromIdList(SVnode *pVnode, SArray *pTableIdList, STableGroupInfo *pGroupInfo) {
#if 0
return tsdbGetTableGroupFromIdListT(pVnode->pTsdb, pTableIdList, pGroupInfo);
#endif
return 0;
}
\ No newline at end of file
......@@ -49,7 +49,7 @@ typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int
#define Q_STATUS_EQUAL(p, s) (((p) & (s)) != 0u)
#define QUERY_IS_ASC_QUERY(q) (GET_FORWARD_DIRECTION_FACTOR((q)->order.order) == QUERY_ASC_FORWARD_STEP)
#define GET_TABLEGROUP(q, _index) ((SArray*)taosArrayGetP((q)->tableqinfoGroupInfo.pGroupList, (_index)))
//#define GET_TABLEGROUP(q, _index) ((SArray*)taosArrayGetP((q)->tableqinfoGroupInfo.pGroupList, (_index)))
#define NEEDTO_COMPRESS_QUERY(size) ((size) > tsCompressColData ? 1 : 0)
......@@ -151,7 +151,7 @@ typedef struct STaskAttr {
int32_t numOfFilterCols;
int64_t* fillVal;
void* tsdb;
STableGroupInfo tableGroupInfo; // table <tid, last_key> list SArray<STableKeyInfo>
// STableListInfo tableGroupInfo; // table list
int32_t vgId;
} STaskAttr;
......@@ -191,7 +191,7 @@ typedef struct SExecTaskInfo {
int32_t tversion;
} schemaVer;
STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure
STableListInfo tableqinfoList; // this is a table list
char* sql; // query sql string
jmp_buf env; // jump to this position when error happens.
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
......@@ -213,7 +213,7 @@ typedef struct STaskRuntimeEnv {
STSCursor cur;
char* tagVal; // tag value of current data block
STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure
// STableGroupInfo tableqinfoGroupInfo; // this is a table list
struct SOperatorInfo* proot;
SGroupResInfo groupResInfo;
int64_t currentOffset; // dynamic offset value
......@@ -342,7 +342,7 @@ typedef struct STagScanInfo {
SArray *pColMatchInfo;
int32_t curPos;
SReadHandle readHandle;
STableGroupInfo *pTableGroups;
STableListInfo *pTableList;
} STagScanInfo;
typedef enum EStreamScanMode {
......@@ -707,7 +707,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, tsdbReaderT pDataReader, SReadHandle* pHandle, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo,
int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, SSDataBlock* pResBlock, SLimit* pLimit, SLimit* pSlimit, SExecTaskInfo* pTaskInfo);
SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SExprInfo* pExprInfo, int32_t numOfCols,
......@@ -720,21 +720,19 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataB
SExecTaskInfo* pTaskInfo, bool showRewrite, int32_t accountId);
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
STimeWindowAggSupp *pTwAggSupp, const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo);
STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
STimeWindowAggSupp *pTwAggSupp, const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo);
STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId, STimeWindowAggSupp* pTwAggSupp,
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition,
SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo,
const STableGroupInfo* pTableGroupInfo);
SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataReader, SReadHandle* pHandle,
uint64_t uid, SSDataBlock* pResBlock, SArray* pColList,
......@@ -748,14 +746,13 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf
SSDataBlock* pResBlock, STimeWindowAggSupp *pTwAggSupp, int32_t tsSlotId, SColumn* pStateKeyCol, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, SArray* pGroupColList, SExecTaskInfo* pTaskInfo,
const STableGroupInfo* pTableGroupInfo);
SSDataBlock* pResultBlock, SArray* pGroupColList, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, SExprInfo* pExpr, int32_t numOfOutput, SSDataBlock* pResBlock, SArray* pColMatchInfo, STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, SExprInfo* pExpr, int32_t numOfOutput, SSDataBlock* pResBlock, SArray* pColMatchInfo, STableListInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream,
SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, int64_t gap,
......@@ -771,8 +768,6 @@ void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlo
void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput);
STableQueryInfo* createTableQueryInfo(void* buf, STimeWindow win);
bool isTaskKilled(SExecTaskInfo* pTaskInfo);
int32_t checkForQueryBuf(size_t numOfTables);
......
......@@ -34,6 +34,7 @@
#include "thash.h"
#include "ttypes.h"
#include "vnode.h"
#include "index.h"
#define IS_MAIN_SCAN(runtime) ((runtime)->scanFlag == MAIN_SCAN)
#define IS_REVERSE_SCAN(runtime) ((runtime)->scanFlag == REVERSE_SCAN)
......@@ -87,7 +88,7 @@ static UNUSED_FUNC void* u_realloc(void* p, size_t __size) {
#endif
#define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st)))
#define GET_NUM_OF_TABLEGROUP(q) taosArrayGetSize((q)->tableqinfoGroupInfo.pGroupList)
//#define GET_NUM_OF_TABLEGROUP(q) taosArrayGetSize((q)->tableqinfoGroupInfo.pGroupList)
#define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->interval.interval > 0)
int32_t getMaximumIdleDurationSec() { return tsShellActivityTimer * 2; }
......@@ -1845,12 +1846,6 @@ void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status) {
}
}
STableQueryInfo* createTableQueryInfo(void* buf, STimeWindow win) {
STableQueryInfo* pTableQueryInfo = buf;
pTableQueryInfo->lastKey = win.skey;
return pTableQueryInfo;
}
void destroyTableQueryInfoImpl(STableQueryInfo* pTableQueryInfo) {
if (pTableQueryInfo == NULL) {
return;
......@@ -2430,7 +2425,7 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t
return TSDB_CODE_SUCCESS;
}
static void doDestroyTableQueryInfo(STableGroupInfo* pTableqinfoGroupInfo);
static void doDestroyTableList(STableListInfo* pTableqinfoList);
static void doTableQueryInfoTimeWindowCheck(SExecTaskInfo* pTaskInfo, STableQueryInfo* pTableQueryInfo, int32_t order) {
#if 0
......@@ -4006,35 +4001,30 @@ void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows) {
}
}
static STableQueryInfo* initTableQueryInfo(const STableGroupInfo* pTableGroupInfo) {
if (pTableGroupInfo->numOfTables == 0) {
return NULL;
}
STableQueryInfo* pTableQueryInfo = taosMemoryCalloc(pTableGroupInfo->numOfTables, sizeof(STableQueryInfo));
if (pTableQueryInfo == NULL) {
return NULL;
}
int32_t index = 0;
for (int32_t i = 0; i < taosArrayGetSize(pTableGroupInfo->pGroupList); ++i) {
SArray* pa = taosArrayGetP(pTableGroupInfo->pGroupList, i);
for (int32_t j = 0; j < taosArrayGetSize(pa); ++j) {
STableKeyInfo* pk = taosArrayGet(pa, j);
STableQueryInfo* pTQueryInfo = &pTableQueryInfo[index++];
pTQueryInfo->lastKey = pk->lastKey;
}
}
STimeWindow win = {0, INT64_MAX};
createTableQueryInfo(pTableQueryInfo, win);
return pTableQueryInfo;
}
//static STableQueryInfo* initTableQueryInfo(const STableListInfo* pTableListInfo) {
// int32_t size = taosArrayGetSize(pTableListInfo->pTableList);
// if (size == 0) {
// return NULL;
// }
//
// STableQueryInfo* pTableQueryInfo = taosMemoryCalloc(size, sizeof(STableQueryInfo));
// if (pTableQueryInfo == NULL) {
// return NULL;
// }
//
// for (int32_t j = 0; j < size; ++j) {
// STableKeyInfo* pk = taosArrayGet(pTableListInfo->pTableList, j);
// STableQueryInfo* pTQueryInfo = &pTableQueryInfo[j];
// pTQueryInfo->lastKey = pk->lastKey;
// }
//
// pTableQueryInfo->lastKey = 0;
// return pTableQueryInfo;
//}
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo,
int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo,
const STableGroupInfo* pTableGroupInfo) {
int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo) {
SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
......@@ -4047,7 +4037,6 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
initResultSizeInfo(pOperator, numOfRows);
int32_t code =
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResultBlock, keyBufSize, pTaskInfo->id.str);
pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
......@@ -4445,11 +4434,10 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT
}
static tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
STableGroupInfo* pTableGroupInfo, uint64_t queryId, uint64_t taskId);
STableListInfo* pTableGroupInfo, uint64_t queryId, uint64_t taskId, SNode* pTagCond);
static int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t tableUid, STableGroupInfo* pGroupInfo,
uint64_t queryId, uint64_t taskId);
static SArray* extractTableIdList(const STableGroupInfo* pTableGroupInfo);
static int32_t getTableList(void* metaHandle, int32_t tableType, uint64_t tableUid, STableListInfo* pListInfo, SNode* pTagCond);
static SArray* extractTableIdList(const STableListInfo* pTableGroupInfo);
static SArray* extractColumnInfo(SNodeList* pNodeList);
static SArray* createSortInfo(SNodeList* pNodeList);
......@@ -4478,14 +4466,14 @@ void extractTableSchemaVersion(SReadHandle* pHandle, uint64_t uid, SExecTaskInfo
}
SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle,
uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) {
uint64_t queryId, uint64_t taskId, STableListInfo* pTableListInfo, SNode* pTagCond) {
int32_t type = nodeType(pPhyNode);
if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) {
if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) {
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
tsdbReaderT pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId);
tsdbReaderT pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId, pTagCond);
if (pDataReader == NULL && terrno != 0) {
return NULL;
}
......@@ -4509,9 +4497,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
tsdbReaderT pDataReader = NULL;
if (pHandle->vnode) {
pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId);
pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId, pTagCond);
} else {
doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId);
getTableList(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableListInfo, pTagCond);
}
if (pDataReader == NULL && terrno != 0) {
......@@ -4524,7 +4512,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc;
SOperatorInfo* pOperatorDumy = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo);
SArray* tableIdList = extractTableIdList(pTableGroupInfo);
SArray* tableIdList = extractTableIdList(pTableListInfo);
SSDataBlock* pResBlock = createResDataBlock(pDescNode);
SArray* pCols =
......@@ -4557,8 +4545,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SSDataBlock* pResBlock = createResDataBlock(pDescNode);
int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo,
queryId, taskId);
int32_t code = getTableList(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableListInfo, pTagCond);
if (code != TSDB_CODE_SUCCESS) {
return NULL;
}
......@@ -4571,7 +4558,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
COL_MATCH_FROM_COL_ID);
SOperatorInfo* pOperator =
createTagScanOperatorInfo(pHandle, pExprInfo, num, pResBlock, colList, pTableGroupInfo, pTaskInfo);
createTagScanOperatorInfo(pHandle, pExprInfo, num, pResBlock, colList, pTableListInfo, pTaskInfo);
return pOperator;
} else {
ASSERT(0);
......@@ -4584,7 +4571,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SOperatorInfo** ops = taosMemoryCalloc(size, POINTER_BYTES);
for (int32_t i = 0; i < size; ++i) {
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i);
ops[i] = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
ops[i] = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableListInfo, pTagCond);
if (ops[i] == NULL) {
return NULL;
}
......@@ -4613,10 +4600,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
if (pAggNode->pGroupKeys != NULL) {
SArray* pColList = extractColumnInfo(pAggNode->pGroupKeys);
pOptr = createGroupOperatorInfo(ops[0], pExprInfo, num, pResBlock, pColList, pAggNode->node.pConditions,
pScalarExprInfo, numOfScalarExpr, pTaskInfo, NULL);
pScalarExprInfo, numOfScalarExpr, pTaskInfo);
} else {
pOptr = createAggregateOperatorInfo(ops[0], pExprInfo, num, pResBlock, pScalarExprInfo, numOfScalarExpr,
pTaskInfo, pTableGroupInfo);
pTaskInfo);
}
} else if (QUERY_NODE_PHYSICAL_PLAN_INTERVAL == type || QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type) {
SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
......@@ -4635,8 +4622,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
.calTrigger = pIntervalPhyNode->window.triggerType};
int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
pOptr = createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, &as, pTableGroupInfo,
pTaskInfo);
pOptr = createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, &as, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) {
SSortPhysiNode* pSortPhyNode = (SSortPhysiNode*)pPhyNode;
......@@ -4685,7 +4671,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &num);
pOptr = createPartitionOperatorInfo(ops[0], pExprInfo, num, pResBlock, pColList, pTaskInfo, NULL);
pOptr = createPartitionOperatorInfo(ops[0], pExprInfo, num, pResBlock, pColList, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW == type) {
SStateWinodwPhysiNode* pStateNode = (SStateWinodwPhysiNode*)pPhyNode;
......@@ -4899,46 +4885,57 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod
return pList;
}
int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t tableUid, STableGroupInfo* pGroupInfo,
uint64_t queryId, uint64_t taskId) {
int32_t code = 0;
int32_t getTableList(void* metaHandle, int32_t tableType, uint64_t tableUid,
STableListInfo* pListInfo, SNode* pTagCond) {
int32_t code = TSDB_CODE_SUCCESS;
pListInfo->pTableList = taosArrayInit(8, sizeof(STableKeyInfo));
if (tableType == TSDB_SUPER_TABLE) {
code = tsdbQuerySTableByTagCond(metaHandle, tableUid, 0, NULL, 0, 0, NULL, pGroupInfo, NULL, 0, queryId, taskId);
if(pTagCond){
SArray* res = taosArrayInit(8, sizeof(uint64_t));
code = doFilterTag(pTagCond, res);
if (code != TSDB_CODE_SUCCESS) {
qError("doFilterTag error:%d", code);
taosArrayDestroy(res);
terrno = code;
return code;
}
for(int i = 0; i < taosArrayGetSize(res); i++){
STableKeyInfo info = {.lastKey = TSKEY_INITIAL_VAL, .uid = *(uint64_t*)taosArrayGet(res, i)};
taosArrayPush(pListInfo->pTableList, &info);
}
taosArrayDestroy(res);
}else{
code = tsdbGetAllTableList(metaHandle, tableUid, pListInfo->pTableList);
}
} else { // Create one table group.
code = tsdbGetOneTableGroup(metaHandle, tableUid, 0, pGroupInfo);
STableKeyInfo info = {.lastKey = 0, .uid = tableUid};
taosArrayPush(pListInfo->pTableList, &info);
}
return code;
}
SArray* extractTableIdList(const STableGroupInfo* pTableGroupInfo) {
SArray* extractTableIdList(const STableListInfo* pTableGroupInfo) {
SArray* tableIdList = taosArrayInit(4, sizeof(uint64_t));
if (pTableGroupInfo->numOfTables > 0) {
SArray* pa = taosArrayGetP(pTableGroupInfo->pGroupList, 0);
ASSERT(taosArrayGetSize(pTableGroupInfo->pGroupList) == 1);
// Transfer the Array of STableKeyInfo into uid list.
size_t numOfTables = taosArrayGetSize(pa);
for (int32_t i = 0; i < numOfTables; ++i) {
STableKeyInfo* pkeyInfo = taosArrayGet(pa, i);
for (int32_t i = 0; i < taosArrayGetSize(pTableGroupInfo->pTableList); ++i) {
STableKeyInfo* pkeyInfo = taosArrayGet(pTableGroupInfo->pTableList, i);
taosArrayPush(tableIdList, &pkeyInfo->uid);
}
}
return tableIdList;
}
tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
STableGroupInfo* pTableGroupInfo, uint64_t queryId, uint64_t taskId) {
uint64_t uid = pTableScanNode->scan.uid;
int32_t code =
doCreateTableGroup(pHandle->meta, pTableScanNode->scan.tableType, uid, pTableGroupInfo, queryId, taskId);
STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId, SNode* pTagCond) {
int32_t code = getTableList(pHandle->meta, pTableScanNode->scan.tableType, pTableScanNode->scan.uid, pTableListInfo, pTagCond);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
if (pTableGroupInfo->numOfTables == 0) {
if (taosArrayGetSize(pTableListInfo->pTableList) == 0) {
code = 0;
qDebug("no table qualified for query, TID:0x%" PRIx64 ", QID:0x%" PRIx64, taskId, queryId);
goto _error;
......@@ -4950,7 +4947,7 @@ tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle*
goto _error;
}
return tsdbQueryTables(pHandle->vnode, &cond, pTableGroupInfo, queryId, taskId);
return tsdbQueryTables(pHandle->vnode, &cond, pTableListInfo, queryId, taskId);
_error:
terrno = code;
......@@ -4969,7 +4966,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
}
(*pTaskInfo)->pRoot =
createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, &(*pTaskInfo)->tableqinfoGroupInfo);
createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, &(*pTaskInfo)->tableqinfoList, pPlan->pTagCond);
if (NULL == (*pTaskInfo)->pRoot) {
code = terrno;
goto _complete;
......@@ -5028,34 +5025,18 @@ void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters) {
taosMemoryFree(pFilter);
}
static void doDestroyTableQueryInfo(STableGroupInfo* pTableqinfoGroupInfo) {
if (pTableqinfoGroupInfo->pGroupList != NULL) {
int32_t numOfGroups = (int32_t)taosArrayGetSize(pTableqinfoGroupInfo->pGroupList);
for (int32_t i = 0; i < numOfGroups; ++i) {
SArray* p = taosArrayGetP(pTableqinfoGroupInfo->pGroupList, i);
size_t num = taosArrayGetSize(p);
for (int32_t j = 0; j < num; ++j) {
STableQueryInfo* item = taosArrayGetP(p, j);
destroyTableQueryInfoImpl(item);
}
taosArrayDestroy(p);
}
}
taosArrayDestroy(pTableqinfoGroupInfo->pGroupList);
taosHashCleanup(pTableqinfoGroupInfo->map);
static void doDestroyTableList(STableListInfo* pTableqinfoList) {
taosArrayDestroy(pTableqinfoList->pTableList);
taosHashCleanup(pTableqinfoList->map);
pTableqinfoGroupInfo->pGroupList = NULL;
pTableqinfoGroupInfo->map = NULL;
pTableqinfoGroupInfo->numOfTables = 0;
pTableqinfoList->pTableList = NULL;
pTableqinfoList->map = NULL;
}
void doDestroyTask(SExecTaskInfo* pTaskInfo) {
qDebug("%s execTask is freed", GET_TASKID(pTaskInfo));
doDestroyTableQueryInfo(&pTaskInfo->tableqinfoGroupInfo);
doDestroyTableList(&pTaskInfo->tableqinfoList);
destroyOperatorInfo(pTaskInfo->pRoot);
// taosArrayDestroy(pTaskInfo->summary.queryProfEvents);
// taosHashCleanup(pTaskInfo->summary.operatorProfResults);
......
......@@ -346,7 +346,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
}
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList,
SNode* pCondition, SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) {
SNode* pCondition, SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo) {
SGroupbyOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupbyOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
......@@ -613,7 +613,7 @@ static void destroyPartitionOperatorInfo(void* param, int32_t numOfOutput) {
}
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList,
SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) {
SExecTaskInfo* pTaskInfo) {
SPartitionOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SPartitionOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
......
......@@ -1601,20 +1601,19 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
SExprInfo* pExprInfo = &pOperator->pExpr[0];
SSDataBlock* pRes = pInfo->pRes;
if (taosArrayGetSize(pInfo->pTableGroups->pGroupList) == 0) {
int32_t size = taosArrayGetSize(pInfo->pTableList->pTableList);
if (size == 0) {
setTaskStatus(pTaskInfo, TASK_COMPLETED);
return NULL;
}
SArray* pa = taosArrayGetP(pInfo->pTableGroups->pGroupList, 0);
char str[512] = {0};
int32_t count = 0;
SMetaReader mr = {0};
metaReaderInit(&mr, pInfo->readHandle.meta, 0);
while (pInfo->curPos < pInfo->pTableGroups->numOfTables && count < pOperator->resultInfo.capacity) {
STableKeyInfo* item = taosArrayGet(pa, pInfo->curPos);
while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) {
STableKeyInfo* item = taosArrayGet(pInfo->pTableList->pTableList, pInfo->curPos);
metaGetTableEntryByUid(&mr, item->uid);
for (int32_t j = 0; j < pOperator->numOfExprs; ++j) {
......@@ -1646,7 +1645,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
}
count += 1;
if (++pInfo->curPos >= pInfo->pTableGroups->numOfTables) {
if (++pInfo->curPos >= size) {
doSetOperatorCompleted(pOperator);
}
}
......@@ -1671,14 +1670,14 @@ static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, SExprInfo* pExpr, int32_t numOfOutput,
SSDataBlock* pResBlock, SArray* pColMatchInfo,
STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo) {
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
STagScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
goto _error;
}
pInfo->pTableGroups = pTableGroupInfo;
pInfo->pTableList = pTableListInfo;
pInfo->pColMatchInfo = pColMatchInfo;
pInfo->pRes = pResBlock;
pInfo->readHandle = *pReadHandle;
......
......@@ -1180,8 +1180,7 @@ bool allInvertible(SqlFunctionCtx* pFCtx, int32_t numOfCols) {
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
STimeWindowAggSupp* pTwAggSupp, const STableGroupInfo* pTableGroupInfo,
SExecTaskInfo* pTaskInfo) {
STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo) {
SIntervalAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
......@@ -1205,8 +1204,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pInfo->invertible = allInvertible(pInfo->binfo.pCtx, numOfCols);
pInfo->invertible = false; // Todo(liuyao): Dependent TSDB API
// pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo);
if (code != TSDB_CODE_SUCCESS /* || pInfo->pTableQueryInfo == NULL*/) {
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
......@@ -1266,7 +1264,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
for (int32_t i = 0; i < numOfChild; i++) {
SSDataBlock* chRes = createOneDataBlock(pResBlock, false);
SOperatorInfo* pChildOp = createIntervalOperatorInfo(NULL, pExprInfo, numOfCols,
chRes, pInterval, primaryTsSlotId, pTwAggSupp, NULL, pTaskInfo);
chRes, pInterval, primaryTsSlotId, pTwAggSupp, pTaskInfo);
if (pChildOp && chRes) {
taosArrayPush(pInfo->pChildren, &pChildOp);
continue;
......@@ -1304,8 +1302,7 @@ _error:
SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
STimeWindowAggSupp* pTwAggSupp, const STableGroupInfo* pTableGroupInfo,
SExecTaskInfo* pTaskInfo) {
STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo) {
SIntervalAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
......@@ -1327,8 +1324,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str);
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win);
// pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo);
if (code != TSDB_CODE_SUCCESS /* || pInfo->pTableQueryInfo == NULL*/) {
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册