未验证 提交 79bb5f82 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #17767 from taosdata/feature/3_liaohj

refactor: do some internal refactor.
...@@ -1867,7 +1867,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -1867,7 +1867,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
if (minKey == key) { if (minKey == key) {
init = true; init = true;
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema); code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -1881,7 +1881,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -1881,7 +1881,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
tRowMerge(&merge, &fRow1); tRowMerge(&merge, &fRow1);
} else { } else {
init = true; init = true;
int32_t code = tRowMergerInit(&merge, &fRow1, pReader->pSchema); code = tRowMergerInit(&merge, &fRow1, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
......
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
#include "tpagedbuf.h" #include "tpagedbuf.h"
#include "tsimplehash.h" #include "tsimplehash.h"
#include "vnode.h" #include "vnode.h"
#include "executor.h"
#define T_LONG_JMP(_obj, _c) \ #define T_LONG_JMP(_obj, _c) \
do { \ do { \
...@@ -95,27 +96,24 @@ typedef struct SColMatchInfo { ...@@ -95,27 +96,24 @@ typedef struct SColMatchInfo {
int32_t matchType; // determinate the source according to col id or slot id int32_t matchType; // determinate the source according to col id or slot id
} SColMatchInfo; } SColMatchInfo;
// If the numOfOutputGroups is 1, the data blocks that belongs to different groups will be provided randomly typedef struct STableListInfo STableListInfo;
// The numOfOutputGroups is specified by physical plan. and will not be affect by numOfGroups
typedef struct STableListInfo {
bool oneTableForEachGroup;
int32_t numOfOuputGroups; // the data block will be generated one by one
int32_t* groupOffset; // keep the offset value for each group in the tableList
SArray* pTableList;
SHashObj* map; // speedup acquire the tableQueryInfo by table uid
uint64_t suid;
} STableListInfo;
void destroyTableList(STableListInfo* pTableList);
int32_t getNumOfOutputGroups(const STableListInfo* pTableList);
bool oneTableForEachGroup(const STableListInfo* pTableList);
uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid);
int32_t addTableIntoTableList(STableListInfo* pTableList, uint64_t uid, uint64_t gid);
int32_t getTablesOfGroup(const STableListInfo* pTableList, int32_t ordinalIndex, STableKeyInfo** pKeyInfo, int32_t* num);
uint64_t getTotalTables(const STableListInfo* pTableList);
struct SqlFunctionCtx; struct SqlFunctionCtx;
int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle,
STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond, const char* id);
STableListInfo* tableListCreate();
void* tableListDestroy(STableListInfo* pTableListInfo);
void tableListClear(STableListInfo* pTableListInfo);
int32_t tableListGetOutputGroups(const STableListInfo* pTableList);
bool oneTableForEachGroup(const STableListInfo* pTableList);
uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid);
int32_t tableListAddTableInfo(STableListInfo* pTableList, uint64_t uid, uint64_t gid);
int32_t tableListGetGroupList(const STableListInfo* pTableList, int32_t ordinalIndex, STableKeyInfo** pKeyInfo, int32_t* num);
uint64_t tableListGetSize(const STableListInfo* pTableList);
uint64_t tableListGetSuid(const STableListInfo* pTableList);
STableKeyInfo* tableListGetInfo(const STableListInfo* pTableList, int32_t index);
size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput); size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput);
void initResultRowInfo(SResultRowInfo* pResultRowInfo); void initResultRowInfo(SResultRowInfo* pResultRowInfo);
void closeResultRow(SResultRow* pResultRow); void closeResultRow(SResultRow* pResultRow);
...@@ -128,6 +126,7 @@ static FORCE_INLINE SResultRow* getResultRowByPos(SDiskbasedBuf* pBuf, SResultRo ...@@ -128,6 +126,7 @@ static FORCE_INLINE SResultRow* getResultRowByPos(SDiskbasedBuf* pBuf, SResultRo
if (forUpdate) { if (forUpdate) {
setBufPageDirty(bufPage, true); setBufPageDirty(bufPage, true);
} }
SResultRow* pRow = (SResultRow*)((char*)bufPage + pos->offset); SResultRow* pRow = (SResultRow*)((char*)bufPage + pos->offset);
return pRow; return pRow;
} }
...@@ -143,10 +142,7 @@ int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo); ...@@ -143,10 +142,7 @@ int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo);
SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode); SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode);
EDealRes doTranslateTagExpr(SNode** pNode, void* pContext); EDealRes doTranslateTagExpr(SNode** pNode, void* pContext);
int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond,
STableListInfo* pListInfo);
int32_t getGroupIdFromTagsVal(void* pMeta, uint64_t uid, SNodeList* pGroupNode, char* keyBuf, uint64_t* pGroupId); int32_t getGroupIdFromTagsVal(void* pMeta, uint64_t uid, SNodeList* pGroupNode, char* keyBuf, uint64_t* pGroupId);
int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableListInfo* pTableListInfo);
size_t getTableTagsBufLen(const SNodeList* pGroups); size_t getTableTagsBufLen(const SNodeList* pGroups);
SArray* createSortInfo(SNodeList* pNodeList); SArray* createSortInfo(SNodeList* pNodeList);
...@@ -169,9 +165,7 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi ...@@ -169,9 +165,7 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi
void cleanupQueryTableDataCond(SQueryTableDataCond* pCond); void cleanupQueryTableDataCond(SQueryTableDataCond* pCond);
int32_t convertFillType(int32_t mode); int32_t convertFillType(int32_t mode);
int32_t resultrowComparAsc(const void* p1, const void* p2); int32_t resultrowComparAsc(const void* p1, const void* p2);
int32_t isQualifiedTable(STableKeyInfo* info, SNode* pTagCond, void* metaHandle, bool* pQualified); int32_t isQualifiedTable(STableKeyInfo* info, SNode* pTagCond, void* metaHandle, bool* pQualified);
#endif // TDENGINE_QUERYUTIL_H #endif // TDENGINE_QUERYUTIL_H
...@@ -184,7 +184,7 @@ typedef struct SExecTaskInfo { ...@@ -184,7 +184,7 @@ typedef struct SExecTaskInfo {
int64_t version; // used for stream to record wal version int64_t version; // used for stream to record wal version
SStreamTaskInfo streamInfo; SStreamTaskInfo streamInfo;
SSchemaInfo schemaInfo; SSchemaInfo schemaInfo;
STableListInfo tableqinfoList; // this is a table list STableListInfo* pTableInfoList; // this is a table list
const char* sql; // query sql string const char* sql; // query sql string
jmp_buf env; // jump to this position when error happens. jmp_buf env; // jump to this position when error happens.
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
...@@ -555,7 +555,7 @@ typedef struct SSysTableScanInfo { ...@@ -555,7 +555,7 @@ typedef struct SSysTableScanInfo {
typedef struct SBlockDistInfo { typedef struct SBlockDistInfo {
SSDataBlock* pResBlock; SSDataBlock* pResBlock;
void* pHandle; STsdbReader* pHandle;
SReadHandle readHandle; SReadHandle readHandle;
uint64_t uid; // table uid uint64_t uid; // table uid
} SBlockDistInfo; } SBlockDistInfo;
...@@ -970,8 +970,8 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, ...@@ -970,8 +970,8 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode, SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode,
SExecTaskInfo* pTaskInfo); SExecTaskInfo* pTaskInfo);
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* readHandle, uint64_t uid, SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanPhysiNode* pBlockScanNode,
SBlockDistScanPhysiNode* pBlockScanNode, SExecTaskInfo* pTaskInfo); SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond, SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond,
SExecTaskInfo* pTaskInfo); SExecTaskInfo* pTaskInfo);
...@@ -1065,10 +1065,6 @@ uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, S ...@@ -1065,10 +1065,6 @@ uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, S
int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup, int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup,
SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle,
STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond,
const char* idstr);
SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNode* pSortPhyNode, SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNode* pSortPhyNode,
SExecTaskInfo* pTaskInfo); SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, STableListInfo* pTableListInfo, SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, STableListInfo* pTableListInfo,
...@@ -1077,7 +1073,6 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN ...@@ -1077,7 +1073,6 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex); void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex);
bool groupbyTbname(SNodeList* pGroupList); bool groupbyTbname(SNodeList* pGroupList);
int32_t setGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* group, bool groupSort);
void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput); void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput);
int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup, int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup,
SGroupResInfo* pGroupResInfo); SGroupResInfo* pGroupResInfo);
......
...@@ -59,22 +59,23 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe ...@@ -59,22 +59,23 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
goto _error; goto _error;
} }
STableListInfo* pTableList = &pTaskInfo->tableqinfoList; STableListInfo* pTableList = pTaskInfo->pTableInfoList;
initResultSizeInfo(&pOperator->resultInfo, 4096); initResultSizeInfo(&pOperator->resultInfo, 4096);
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
pInfo->pUidList = taosArrayInit(4, sizeof(int64_t)); pInfo->pUidList = taosArrayInit(4, sizeof(int64_t));
// partition by tbname, todo opt perf // partition by tbname, todo opt perf
if (oneTableForEachGroup(pTableList) || (getTotalTables(pTableList) == 1)) { if (oneTableForEachGroup(pTableList) || (tableListGetSize(pTableList) == 1)) {
pInfo->retrieveType = pInfo->retrieveType =
CACHESCAN_RETRIEVE_TYPE_ALL | (pScanNode->ignoreNull ? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW); CACHESCAN_RETRIEVE_TYPE_ALL | (pScanNode->ignoreNull ? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW);
STableKeyInfo* pList = taosArrayGet(pTableList->pTableList, 0); STableKeyInfo* pList = tableListGetInfo(pTableList, 0);
size_t num = taosArrayGetSize(pTableList->pTableList);
size_t num = tableListGetSize(pTableList);
uint64_t suid = tableListGetSuid(pTableList);
code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, num, code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, num,
taosArrayGetSize(pInfo->matchInfo.pList), pTableList->suid, &pInfo->pLastrowReader); taosArrayGetSize(pInfo->matchInfo.pList), suid, &pInfo->pLastrowReader);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -120,8 +121,10 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { ...@@ -120,8 +121,10 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
SLastrowScanInfo* pInfo = pOperator->info; SLastrowScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
STableListInfo* pTableList = &pTaskInfo->tableqinfoList; STableListInfo* pTableList = pTaskInfo->pTableInfoList;
int32_t size = taosArrayGetSize(pTableList->pTableList);
uint64_t suid = tableListGetSuid(pTableList);
int32_t size = tableListGetSize(pTableList);
if (size == 0) { if (size == 0) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
return NULL; return NULL;
...@@ -184,20 +187,19 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { ...@@ -184,20 +187,19 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
return NULL; return NULL;
} }
} else { } else {
size_t totalGroups = getNumOfOutputGroups(pTableList); size_t totalGroups = tableListGetOutputGroups(pTableList);
while (pInfo->currentGroupIndex < totalGroups) { while (pInfo->currentGroupIndex < totalGroups) {
STableKeyInfo* pList = NULL; STableKeyInfo* pList = NULL;
int32_t num = 0; int32_t num = 0;
int32_t code = getTablesOfGroup(pTableList, pInfo->currentGroupIndex, &pList, &num); int32_t code = tableListGetGroupList(pTableList, pInfo->currentGroupIndex, &pList, &num);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
} }
tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, num, tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, num,
taosArrayGetSize(pInfo->matchInfo.pList), pTableList->suid, &pInfo->pLastrowReader); taosArrayGetSize(pInfo->matchInfo.pList), suid, &pInfo->pLastrowReader);
taosArrayClear(pInfo->pUidList); taosArrayClear(pInfo->pUidList);
code = tsdbRetrieveCacheRows(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pUidList); code = tsdbRetrieveCacheRows(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pUidList);
......
...@@ -26,9 +26,31 @@ ...@@ -26,9 +26,31 @@
#include "executorimpl.h" #include "executorimpl.h"
#include "tcompression.h" #include "tcompression.h"
static int32_t removeInvalidTable(SArray* list, SHashObj* tags); // If the numOfOutputGroups is 1, the data blocks that belongs to different groups will be provided randomly
// The numOfOutputGroups is specified by physical plan. and will not be affect by numOfGroups
struct STableListInfo {
bool oneTableForEachGroup;
int32_t numOfOuputGroups; // the data block will be generated one by one
int32_t* groupOffset; // keep the offset value for each group in the tableList
SArray* pTableList;
SHashObj* map; // speedup acquire the tableQueryInfo by table uid
uint64_t suid;
};
typedef struct tagFilterAssist {
SHashObj* colHash;
int32_t index;
SArray* cInfoList;
} tagFilterAssist;
static int32_t removeInvalidTable(SArray* uids, SHashObj* tags);
static int32_t optimizeTbnameInCond(void* metaHandle, int64_t suid, SArray* list, SNode* pTagCond, SHashObj* tags); static int32_t optimizeTbnameInCond(void* metaHandle, int64_t suid, SArray* list, SNode* pTagCond, SHashObj* tags);
static int32_t optimizeTbnameInCondImpl(void* metaHandle, int64_t suid, SArray* list, SNode* pTagCond); static int32_t optimizeTbnameInCondImpl(void* metaHandle, int64_t suid, SArray* list, SNode* pTagCond);
static int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond,
SNode* pTagIndexCond, STableListInfo* pListInfo);
static int64_t getLimit(const SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->limit; }
static int64_t getOffset(const SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->offset; }
void initResultRowInfo(SResultRowInfo* pResultRowInfo) { void initResultRowInfo(SResultRowInfo* pResultRowInfo) {
pResultRowInfo->size = 0; pResultRowInfo->size = 0;
...@@ -301,12 +323,6 @@ int32_t isQualifiedTable(STableKeyInfo* info, SNode* pTagCond, void* metaHandle, ...@@ -301,12 +323,6 @@ int32_t isQualifiedTable(STableKeyInfo* info, SNode* pTagCond, void* metaHandle,
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
typedef struct tagFilterAssist {
SHashObj* colHash;
int32_t index;
SArray* cInfoList;
} tagFilterAssist;
static EDealRes getColumn(SNode** pNode, void* pContext) { static EDealRes getColumn(SNode** pNode, void* pContext) {
SColumnNode* pSColumnNode = NULL; SColumnNode* pSColumnNode = NULL;
if (QUERY_NODE_COLUMN == nodeType((*pNode))) { if (QUERY_NODE_COLUMN == nodeType((*pNode))) {
...@@ -482,6 +498,7 @@ static SColumnInfoData* getColInfoResult(void* metaHandle, int64_t suid, SArray* ...@@ -482,6 +498,7 @@ static SColumnInfoData* getColInfoResult(void* metaHandle, int64_t suid, SArray*
} }
} }
} }
pResBlock->info.rows = rows; pResBlock->info.rows = rows;
// int64_t st1 = taosGetTimestampUs(); // int64_t st1 = taosGetTimestampUs();
...@@ -766,6 +783,7 @@ static int tableUidCompare(const void* a, const void* b) { ...@@ -766,6 +783,7 @@ static int tableUidCompare(const void* a, const void* b) {
} }
return u1 < u2 ? -1 : 1; return u1 < u2 ? -1 : 1;
} }
static int32_t optimizeTbnameInCond(void* metaHandle, int64_t suid, SArray* list, SNode* cond, SHashObj* tags) { static int32_t optimizeTbnameInCond(void* metaHandle, int64_t suid, SArray* list, SNode* cond, SHashObj* tags) {
int32_t ret = -1; int32_t ret = -1;
if (nodeType(cond) == QUERY_NODE_OPERATOR) { if (nodeType(cond) == QUERY_NODE_OPERATOR) {
...@@ -820,6 +838,7 @@ static int32_t removeInvalidTable(SArray* uids, SHashObj* tags) { ...@@ -820,6 +838,7 @@ static int32_t removeInvalidTable(SArray* uids, SHashObj* tags) {
taosArrayPush(validUid, uid); taosArrayPush(validUid, uid);
} }
} }
taosArraySwap(uids, validUid); taosArraySwap(uids, validUid);
taosArrayDestroy(validUid); taosArrayDestroy(validUid);
return 0; return 0;
...@@ -930,11 +949,6 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, ...@@ -930,11 +949,6 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode,
STableListInfo* pListInfo) { STableListInfo* pListInfo) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
pListInfo->pTableList = taosArrayInit(8, sizeof(STableKeyInfo));
if (pListInfo->pTableList == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
uint64_t tableUid = pScanNode->uid; uint64_t tableUid = pScanNode->uid;
pListInfo->suid = pScanNode->suid; pListInfo->suid = pScanNode->suid;
SArray* res = taosArrayInit(8, sizeof(uint64_t)); SArray* res = taosArrayInit(8, sizeof(uint64_t));
...@@ -994,13 +1008,14 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, ...@@ -994,13 +1008,14 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode,
size_t numOfTables = taosArrayGetSize(res); size_t numOfTables = taosArrayGetSize(res);
for (int i = 0; i < numOfTables; i++) { for (int i = 0; i < numOfTables; i++) {
STableKeyInfo info = {.uid = *(uint64_t*)taosArrayGet(res, i), .groupId = 0}; STableKeyInfo info = {.uid = *(uint64_t*)taosArrayGet(res, i), .groupId = 0};
void* p = taosArrayPush(pListInfo->pTableList, &info);
void* p = taosArrayPush(pListInfo->pTableList, &info);
if (p == NULL) { if (p == NULL) {
taosArrayDestroy(res); taosArrayDestroy(res);
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
qDebug("tagfilter get uid:%" PRId64 "", info.uid); qDebug("tagfilter get uid:%" PRIu64 "", info.uid);
} }
taosArrayDestroy(res); taosArrayDestroy(res);
...@@ -1642,9 +1657,6 @@ bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo) { ...@@ -1642,9 +1657,6 @@ bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo) {
pLimitInfo->slimit.offset != -1); pLimitInfo->slimit.offset != -1);
} }
static int64_t getLimit(const SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->limit; }
static int64_t getOffset(const SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->offset; }
void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo) { void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo) {
SLimit limit = {.limit = getLimit(pLimit), .offset = getOffset(pLimit)}; SLimit limit = {.limit = getLimit(pLimit), .offset = getOffset(pLimit)};
SLimit slimit = {.limit = getLimit(pSLimit), .offset = getOffset(pSLimit)}; SLimit slimit = {.limit = getLimit(pSLimit), .offset = getOffset(pSLimit)};
...@@ -1655,11 +1667,23 @@ void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimit ...@@ -1655,11 +1667,23 @@ void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimit
pLimitInfo->remainGroupOffset = slimit.offset; pLimitInfo->remainGroupOffset = slimit.offset;
} }
uint64_t getTotalTables(const STableListInfo* pTableList) { uint64_t tableListGetSize(const STableListInfo* pTableList) {
ASSERT(taosArrayGetSize(pTableList->pTableList) == taosHashGetSize(pTableList->map)); ASSERT(taosArrayGetSize(pTableList->pTableList) == taosHashGetSize(pTableList->map));
return taosArrayGetSize(pTableList->pTableList); return taosArrayGetSize(pTableList->pTableList);
} }
uint64_t tableListGetSuid(const STableListInfo* pTableList) {
return pTableList->suid;
}
STableKeyInfo* tableListGetInfo(const STableListInfo* pTableList, int32_t index) {
if (taosArrayGetSize(pTableList->pTableList) == 0) {
return NULL;
}
return taosArrayGet(pTableList->pTableList, index);
}
uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid) { uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid) {
int32_t* slot = taosHashGet(pTableList->map, &tableUid, sizeof(tableUid)); int32_t* slot = taosHashGet(pTableList->map, &tableUid, sizeof(tableUid));
ASSERT(pTableList->map != NULL && slot != NULL); ASSERT(pTableList->map != NULL && slot != NULL);
...@@ -1670,7 +1694,8 @@ uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid) { ...@@ -1670,7 +1694,8 @@ uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid) {
return pKeyInfo->groupId; return pKeyInfo->groupId;
} }
int32_t addTableIntoTableList(STableListInfo* pTableList, uint64_t uid, uint64_t gid) { // TODO handle the group offset info, fix it, the rule of group output will be broken by this function
int32_t tableListAddTableInfo(STableListInfo* pTableList, uint64_t uid, uint64_t gid) {
if (pTableList->map == NULL) { if (pTableList->map == NULL) {
ASSERT(taosArrayGetSize(pTableList->pTableList) == 0); ASSERT(taosArrayGetSize(pTableList->pTableList) == 0);
pTableList->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); pTableList->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
...@@ -1686,9 +1711,9 @@ int32_t addTableIntoTableList(STableListInfo* pTableList, uint64_t uid, uint64_t ...@@ -1686,9 +1711,9 @@ int32_t addTableIntoTableList(STableListInfo* pTableList, uint64_t uid, uint64_t
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t getTablesOfGroup(const STableListInfo* pTableList, int32_t ordinalGroupIndex, STableKeyInfo** pKeyInfo, int32_t tableListGetGroupList(const STableListInfo* pTableList, int32_t ordinalGroupIndex, STableKeyInfo** pKeyInfo,
int32_t* size) { int32_t* size) {
int32_t total = getNumOfOutputGroups(pTableList); int32_t total = tableListGetOutputGroups(pTableList);
if (ordinalGroupIndex < 0 || ordinalGroupIndex >= total) { if (ordinalGroupIndex < 0 || ordinalGroupIndex >= total) {
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
} }
...@@ -1696,10 +1721,10 @@ int32_t getTablesOfGroup(const STableListInfo* pTableList, int32_t ordinalGroupI ...@@ -1696,10 +1721,10 @@ int32_t getTablesOfGroup(const STableListInfo* pTableList, int32_t ordinalGroupI
// here handle two special cases: // here handle two special cases:
// 1. only one group exists, and 2. one table exists for each group. // 1. only one group exists, and 2. one table exists for each group.
if (total == 1) { if (total == 1) {
*size = getTotalTables(pTableList); *size = tableListGetSize(pTableList);
*pKeyInfo = (*size == 0)? NULL:taosArrayGet(pTableList->pTableList, 0); *pKeyInfo = (*size == 0)? NULL:taosArrayGet(pTableList->pTableList, 0);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else if (total == getTotalTables(pTableList)) { } else if (total == tableListGetSize(pTableList)) {
*size = 1; *size = 1;
*pKeyInfo = taosArrayGet(pTableList->pTableList, ordinalGroupIndex); *pKeyInfo = taosArrayGet(pTableList->pTableList, ordinalGroupIndex);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -1716,16 +1741,178 @@ int32_t getTablesOfGroup(const STableListInfo* pTableList, int32_t ordinalGroupI ...@@ -1716,16 +1741,178 @@ int32_t getTablesOfGroup(const STableListInfo* pTableList, int32_t ordinalGroupI
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t getNumOfOutputGroups(const STableListInfo* pTableList) { return pTableList->numOfOuputGroups; } int32_t tableListGetOutputGroups(const STableListInfo* pTableList) { return pTableList->numOfOuputGroups; }
bool oneTableForEachGroup(const STableListInfo* pTableList) { return pTableList->oneTableForEachGroup; } bool oneTableForEachGroup(const STableListInfo* pTableList) { return pTableList->oneTableForEachGroup; }
void destroyTableList(STableListInfo* pTableqinfoList) { STableListInfo* tableListCreate() {
pTableqinfoList->pTableList = taosArrayDestroy(pTableqinfoList->pTableList); STableListInfo* pListInfo = taosMemoryCalloc(1, sizeof(STableListInfo));
taosMemoryFreeClear(pTableqinfoList->groupOffset); if (pListInfo == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo));
if (pListInfo->pTableList == NULL) {
goto _error;
}
pListInfo->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
if (pListInfo->map == NULL) {
goto _error;
}
pListInfo->numOfOuputGroups = 1;
return pListInfo;
_error:
tableListDestroy(pListInfo);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
void* tableListDestroy(STableListInfo* pTableListInfo) {
if (pTableListInfo == NULL) {
return NULL;
}
pTableListInfo->pTableList = taosArrayDestroy(pTableListInfo->pTableList);
taosMemoryFreeClear(pTableListInfo->groupOffset);
taosHashCleanup(pTableListInfo->map);
pTableListInfo->pTableList = NULL;
pTableListInfo->map = NULL;
taosMemoryFree(pTableListInfo);
return NULL;
}
void tableListClear(STableListInfo* pTableListInfo) {
if (pTableListInfo == NULL) {
return;
}
taosArrayClear(pTableListInfo->pTableList);
taosHashClear(pTableListInfo->map);
taosMemoryFree(pTableListInfo->groupOffset);
pTableListInfo->numOfOuputGroups = 1;
pTableListInfo->oneTableForEachGroup = false;
}
static int32_t orderbyGroupIdComparFn(const void* p1, const void* p2) {
STableKeyInfo* pInfo1 = (STableKeyInfo*) p1;
STableKeyInfo* pInfo2 = (STableKeyInfo*) p2;
if (pInfo1->groupId == pInfo2->groupId) {
return 0;
} else {
return pInfo1->groupId < pInfo2->groupId? -1:1;
}
}
static int32_t sortTableGroup(STableListInfo* pTableListInfo) {
int32_t code = TSDB_CODE_SUCCESS;
taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn);
int32_t size = taosArrayGetSize(pTableListInfo->pTableList);
SArray* pList = taosArrayInit(4, sizeof(int32_t));
STableKeyInfo* pInfo = taosArrayGet(pTableListInfo->pTableList, 0);
uint64_t gid = pInfo->groupId;
int32_t start = 0;
taosArrayPush(pList, &start);
for(int32_t i = 1; i < size; ++i) {
pInfo = taosArrayGet(pTableListInfo->pTableList, i);
if (pInfo->groupId != gid) {
taosArrayPush(pList, &i);
gid = pInfo->groupId;
}
}
pTableListInfo->numOfOuputGroups = taosArrayGetSize(pList);
pTableListInfo->groupOffset = taosMemoryMalloc(sizeof(int32_t) * pTableListInfo->numOfOuputGroups);
memcpy(pTableListInfo->groupOffset, taosArrayGet(pList, 0), sizeof(int32_t) * pTableListInfo->numOfOuputGroups);
taosArrayDestroy(pList);
return TDB_CODE_SUCCESS;
}
int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* group, bool groupSort) {
int32_t code = TSDB_CODE_SUCCESS;
ASSERT(pTableListInfo->map != NULL);
bool groupByTbname = groupbyTbname(group);
size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList);
if (group == NULL || groupByTbname) {
for (int32_t i = 0; i < numOfTables; i++) {
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
info->groupId = groupByTbname? info->uid:0;
}
pTableListInfo->oneTableForEachGroup = groupByTbname;
taosHashCleanup(pTableqinfoList->map); if (groupSort && groupByTbname) {
taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn);
pTableListInfo->numOfOuputGroups = numOfTables;
} else {
pTableListInfo->numOfOuputGroups = 1;
}
} else {
code = getColInfoResultForGroupby(pHandle->meta, group, pTableListInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
pTableqinfoList->pTableList = NULL; if (groupSort) {
pTableqinfoList->map = NULL; code = sortTableGroup(pTableListInfo);
} }
\ No newline at end of file }
// add all table entry in the hash map
size_t size = taosArrayGetSize(pTableListInfo->pTableList);
for(int32_t i = 0; i < size; ++i) {
STableKeyInfo* p = taosArrayGet(pTableListInfo->pTableList, i);
taosHashPut(pTableListInfo->map, &p->uid, sizeof(uint64_t), &i, sizeof(int32_t));
}
return code;
}
int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle,
STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond,
const char* idStr) {
int64_t st = taosGetTimestampUs();
if (pHandle == NULL) {
qError("invalid handle, in creating operator tree, %s", idStr);
return TSDB_CODE_INVALID_PARA;
}
int32_t code = getTableList(pHandle->meta, pHandle->vnode, pScanNode, pTagCond, pTagIndexCond, pTableListInfo);
if (code != TSDB_CODE_SUCCESS) {
qError("failed to getTableList, code: %s", tstrerror(code));
return code;
}
ASSERT(pTableListInfo->numOfOuputGroups == 1);
int64_t st1 = taosGetTimestampUs();
qDebug("generate queried table list completed, elapsed time:%.2f ms %s", (st1 - st) / 1000.0, idStr);
if (taosArrayGetSize(pTableListInfo->pTableList) == 0) {
qDebug("no table qualified for query, %s" PRIx64, idStr);
return TSDB_CODE_SUCCESS;
}
code = buildGroupIdMapForAllTables(pTableListInfo, pHandle, pGroupTags, groupSort);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
int64_t st2 = taosGetTimestampUs();
qDebug("generate group id map completed, elapsed time:%.2f ms %s", (st2 - st1) / 1000.0, idStr);
return TSDB_CODE_SUCCESS;
}
...@@ -287,14 +287,11 @@ static SArray* filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const S ...@@ -287,14 +287,11 @@ static SArray* filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const S
int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bool isAdd) { int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bool isAdd) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
STableListInfo* pListInfo = &pTaskInfo->tableqinfoList;
if (isAdd) { if (isAdd) {
qDebug("add %d tables id into query list, %s", (int32_t)taosArrayGetSize(tableIdList), pTaskInfo->id.str); qDebug("add %d tables id into query list, %s", (int32_t)taosArrayGetSize(tableIdList), pTaskInfo->id.str);
} }
// traverse to the stream scanner node to add this table id // traverse to the stream scanner node to add this table id
SOperatorInfo* pInfo = pTaskInfo->pRoot; SOperatorInfo* pInfo = pTaskInfo->pRoot;
while (pInfo->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { while (pInfo->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
...@@ -328,7 +325,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo ...@@ -328,7 +325,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
} }
} }
STableListInfo* pTableListInfo = &pTaskInfo->tableqinfoList; STableListInfo* pTableListInfo = pTaskInfo->pTableInfoList;
for (int32_t i = 0; i < numOfQualifiedTables; ++i) { for (int32_t i = 0; i < numOfQualifiedTables; ++i) {
uint64_t* uid = taosArrayGet(qa, i); uint64_t* uid = taosArrayGet(qa, i);
...@@ -361,7 +358,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo ...@@ -361,7 +358,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
if (!exists) { if (!exists) {
#endif #endif
addTableIntoTableList(pTableListInfo, keyInfo.uid, keyInfo.groupId); tableListAddTableInfo(pTableListInfo, keyInfo.uid, keyInfo.groupId);
} }
if (keyBuf != NULL) { if (keyBuf != NULL) {
...@@ -925,8 +922,8 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT ...@@ -925,8 +922,8 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
int64_t ts = pOffset->ts; int64_t ts = pOffset->ts;
if (uid == 0) { if (uid == 0) {
if (taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList) != 0) { if (tableListGetSize(pTaskInfo->pTableInfoList) != 0) {
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, 0); STableKeyInfo* pTableInfo = tableListGetInfo(pTaskInfo->pTableInfoList, 0);
uid = pTableInfo->uid; uid = pTableInfo->uid;
ts = INT64_MIN; ts = INT64_MIN;
} else { } else {
...@@ -937,7 +934,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT ...@@ -937,7 +934,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
/*if (pTaskInfo->streamInfo.lastStatus.type != TMQ_OFFSET__SNAPSHOT_DATA ||*/ /*if (pTaskInfo->streamInfo.lastStatus.type != TMQ_OFFSET__SNAPSHOT_DATA ||*/
/*pTaskInfo->streamInfo.lastStatus.uid != uid || pTaskInfo->streamInfo.lastStatus.ts != ts) {*/ /*pTaskInfo->streamInfo.lastStatus.uid != uid || pTaskInfo->streamInfo.lastStatus.ts != ts) {*/
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
int32_t numOfTables = getTotalTables(&pTaskInfo->tableqinfoList); int32_t numOfTables = tableListGetSize(pTaskInfo->pTableInfoList);
#ifndef NDEBUG #ifndef NDEBUG
qDebug("switch to next table %" PRId64 " (cursor %d), %" PRId64 " rows returned", uid, qDebug("switch to next table %" PRId64 " (cursor %d), %" PRId64 " rows returned", uid,
...@@ -947,7 +944,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT ...@@ -947,7 +944,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
bool found = false; bool found = false;
for (int32_t i = 0; i < numOfTables; i++) { for (int32_t i = 0; i < numOfTables; i++) {
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, i); STableKeyInfo* pTableInfo = tableListGetInfo(pTaskInfo->pTableInfoList, i);
if (pTableInfo->uid == uid) { if (pTableInfo->uid == uid) {
found = true; found = true;
pTableScanInfo->currentTable = i; pTableScanInfo->currentTable = i;
...@@ -959,8 +956,8 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT ...@@ -959,8 +956,8 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
ASSERT(found); ASSERT(found);
if (pTableScanInfo->dataReader == NULL) { if (pTableScanInfo->dataReader == NULL) {
STableKeyInfo* pList = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, 0); STableKeyInfo* pList = tableListGetInfo(pTaskInfo->pTableInfoList, 0);
int32_t num = getTotalTables(&pTaskInfo->tableqinfoList); int32_t num = tableListGetSize(pTaskInfo->pTableInfoList);
if (tsdbReaderOpen(pTableScanInfo->readHandle.vnode, &pTableScanInfo->cond, pList, num, if (tsdbReaderOpen(pTableScanInfo->readHandle.vnode, &pTableScanInfo->cond, pList, num,
&pTableScanInfo->dataReader, NULL) < 0 || pTableScanInfo->dataReader == NULL) { &pTableScanInfo->dataReader, NULL) < 0 || pTableScanInfo->dataReader == NULL) {
...@@ -993,22 +990,28 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT ...@@ -993,22 +990,28 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
SMetaTableInfo mtInfo = getUidfromSnapShot(sContext); SMetaTableInfo mtInfo = getUidfromSnapShot(sContext);
tsdbReaderClose(pInfo->dataReader); tsdbReaderClose(pInfo->dataReader);
pInfo->dataReader = NULL; pInfo->dataReader = NULL;
cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond); cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
taosArrayDestroy(pTaskInfo->tableqinfoList.pTableList); tableListClear(pTaskInfo->pTableInfoList);
if (mtInfo.uid == 0) return 0; // no data
if (mtInfo.uid == 0) {
return 0; // no data
}
initQueryTableDataCondForTmq(&pTaskInfo->streamInfo.tableCond, sContext, &mtInfo); initQueryTableDataCondForTmq(&pTaskInfo->streamInfo.tableCond, sContext, &mtInfo);
pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts; pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts;
STableListInfo* pListInfo = &pTaskInfo->tableqinfoList; if (pTaskInfo->pTableInfoList == NULL) {
pTaskInfo->pTableInfoList = tableListCreate();
}
pListInfo->pTableList = taosArrayInit(1, sizeof(STableKeyInfo)); tableListAddTableInfo(pTaskInfo->pTableInfoList, mtInfo.uid, 0);
taosArrayPush(pListInfo->pTableList, &(STableKeyInfo){.uid = mtInfo.uid, .groupId = 0});
STableKeyInfo* pList = taosArrayGet(pListInfo->pTableList, 0); STableKeyInfo* pList = tableListGetInfo(pTaskInfo->pTableInfoList, 0);
int32_t size = tableListGetSize(pTaskInfo->pTableInfoList);
ASSERT(size == 1);
tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, taosArrayGetSize(pListInfo->pTableList), tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size, &pInfo->dataReader, NULL);
&pInfo->dataReader, NULL);
cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond); cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
strcpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName); strcpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName);
......
...@@ -3273,6 +3273,7 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT ...@@ -3273,6 +3273,7 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT
pTaskInfo->cost.created = taosGetTimestampMs(); pTaskInfo->cost.created = taosGetTimestampMs();
pTaskInfo->id.queryId = queryId; pTaskInfo->id.queryId = queryId;
pTaskInfo->execModel = model; pTaskInfo->execModel = model;
pTaskInfo->pTableInfoList = tableListCreate();
char* p = taosMemoryCalloc(1, 128); char* p = taosMemoryCalloc(1, 128);
snprintf(p, 128, "TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, queryId); snprintf(p, 128, "TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, queryId);
...@@ -3364,117 +3365,6 @@ static void cleanupTableSchemaInfo(SSchemaInfo* pSchemaInfo) { ...@@ -3364,117 +3365,6 @@ static void cleanupTableSchemaInfo(SSchemaInfo* pSchemaInfo) {
static void cleanupStreamInfo(SStreamTaskInfo* pStreamInfo) { tDeleteSSchemaWrapper(pStreamInfo->schema); } static void cleanupStreamInfo(SStreamTaskInfo* pStreamInfo) { tDeleteSSchemaWrapper(pStreamInfo->schema); }
static int32_t orderbyGroupIdComparFn(const void* p1, const void* p2) {
STableKeyInfo* pInfo1 = (STableKeyInfo*) p1;
STableKeyInfo* pInfo2 = (STableKeyInfo*) p2;
if (pInfo1->groupId == pInfo2->groupId) {
return 0;
} else {
return pInfo1->groupId < pInfo2->groupId? -1:1;
}
}
static int32_t sortTableGroup(STableListInfo* pTableListInfo) {
int32_t code = TSDB_CODE_SUCCESS;
taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn);
int32_t size = taosArrayGetSize(pTableListInfo->pTableList);
SArray* pList = taosArrayInit(4, sizeof(int32_t));
STableKeyInfo* pInfo = taosArrayGet(pTableListInfo->pTableList, 0);
uint64_t gid = pInfo->groupId;
int32_t start = 0;
taosArrayPush(pList, &start);
for(int32_t i = 1; i < size; ++i) {
pInfo = taosArrayGet(pTableListInfo->pTableList, i);
if (pInfo->groupId != gid) {
taosArrayPush(pList, &i);
gid = pInfo->groupId;
}
}
pTableListInfo->numOfOuputGroups = taosArrayGetSize(pList);
pTableListInfo->groupOffset = taosMemoryMalloc(sizeof(int32_t) * pTableListInfo->numOfOuputGroups);
memcpy(pTableListInfo->groupOffset, taosArrayGet(pList, 0), sizeof(int32_t) * pTableListInfo->numOfOuputGroups);
taosArrayDestroy(pList);
# if 0
SArray* sortSupport = taosArrayInit(16, sizeof(uint64_t));
if (sortSupport == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
size_t num = taosArrayGetSize(pTableListInfo->pTableList);
for (int32_t i = 0; i < num; i++) {
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
uint64_t* groupId = taosHashGet(pTableListInfo->map, &info->uid, sizeof(uint64_t));
int32_t index = taosArraySearchIdx(sortSupport, groupId, compareUint64Val, TD_EQ);
if (index == -1) {
void* p = taosArraySearch(sortSupport, groupId, compareUint64Val, TD_GT);
SArray* tGroup = taosArrayInit(8, sizeof(STableKeyInfo));
if (tGroup == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
if (taosArrayPush(tGroup, info) == NULL) {
qError("taos push info array error");
code = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
if (p == NULL) {
if (taosArrayPush(sortSupport, groupId) == NULL) {
qError("taos push support array error");
code = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
if (taosArrayPush(pTableListInfo->pGroupList, &tGroup) == NULL) {
qError("taos push group array error");
code = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
} else {
int32_t pos = TARRAY_ELEM_IDX(sortSupport, p);
if (taosArrayInsert(sortSupport, pos, groupId) == NULL) {
qError("taos insert support array error");
code = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
if (taosArrayInsert(pTableListInfo->pGroupList, pos, &tGroup) == NULL) {
qError("taos insert group array error");
code = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
}
} else {
SArray* tGroup = (SArray*)taosArrayGetP(pTableListInfo->pGroupList, index);
if (taosArrayPush(tGroup, info) == NULL) {
qError("taos push uid array error");
code = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
}
}
taosArrayDestroy(sortSupport);
#endif
return TDB_CODE_SUCCESS;
_error:
// taosArrayDestroy(sortSupport);
return code;
}
bool groupbyTbname(SNodeList* pGroupList) { bool groupbyTbname(SNodeList* pGroupList) {
bool bytbname = false; bool bytbname = false;
if (LIST_LENGTH(pGroupList) == 1) { if (LIST_LENGTH(pGroupList) == 1) {
...@@ -3488,80 +3378,11 @@ bool groupbyTbname(SNodeList* pGroupList) { ...@@ -3488,80 +3378,11 @@ bool groupbyTbname(SNodeList* pGroupList) {
return bytbname; return bytbname;
} }
int32_t setGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* group, bool groupSort) { SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SNode* pTagCond,
int32_t code = TSDB_CODE_SUCCESS; SNode* pTagIndexCond, const char* pUser) {
pTableListInfo->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
if (pTableListInfo->map == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
return code;
}
bool groupByTbname = groupbyTbname(group);
size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList);
if (group == NULL || groupByTbname) {
for (int32_t i = 0; i < numOfTables; i++) {
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
info->groupId = groupByTbname? info->uid:0;
}
pTableListInfo->oneTableForEachGroup = groupByTbname;
if (groupSort && groupByTbname) {
taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn);
pTableListInfo->numOfOuputGroups = numOfTables;
} else {
pTableListInfo->numOfOuputGroups = 1;
}
} else {
code = getColInfoResultForGroupby(pHandle->meta, group, pTableListInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (groupSort) {
code = sortTableGroup(pTableListInfo);
}
}
// add all table entry in the hash map
size_t size = taosArrayGetSize(pTableListInfo->pTableList);
for(int32_t i = 0; i < size; ++i) {
STableKeyInfo* p = taosArrayGet(pTableListInfo->pTableList, i);
taosHashPut(pTableListInfo->map, &p->uid, sizeof(uint64_t), &i, sizeof(int32_t));
}
return code;
}
static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pCond) {
memset(pCond, 0, sizeof(SQueryTableDataCond));
pCond->order = TSDB_ORDER_ASC;
pCond->numOfCols = 1;
pCond->colList = taosMemoryCalloc(1, sizeof(SColumnInfo));
if (pCond->colList == NULL) {
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return terrno;
}
pCond->colList->colId = 1;
pCond->colList->type = TSDB_DATA_TYPE_TIMESTAMP;
pCond->colList->bytes = sizeof(TSKEY);
pCond->twindows = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
pCond->suid = uid;
pCond->type = TIMEWINDOW_RANGE_CONTAINED;
pCond->startVersion = -1;
pCond->endVersion = -1;
return TSDB_CODE_SUCCESS;
}
SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle,
STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond,
const char* pUser) {
int32_t type = nodeType(pPhyNode); int32_t type = nodeType(pPhyNode);
STableListInfo* pTableListInfo = pTaskInfo->pTableInfoList;
const char* idstr = GET_TASKID(pTaskInfo);
if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) { if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) {
SOperatorInfo* pOperator = NULL; SOperatorInfo* pOperator = NULL;
...@@ -3576,10 +3397,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -3576,10 +3397,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
int32_t code = int32_t code =
createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, pHandle, createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, pHandle,
pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo)); pTableListInfo, pTagCond, pTagIndexCond, idstr);
if (code) { if (code) {
pTaskInfo->code = code; pTaskInfo->code = code;
qError("failed to createScanTableListInfo, code:%s, %s", tstrerror(code), GET_TASKID(pTaskInfo)); qError("failed to createScanTableListInfo, code:%s, %s", tstrerror(code), idstr);
return NULL; return NULL;
} }
...@@ -3596,7 +3417,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -3596,7 +3417,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode; STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode;
int32_t code = int32_t code =
createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, /*pTableScanNode->groupSort*/true, pHandle, createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, /*pTableScanNode->groupSort*/true, pHandle,
pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo)); pTableListInfo, pTagCond, pTagIndexCond, idstr);
if (code) { if (code) {
pTaskInfo->code = code; pTaskInfo->code = code;
qError("failed to createScanTableListInfo, code: %s", tstrerror(code)); qError("failed to createScanTableListInfo, code: %s", tstrerror(code));
...@@ -3621,7 +3442,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -3621,7 +3442,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
if (pHandle->vnode) { if (pHandle->vnode) {
int32_t code = int32_t code =
createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort,
pHandle, pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo)); pHandle, pTableListInfo, pTagCond, pTagIndexCond, idstr);
if (code) { if (code) {
pTaskInfo->code = code; pTaskInfo->code = code;
qError("failed to createScanTableListInfo, code: %s", tstrerror(code)); qError("failed to createScanTableListInfo, code: %s", tstrerror(code));
...@@ -3629,11 +3450,11 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -3629,11 +3450,11 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
} }
#ifndef NDEBUG #ifndef NDEBUG
int32_t sz = taosArrayGetSize(pTableListInfo->pTableList); int32_t sz = tableListGetSize(pTableListInfo);
qDebug("create stream task, total:%d", sz); qDebug("create stream task, total:%d", sz);
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
STableKeyInfo* pKeyInfo = taosArrayGet(pTableListInfo->pTableList, i); STableKeyInfo* pKeyInfo = tableListGetInfo(pTableListInfo, i);
qDebug("add table uid:%" PRIu64", gid:%"PRIu64, pKeyInfo->uid, pKeyInfo->groupId); qDebug("add table uid:%" PRIu64", gid:%"PRIu64, pKeyInfo->uid, pKeyInfo->groupId);
} }
#endif #endif
...@@ -3646,7 +3467,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -3646,7 +3467,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pOperator = createSysTableScanOperatorInfo(pHandle, pSysScanPhyNode, pUser, pTaskInfo); pOperator = createSysTableScanOperatorInfo(pHandle, pSysScanPhyNode, pUser, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) {
STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode; STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode;
int32_t code = getTableList(pHandle->meta, pHandle->vnode, pScanPhyNode, pTagCond, pTagIndexCond, pTableListInfo);
int32_t code = createScanTableListInfo(pScanPhyNode, NULL, false, pHandle, pTableListInfo, pTagCond,
pTagIndexCond, idstr);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code; pTaskInfo->code = code;
qError("failed to getTableList, code: %s", tstrerror(code)); qError("failed to getTableList, code: %s", tstrerror(code));
...@@ -3656,8 +3479,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -3656,8 +3479,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pOperator = createTagScanOperatorInfo(pHandle, pScanPhyNode, pTableListInfo, pTaskInfo); pOperator = createTagScanOperatorInfo(pHandle, pScanPhyNode, pTableListInfo, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) {
SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode; SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode;
pTableListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo));
pTableListInfo->numOfOuputGroups = 1;
if (pBlockNode->tableType == TSDB_SUPER_TABLE) { if (pBlockNode->tableType == TSDB_SUPER_TABLE) {
SArray* pList = taosArrayInit(4, sizeof(STableKeyInfo)); SArray* pList = taosArrayInit(4, sizeof(STableKeyInfo));
...@@ -3667,38 +3488,21 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -3667,38 +3488,21 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return NULL; return NULL;
} }
for(int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); ++i) { for(int32_t i = 0; i < tableListGetSize(pTableListInfo); ++i) {
STableKeyInfo* p = taosArrayGet(pList, i); STableKeyInfo* p = taosArrayGet(pList, i);
addTableIntoTableList(pTableListInfo, p->uid, 0); tableListAddTableInfo(pTableListInfo, p->uid, 0);
} }
taosArrayDestroy(pList); taosArrayDestroy(pList);
} else { // Create one table group. } else { // Create group with only one table
addTableIntoTableList(pTableListInfo, pBlockNode->uid, 0); tableListAddTableInfo(pTableListInfo, pBlockNode->uid, 0);
} }
SQueryTableDataCond cond = {0}; pOperator = createDataBlockInfoScanOperator(pHandle, pBlockNode, pTaskInfo);
int32_t code = initTableblockDistQueryCond(pBlockNode->suid, &cond);
if (code != TSDB_CODE_SUCCESS) {
return NULL;
}
size_t num = getTotalTables(pTableListInfo);
void* pList = NULL;
if (num > 0) {
pList = taosArrayGet(pTableListInfo->pTableList, 0);
}
STsdbReader* pReader = NULL;
tsdbReaderOpen(pHandle->vnode, &cond, pList, num, &pReader, "");
cleanupQueryTableDataCond(&cond);
pOperator = createDataBlockInfoScanOperator(pReader, pHandle, cond.suid, pBlockNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN == type) {
SLastRowScanPhysiNode* pScanNode = (SLastRowScanPhysiNode*)pPhyNode; SLastRowScanPhysiNode* pScanNode = (SLastRowScanPhysiNode*)pPhyNode;
int32_t code = createScanTableListInfo(&pScanNode->scan, pScanNode->pGroupTags, true, pHandle, pTableListInfo, int32_t code = createScanTableListInfo(&pScanNode->scan, pScanNode->pGroupTags, true, pHandle, pTableListInfo,
pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo)); pTagCond, pTagIndexCond, idstr);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code; pTaskInfo->code = code;
return NULL; return NULL;
...@@ -3725,17 +3529,18 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -3725,17 +3529,18 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
} }
size_t size = LIST_LENGTH(pPhyNode->pChildren); size_t size = LIST_LENGTH(pPhyNode->pChildren);
SOperatorInfo** ops = taosMemoryCalloc(size, POINTER_BYTES); SOperatorInfo** ops = taosMemoryCalloc(size, POINTER_BYTES);
if (ops == NULL) {
return NULL;
}
for (int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i); SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i);
ops[i] = createOperatorTree(pChildNode, pTaskInfo, pHandle, pTableListInfo, pTagCond, pTagIndexCond, pUser); ops[i] = createOperatorTree(pChildNode, pTaskInfo, pHandle, pTagCond, pTagIndexCond, pUser);
if (ops[i] == NULL) { if (ops[i] == NULL) {
taosMemoryFree(ops); taosMemoryFree(ops);
return NULL; return NULL;
} }
ops[i]->resultDataBlockId = pChildNode->pOutputDataBlockDesc->dataBlockId;
} }
SOperatorInfo* pOptr = NULL; SOperatorInfo* pOptr = NULL;
...@@ -4000,15 +3805,18 @@ int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pT ...@@ -4000,15 +3805,18 @@ int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pT
if (NULL == pDeleterParam) { if (NULL == pDeleterParam) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
int32_t tbNum = taosArrayGetSize(pTask->tableqinfoList.pTableList); int32_t tbNum = tableListGetSize(pTask->pTableInfoList);
pDeleterParam->suid = pTask->tableqinfoList.suid; pDeleterParam->suid = tableListGetSuid(pTask->pTableInfoList);
// TODO extract uid list
pDeleterParam->pUidList = taosArrayInit(tbNum, sizeof(uint64_t)); pDeleterParam->pUidList = taosArrayInit(tbNum, sizeof(uint64_t));
if (NULL == pDeleterParam->pUidList) { if (NULL == pDeleterParam->pUidList) {
taosMemoryFree(pDeleterParam); taosMemoryFree(pDeleterParam);
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
for (int32_t i = 0; i < tbNum; ++i) { for (int32_t i = 0; i < tbNum; ++i) {
STableKeyInfo* pTable = taosArrayGet(pTask->tableqinfoList.pTableList, i); STableKeyInfo* pTable = tableListGetInfo(pTask->pTableInfoList, i);
taosArrayPush(pDeleterParam->pUidList, &pTable->uid); taosArrayPush(pDeleterParam->pUidList, &pTable->uid);
} }
...@@ -4044,8 +3852,8 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead ...@@ -4044,8 +3852,8 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
sql = NULL; sql = NULL;
(*pTaskInfo)->pSubplan = pPlan; (*pTaskInfo)->pSubplan = pPlan;
(*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, &(*pTaskInfo)->tableqinfoList, (*pTaskInfo)->pRoot =
pPlan->pTagCond, pPlan->pTagIndexCond, pPlan->user); createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, pPlan->pTagCond, pPlan->pTagIndexCond, pPlan->user);
if (NULL == (*pTaskInfo)->pRoot) { if (NULL == (*pTaskInfo)->pRoot) {
code = (*pTaskInfo)->code; code = (*pTaskInfo)->code;
...@@ -4064,7 +3872,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead ...@@ -4064,7 +3872,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
void doDestroyTask(SExecTaskInfo* pTaskInfo) { void doDestroyTask(SExecTaskInfo* pTaskInfo) {
qDebug("%s execTask is freed", GET_TASKID(pTaskInfo)); qDebug("%s execTask is freed", GET_TASKID(pTaskInfo));
destroyTableList(&pTaskInfo->tableqinfoList); pTaskInfo->pTableInfoList = tableListDestroy(pTaskInfo->pTableInfoList);
destroyOperatorInfo(pTaskInfo->pRoot); destroyOperatorInfo(pTaskInfo->pRoot);
cleanupTableSchemaInfo(&pTaskInfo->schemaInfo); cleanupTableSchemaInfo(&pTaskInfo->schemaInfo);
cleanupStreamInfo(&pTaskInfo->streamInfo); cleanupStreamInfo(&pTaskInfo->streamInfo);
......
...@@ -623,7 +623,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { ...@@ -623,7 +623,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
pBlock->info = binfo; pBlock->info = binfo;
ASSERT(binfo.uid != 0); ASSERT(binfo.uid != 0);
pBlock->info.groupId = getTableGroupId(&pTaskInfo->tableqinfoList, pBlock->info.uid); pBlock->info.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.uid);
uint32_t status = 0; uint32_t status = 0;
int32_t code = loadDataBlock(pOperator, pTableScanInfo, pBlock, &status); int32_t code = loadDataBlock(pOperator, pTableScanInfo, pBlock, &status);
...@@ -719,7 +719,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { ...@@ -719,7 +719,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
// scan table one by one sequentially // scan table one by one sequentially
if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) { if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) {
int32_t numOfTables = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList); int32_t numOfTables = tableListGetSize(pTaskInfo->pTableInfoList);
while (1) { while (1) {
SSDataBlock* result = doTableScanGroup(pOperator); SSDataBlock* result = doTableScanGroup(pOperator);
...@@ -733,7 +733,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { ...@@ -733,7 +733,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
return NULL; return NULL;
} }
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, pInfo->currentTable); STableKeyInfo* pTableInfo = tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->currentTable);
tsdbSetTableList(pInfo->dataReader, pTableInfo, 1); tsdbSetTableList(pInfo->dataReader, pTableInfo, 1);
qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d %s", pTableInfo->uid, numOfTables, qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d %s", pTableInfo->uid, numOfTables,
pInfo->currentTable, pTaskInfo->id.str); pInfo->currentTable, pTaskInfo->id.str);
...@@ -743,14 +743,14 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { ...@@ -743,14 +743,14 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
} }
} else { // scan table group by group sequentially } else { // scan table group by group sequentially
if (pInfo->currentGroupId == -1) { if (pInfo->currentGroupId == -1) {
if ((++pInfo->currentGroupId) >= getNumOfOutputGroups(&pTaskInfo->tableqinfoList)) { if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pTaskInfo->pTableInfoList)) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
return NULL; return NULL;
} }
int32_t num = 0; int32_t num = 0;
STableKeyInfo* pList = NULL; STableKeyInfo* pList = NULL;
getTablesOfGroup(&pTaskInfo->tableqinfoList, pInfo->currentGroupId, &pList, &num); tableListGetGroupList(pTaskInfo->pTableInfoList, pInfo->currentGroupId, &pList, &num);
ASSERT(pInfo->dataReader == NULL); ASSERT(pInfo->dataReader == NULL);
int32_t code = tsdbReaderOpen(pInfo->readHandle.vnode, &pInfo->cond, pList, num, (STsdbReader**)&pInfo->dataReader, int32_t code = tsdbReaderOpen(pInfo->readHandle.vnode, &pInfo->cond, pList, num, (STsdbReader**)&pInfo->dataReader,
...@@ -766,7 +766,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { ...@@ -766,7 +766,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
return result; return result;
} }
if ((++pInfo->currentGroupId) >= getNumOfOutputGroups(&pTaskInfo->tableqinfoList)) { if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pTaskInfo->pTableInfoList)) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
return NULL; return NULL;
} }
...@@ -778,7 +778,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { ...@@ -778,7 +778,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
int32_t num = 0; int32_t num = 0;
STableKeyInfo* pList = NULL; STableKeyInfo* pList = NULL;
getTablesOfGroup(&pTaskInfo->tableqinfoList, pInfo->currentGroupId, &pList, &num); tableListGetGroupList(pTaskInfo->pTableInfoList, pInfo->currentGroupId, &pList, &num);
tsdbSetTableList(pInfo->dataReader, pList, num); tsdbSetTableList(pInfo->dataReader, pList, num);
tsdbReaderReset(pInfo->dataReader, &pInfo->cond); tsdbReaderReset(pInfo->dataReader, &pInfo->cond);
...@@ -1000,8 +1000,31 @@ static void destroyBlockDistScanOperatorInfo(void* param) { ...@@ -1000,8 +1000,31 @@ static void destroyBlockDistScanOperatorInfo(void* param) {
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
} }
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* readHandle, uint64_t uid, static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pCond) {
SBlockDistScanPhysiNode* pBlockScanNode, SExecTaskInfo* pTaskInfo) { memset(pCond, 0, sizeof(SQueryTableDataCond));
pCond->order = TSDB_ORDER_ASC;
pCond->numOfCols = 1;
pCond->colList = taosMemoryCalloc(1, sizeof(SColumnInfo));
if (pCond->colList == NULL) {
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return terrno;
}
pCond->colList->colId = 1;
pCond->colList->type = TSDB_DATA_TYPE_TIMESTAMP;
pCond->colList->bytes = sizeof(TSKEY);
pCond->twindows = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
pCond->suid = uid;
pCond->type = TIMEWINDOW_RANGE_CONTAINED;
pCond->startVersion = -1;
pCond->endVersion = -1;
return TSDB_CODE_SUCCESS;
}
SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanPhysiNode* pBlockScanNode, SExecTaskInfo* pTaskInfo) {
SBlockDistInfo* pInfo = taosMemoryCalloc(1, sizeof(SBlockDistInfo)); SBlockDistInfo* pInfo = taosMemoryCalloc(1, sizeof(SBlockDistInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
...@@ -1009,9 +1032,24 @@ SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* re ...@@ -1009,9 +1032,24 @@ SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* re
goto _error; goto _error;
} }
pInfo->pHandle = dataReader; {
SQueryTableDataCond cond = {0};
int32_t code = initTableblockDistQueryCond(pBlockScanNode->suid, &cond);
if (code != TSDB_CODE_SUCCESS) {
return NULL;
}
STableListInfo* pTableListInfo = pTaskInfo->pTableInfoList;
size_t num = tableListGetSize(pTableListInfo);
void* pList = tableListGetInfo(pTableListInfo, 0);
tsdbReaderOpen(readHandle->vnode, &cond, pList, num, &pInfo->pHandle, pTaskInfo->id.str);
cleanupQueryTableDataCond(&cond);
}
pInfo->readHandle = *readHandle; pInfo->readHandle = *readHandle;
pInfo->uid = uid; pInfo->uid = pBlockScanNode->suid;
pInfo->pResBlock = createResDataBlock(pBlockScanNode->node.pOutputDataBlockDesc); pInfo->pResBlock = createResDataBlock(pBlockScanNode->node.pOutputDataBlockDesc);
int32_t numOfCols = 0; int32_t numOfCols = 0;
...@@ -1119,7 +1157,7 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU ...@@ -1119,7 +1157,7 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU
relocateColumnData(pBlock, pTableScanInfo->matchInfo.pList, pCols, true); relocateColumnData(pBlock, pTableScanInfo->matchInfo.pList, pCols, true);
doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo); doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo);
pBlock->info.groupId = getTableGroupId(&pTaskInfo->tableqinfoList, binfo.uid); pBlock->info.groupId = getTableGroupId(pTaskInfo->pTableInfoList, binfo.uid);
} }
tsdbReaderClose(pReader); tsdbReaderClose(pReader);
...@@ -1139,8 +1177,8 @@ static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, ...@@ -1139,8 +1177,8 @@ static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts,
} }
static uint64_t getGroupIdByUid(SStreamScanInfo* pInfo, uint64_t uid) { static uint64_t getGroupIdByUid(SStreamScanInfo* pInfo, uint64_t uid) {
return getTableGroupId(&pInfo->pTableScanOp->pTaskInfo->tableqinfoList, uid); return getTableGroupId(pInfo->pTableScanOp->pTaskInfo->pTableInfoList, uid);
// SHashObj* map = pInfo->pTableScanOp->pTaskInfo->tableqinfoList.map; // SHashObj* map = pInfo->pTableScanOp->pTaskInfo->pTableInfoList.map;
// uint64_t* groupId = taosHashGet(map, &uid, sizeof(int64_t)); // uint64_t* groupId = taosHashGet(map, &uid, sizeof(int64_t));
// if (groupId) { // if (groupId) {
// return *groupId; // return *groupId;
...@@ -1564,7 +1602,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock ...@@ -1564,7 +1602,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
pInfo->pRes->info.type = STREAM_NORMAL; pInfo->pRes->info.type = STREAM_NORMAL;
pInfo->pRes->info.version = pBlock->info.version; pInfo->pRes->info.version = pBlock->info.version;
pInfo->pRes->info.groupId = getTableGroupId(&pTaskInfo->tableqinfoList, pBlock->info.uid); pInfo->pRes->info.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.uid);
// todo extract method // todo extract method
for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) {
...@@ -2076,12 +2114,13 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { ...@@ -2076,12 +2114,13 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
} }
} }
static SArray* extractTableIdList(const STableListInfo* pTableGroupInfo) { static SArray* extractTableIdList(const STableListInfo* pTableListInfo) {
SArray* tableIdList = taosArrayInit(4, sizeof(uint64_t)); SArray* tableIdList = taosArrayInit(4, sizeof(uint64_t));
// Transfer the Array of STableKeyInfo into uid list. // Transfer the Array of STableKeyInfo into uid list.
for (int32_t i = 0; i < taosArrayGetSize(pTableGroupInfo->pTableList); ++i) { size_t size = tableListGetSize(pTableListInfo);
STableKeyInfo* pkeyInfo = taosArrayGet(pTableGroupInfo->pTableList, i); for (int32_t i = 0; i < size; ++i) {
STableKeyInfo* pkeyInfo = tableListGetInfo(pTableListInfo, i);
taosArrayPush(tableIdList, &pkeyInfo->uid); taosArrayPush(tableIdList, &pkeyInfo->uid);
} }
...@@ -2350,7 +2389,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys ...@@ -2350,7 +2389,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
STableKeyInfo* pList = NULL; STableKeyInfo* pList = NULL;
int32_t num = 0; int32_t num = 0;
getTablesOfGroup(&pTaskInfo->tableqinfoList, 0, &pList, &num); tableListGetGroupList(pTaskInfo->pTableInfoList, 0, &pList, &num);
if (pHandle->initTableReader) { if (pHandle->initTableReader) {
pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER; pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER;
...@@ -2383,7 +2422,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys ...@@ -2383,7 +2422,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
// set the extract column id to streamHandle // set the extract column id to streamHandle
tqReaderSetColIdList(pInfo->tqReader, pColIds); tqReaderSetColIdList(pInfo->tqReader, pColIds);
SArray* tableIdList = extractTableIdList(&pTaskInfo->tableqinfoList); SArray* tableIdList = extractTableIdList(pTaskInfo->pTableInfoList);
code = tqReaderSetTbUidList(pInfo->tqReader, tableIdList); code = tqReaderSetTbUidList(pInfo->tqReader, tableIdList);
if (code != 0) { if (code != 0) {
taosArrayDestroy(tableIdList); taosArrayDestroy(tableIdList);
...@@ -4082,7 +4121,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { ...@@ -4082,7 +4121,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
SSDataBlock* pRes = pInfo->pRes; SSDataBlock* pRes = pInfo->pRes;
blockDataCleanup(pRes); blockDataCleanup(pRes);
int32_t size = taosArrayGetSize(pInfo->pTableList->pTableList); int32_t size = tableListGetSize(pTaskInfo->pTableInfoList);
if (size == 0) { if (size == 0) {
setTaskStatus(pTaskInfo, TASK_COMPLETED); setTaskStatus(pTaskInfo, TASK_COMPLETED);
return NULL; return NULL;
...@@ -4094,7 +4133,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { ...@@ -4094,7 +4133,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
metaReaderInit(&mr, pInfo->readHandle.meta, 0); metaReaderInit(&mr, pInfo->readHandle.meta, 0);
while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) { while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) {
STableKeyInfo* item = taosArrayGet(pInfo->pTableList->pTableList, pInfo->curPos); STableKeyInfo* item = tableListGetInfo(pInfo->pTableList, pInfo->curPos);
int32_t code = metaGetTableEntryByUid(&mr, item->uid); int32_t code = metaGetTableEntryByUid(&mr, item->uid);
tDecoderClear(&mr.coder); tDecoderClear(&mr.coder);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -4209,47 +4248,10 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi ...@@ -4209,47 +4248,10 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
return NULL; return NULL;
} }
int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle,
STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond,
const char* idStr) {
int64_t st = taosGetTimestampUs();
if (pHandle == NULL) {
qError("invalid handle, in creating operator tree, %s", idStr);
return TSDB_CODE_INVALID_PARA;
}
int32_t code = getTableList(pHandle->meta, pHandle->vnode, pScanNode, pTagCond, pTagIndexCond, pTableListInfo);
if (code != TSDB_CODE_SUCCESS) {
qError("failed to getTableList, code: %s", tstrerror(code));
return code;
}
pTableListInfo->numOfOuputGroups = 1;
int64_t st1 = taosGetTimestampUs();
qDebug("generate queried table list completed, elapsed time:%.2f ms %s", (st1 - st) / 1000.0, idStr);
if (taosArrayGetSize(pTableListInfo->pTableList) == 0) {
qDebug("no table qualified for query, %s" PRIx64, idStr);
return TSDB_CODE_SUCCESS;
}
code = setGroupIdMapForAllTables(pTableListInfo, pHandle, pGroupTags, groupSort);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
int64_t st2 = taosGetTimestampUs();
qDebug("generate group id map completed, elapsed time:%.2f ms %s", (st2 - st1) / 1000.0, idStr);
return TSDB_CODE_SUCCESS;
}
int32_t createMultipleDataReaders(SQueryTableDataCond* pQueryCond, SReadHandle* pHandle, STableListInfo* pTableListInfo, int32_t createMultipleDataReaders(SQueryTableDataCond* pQueryCond, SReadHandle* pHandle, STableListInfo* pTableListInfo,
int32_t tableStartIdx, int32_t tableEndIdx, SArray* arrayReader, const char* idstr) { int32_t tableStartIdx, int32_t tableEndIdx, SArray* arrayReader, const char* idstr) {
for (int32_t i = tableStartIdx; i <= tableEndIdx; ++i) { for (int32_t i = tableStartIdx; i <= tableEndIdx; ++i) {
STableKeyInfo* pList = taosArrayGet(pTableListInfo->pTableList, i); STableKeyInfo* pList = tableListGetInfo(pTableListInfo, i);
STsdbReader* pReader = NULL; STsdbReader* pReader = NULL;
tsdbReaderOpen(pHandle->vnode, pQueryCond, pList, 1, &pReader, idstr); tsdbReaderOpen(pHandle->vnode, pQueryCond, pList, 1, &pReader, idstr);
taosArrayPush(arrayReader, &pReader); taosArrayPush(arrayReader, &pReader);
...@@ -4262,7 +4264,7 @@ int32_t createMultipleDataReaders2(SQueryTableDataCond* pQueryCond, SReadHandle* ...@@ -4262,7 +4264,7 @@ int32_t createMultipleDataReaders2(SQueryTableDataCond* pQueryCond, SReadHandle*
STableListInfo* pTableListInfo, int32_t tableStartIdx, int32_t tableEndIdx, STableListInfo* pTableListInfo, int32_t tableStartIdx, int32_t tableEndIdx,
STsdbReader** ppReader, const char* idstr) { STsdbReader** ppReader, const char* idstr) {
STsdbReader* pReader = NULL; STsdbReader* pReader = NULL;
void* pStart = taosArrayGet(pTableListInfo->pTableList, tableStartIdx); void* pStart = tableListGetInfo(pTableListInfo, tableStartIdx);
int32_t num = tableEndIdx - tableStartIdx + 1; int32_t num = tableEndIdx - tableStartIdx + 1;
int32_t code = tsdbReaderOpen(pHandle->vnode, pQueryCond, pStart, num, &pReader, idstr); int32_t code = tsdbReaderOpen(pHandle->vnode, pQueryCond, pStart, num, &pReader, idstr);
...@@ -4522,7 +4524,7 @@ static SSDataBlock* getTableDataBlockTemp(void* param) { ...@@ -4522,7 +4524,7 @@ static SSDataBlock* getTableDataBlockTemp(void* param) {
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
void* p =taosArrayGet(pInfo->tableListInfo->pTableList, readIdx + pInfo->tableStartIndex); void* p = tableListGetInfo(pInfo->tableListInfo, readIdx + pInfo->tableStartIndex);
SReadHandle* pHandle = &pInfo->readHandle; SReadHandle* pHandle = &pInfo->readHandle;
tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, &pInfo->pReader, GET_TASKID(pTaskInfo)); tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, &pInfo->pReader, GET_TASKID(pTaskInfo));
...@@ -4565,7 +4567,7 @@ static SSDataBlock* getTableDataBlockTemp(void* param) { ...@@ -4565,7 +4567,7 @@ static SSDataBlock* getTableDataBlockTemp(void* param) {
continue; continue;
} }
pBlock->info.groupId = getTableGroupId(&pOperator->pTaskInfo->tableqinfoList, pBlock->info.uid); pBlock->info.groupId = getTableGroupId(pOperator->pTaskInfo->pTableInfoList, pBlock->info.uid);
pOperator->resultInfo.totalRows += pBlock->info.rows; // pTableScanInfo->readRecorder.totalRows; pOperator->resultInfo.totalRows += pBlock->info.rows; // pTableScanInfo->readRecorder.totalRows;
pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
...@@ -4622,7 +4624,7 @@ static SSDataBlock* getTableDataBlock2(void* param) { ...@@ -4622,7 +4624,7 @@ static SSDataBlock* getTableDataBlock2(void* param) {
continue; continue;
} }
pBlock->info.groupId = getTableGroupId(&pOperator->pTaskInfo->tableqinfoList, pBlock->info.uid); pBlock->info.groupId = getTableGroupId(pOperator->pTaskInfo->pTableInfoList, pBlock->info.uid);
pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows; pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows;
pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
...@@ -4676,7 +4678,7 @@ static SSDataBlock* getTableDataBlock(void* param) { ...@@ -4676,7 +4678,7 @@ static SSDataBlock* getTableDataBlock(void* param) {
continue; continue;
} }
pBlock->info.groupId = getTableGroupId(&pOperator->pTaskInfo->tableqinfoList, pBlock->info.uid); pBlock->info.groupId = getTableGroupId(pOperator->pTaskInfo->pTableInfoList, pBlock->info.uid);
pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows; pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows;
pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
...@@ -4719,10 +4721,10 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { ...@@ -4719,10 +4721,10 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
{ {
size_t tableListSize = taosArrayGetSize(pInfo->tableListInfo->pTableList); size_t numOfTables = tableListGetSize(pInfo->tableListInfo);
int32_t i = pInfo->tableStartIndex + 1; int32_t i = pInfo->tableStartIndex + 1;
for (; i < tableListSize; ++i) { for (; i < numOfTables; ++i) {
STableKeyInfo* tableKeyInfo = taosArrayGet(pInfo->tableListInfo->pTableList, i); STableKeyInfo* tableKeyInfo = tableListGetInfo(pInfo->tableListInfo, i);
if (tableKeyInfo->groupId != pInfo->groupId) { if (tableKeyInfo->groupId != pInfo->groupId) {
break; break;
} }
...@@ -4847,7 +4849,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { ...@@ -4847,7 +4849,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
} }
size_t tableListSize = taosArrayGetSize(pInfo->tableListInfo->pTableList); size_t tableListSize = tableListGetSize(pInfo->tableListInfo);
if (!pInfo->hasGroupId) { if (!pInfo->hasGroupId) {
pInfo->hasGroupId = true; pInfo->hasGroupId = true;
...@@ -4856,7 +4858,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { ...@@ -4856,7 +4858,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
return NULL; return NULL;
} }
pInfo->tableStartIndex = 0; pInfo->tableStartIndex = 0;
pInfo->groupId = ((STableKeyInfo*)taosArrayGet(pInfo->tableListInfo->pTableList, pInfo->tableStartIndex))->groupId; pInfo->groupId = ((STableKeyInfo*)tableListGetInfo(pInfo->tableListInfo, pInfo->tableStartIndex))->groupId;
startGroupTableMergeScan(pOperator); startGroupTableMergeScan(pOperator);
} }
...@@ -4875,8 +4877,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { ...@@ -4875,8 +4877,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
break; break;
} }
pInfo->tableStartIndex = pInfo->tableEndIndex + 1; pInfo->tableStartIndex = pInfo->tableEndIndex + 1;
pInfo->groupId = pInfo->groupId = tableListGetInfo(pInfo->tableListInfo, pInfo->tableStartIndex)->groupId;
((STableKeyInfo*)taosArrayGet(pInfo->tableListInfo->pTableList, pInfo->tableStartIndex))->groupId;
startGroupTableMergeScan(pOperator); startGroupTableMergeScan(pOperator);
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册