提交 5a971dd0 编写于 作者: H Haojun Liao

fix(query): handle partition by in table scan operator with limit/offset value.

上级 376e7ea5
...@@ -163,14 +163,6 @@ typedef enum EStreamType { ...@@ -163,14 +163,6 @@ typedef enum EStreamType {
STREAM_PULL_OVER, STREAM_PULL_OVER,
} EStreamType; } EStreamType;
typedef struct {
SArray* pGroupList;
SArray* pTableList;
SHashObj* map; // speedup acquire the tableQueryInfo by table uid
bool needSortTableByGroupId;
uint64_t suid;
} STableListInfo;
#pragma pack(push, 1) #pragma pack(push, 1)
typedef struct SColumnDataAgg { typedef struct SColumnDataAgg {
int16_t colId; int16_t colId;
......
...@@ -152,9 +152,9 @@ typedef struct STsdbReader STsdbReader; ...@@ -152,9 +152,9 @@ typedef struct STsdbReader STsdbReader;
#define CACHESCAN_RETRIEVE_LAST_ROW 0x4 #define CACHESCAN_RETRIEVE_LAST_ROW 0x4
#define CACHESCAN_RETRIEVE_LAST 0x8 #define CACHESCAN_RETRIEVE_LAST 0x8
int32_t tsdbSetTableId(STsdbReader *pReader, int64_t uid); int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t num);
int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, SArray *pTableList, STsdbReader **ppReader, int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables,
const char *idstr); STsdbReader **ppReader, const char *idstr);
void tsdbReaderClose(STsdbReader *pReader); void tsdbReaderClose(STsdbReader *pReader);
bool tsdbNextDataBlock(STsdbReader *pReader); bool tsdbNextDataBlock(STsdbReader *pReader);
void tsdbRetrieveDataBlockInfo(STsdbReader *pReader, SDataBlockInfo *pDataBlockInfo); void tsdbRetrieveDataBlockInfo(STsdbReader *pReader, SDataBlockInfo *pDataBlockInfo);
...@@ -167,8 +167,8 @@ void *tsdbGetIdx(SMeta *pMeta); ...@@ -167,8 +167,8 @@ void *tsdbGetIdx(SMeta *pMeta);
void *tsdbGetIvtIdx(SMeta *pMeta); void *tsdbGetIvtIdx(SMeta *pMeta);
uint64_t getReaderMaxVersion(STsdbReader *pReader); uint64_t getReaderMaxVersion(STsdbReader *pReader);
int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, SArray *pTableIdList, int32_t numOfCols, uint64_t suid, int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, int32_t numOfTables,
void **pReader); int32_t numOfCols, uint64_t suid, void** pReader);
int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, SArray *pTableUids); int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, SArray *pTableUids);
void *tsdbCacherowsReaderClose(void *pReader); void *tsdbCacherowsReaderClose(void *pReader);
int32_t tsdbGetTableSchema(SVnode *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid); int32_t tsdbGetTableSchema(SVnode *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid);
......
...@@ -716,7 +716,10 @@ typedef struct SCacheRowsReader { ...@@ -716,7 +716,10 @@ typedef struct SCacheRowsReader {
int32_t numOfCols; int32_t numOfCols;
int32_t type; int32_t type;
int32_t tableIndex; // currently returned result tables int32_t tableIndex; // currently returned result tables
SArray *pTableList; // table id list
STableKeyInfo *pTableList; // table id list
int32_t numOfTables;
SSttBlockLoadInfo *pLoadInfo; SSttBlockLoadInfo *pLoadInfo;
STsdbReadSnap *pReadSnap; STsdbReadSnap *pReadSnap;
SDataFReader *pDataFReader; SDataFReader *pDataFReader;
......
...@@ -162,10 +162,7 @@ int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSub ...@@ -162,10 +162,7 @@ int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSub
int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock,
SSubmitBlkRsp* pRsp); SSubmitBlkRsp* pRsp);
int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey); int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey);
STsdbReader tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
void* pMemRef);
int32_t tsdbSetKeepCfg(STsdb* pTsdb, STsdbCfg* pCfg); int32_t tsdbSetKeepCfg(STsdb* pTsdb, STsdbCfg* pCfg);
int32_t tsdbGetStbIdList(SMeta* pMeta, int64_t suid, SArray* list);
// tq // tq
int tqInit(); int tqInit();
......
...@@ -97,8 +97,8 @@ static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pRea ...@@ -97,8 +97,8 @@ static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pRea
} }
} }
int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList, int32_t numOfCols, uint64_t suid, int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, int32_t numOfTables, int32_t numOfCols,
void** pReader) { uint64_t suid, void** pReader) {
*pReader = NULL; *pReader = NULL;
SCacheRowsReader* p = taosMemoryCalloc(1, sizeof(SCacheRowsReader)); SCacheRowsReader* p = taosMemoryCalloc(1, sizeof(SCacheRowsReader));
...@@ -111,12 +111,12 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList ...@@ -111,12 +111,12 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList
p->numOfCols = numOfCols; p->numOfCols = numOfCols;
p->suid = suid; p->suid = suid;
if (taosArrayGetSize(pTableIdList) == 0) { if (numOfTables == 0) {
*pReader = p; *pReader = p;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
STableKeyInfo* pKeyInfo = taosArrayGet(pTableIdList, 0); STableKeyInfo* pKeyInfo = &((STableKeyInfo*)pTableIdList)[0];
p->pSchema = metaGetTbTSchema(p->pVnode->pMeta, pKeyInfo->uid, -1, 1); p->pSchema = metaGetTbTSchema(p->pVnode->pMeta, pKeyInfo->uid, -1, 1);
p->pTableList = pTableIdList; p->pTableList = pTableIdList;
...@@ -205,7 +205,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 ...@@ -205,7 +205,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
SLRUCache* lruCache = pr->pVnode->pTsdb->lruCache; SLRUCache* lruCache = pr->pVnode->pTsdb->lruCache;
LRUHandle* h = NULL; LRUHandle* h = NULL;
SArray* pRow = NULL; SArray* pRow = NULL;
size_t numOfTables = taosArrayGetSize(pr->pTableList); // size_t numOfTables = taosArrayGetSize(pr->pTableList);
bool hasRes = false; bool hasRes = false;
SArray* pLastCols = NULL; SArray* pLastCols = NULL;
...@@ -243,8 +243,8 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 ...@@ -243,8 +243,8 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
// retrieve the only one last row of all tables in the uid list. // retrieve the only one last row of all tables in the uid list.
if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_SINGLE)) { if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_SINGLE)) {
for (int32_t i = 0; i < numOfTables; ++i) { for (int32_t i = 0; i < pr->numOfTables; ++i) {
STableKeyInfo* pKeyInfo = taosArrayGet(pr->pTableList, i); STableKeyInfo* pKeyInfo = &pr->pTableList[i];
code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h); code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -308,8 +308,8 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 ...@@ -308,8 +308,8 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
} }
} else if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_ALL)) { } else if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_ALL)) {
for (int32_t i = pr->tableIndex; i < numOfTables; ++i) { for (int32_t i = pr->tableIndex; i < pr->numOfTables; ++i) {
STableKeyInfo* pKeyInfo = (STableKeyInfo*)taosArrayGet(pr->pTableList, i); STableKeyInfo* pKeyInfo = &pr->pTableList[i];
code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h); code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
......
...@@ -3445,13 +3445,18 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e ...@@ -3445,13 +3445,18 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
// todo refactor, use arraylist instead // TODO refactor: with createDataBlockScanInfo
int32_t tsdbSetTableId(STsdbReader* pReader, int64_t uid) { int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t num) {
ASSERT(pReader != NULL); ASSERT(pReader != NULL);
taosHashClear(pReader->status.pTableMap); taosHashClear(pReader->status.pTableMap);
STableBlockScanInfo info = {.lastKey = 0, .uid = uid}; STableKeyInfo* pList = (STableKeyInfo*) pTableList;
taosHashPut(pReader->status.pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info));
for(int32_t i = 0; i < num; ++i) {
STableBlockScanInfo info = {.lastKey = 0, .uid = pList[i].uid};
taosHashPut(pReader->status.pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info));
}
return TDB_CODE_SUCCESS; return TDB_CODE_SUCCESS;
} }
...@@ -3487,8 +3492,8 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) { ...@@ -3487,8 +3492,8 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) {
} }
// ====================================== EXPOSED APIs ====================================== // ====================================== EXPOSED APIs ======================================
int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTableList, STsdbReader** ppReader, int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables,
const char* idstr) { STsdbReader** ppReader, const char* idstr) {
STimeWindow window = pCond->twindows; STimeWindow window = pCond->twindows;
if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) { if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) {
pCond->twindows.skey += 1; pCond->twindows.skey += 1;
...@@ -3557,8 +3562,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl ...@@ -3557,8 +3562,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
STsdbReader* p = pReader->innerReader[0] != NULL ? pReader->innerReader[0] : pReader; STsdbReader* p = pReader->innerReader[0] != NULL ? pReader->innerReader[0] : pReader;
int32_t numOfTables = taosArrayGetSize(pTableList); pReader->status.pTableMap = createDataBlockScanInfo(p, pTableList, numOfTables);
pReader->status.pTableMap = createDataBlockScanInfo(p, pTableList->pData, numOfTables);
if (pReader->status.pTableMap == NULL) { if (pReader->status.pTableMap == NULL) {
tsdbReaderClose(pReader); tsdbReaderClose(pReader);
*ppReader = NULL; *ppReader = NULL;
......
...@@ -28,7 +28,7 @@ ...@@ -28,7 +28,7 @@
do { \ do { \
ASSERT((_c) != -1); \ ASSERT((_c) != -1); \
longjmp((_obj), (_c)); \ longjmp((_obj), (_c)); \
} while (0); } while (0)
#define SET_RES_WINDOW_KEY(_k, _ori, _len, _uid) \ #define SET_RES_WINDOW_KEY(_k, _ori, _len, _uid) \
do { \ do { \
...@@ -95,6 +95,21 @@ typedef struct SColMatchInfo { ...@@ -95,6 +95,21 @@ typedef struct SColMatchInfo {
int32_t matchType; // determinate the source according to col id or slot id int32_t matchType; // determinate the source according to col id or slot id
} SColMatchInfo; } SColMatchInfo;
typedef struct {
bool oneTableForEachGroup;
int32_t numOfGroups;
int32_t* groupOffset; // keep the offset value for each group in the tableList
SArray* pTableList;
SHashObj* map; // speedup acquire the tableQueryInfo by table uid
uint64_t suid;
} STableListInfo;
void destroyTableList(STableListInfo* pTableList);
int32_t getNumOfGroups(STableListInfo* pTableList);
uint64_t getTableGroupId(STableListInfo* pTableList, uint64_t tableUid);
int32_t getTablesOfGroup(STableListInfo* pTableList, int32_t ordinalIndex, STableKeyInfo** pKeyInfo, int32_t* num);
uint64_t getTotalTables(STableListInfo* pTableList);
struct SqlFunctionCtx; struct SqlFunctionCtx;
size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput); size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput);
......
...@@ -171,13 +171,12 @@ typedef struct { ...@@ -171,13 +171,12 @@ typedef struct {
} SSchemaInfo; } SSchemaInfo;
typedef struct SExecTaskInfo { typedef struct SExecTaskInfo {
STaskIdInfo id; STaskIdInfo id;
uint32_t status; uint32_t status;
STimeWindow window; int32_t code;
STaskCostInfo cost; STimeWindow window;
int64_t owner; // if it is in execution STaskCostInfo cost;
int32_t code; int64_t owner; // if it is in execution
int64_t version; // used for stream to record wal version int64_t version; // used for stream to record wal version
SStreamTaskInfo streamInfo; SStreamTaskInfo streamInfo;
SSchemaInfo schemaInfo; SSchemaInfo schemaInfo;
...@@ -1071,7 +1070,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN ...@@ -1071,7 +1070,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex); void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex);
bool groupbyTbname(SNodeList* pGroupList); bool groupbyTbname(SNodeList* pGroupList);
int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* groupKey); int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* groupKey, bool groupSort);
void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput); void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput);
int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup, int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup,
SGroupResInfo* pGroupResInfo); SGroupResInfo* pGroupResInfo);
......
...@@ -61,11 +61,15 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe ...@@ -61,11 +61,15 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
pInfo->pUidList = taosArrayInit(4, sizeof(int64_t)); pInfo->pUidList = taosArrayInit(4, sizeof(int64_t));
// partition by tbname // partition by tbname, todo opt perf
if (taosArrayGetSize(pTableList->pGroupList) == taosArrayGetSize(pTableList->pTableList)) { if (getNumOfGroups(pTableList) == getTotalTables(pTableList)) {
pInfo->retrieveType = pInfo->retrieveType =
CACHESCAN_RETRIEVE_TYPE_ALL | (pScanNode->ignoreNull ? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW); CACHESCAN_RETRIEVE_TYPE_ALL | (pScanNode->ignoreNull ? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW);
code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pTableList->pTableList,
STableKeyInfo* pList = taosArrayGet(pTableList->pTableList, 0);
size_t num = taosArrayGetSize(pTableList->pTableList);
code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, num,
taosArrayGetSize(pInfo->matchInfo.pList), pTableList->suid, &pInfo->pLastrowReader); taosArrayGetSize(pInfo->matchInfo.pList), pTableList->suid, &pInfo->pLastrowReader);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
...@@ -185,18 +189,25 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { ...@@ -185,18 +189,25 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
return NULL; return NULL;
} }
} else { } else {
size_t totalGroups = taosArrayGetSize(pTableList->pGroupList); size_t totalGroups = getNumOfGroups(pTableList);
while (pInfo->currentGroupIndex < totalGroups) { while (pInfo->currentGroupIndex < totalGroups) {
SArray* pGroupTableList = taosArrayGetP(pTableList->pGroupList, pInfo->currentGroupIndex);
tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pGroupTableList, STableKeyInfo* pList = NULL;
int32_t num = 0;
int32_t code = getTablesOfGroup(pTableList, pInfo->currentGroupIndex, &pList, &num);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, num,
taosArrayGetSize(pInfo->matchInfo.pList), pTableList->suid, &pInfo->pLastrowReader); taosArrayGetSize(pInfo->matchInfo.pList), pTableList->suid, &pInfo->pLastrowReader);
taosArrayClear(pInfo->pUidList); taosArrayClear(pInfo->pUidList);
int32_t code = tsdbRetrieveCacheRows(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pUidList); code = tsdbRetrieveCacheRows(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pUidList);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
} }
pInfo->currentGroupIndex += 1; pInfo->currentGroupIndex += 1;
...@@ -206,7 +217,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { ...@@ -206,7 +217,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
if (pInfo->pseudoExprSup.numOfExprs > 0) { if (pInfo->pseudoExprSup.numOfExprs > 0) {
SExprSupp* pSup = &pInfo->pseudoExprSup; SExprSupp* pSup = &pInfo->pseudoExprSup;
STableKeyInfo* pKeyInfo = taosArrayGet(pGroupTableList, 0); STableKeyInfo* pKeyInfo = &((STableKeyInfo*)pTableList)[0];
pInfo->pRes->info.groupId = pKeyInfo->groupId; pInfo->pRes->info.groupId = pKeyInfo->groupId;
if (taosArrayGetSize(pInfo->pUidList) > 0) { if (taosArrayGetSize(pInfo->pUidList) > 0) {
......
...@@ -544,6 +544,7 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis ...@@ -544,6 +544,7 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto end; goto end;
} }
ctx.index = 0; ctx.index = 0;
ctx.cInfoList = taosArrayInit(4, sizeof(SColumnInfo)); ctx.cInfoList = taosArrayInit(4, sizeof(SColumnInfo));
if (ctx.cInfoList == NULL) { if (ctx.cInfoList == NULL) {
...@@ -606,6 +607,7 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis ...@@ -606,6 +607,7 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis
} else { } else {
void* tag = taosHashGet(tags, uid, sizeof(int64_t)); void* tag = taosHashGet(tags, uid, sizeof(int64_t));
ASSERT(tag); ASSERT(tag);
STagVal tagVal = {0}; STagVal tagVal = {0};
tagVal.cid = pColInfo->info.colId; tagVal.cid = pColInfo->info.colId;
const char* p = metaGetTableTagVal(tag, pColInfo->info.type, &tagVal); const char* p = metaGetTableTagVal(tag, pColInfo->info.type, &tagVal);
...@@ -636,6 +638,7 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis ...@@ -636,6 +638,7 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis
} }
} }
} }
pResBlock->info.rows = rows; pResBlock->info.rows = rows;
// int64_t st1 = taosGetTimestampUs(); // int64_t st1 = taosGetTimestampUs();
...@@ -661,10 +664,12 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis ...@@ -661,10 +664,12 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis
} }
break; break;
} }
default: default:
code = TSDB_CODE_OPS_NOT_SUPPORT; code = TSDB_CODE_OPS_NOT_SUPPORT;
goto end; goto end;
} }
if (nodeType(pNode) == QUERY_NODE_COLUMN) { if (nodeType(pNode) == QUERY_NODE_COLUMN) {
SColumnNode* pSColumnNode = (SColumnNode*)pNode; SColumnNode* pSColumnNode = (SColumnNode*)pNode;
SColumnInfoData* pColInfo = (SColumnInfoData*)taosArrayGet(pResBlock->pDataBlock, pSColumnNode->slotId); SColumnInfoData* pColInfo = (SColumnInfoData*)taosArrayGet(pResBlock->pDataBlock, pSColumnNode->slotId);
...@@ -674,10 +679,12 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis ...@@ -674,10 +679,12 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis
} else { } else {
code = scalarCalculate(pNode, pBlockList, &output); code = scalarCalculate(pNode, pBlockList, &output);
} }
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
releaseColInfoData(output.columnData); releaseColInfoData(output.columnData);
goto end; goto end;
} }
taosArrayPush(groupData, &output.columnData); taosArrayPush(groupData, &output.columnData);
} }
...@@ -696,6 +703,7 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis ...@@ -696,6 +703,7 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto end; goto end;
} }
for (int i = 0; i < rows; i++) { for (int i = 0; i < rows; i++) {
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i); STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
...@@ -946,14 +954,6 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, ...@@ -946,14 +954,6 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode,
} }
taosArrayDestroy(res); taosArrayDestroy(res);
pListInfo->pGroupList = taosArrayInit(4, POINTER_BYTES);
if (pListInfo->pGroupList == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
// put into list as default group, remove it if grouping sorting is required later
taosArrayPush(pListInfo->pGroupList, &pListInfo->pTableList);
return code; return code;
} }
...@@ -1604,3 +1604,63 @@ void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimit ...@@ -1604,3 +1604,63 @@ void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimit
pLimitInfo->remainOffset = limit.offset; pLimitInfo->remainOffset = limit.offset;
pLimitInfo->remainGroupOffset = slimit.offset; pLimitInfo->remainGroupOffset = slimit.offset;
} }
uint64_t getTotalTables(STableListInfo* pTableList) {
if (pTableList->map != NULL) {
ASSERT(taosArrayGetSize(pTableList->pTableList) == taosHashGetSize(pTableList->map));
}
return taosArrayGetSize(pTableList->pTableList);
}
uint64_t getTableGroupId(STableListInfo* pTableList, uint64_t tableUid) {
uint64_t* groupId = taosHashGet(pTableList->map, &tableUid, sizeof(tableUid));
if (groupId != NULL) {
return *groupId;
} else {
return 0;
}
}
int32_t getTablesOfGroup(STableListInfo* pTableList, int32_t ordinalGroupIndex, STableKeyInfo** pKeyInfo, int32_t* size) {
int32_t total = getNumOfGroups(pTableList);
if (ordinalGroupIndex < 0 || ordinalGroupIndex >= total) {
return TSDB_CODE_INVALID_PARA;
}
// here handle two special cases:
// 1. only one group exists, and 2. one table exists for each group.
if (pTableList->numOfGroups == 1) {
*size = getTotalTables(pTableList);
*pKeyInfo = taosArrayGet(pTableList->pTableList, 0);
return TSDB_CODE_SUCCESS;
} else if (pTableList->numOfGroups == getTotalTables(pTableList)) {
*size = 1;
*pKeyInfo = taosArrayGet(pTableList->pTableList, ordinalGroupIndex);
return TSDB_CODE_SUCCESS;
}
int32_t offset = pTableList->groupOffset[ordinalGroupIndex];
if (ordinalGroupIndex < total - 1) {
*size = pTableList->groupOffset[offset + 1] - pTableList->groupOffset[offset];
} else {
*size = total - pTableList->groupOffset[offset] - 1;
}
*pKeyInfo = taosArrayGet(pTableList->pTableList, offset);
return TSDB_CODE_SUCCESS;
}
int32_t getNumOfGroups(STableListInfo* pTableList) {
return pTableList->numOfGroups;
}
void destroyTableList(STableListInfo* pTableqinfoList) {
pTableqinfoList->pTableList = taosArrayDestroy(pTableqinfoList->pTableList);
taosMemoryFreeClear(pTableqinfoList->groupOffset);
taosHashCleanup(pTableqinfoList->map);
pTableqinfoList->pTableList = NULL;
pTableqinfoList->map = NULL;
}
\ No newline at end of file
...@@ -811,14 +811,18 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT ...@@ -811,14 +811,18 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
ASSERT(found); ASSERT(found);
if (pTableScanInfo->dataReader == NULL) { if (pTableScanInfo->dataReader == NULL) {
if (tsdbReaderOpen(pTableScanInfo->readHandle.vnode, &pTableScanInfo->cond,
pTaskInfo->tableqinfoList.pTableList, &pTableScanInfo->dataReader, NULL) < 0 || STableKeyInfo* pList = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, 0);
int32_t num = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList);
if (tsdbReaderOpen(pTableScanInfo->readHandle.vnode, &pTableScanInfo->cond, pList, num,
&pTableScanInfo->dataReader, NULL) < 0 ||
pTableScanInfo->dataReader == NULL) { pTableScanInfo->dataReader == NULL) {
ASSERT(0); ASSERT(0);
} }
} }
tsdbSetTableId(pTableScanInfo->dataReader, uid); STableKeyInfo tki = {.uid = uid};
tsdbSetTableList(pTableScanInfo->dataReader, &tki, 1);
int64_t oldSkey = pTableScanInfo->cond.twindows.skey; int64_t oldSkey = pTableScanInfo->cond.twindows.skey;
pTableScanInfo->cond.twindows.skey = ts + 1; pTableScanInfo->cond.twindows.skey = ts + 1;
tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond); tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond);
...@@ -848,9 +852,15 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT ...@@ -848,9 +852,15 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
initQueryTableDataCondForTmq(&pTaskInfo->streamInfo.tableCond, sContext, &mtInfo); initQueryTableDataCondForTmq(&pTaskInfo->streamInfo.tableCond, sContext, &mtInfo);
pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts; pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts;
pTaskInfo->tableqinfoList.pTableList = taosArrayInit(1, sizeof(STableKeyInfo));
taosArrayPush(pTaskInfo->tableqinfoList.pTableList, &(STableKeyInfo){.uid = mtInfo.uid, .groupId = 0}); STableListInfo* pListInfo = &pTaskInfo->tableqinfoList;
tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pTaskInfo->tableqinfoList.pTableList,
pListInfo->pTableList = taosArrayInit(1, sizeof(STableKeyInfo));
taosArrayPush(pListInfo->pTableList, &(STableKeyInfo){.uid = mtInfo.uid, .groupId = 0});
STableKeyInfo* pList = taosArrayGet(pListInfo->pTableList, 0);
tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, taosArrayGetSize(pListInfo->pTableList),
&pInfo->dataReader, NULL); &pInfo->dataReader, NULL);
cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond); cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
......
...@@ -1739,8 +1739,6 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t ...@@ -1739,8 +1739,6 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void doDestroyTableList(STableListInfo* pTableqinfoList);
typedef struct SFetchRspHandleWrapper { typedef struct SFetchRspHandleWrapper {
uint32_t exchangeId; uint32_t exchangeId;
int32_t sourceIndex; int32_t sourceIndex;
...@@ -3366,62 +3364,116 @@ static void cleanupTableSchemaInfo(SSchemaInfo* pSchemaInfo) { ...@@ -3366,62 +3364,116 @@ static void cleanupTableSchemaInfo(SSchemaInfo* pSchemaInfo) {
static void cleanupStreamInfo(SStreamTaskInfo* pStreamInfo) { tDeleteSSchemaWrapper(pStreamInfo->schema); } static void cleanupStreamInfo(SStreamTaskInfo* pStreamInfo) { tDeleteSSchemaWrapper(pStreamInfo->schema); }
static int32_t orderbyGroupIdComparFn(const void* p1, const void* p2) {
STableKeyInfo* pInfo1 = (STableKeyInfo*) p1;
STableKeyInfo* pInfo2 = (STableKeyInfo*) p2;
if (pInfo1->groupId == pInfo2->groupId) {
return 0;
} else {
return pInfo1->groupId < pInfo2->groupId? -1:1;
}
}
static int32_t sortTableGroup(STableListInfo* pTableListInfo) { static int32_t sortTableGroup(STableListInfo* pTableListInfo) {
taosArrayClear(pTableListInfo->pGroupList); int32_t code = TSDB_CODE_SUCCESS;
taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn);
int32_t size = getTotalTables(pTableListInfo);
SArray* pList = taosArrayInit(4, sizeof(int32_t));
STableKeyInfo* pInfo = taosArrayGet(pTableListInfo->pTableList, 0);
uint64_t gid = pInfo->groupId;
int32_t start = 0;
taosArrayPush(pList, &start);
for(int32_t i = 1; i < size; ++i) {
pInfo = taosArrayGet(pTableListInfo->pTableList, i);
if (pInfo->groupId != gid) {
taosArrayPush(pList, &i);
gid = pInfo->groupId;
}
}
pTableListInfo->numOfGroups = taosArrayGetSize(pList);
pTableListInfo->groupOffset = taosMemoryMalloc(sizeof(int32_t) * pTableListInfo->numOfGroups);
memcpy(pTableListInfo->groupOffset, taosArrayGet(pList, 0), sizeof(int32_t) * pTableListInfo->numOfGroups);
taosArrayDestroy(pList);
# if 0
SArray* sortSupport = taosArrayInit(16, sizeof(uint64_t)); SArray* sortSupport = taosArrayInit(16, sizeof(uint64_t));
if (sortSupport == NULL) return TSDB_CODE_OUT_OF_MEMORY; if (sortSupport == NULL) {
for (int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); i++) { return TSDB_CODE_OUT_OF_MEMORY;
}
size_t num = taosArrayGetSize(pTableListInfo->pTableList);
for (int32_t i = 0; i < num; i++) {
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i); STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
uint64_t* groupId = taosHashGet(pTableListInfo->map, &info->uid, sizeof(uint64_t)); uint64_t* groupId = taosHashGet(pTableListInfo->map, &info->uid, sizeof(uint64_t));
int32_t index = taosArraySearchIdx(sortSupport, groupId, compareUint64Val, TD_EQ); int32_t index = taosArraySearchIdx(sortSupport, groupId, compareUint64Val, TD_EQ);
if (index == -1) { if (index == -1) {
void* p = taosArraySearch(sortSupport, groupId, compareUint64Val, TD_GT); void* p = taosArraySearch(sortSupport, groupId, compareUint64Val, TD_GT);
SArray* tGroup = taosArrayInit(8, sizeof(STableKeyInfo)); SArray* tGroup = taosArrayInit(8, sizeof(STableKeyInfo));
if (tGroup == NULL) { if (tGroup == NULL) {
taosArrayDestroy(sortSupport); code = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_OUT_OF_MEMORY; goto _error;
} }
if (taosArrayPush(tGroup, info) == NULL) { if (taosArrayPush(tGroup, info) == NULL) {
qError("taos push info array error"); qError("taos push info array error");
taosArrayDestroy(sortSupport); code = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_QRY_APP_ERROR; goto _error;
} }
if (p == NULL) { if (p == NULL) {
if (taosArrayPush(sortSupport, groupId) == NULL) { if (taosArrayPush(sortSupport, groupId) == NULL) {
qError("taos push support array error"); qError("taos push support array error");
taosArrayDestroy(sortSupport); code = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_QRY_APP_ERROR; goto _error;
} }
if (taosArrayPush(pTableListInfo->pGroupList, &tGroup) == NULL) { if (taosArrayPush(pTableListInfo->pGroupList, &tGroup) == NULL) {
qError("taos push group array error"); qError("taos push group array error");
taosArrayDestroy(sortSupport); code = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_QRY_APP_ERROR; goto _error;
} }
} else { } else {
int32_t pos = TARRAY_ELEM_IDX(sortSupport, p); int32_t pos = TARRAY_ELEM_IDX(sortSupport, p);
if (taosArrayInsert(sortSupport, pos, groupId) == NULL) { if (taosArrayInsert(sortSupport, pos, groupId) == NULL) {
qError("taos insert support array error"); qError("taos insert support array error");
taosArrayDestroy(sortSupport); code = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_QRY_APP_ERROR; goto _error;
} }
if (taosArrayInsert(pTableListInfo->pGroupList, pos, &tGroup) == NULL) { if (taosArrayInsert(pTableListInfo->pGroupList, pos, &tGroup) == NULL) {
qError("taos insert group array error"); qError("taos insert group array error");
taosArrayDestroy(sortSupport); code = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_QRY_APP_ERROR; goto _error;
} }
} }
} else { } else {
SArray* tGroup = (SArray*)taosArrayGetP(pTableListInfo->pGroupList, index); SArray* tGroup = (SArray*)taosArrayGetP(pTableListInfo->pGroupList, index);
if (taosArrayPush(tGroup, info) == NULL) { if (taosArrayPush(tGroup, info) == NULL) {
qError("taos push uid array error"); qError("taos push uid array error");
taosArrayDestroy(sortSupport); code = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_QRY_APP_ERROR; goto _error;
} }
} }
} }
taosArrayDestroy(sortSupport); taosArrayDestroy(sortSupport);
#endif
return TDB_CODE_SUCCESS; return TDB_CODE_SUCCESS;
_error:
// taosArrayDestroy(sortSupport);
return code;
} }
bool groupbyTbname(SNodeList* pGroupList) { bool groupbyTbname(SNodeList* pGroupList) {
...@@ -3437,7 +3489,7 @@ bool groupbyTbname(SNodeList* pGroupList) { ...@@ -3437,7 +3489,7 @@ bool groupbyTbname(SNodeList* pGroupList) {
return bytbname; return bytbname;
} }
int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* group) { int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* group, bool groupSort) {
if (group == NULL) { if (group == NULL) {
return TDB_CODE_SUCCESS; return TDB_CODE_SUCCESS;
} }
...@@ -3447,25 +3499,26 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, ...@@ -3447,25 +3499,26 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
bool assignUid = groupbyTbname(group); bool assignUid = groupbyTbname(group);
size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList); size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList);
if (assignUid) { if (assignUid) { // in case of group/partition by tbname, the group id is equalled to the uid of table
for (int32_t i = 0; i < numOfTables; i++) { for (int32_t i = 0; i < numOfTables; i++) {
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i); STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
info->groupId = info->uid; info->groupId = info->uid;
taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &info->groupId, sizeof(uint64_t)); taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &info->groupId, sizeof(uint64_t));
} }
pTableListInfo->oneTableForEachGroup = true;
pTableListInfo->numOfGroups = numOfTables;
} else { } else {
int32_t code = getColInfoResultForGroupby(pHandle->meta, group, pTableListInfo); int32_t code = getColInfoResultForGroupby(pHandle->meta, group, pTableListInfo);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
}
if (pTableListInfo->needSortTableByGroupId) { if (groupSort) {
return sortTableGroup(pTableListInfo); return sortTableGroup(pTableListInfo);
}
} }
return TDB_CODE_SUCCESS; return TDB_CODE_SUCCESS;
...@@ -3505,6 +3558,12 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -3505,6 +3558,12 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) { if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) {
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
// NOTE: this is an patch to fix the physical plan
// TODO remove it later
if (pTableScanNode->scan.node.pLimit != NULL) {
pTableScanNode->groupSort = true;
}
int32_t code = int32_t code =
createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, pHandle, createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, pHandle,
pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo)); pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo));
...@@ -3604,8 +3663,11 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -3604,8 +3663,11 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return NULL; return NULL;
} }
void* pList = taosArrayGet(pTableListInfo->pTableList, 0);
size_t num = taosArrayGetSize(pTableListInfo->pTableList);
STsdbReader* pReader = NULL; STsdbReader* pReader = NULL;
tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo->pTableList, &pReader, ""); tsdbReaderOpen(pHandle->vnode, &cond, pList, num, &pReader, "");
cleanupQueryTableDataCond(&cond); cleanupQueryTableDataCond(&cond);
pOperator = createDataBlockInfoScanOperator(pReader, pHandle, cond.suid, pBlockNode, pTaskInfo); pOperator = createDataBlockInfoScanOperator(pReader, pHandle, cond.suid, pBlockNode, pTaskInfo);
...@@ -3639,7 +3701,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -3639,7 +3701,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return pOperator; return pOperator;
} }
int32_t num = 0;
size_t size = LIST_LENGTH(pPhyNode->pChildren); size_t size = LIST_LENGTH(pPhyNode->pChildren);
SOperatorInfo** ops = taosMemoryCalloc(size, POINTER_BYTES); SOperatorInfo** ops = taosMemoryCalloc(size, POINTER_BYTES);
...@@ -3974,28 +4035,10 @@ _complete: ...@@ -3974,28 +4035,10 @@ _complete:
return code; return code;
} }
void doDestroyTableList(STableListInfo* pTableqinfoList) {
taosArrayDestroy(pTableqinfoList->pTableList);
taosHashCleanup(pTableqinfoList->map);
if (pTableqinfoList->needSortTableByGroupId) {
for (int32_t i = 0; i < taosArrayGetSize(pTableqinfoList->pGroupList); i++) {
SArray* tmp = taosArrayGetP(pTableqinfoList->pGroupList, i);
if (tmp == pTableqinfoList->pTableList) {
continue;
}
taosArrayDestroy(tmp);
}
}
taosArrayDestroy(pTableqinfoList->pGroupList);
pTableqinfoList->pTableList = NULL;
pTableqinfoList->map = NULL;
}
void doDestroyTask(SExecTaskInfo* pTaskInfo) { void doDestroyTask(SExecTaskInfo* pTaskInfo) {
qDebug("%s execTask is freed", GET_TASKID(pTaskInfo)); qDebug("%s execTask is freed", GET_TASKID(pTaskInfo));
doDestroyTableList(&pTaskInfo->tableqinfoList); destroyTableList(&pTaskInfo->tableqinfoList);
destroyOperatorInfo(pTaskInfo->pRoot); destroyOperatorInfo(pTaskInfo->pRoot);
cleanupTableSchemaInfo(&pTaskInfo->schemaInfo); cleanupTableSchemaInfo(&pTaskInfo->schemaInfo);
cleanupStreamInfo(&pTaskInfo->streamInfo); cleanupStreamInfo(&pTaskInfo->streamInfo);
......
...@@ -159,6 +159,7 @@ static int32_t setInfoForNewGroup(SSDataBlock* pBlock, SLimitInfo* pLimitInfo, S ...@@ -159,6 +159,7 @@ static int32_t setInfoForNewGroup(SSDataBlock* pBlock, SLimitInfo* pLimitInfo, S
// reset the value for a new group data // reset the value for a new group data
// existing rows that belongs to previous group. // existing rows that belongs to previous group.
// TODO refactor with doTableScan
pLimitInfo->numOfOutputRows = 0; pLimitInfo->numOfOutputRows = 0;
pLimitInfo->remainOffset = pLimitInfo->limit.offset; pLimitInfo->remainOffset = pLimitInfo->limit.offset;
} }
......
...@@ -377,8 +377,6 @@ void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo ...@@ -377,8 +377,6 @@ void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo
blockDataKeepFirstNRows(pBlock, keep); blockDataKeepFirstNRows(pBlock, keep);
qDebug("output limit %"PRId64" has reached, %s", pLimit->limit, GET_TASKID(pTaskInfo)); qDebug("output limit %"PRId64" has reached, %s", pLimit->limit, GET_TASKID(pTaskInfo));
// setTaskStatus(pTaskInfo, TASK_COMPLETED);
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
} }
} }
...@@ -682,10 +680,7 @@ static SSDataBlock* doTableScanGroup(SOperatorInfo* pOperator) { ...@@ -682,10 +680,7 @@ static SSDataBlock* doTableScanGroup(SOperatorInfo* pOperator) {
if (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) { if (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED); setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
pTableScanInfo->scanFlag = REPEAT_SCAN; pTableScanInfo->scanFlag = REPEAT_SCAN;
qDebug( qDebug("start to repeat ascending order scan data blocks due to query func required, %s", GET_TASKID(pTaskInfo));
"%s start to repeat ascending order scan data SELECT last_row(*),hostname from cpu group by hostname;blocks "
"due to query func required",
GET_TASKID(pTaskInfo));
// do prepare for the next round table scan operation // do prepare for the next round table scan operation
tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond); tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond);
...@@ -712,8 +707,7 @@ static SSDataBlock* doTableScanGroup(SOperatorInfo* pOperator) { ...@@ -712,8 +707,7 @@ static SSDataBlock* doTableScanGroup(SOperatorInfo* pOperator) {
setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED); setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
pTableScanInfo->scanFlag = REPEAT_SCAN; pTableScanInfo->scanFlag = REPEAT_SCAN;
qDebug("%s start to repeat descending order scan data blocks due to query func required", qDebug("%s start to repeat descending order scan data blocks", GET_TASKID(pTaskInfo));
GET_TASKID(pTaskInfo));
tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond); tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond);
} }
} }
...@@ -726,7 +720,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { ...@@ -726,7 +720,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
STableScanInfo* pInfo = pOperator->info; STableScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
// if scan table by table // scan table one by one sequentially
if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) { if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) {
int32_t numOfTables = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList); int32_t numOfTables = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList);
...@@ -743,54 +737,63 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { ...@@ -743,54 +737,63 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
} }
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, pInfo->currentTable); STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, pInfo->currentTable);
tsdbSetTableId(pInfo->dataReader, pTableInfo->uid); tsdbSetTableList(pInfo->dataReader, pTableInfo, 1);
qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d %s", pTableInfo->uid, numOfTables, qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d %s", pTableInfo->uid, numOfTables,
pInfo->currentTable, pTaskInfo->id.str); pInfo->currentTable, pTaskInfo->id.str);
tsdbReaderReset(pInfo->dataReader, &pInfo->cond); tsdbReaderReset(pInfo->dataReader, &pInfo->cond);
pInfo->scanTimes = 0; pInfo->scanTimes = 0;
} }
} } else { // scan table group by group sequentially
if (pInfo->currentGroupId == -1) {
if ((++pInfo->currentGroupId) >= getNumOfGroups(&pTaskInfo->tableqinfoList)) {
doSetOperatorCompleted(pOperator);
return NULL;
}
if (pInfo->currentGroupId == -1) { int32_t num = 0;
pInfo->currentGroupId++; STableKeyInfo* pList = NULL;
if (pInfo->currentGroupId >= taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList)) { getTablesOfGroup(&pTaskInfo->tableqinfoList, pInfo->currentGroupId, &pList, &num);
setTaskStatus(pTaskInfo, TASK_COMPLETED); ASSERT(pInfo->dataReader == NULL);
return NULL;
int32_t code = tsdbReaderOpen(pInfo->readHandle.vnode, &pInfo->cond, pList, num, (STsdbReader**)&pInfo->dataReader,
GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
} }
SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, pInfo->currentGroupId); SSDataBlock* result = doTableScanGroup(pOperator);
tsdbReaderClose(pInfo->dataReader); if (result != NULL) {
return result;
}
int32_t code = tsdbReaderOpen(pInfo->readHandle.vnode, &pInfo->cond, tableList, (STsdbReader**)&pInfo->dataReader, if ((++pInfo->currentGroupId) >= getNumOfGroups(&pTaskInfo->tableqinfoList)) {
GET_TASKID(pTaskInfo)); doSetOperatorCompleted(pOperator);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
return NULL; return NULL;
} }
}
SSDataBlock* result = doTableScanGroup(pOperator); // reset value for the next group data output
if (result) { pOperator->status = OP_OPENED;
return result; pInfo->limitInfo.numOfOutputRows = 0;
} pInfo->limitInfo.remainOffset = pInfo->limitInfo.limit.offset;
pInfo->currentGroupId++; int32_t num = 0;
if (pInfo->currentGroupId >= taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList)) { STableKeyInfo* pList = NULL;
setTaskStatus(pTaskInfo, TASK_COMPLETED); getTablesOfGroup(&pTaskInfo->tableqinfoList, pInfo->currentGroupId, &pList, &num);
return NULL;
}
tsdbReaderReset(pInfo->dataReader, &pInfo->cond); tsdbSetTableList(pInfo->dataReader, pList, num);
pInfo->scanTimes = 0; tsdbReaderReset(pInfo->dataReader, &pInfo->cond);
pInfo->scanTimes = 0;
result = doTableScanGroup(pOperator); result = doTableScanGroup(pOperator);
if (result) { if (result != NULL) {
return result; return result;
} }
setTaskStatus(pTaskInfo, TASK_COMPLETED); doSetOperatorCompleted(pOperator);
return NULL; return NULL;
}
} }
static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) { static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
...@@ -836,7 +839,6 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, ...@@ -836,7 +839,6 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
} }
initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo); initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo);
code = initQueryTableDataCond(&pInfo->cond, pTableScanNode); code = initQueryTableDataCond(&pInfo->cond, pTableScanNode);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
...@@ -1078,37 +1080,52 @@ void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin) { ...@@ -1078,37 +1080,52 @@ void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin) {
pTableScanInfo->currentGroupId = -1; pTableScanInfo->currentGroupId = -1;
} }
static void freeArray(void* array) { taosArrayDestroy(array); } //static void freeArray(void* array) { taosArrayDestroy(array); }
//
static void resetTableScanOperator(SOperatorInfo* pTableScanOp) { //static void resetTableScanOperator(SOperatorInfo* pTableScanOp) {
STableScanInfo* pTableScanInfo = pTableScanOp->info; // STableScanInfo* pTableScanInfo = pTableScanOp->info;
pTableScanInfo->cond.startVersion = -1; // pTableScanInfo->cond.startVersion = -1;
pTableScanInfo->cond.endVersion = -1; // pTableScanInfo->cond.endVersion = -1;
SArray* gpTbls = pTableScanOp->pTaskInfo->tableqinfoList.pGroupList; //// SArray* gpTbls = pTableScanOp->pTaskInfo->tableqinfoList.pGroupList;
SArray* allTbls = pTableScanOp->pTaskInfo->tableqinfoList.pTableList; //// SArray* allTbls = pTableScanOp->pTaskInfo->tableqinfoList.pTableList;
taosArrayClearP(gpTbls, freeArray); //// taosArrayClearP(gpTbls, freeArray);
taosArrayPush(gpTbls, &allTbls); //// taosArrayPush(gpTbls, &allTbls);
STimeWindow win = {.skey = INT64_MIN, .ekey = INT64_MAX}; // STimeWindow win = {.skey = INT64_MIN, .ekey = INT64_MAX};
resetTableScanInfo(pTableScanOp->info, &win); // resetTableScanInfo(pTableScanOp->info, &win);
} //}
static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbUid, TSKEY startTs, TSKEY endTs, static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbUid, TSKEY startTs, TSKEY endTs,
int64_t maxVersion) { int64_t maxVersion) {
SArray* gpTbls = pTableScanOp->pTaskInfo->tableqinfoList.pGroupList;
taosArrayClear(gpTbls);
STableKeyInfo tblInfo = {.uid = tbUid, .groupId = 0}; STableKeyInfo tblInfo = {.uid = tbUid, .groupId = 0};
SArray* tbls = taosArrayInit(1, sizeof(STableKeyInfo));
taosArrayPush(tbls, &tblInfo);
taosArrayPush(gpTbls, &tbls);
STimeWindow win = {.skey = startTs, .ekey = endTs}; STableScanInfo* pTableScanInfo = pTableScanOp->info;
STableScanInfo* pTableScanInfo = pTableScanOp->info; SQueryTableDataCond cond = pTableScanInfo->cond;
pTableScanInfo->cond.startVersion = -1;
pTableScanInfo->cond.endVersion = maxVersion; cond.startVersion = -1;
resetTableScanInfo(pTableScanOp->info, &win); cond.endVersion = maxVersion;
SSDataBlock* pRes = doTableScan(pTableScanOp); cond.twindows = (STimeWindow){.skey = startTs, .ekey = endTs};
resetTableScanOperator(pTableScanOp);
return pRes; SExecTaskInfo* pTaskInfo = pTableScanOp->pTaskInfo;
blockDataCleanup(pTableScanInfo->pResBlock);
STsdbReader* pReader = NULL;
int32_t code = tsdbReaderOpen(pTableScanInfo->readHandle.vnode, &cond, &tblInfo, 1, (STsdbReader**)&pReader,
GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return NULL;
}
bool hasBlock = tsdbNextDataBlock(pReader);
if (hasBlock) {
SArray* pCols = tsdbRetrieveDataBlock(pTableScanInfo->dataReader, NULL);
relocateColumnData(pTableScanInfo->pResBlock, pTableScanInfo->matchInfo.pList, pCols, true);
doSetTagColumnData(pTableScanInfo, pTableScanInfo->pResBlock, pTaskInfo);
}
tsdbReaderClose(pReader);
return pTableScanInfo->pResBlock->info.rows > 0? pTableScanInfo->pResBlock:NULL;
} }
static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) { static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) {
...@@ -2322,11 +2339,14 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys ...@@ -2322,11 +2339,14 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pTSInfo->cond.endVersion = pHandle->version; pTSInfo->cond.endVersion = pHandle->version;
} }
SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, 0); STableKeyInfo* pList = NULL;
int32_t num = 0;
getTablesOfGroup(&pTaskInfo->tableqinfoList, 0, &pList, &num);
if (pHandle->initTableReader) { if (pHandle->initTableReader) {
pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER; pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER;
pTSInfo->dataReader = NULL; pTSInfo->dataReader = NULL;
if (tsdbReaderOpen(pHandle->vnode, &pTSInfo->cond, tableList, &pTSInfo->dataReader, NULL) < 0) { if (tsdbReaderOpen(pHandle->vnode, &pTSInfo->cond, pList, num, &pTSInfo->dataReader, NULL) < 0) {
ASSERT(0); ASSERT(0);
} }
} }
...@@ -4134,8 +4154,8 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags ...@@ -4134,8 +4154,8 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
pTableListInfo->needSortTableByGroupId = groupSort; pTableListInfo->numOfGroups = 1;
code = generateGroupIdMap(pTableListInfo, pHandle, pGroupTags); code = generateGroupIdMap(pTableListInfo, pHandle, pGroupTags, groupSort);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -4149,14 +4169,10 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags ...@@ -4149,14 +4169,10 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags
int32_t createMultipleDataReaders(SQueryTableDataCond* pQueryCond, SReadHandle* pHandle, STableListInfo* pTableListInfo, int32_t createMultipleDataReaders(SQueryTableDataCond* pQueryCond, SReadHandle* pHandle, STableListInfo* pTableListInfo,
int32_t tableStartIdx, int32_t tableEndIdx, SArray* arrayReader, const char* idstr) { int32_t tableStartIdx, int32_t tableEndIdx, SArray* arrayReader, const char* idstr) {
for (int32_t i = tableStartIdx; i <= tableEndIdx; ++i) { for (int32_t i = tableStartIdx; i <= tableEndIdx; ++i) {
SArray* subTableList = taosArrayInit(1, sizeof(STableKeyInfo)); STableKeyInfo* pList = taosArrayGet(pTableListInfo->pTableList, i);
taosArrayPush(subTableList, taosArrayGet(pTableListInfo->pTableList, i)); STsdbReader* pReader = NULL;
tsdbReaderOpen(pHandle->vnode, pQueryCond, pList, 1, &pReader, idstr);
STsdbReader* pReader = NULL;
tsdbReaderOpen(pHandle->vnode, pQueryCond, subTableList, &pReader, idstr);
taosArrayPush(arrayReader, &pReader); taosArrayPush(arrayReader, &pReader);
taosArrayDestroy(subTableList);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
...@@ -39,6 +39,7 @@ endi ...@@ -39,6 +39,7 @@ endi
$val = $totalNum - 1 $val = $totalNum - 1
sql select * from $stb limit $totalNum offset 1 sql select * from $stb limit $totalNum offset 1
if $rows != $val then if $rows != $val then
print expect $val , actual: $rows
return -1 return -1
endi endi
if $data01 != 1 then if $data01 != 1 then
...@@ -492,3 +493,9 @@ sql select max(c1), min(c2), avg(c3), sum(c5), spread(c6), first(c7), last(c8), ...@@ -492,3 +493,9 @@ sql select max(c1), min(c2), avg(c3), sum(c5), spread(c6), first(c7), last(c8),
if $rows != 6 then if $rows != 6 then
return -1 return -1
endi endi
sql select * from $stb partition by tbname limit 1
if $rows != 10 then
return -1
endi
...@@ -722,6 +722,7 @@ sql select bottom(c1, 1) from $stb where ts >= $ts0 and ts <= $tsu limit 5 offse ...@@ -722,6 +722,7 @@ sql select bottom(c1, 1) from $stb where ts >= $ts0 and ts <= $tsu limit 5 offse
if $rows != 0 then if $rows != 0 then
return -1 return -1
endi endi
sql select bottom(c1, 5) from $stb where ts >= $ts0 and ts <= $tsu limit 3 offset 5 sql select bottom(c1, 5) from $stb where ts >= $ts0 and ts <= $tsu limit 3 offset 5
if $rows != 0 then if $rows != 0 then
return -1 return -1
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册