未验证 提交 d0c7b573 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #17755 from taosdata/feature/3_liaohj

Feature/3 liaohj
...@@ -164,14 +164,6 @@ typedef enum EStreamType { ...@@ -164,14 +164,6 @@ typedef enum EStreamType {
STREAM_FILL_OVER, STREAM_FILL_OVER,
} EStreamType; } EStreamType;
typedef struct {
SArray* pGroupList;
SArray* pTableList;
SHashObj* map; // speedup acquire the tableQueryInfo by table uid
bool needSortTableByGroupId;
uint64_t suid;
} STableListInfo;
#pragma pack(push, 1) #pragma pack(push, 1)
typedef struct SColumnDataAgg { typedef struct SColumnDataAgg {
int16_t colId; int16_t colId;
......
...@@ -152,26 +152,25 @@ typedef struct STsdbReader STsdbReader; ...@@ -152,26 +152,25 @@ typedef struct STsdbReader STsdbReader;
#define CACHESCAN_RETRIEVE_LAST_ROW 0x4 #define CACHESCAN_RETRIEVE_LAST_ROW 0x4
#define CACHESCAN_RETRIEVE_LAST 0x8 #define CACHESCAN_RETRIEVE_LAST 0x8
int32_t tsdbSetTableId(STsdbReader *pReader, int64_t uid); int32_t tsdbSetTableList(STsdbReader *pReader, const void *pTableList, int32_t num);
int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, SArray *pTableList, STsdbReader **ppReader, int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables,
const char *idstr); STsdbReader **ppReader, const char *idstr);
void tsdbReaderClose(STsdbReader *pReader); void tsdbReaderClose(STsdbReader *pReader);
bool tsdbNextDataBlock(STsdbReader *pReader); bool tsdbNextDataBlock(STsdbReader *pReader);
bool tsdbTableNextDataBlock(STsdbReader *pReader, int64_t uid); bool tsdbTableNextDataBlock(STsdbReader *pReader, uint64_t uid);
void tsdbRetrieveDataBlockInfo(STsdbReader *pReader, SDataBlockInfo *pDataBlockInfo); void tsdbRetrieveDataBlockInfo(STsdbReader *pReader, SDataBlockInfo *pDataBlockInfo);
int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SColumnDataAgg ***pBlockStatis, bool *allHave); int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SColumnDataAgg ***pBlockStatis, bool *allHave);
SArray *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList); SArray *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList);
int32_t tsdbReaderReset(STsdbReader *pReader, SQueryTableDataCond *pCond); int32_t tsdbReaderReset(STsdbReader *pReader, SQueryTableDataCond *pCond);
int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo); int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo);
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader *pHandle); int64_t tsdbGetNumOfRowsInMemTable(STsdbReader *pHandle);
bool tsdbIsAscendingOrder(STsdbReader *pReader);
void *tsdbGetIdx(SMeta *pMeta); void *tsdbGetIdx(SMeta *pMeta);
void *tsdbGetIvtIdx(SMeta *pMeta); void *tsdbGetIvtIdx(SMeta *pMeta);
uint64_t getReaderMaxVersion(STsdbReader *pReader); uint64_t getReaderMaxVersion(STsdbReader *pReader);
int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, SArray *pTableIdList, int32_t numOfCols, uint64_t suid, int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols,
void **pReader); uint64_t suid, void **pReader);
int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, SArray *pTableUids); int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, SArray *pTableUids);
void *tsdbCacherowsReaderClose(void *pReader); void *tsdbCacherowsReaderClose(void *pReader);
int32_t tsdbGetTableSchema(SVnode *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid); int32_t tsdbGetTableSchema(SVnode *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid);
......
...@@ -716,7 +716,9 @@ typedef struct SCacheRowsReader { ...@@ -716,7 +716,9 @@ typedef struct SCacheRowsReader {
int32_t numOfCols; int32_t numOfCols;
int32_t type; int32_t type;
int32_t tableIndex; // currently returned result tables int32_t tableIndex; // currently returned result tables
SArray *pTableList; // table id list
STableKeyInfo *pTableList; // table id list
int32_t numOfTables;
SSttBlockLoadInfo *pLoadInfo; SSttBlockLoadInfo *pLoadInfo;
STsdbReadSnap *pReadSnap; STsdbReadSnap *pReadSnap;
SDataFReader *pDataFReader; SDataFReader *pDataFReader;
......
...@@ -162,10 +162,7 @@ int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSub ...@@ -162,10 +162,7 @@ int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSub
int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock,
SSubmitBlkRsp* pRsp); SSubmitBlkRsp* pRsp);
int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey); int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey);
STsdbReader tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
void* pMemRef);
int32_t tsdbSetKeepCfg(STsdb* pTsdb, STsdbCfg* pCfg); int32_t tsdbSetKeepCfg(STsdb* pTsdb, STsdbCfg* pCfg);
int32_t tsdbGetStbIdList(SMeta* pMeta, int64_t suid, SArray* list);
// tq // tq
int tqInit(); int tqInit();
......
...@@ -701,25 +701,28 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma ...@@ -701,25 +701,28 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma
#endif #endif
for (int32_t i = 0; i < taosArrayGetSize(pResList); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pResList); ++i) {
SSDataBlock *output = taosArrayGetP(pResList, i); SSDataBlock *output = taosArrayGetP(pResList, i);
smaDebug("result block, uid:%"PRIu64", groupid:%"PRIu64", rows:%d", output->info.uid, output->info.groupId,
output->info.rows);
STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]); STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]);
SSubmitReq *pReq = NULL; SSubmitReq *pReq = NULL;
// TODO: the schema update should be handled later(TD-17965) // TODO: the schema update should be handled later(TD-17965)
if (buildSubmitReqFromDataBlock(&pReq, output, pTSchema, SMA_VID(pSma), suid) < 0) { if (buildSubmitReqFromDataBlock(&pReq, output, pTSchema, SMA_VID(pSma), suid) < 0) {
smaError("vgId:%d, build submit req for rsma table %" PRIi64 " level %" PRIi8 " failed since %s", SMA_VID(pSma), smaError("vgId:%d, build submit req for rsma table suid:%" PRIu64 ", uid:%"PRIu64", level %" PRIi8 " failed since %s", SMA_VID(pSma),
suid, pItem->level, terrstr()); suid, output->info.groupId, pItem->level, terrstr());
goto _err; goto _err;
} }
if (pReq && tdProcessSubmitReq(sinkTsdb, output->info.version, pReq) < 0) { if (pReq && tdProcessSubmitReq(sinkTsdb, output->info.version, pReq) < 0) {
taosMemoryFreeClear(pReq); taosMemoryFreeClear(pReq);
smaError("vgId:%d, process submit req for rsma table %" PRIi64 " level %" PRIi8 " failed since %s", smaError("vgId:%d, process submit req for rsma suid:%"PRIu64", uid:%" PRIu64 " level %" PRIi8 " failed since %s",
SMA_VID(pSma), suid, pItem->level, terrstr()); SMA_VID(pSma), suid, output->info.groupId, pItem->level, terrstr());
goto _err; goto _err;
} }
smaDebug("vgId:%d, process submit req for rsma table %" PRIi64 " level %" PRIi8 " ver %" PRIi64 " len %" PRIu32, smaDebug("vgId:%d, process submit req for rsma suid:%" PRIu64 ",uid:%"PRIu64", level %" PRIi8 " ver %" PRIi64 " len %" PRIu32,
SMA_VID(pSma), suid, pItem->level, output->info.version, htonl(pReq->header.contLen)); SMA_VID(pSma), suid, output->info.groupId, pItem->level, output->info.version, htonl(pReq->header.contLen));
taosMemoryFreeClear(pReq); taosMemoryFreeClear(pReq);
} }
......
...@@ -97,10 +97,9 @@ static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pRea ...@@ -97,10 +97,9 @@ static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pRea
} }
} }
int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList, int32_t numOfCols, uint64_t suid, int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, int32_t numOfTables, int32_t numOfCols,
void** pReader) { uint64_t suid, void** pReader) {
*pReader = NULL; *pReader = NULL;
SCacheRowsReader* p = taosMemoryCalloc(1, sizeof(SCacheRowsReader)); SCacheRowsReader* p = taosMemoryCalloc(1, sizeof(SCacheRowsReader));
if (p == NULL) { if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
...@@ -111,14 +110,15 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList ...@@ -111,14 +110,15 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList
p->numOfCols = numOfCols; p->numOfCols = numOfCols;
p->suid = suid; p->suid = suid;
if (taosArrayGetSize(pTableIdList) == 0) { if (numOfTables == 0) {
*pReader = p; *pReader = p;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
STableKeyInfo* pKeyInfo = taosArrayGet(pTableIdList, 0); STableKeyInfo* pKeyInfo = &((STableKeyInfo*)pTableIdList)[0];
p->pSchema = metaGetTbTSchema(p->pVnode->pMeta, pKeyInfo->uid, -1, 1); p->pSchema = metaGetTbTSchema(p->pVnode->pMeta, pKeyInfo->uid, -1, 1);
p->pTableList = pTableIdList; p->pTableList = pTableIdList;
p->numOfTables = numOfTables;
p->transferBuf = taosMemoryCalloc(p->pSchema->numOfCols, POINTER_BYTES); p->transferBuf = taosMemoryCalloc(p->pSchema->numOfCols, POINTER_BYTES);
if (p->transferBuf == NULL) { if (p->transferBuf == NULL) {
...@@ -205,7 +205,6 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 ...@@ -205,7 +205,6 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
SLRUCache* lruCache = pr->pVnode->pTsdb->lruCache; SLRUCache* lruCache = pr->pVnode->pTsdb->lruCache;
LRUHandle* h = NULL; LRUHandle* h = NULL;
SArray* pRow = NULL; SArray* pRow = NULL;
size_t numOfTables = taosArrayGetSize(pr->pTableList);
bool hasRes = false; bool hasRes = false;
SArray* pLastCols = NULL; SArray* pLastCols = NULL;
...@@ -243,8 +242,8 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 ...@@ -243,8 +242,8 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
// retrieve the only one last row of all tables in the uid list. // retrieve the only one last row of all tables in the uid list.
if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_SINGLE)) { if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_SINGLE)) {
for (int32_t i = 0; i < numOfTables; ++i) { for (int32_t i = 0; i < pr->numOfTables; ++i) {
STableKeyInfo* pKeyInfo = taosArrayGet(pr->pTableList, i); STableKeyInfo* pKeyInfo = &pr->pTableList[i];
code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h); code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -308,8 +307,8 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 ...@@ -308,8 +307,8 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
} }
} else if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_ALL)) { } else if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_ALL)) {
for (int32_t i = pr->tableIndex; i < numOfTables; ++i) { for (int32_t i = pr->tableIndex; i < pr->numOfTables; ++i) {
STableKeyInfo* pKeyInfo = (STableKeyInfo*)taosArrayGet(pr->pTableList, i); STableKeyInfo* pKeyInfo = &pr->pTableList[i];
code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h); code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
...@@ -334,7 +333,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 ...@@ -334,7 +333,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
code = TSDB_CODE_INVALID_PARA; code = TSDB_CODE_INVALID_PARA;
} }
_end: _end:
tsdbDataFReaderClose(&pr->pDataFReaderLast); tsdbDataFReaderClose(&pr->pDataFReaderLast);
tsdbDataFReaderClose(&pr->pDataFReader); tsdbDataFReaderClose(&pr->pDataFReader);
......
...@@ -270,10 +270,7 @@ static void resetDataBlockScanInfo(SHashObj* pTableMap, int64_t ts) { ...@@ -270,10 +270,7 @@ static void resetDataBlockScanInfo(SHashObj* pTableMap, int64_t ts) {
} }
} }
static void destroyBlockScanInfo(SHashObj* pTableMap) { static void clearBlockScanInfo(STableBlockScanInfo* p) {
STableBlockScanInfo* p = NULL;
while ((p = taosHashIterate(pTableMap, p)) != NULL) {
p->iterInit = false; p->iterInit = false;
p->iiter.hasVal = false; p->iiter.hasVal = false;
...@@ -288,6 +285,12 @@ static void destroyBlockScanInfo(SHashObj* pTableMap) { ...@@ -288,6 +285,12 @@ static void destroyBlockScanInfo(SHashObj* pTableMap) {
p->delSkyline = taosArrayDestroy(p->delSkyline); p->delSkyline = taosArrayDestroy(p->delSkyline);
p->pBlockList = taosArrayDestroy(p->pBlockList); p->pBlockList = taosArrayDestroy(p->pBlockList);
tMapDataClear(&p->mapData); tMapDataClear(&p->mapData);
}
static void destroyBlockScanInfo(SHashObj* pTableMap) {
STableBlockScanInfo* p = NULL;
while ((p = taosHashIterate(pTableMap, p)) != NULL) {
clearBlockScanInfo(p);
} }
taosHashCleanup(pTableMap); taosHashCleanup(pTableMap);
...@@ -3452,13 +3455,23 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e ...@@ -3452,13 +3455,23 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
// todo refactor, use arraylist instead // TODO refactor: with createDataBlockScanInfo
int32_t tsdbSetTableId(STsdbReader* pReader, int64_t uid) { int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t num) {
ASSERT(pReader != NULL); ASSERT(pReader != NULL);
STableBlockScanInfo* p = NULL;
while ((p = taosHashIterate(pReader->status.pTableMap, p)) != NULL) {
clearBlockScanInfo(p);
}
taosHashClear(pReader->status.pTableMap); taosHashClear(pReader->status.pTableMap);
STableBlockScanInfo info = {.lastKey = 0, .uid = uid}; STableKeyInfo* pList = (STableKeyInfo*) pTableList;
for(int32_t i = 0; i < num; ++i) {
STableBlockScanInfo info = {.lastKey = 0, .uid = pList[i].uid};
taosHashPut(pReader->status.pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info)); taosHashPut(pReader->status.pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info));
}
return TDB_CODE_SUCCESS; return TDB_CODE_SUCCESS;
} }
...@@ -3495,8 +3508,8 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) { ...@@ -3495,8 +3508,8 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) {
} }
// ====================================== EXPOSED APIs ====================================== // ====================================== EXPOSED APIs ======================================
int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTableList, STsdbReader** ppReader, int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables,
const char* idstr) { STsdbReader** ppReader, const char* idstr) {
STimeWindow window = pCond->twindows; STimeWindow window = pCond->twindows;
if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) { if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) {
pCond->twindows.skey += 1; pCond->twindows.skey += 1;
...@@ -3555,8 +3568,8 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl ...@@ -3555,8 +3568,8 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
if (pReader->pSchema == NULL) { if (pReader->pSchema == NULL) {
tsdbError("failed to get table schema, suid:%" PRIu64 ", ver:-1, %s", pReader->suid, pReader->idStr); tsdbError("failed to get table schema, suid:%" PRIu64 ", ver:-1, %s", pReader->suid, pReader->idStr);
} }
} else if (taosArrayGetSize(pTableList) > 0) { } else if (numOfTables > 0) {
STableKeyInfo* pKey = taosArrayGet(pTableList, 0); STableKeyInfo* pKey = pTableList;
pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pKey->uid, -1, 1); pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pKey->uid, -1, 1);
if (pReader->pSchema == NULL) { if (pReader->pSchema == NULL) {
tsdbError("failed to get table schema, uid:%" PRIu64 ", ver:-1, %s", pKey->uid, pReader->idStr); tsdbError("failed to get table schema, uid:%" PRIu64 ", ver:-1, %s", pKey->uid, pReader->idStr);
...@@ -3565,8 +3578,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl ...@@ -3565,8 +3578,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
STsdbReader* p = pReader->innerReader[0] != NULL ? pReader->innerReader[0] : pReader; STsdbReader* p = pReader->innerReader[0] != NULL ? pReader->innerReader[0] : pReader;
int32_t numOfTables = taosArrayGetSize(pTableList); pReader->status.pTableMap = createDataBlockScanInfo(p, pTableList, numOfTables);
pReader->status.pTableMap = createDataBlockScanInfo(p, pTableList->pData, numOfTables);
if (pReader->status.pTableMap == NULL) { if (pReader->status.pTableMap == NULL) {
tsdbReaderClose(pReader); tsdbReaderClose(pReader);
*ppReader = NULL; *ppReader = NULL;
...@@ -3613,7 +3625,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl ...@@ -3613,7 +3625,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr); tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr);
return code; return code;
_err: _err:
tsdbError("failed to create data reader, code:%s %s", tstrerror(code), idstr); tsdbError("failed to create data reader, code:%s %s", tstrerror(code), idstr);
return code; return code;
} }
...@@ -3778,7 +3790,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { ...@@ -3778,7 +3790,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
return false; return false;
} }
bool tsdbTableNextDataBlock(STsdbReader* pReader, int64_t uid) { bool tsdbTableNextDataBlock(STsdbReader* pReader, uint64_t uid) {
STableBlockScanInfo* pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &uid, sizeof(uid)); STableBlockScanInfo* pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &uid, sizeof(uid));
if (pBlockScanInfo == NULL) { // no data block for the table of given uid if (pBlockScanInfo == NULL) { // no data block for the table of given uid
return false; return false;
...@@ -4158,7 +4170,7 @@ int32_t tsdbTakeReadSnap(STsdb* pTsdb, STsdbReadSnap** ppSnap, const char* idStr ...@@ -4158,7 +4170,7 @@ int32_t tsdbTakeReadSnap(STsdb* pTsdb, STsdbReadSnap** ppSnap, const char* idStr
} }
tsdbTrace("vgId:%d, take read snapshot, %s", TD_VID(pTsdb->pVnode), idStr); tsdbTrace("vgId:%d, take read snapshot, %s", TD_VID(pTsdb->pVnode), idStr);
_exit: _exit:
return code; return code;
} }
...@@ -4177,4 +4189,3 @@ void tsdbUntakeReadSnap(STsdb* pTsdb, STsdbReadSnap* pSnap, const char* idStr) { ...@@ -4177,4 +4189,3 @@ void tsdbUntakeReadSnap(STsdb* pTsdb, STsdbReadSnap* pSnap, const char* idStr) {
} }
tsdbTrace("vgId:%d, untake read snapshot, %s", TD_VID(pTsdb->pVnode), idStr); tsdbTrace("vgId:%d, untake read snapshot, %s", TD_VID(pTsdb->pVnode), idStr);
} }
bool tsdbIsAscendingOrder(STsdbReader* pReader) { return ASCENDING_TRAVERSE(pReader->order); }
...@@ -28,7 +28,7 @@ ...@@ -28,7 +28,7 @@
do { \ do { \
ASSERT((_c) != -1); \ ASSERT((_c) != -1); \
longjmp((_obj), (_c)); \ longjmp((_obj), (_c)); \
} while (0); } while (0)
#define SET_RES_WINDOW_KEY(_k, _ori, _len, _uid) \ #define SET_RES_WINDOW_KEY(_k, _ori, _len, _uid) \
do { \ do { \
...@@ -95,6 +95,25 @@ typedef struct SColMatchInfo { ...@@ -95,6 +95,25 @@ typedef struct SColMatchInfo {
int32_t matchType; // determinate the source according to col id or slot id int32_t matchType; // determinate the source according to col id or slot id
} SColMatchInfo; } SColMatchInfo;
// If the numOfOutputGroups is 1, the data blocks that belongs to different groups will be provided randomly
// The numOfOutputGroups is specified by physical plan. and will not be affect by numOfGroups
typedef struct STableListInfo {
bool oneTableForEachGroup;
int32_t numOfOuputGroups; // the data block will be generated one by one
int32_t* groupOffset; // keep the offset value for each group in the tableList
SArray* pTableList;
SHashObj* map; // speedup acquire the tableQueryInfo by table uid
uint64_t suid;
} STableListInfo;
void destroyTableList(STableListInfo* pTableList);
int32_t getNumOfOutputGroups(const STableListInfo* pTableList);
bool oneTableForEachGroup(const STableListInfo* pTableList);
uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid);
int32_t addTableIntoTableList(STableListInfo* pTableList, uint64_t uid, uint64_t gid);
int32_t getTablesOfGroup(const STableListInfo* pTableList, int32_t ordinalIndex, STableKeyInfo** pKeyInfo, int32_t* num);
uint64_t getTotalTables(const STableListInfo* pTableList);
struct SqlFunctionCtx; struct SqlFunctionCtx;
size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput); size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput);
......
...@@ -1077,7 +1077,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN ...@@ -1077,7 +1077,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex); void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex);
bool groupbyTbname(SNodeList* pGroupList); bool groupbyTbname(SNodeList* pGroupList);
int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* groupKey); int32_t setGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* group, bool groupSort);
void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput); void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput);
int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup, int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup,
SGroupResInfo* pGroupResInfo); SGroupResInfo* pGroupResInfo);
......
...@@ -48,6 +48,10 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe ...@@ -48,6 +48,10 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
int32_t numOfCols = 0; int32_t numOfCols = 0;
code = code =
extractColMatchInfo(pScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo); extractColMatchInfo(pScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
removeRedundantTsCol(pScanNode, &pInfo->matchInfo); removeRedundantTsCol(pScanNode, &pInfo->matchInfo);
code = extractCacheScanSlotId(pInfo->matchInfo.pList, pTaskInfo, &pInfo->pSlotIds); code = extractCacheScanSlotId(pInfo->matchInfo.pList, pTaskInfo, &pInfo->pSlotIds);
...@@ -61,11 +65,15 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe ...@@ -61,11 +65,15 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
pInfo->pUidList = taosArrayInit(4, sizeof(int64_t)); pInfo->pUidList = taosArrayInit(4, sizeof(int64_t));
// partition by tbname // partition by tbname, todo opt perf
if (taosArrayGetSize(pTableList->pGroupList) == taosArrayGetSize(pTableList->pTableList)) { if (oneTableForEachGroup(pTableList) || (getTotalTables(pTableList) == 1)) {
pInfo->retrieveType = pInfo->retrieveType =
CACHESCAN_RETRIEVE_TYPE_ALL | (pScanNode->ignoreNull ? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW); CACHESCAN_RETRIEVE_TYPE_ALL | (pScanNode->ignoreNull ? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW);
code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pTableList->pTableList,
STableKeyInfo* pList = taosArrayGet(pTableList->pTableList, 0);
size_t num = taosArrayGetSize(pTableList->pTableList);
code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, num,
taosArrayGetSize(pInfo->matchInfo.pList), pTableList->suid, &pInfo->pLastrowReader); taosArrayGetSize(pInfo->matchInfo.pList), pTableList->suid, &pInfo->pLastrowReader);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
...@@ -98,7 +106,7 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe ...@@ -98,7 +106,7 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
pOperator->cost.openCost = 0; pOperator->cost.openCost = 0;
return pOperator; return pOperator;
_error: _error:
pTaskInfo->code = code; pTaskInfo->code = code;
destroyLastrowScanOperator(pInfo); destroyLastrowScanOperator(pInfo);
taosMemoryFree(pOperator); taosMemoryFree(pOperator);
...@@ -167,16 +175,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { ...@@ -167,16 +175,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
} }
} }
if (pTableList->map != NULL) { pInfo->pRes->info.groupId = getTableGroupId(pTableList, pInfo->pRes->info.uid);
int64_t* groupId = taosHashGet(pTableList->map, &pInfo->pRes->info.uid, sizeof(int64_t));
if (groupId != NULL) {
pInfo->pRes->info.groupId = *groupId;
}
} else {
ASSERT(taosArrayGetSize(pTableList->pTableList) == 1);
STableKeyInfo* pKeyInfo = taosArrayGet(pTableList->pTableList, 0);
pInfo->pRes->info.groupId = pKeyInfo->groupId;
}
pInfo->indexOfBufferedRes += 1; pInfo->indexOfBufferedRes += 1;
return pInfo->pRes; return pInfo->pRes;
...@@ -185,18 +184,25 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { ...@@ -185,18 +184,25 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
return NULL; return NULL;
} }
} else { } else {
size_t totalGroups = taosArrayGetSize(pTableList->pGroupList); size_t totalGroups = getNumOfOutputGroups(pTableList);
while (pInfo->currentGroupIndex < totalGroups) { while (pInfo->currentGroupIndex < totalGroups) {
SArray* pGroupTableList = taosArrayGetP(pTableList->pGroupList, pInfo->currentGroupIndex);
tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pGroupTableList, STableKeyInfo* pList = NULL;
int32_t num = 0;
int32_t code = getTablesOfGroup(pTableList, pInfo->currentGroupIndex, &pList, &num);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, num,
taosArrayGetSize(pInfo->matchInfo.pList), pTableList->suid, &pInfo->pLastrowReader); taosArrayGetSize(pInfo->matchInfo.pList), pTableList->suid, &pInfo->pLastrowReader);
taosArrayClear(pInfo->pUidList); taosArrayClear(pInfo->pUidList);
int32_t code = tsdbRetrieveCacheRows(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pUidList); code = tsdbRetrieveCacheRows(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pUidList);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
} }
pInfo->currentGroupIndex += 1; pInfo->currentGroupIndex += 1;
...@@ -206,7 +212,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { ...@@ -206,7 +212,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
if (pInfo->pseudoExprSup.numOfExprs > 0) { if (pInfo->pseudoExprSup.numOfExprs > 0) {
SExprSupp* pSup = &pInfo->pseudoExprSup; SExprSupp* pSup = &pInfo->pseudoExprSup;
STableKeyInfo* pKeyInfo = taosArrayGet(pGroupTableList, 0); STableKeyInfo* pKeyInfo = &((STableKeyInfo*)pTableList)[0];
pInfo->pRes->info.groupId = pKeyInfo->groupId; pInfo->pRes->info.groupId = pKeyInfo->groupId;
if (taosArrayGetSize(pInfo->pUidList) > 0) { if (taosArrayGetSize(pInfo->pUidList) > 0) {
......
...@@ -544,6 +544,7 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis ...@@ -544,6 +544,7 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto end; goto end;
} }
ctx.index = 0; ctx.index = 0;
ctx.cInfoList = taosArrayInit(4, sizeof(SColumnInfo)); ctx.cInfoList = taosArrayInit(4, sizeof(SColumnInfo));
if (ctx.cInfoList == NULL) { if (ctx.cInfoList == NULL) {
...@@ -606,6 +607,7 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis ...@@ -606,6 +607,7 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis
} else { } else {
void* tag = taosHashGet(tags, uid, sizeof(int64_t)); void* tag = taosHashGet(tags, uid, sizeof(int64_t));
ASSERT(tag); ASSERT(tag);
STagVal tagVal = {0}; STagVal tagVal = {0};
tagVal.cid = pColInfo->info.colId; tagVal.cid = pColInfo->info.colId;
const char* p = metaGetTableTagVal(tag, pColInfo->info.type, &tagVal); const char* p = metaGetTableTagVal(tag, pColInfo->info.type, &tagVal);
...@@ -636,6 +638,7 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis ...@@ -636,6 +638,7 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis
} }
} }
} }
pResBlock->info.rows = rows; pResBlock->info.rows = rows;
// int64_t st1 = taosGetTimestampUs(); // int64_t st1 = taosGetTimestampUs();
...@@ -661,10 +664,12 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis ...@@ -661,10 +664,12 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis
} }
break; break;
} }
default: default:
code = TSDB_CODE_OPS_NOT_SUPPORT; code = TSDB_CODE_OPS_NOT_SUPPORT;
goto end; goto end;
} }
if (nodeType(pNode) == QUERY_NODE_COLUMN) { if (nodeType(pNode) == QUERY_NODE_COLUMN) {
SColumnNode* pSColumnNode = (SColumnNode*)pNode; SColumnNode* pSColumnNode = (SColumnNode*)pNode;
SColumnInfoData* pColInfo = (SColumnInfoData*)taosArrayGet(pResBlock->pDataBlock, pSColumnNode->slotId); SColumnInfoData* pColInfo = (SColumnInfoData*)taosArrayGet(pResBlock->pDataBlock, pSColumnNode->slotId);
...@@ -674,10 +679,12 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis ...@@ -674,10 +679,12 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis
} else { } else {
code = scalarCalculate(pNode, pBlockList, &output); code = scalarCalculate(pNode, pBlockList, &output);
} }
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
releaseColInfoData(output.columnData); releaseColInfoData(output.columnData);
goto end; goto end;
} }
taosArrayPush(groupData, &output.columnData); taosArrayPush(groupData, &output.columnData);
} }
...@@ -696,6 +703,7 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis ...@@ -696,6 +703,7 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto end; goto end;
} }
for (int i = 0; i < rows; i++) { for (int i = 0; i < rows; i++) {
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i); STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
...@@ -733,7 +741,6 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis ...@@ -733,7 +741,6 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis
int32_t len = (int32_t)(pStart - (char*)keyBuf); int32_t len = (int32_t)(pStart - (char*)keyBuf);
info->groupId = calcGroupId(keyBuf, len); info->groupId = calcGroupId(keyBuf, len);
taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &info->groupId, sizeof(uint64_t));
} }
// int64_t st2 = taosGetTimestampUs(); // int64_t st2 = taosGetTimestampUs();
...@@ -817,38 +824,86 @@ static int32_t removeInvalidTable(SArray* uids, SHashObj* tags) { ...@@ -817,38 +824,86 @@ static int32_t removeInvalidTable(SArray* uids, SHashObj* tags) {
taosArrayDestroy(validUid); taosArrayDestroy(validUid);
return 0; return 0;
} }
static int32_t nameComparFn(const void* p1, const void* p2) {
const char* pName1 = *(const char**)p1;
const char* pName2 = *(const char**)p2;
int32_t ret = strcmp(pName1, pName2);
if (ret == 0) {
return 0;
} else {
return (ret > 0) ? 1 : -1;
}
}
static SArray* getTableNameList(const SNodeListNode* pList) {
int32_t len = LIST_LENGTH(pList->pNodeList);
SListCell* cell = pList->pNodeList->pHead;
SArray* pTbList = taosArrayInit(len, POINTER_BYTES);
for (int i = 0; i < pList->pNodeList->length; i++) {
SValueNode* valueNode = (SValueNode*)cell->pNode;
if (!IS_VAR_DATA_TYPE(valueNode->node.resType.type)) {
terrno = TSDB_CODE_INVALID_PARA;
taosArrayDestroy(pTbList);
return NULL;
}
char* name = varDataVal(valueNode->datum.p);
taosArrayPush(pTbList, &name);
cell = cell->pNext;
}
size_t numOfTables = taosArrayGetSize(pTbList);
// order the name
taosArraySort(pTbList, nameComparFn);
// remove the duplicates
SArray* pNewList = taosArrayInit(taosArrayGetSize(pTbList), sizeof(void*));
taosArrayPush(pNewList, taosArrayGet(pTbList, 0));
for (int32_t i = 1; i < numOfTables; ++i) {
char** name = taosArrayGetLast(pNewList);
char** nameInOldList = taosArrayGet(pTbList, i);
if (strcmp(*name, *nameInOldList) == 0) {
continue;
}
taosArrayPush(pNewList, nameInOldList);
}
taosArrayDestroy(pTbList);
return pNewList;
}
static int32_t optimizeTbnameInCondImpl(void* metaHandle, int64_t suid, SArray* list, SNode* pTagCond) { static int32_t optimizeTbnameInCondImpl(void* metaHandle, int64_t suid, SArray* list, SNode* pTagCond) {
if (nodeType(pTagCond) != QUERY_NODE_OPERATOR) { if (nodeType(pTagCond) != QUERY_NODE_OPERATOR) {
return -1; return -1;
} }
SOperatorNode* pNode = (SOperatorNode*)pTagCond; SOperatorNode* pNode = (SOperatorNode*)pTagCond;
if (pNode->opType != OP_TYPE_IN) { if (pNode->opType != OP_TYPE_IN) {
return -1; return -1;
} }
if ((pNode->pLeft != NULL && nodeType(pNode->pLeft) == QUERY_NODE_COLUMN && if ((pNode->pLeft != NULL && nodeType(pNode->pLeft) == QUERY_NODE_COLUMN &&
((SColumnNode*)pNode->pLeft)->colType == COLUMN_TYPE_TBNAME) && ((SColumnNode*)pNode->pLeft)->colType == COLUMN_TYPE_TBNAME) &&
(pNode->pRight != NULL && nodeType(pNode->pRight) == QUERY_NODE_NODE_LIST)) { (pNode->pRight != NULL && nodeType(pNode->pRight) == QUERY_NODE_NODE_LIST)) {
SNodeListNode* pList = (SNodeListNode*)pNode->pRight; SNodeListNode* pList = (SNodeListNode*)pNode->pRight;
int32_t len = LIST_LENGTH(pList->pNodeList); int32_t len = LIST_LENGTH(pList->pNodeList);
if (len <= 0) return -1; if (len <= 0) {
SListCell* cell = pList->pNodeList->pHead;
SArray* pTbList = taosArrayInit(len, sizeof(void*));
for (int i = 0; i < pList->pNodeList->length; i++) {
SValueNode* valueNode = (SValueNode*)cell->pNode;
if (!IS_VAR_DATA_TYPE(valueNode->node.resType.type)) {
taosArrayDestroy(pTbList);
return -1; return -1;
} }
char* name = varDataVal(valueNode->datum.p);
taosArrayPush(pTbList, &name);
cell = cell->pNext;
}
for (int i = 0; i < taosArrayGetSize(pTbList); i++) { SArray* pTbList = getTableNameList(pList);
int32_t numOfTables = taosArrayGetSize(pTbList);
for (int i = 0; i < numOfTables; i++) {
char* name = taosArrayGetP(pTbList, i); char* name = taosArrayGetP(pTbList, i);
uint64_t uid = 0; uint64_t uid = 0;
if (metaGetTableUidByName(metaHandle, name, &uid) == 0) { if (metaGetTableUidByName(metaHandle, name, &uid) == 0) {
ETableType tbType = TSDB_TABLE_MAX; ETableType tbType = TSDB_TABLE_MAX;
...@@ -863,11 +918,14 @@ static int32_t optimizeTbnameInCondImpl(void* metaHandle, int64_t suid, SArray* ...@@ -863,11 +918,14 @@ static int32_t optimizeTbnameInCondImpl(void* metaHandle, int64_t suid, SArray*
terrno = 0; terrno = 0;
} }
} }
taosArrayDestroy(pTbList); taosArrayDestroy(pTbList);
return 0; return 0;
} }
return -1; return -1;
} }
int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond, int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond,
STableListInfo* pListInfo) { STableListInfo* pListInfo) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
...@@ -946,14 +1004,6 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, ...@@ -946,14 +1004,6 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode,
} }
taosArrayDestroy(res); taosArrayDestroy(res);
pListInfo->pGroupList = taosArrayInit(4, POINTER_BYTES);
if (pListInfo->pGroupList == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
// put into list as default group, remove it if grouping sorting is required later
taosArrayPush(pListInfo->pGroupList, &pListInfo->pTableList);
return code; return code;
} }
...@@ -1411,7 +1461,7 @@ void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray ...@@ -1411,7 +1461,7 @@ void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray
while (i < numOfSrcCols && j < taosArrayGetSize(pColMatchInfo)) { while (i < numOfSrcCols && j < taosArrayGetSize(pColMatchInfo)) {
SColumnInfoData* p = taosArrayGet(pCols, i); SColumnInfoData* p = taosArrayGet(pCols, i);
SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, j); SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, j);
/* if (!outputEveryColumn && pmInfo->reserved) { /* if (!outputEveryColumn && pmInfo->reserved) {
j++; j++;
continue; continue;
}*/ }*/
...@@ -1604,3 +1654,78 @@ void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimit ...@@ -1604,3 +1654,78 @@ void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimit
pLimitInfo->remainOffset = limit.offset; pLimitInfo->remainOffset = limit.offset;
pLimitInfo->remainGroupOffset = slimit.offset; pLimitInfo->remainGroupOffset = slimit.offset;
} }
uint64_t getTotalTables(const STableListInfo* pTableList) {
ASSERT(taosArrayGetSize(pTableList->pTableList) == taosHashGetSize(pTableList->map));
return taosArrayGetSize(pTableList->pTableList);
}
uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid) {
int32_t* slot = taosHashGet(pTableList->map, &tableUid, sizeof(tableUid));
ASSERT(pTableList->map != NULL && slot != NULL);
STableKeyInfo* pKeyInfo = taosArrayGet(pTableList->pTableList, *slot);
ASSERT(pKeyInfo->uid == tableUid);
return pKeyInfo->groupId;
}
int32_t addTableIntoTableList(STableListInfo* pTableList, uint64_t uid, uint64_t gid) {
if (pTableList->map == NULL) {
ASSERT(taosArrayGetSize(pTableList->pTableList) == 0);
pTableList->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
}
STableKeyInfo keyInfo = {.uid = uid, .groupId = gid};
taosArrayPush(pTableList->pTableList, &keyInfo);
int32_t slot = (int32_t)taosArrayGetSize(pTableList->pTableList) - 1;
taosHashPut(pTableList->map, &uid, sizeof(uid), &slot, sizeof(slot));
qDebug("uid:%" PRIu64 ", groupId:%" PRIu64 " added into table list, slot:%d, total:%d", uid, gid, slot, slot + 1);
return TSDB_CODE_SUCCESS;
}
int32_t getTablesOfGroup(const STableListInfo* pTableList, int32_t ordinalGroupIndex, STableKeyInfo** pKeyInfo,
int32_t* size) {
int32_t total = getNumOfOutputGroups(pTableList);
if (ordinalGroupIndex < 0 || ordinalGroupIndex >= total) {
return TSDB_CODE_INVALID_PARA;
}
// here handle two special cases:
// 1. only one group exists, and 2. one table exists for each group.
if (total == 1) {
*size = getTotalTables(pTableList);
*pKeyInfo = (*size == 0)? NULL:taosArrayGet(pTableList->pTableList, 0);
return TSDB_CODE_SUCCESS;
} else if (total == getTotalTables(pTableList)) {
*size = 1;
*pKeyInfo = taosArrayGet(pTableList->pTableList, ordinalGroupIndex);
return TSDB_CODE_SUCCESS;
}
int32_t offset = pTableList->groupOffset[ordinalGroupIndex];
if (ordinalGroupIndex < total - 1) {
*size = pTableList->groupOffset[offset + 1] - pTableList->groupOffset[offset];
} else {
*size = total - pTableList->groupOffset[offset] - 1;
}
*pKeyInfo = taosArrayGet(pTableList->pTableList, offset);
return TSDB_CODE_SUCCESS;
}
int32_t getNumOfOutputGroups(const STableListInfo* pTableList) { return pTableList->numOfOuputGroups; }
bool oneTableForEachGroup(const STableListInfo* pTableList) { return pTableList->oneTableForEachGroup; }
void destroyTableList(STableListInfo* pTableqinfoList) {
pTableqinfoList->pTableList = taosArrayDestroy(pTableqinfoList->pTableList);
taosMemoryFreeClear(pTableqinfoList->groupOffset);
taosHashCleanup(pTableqinfoList->map);
pTableqinfoList->pTableList = NULL;
pTableqinfoList->map = NULL;
}
\ No newline at end of file
...@@ -293,9 +293,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo ...@@ -293,9 +293,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
qDebug("add %d tables id into query list, %s", (int32_t)taosArrayGetSize(tableIdList), pTaskInfo->id.str); qDebug("add %d tables id into query list, %s", (int32_t)taosArrayGetSize(tableIdList), pTaskInfo->id.str);
} }
if (pListInfo->map == NULL) {
pListInfo->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
}
// traverse to the stream scanner node to add this table id // traverse to the stream scanner node to add this table id
SOperatorInfo* pInfo = pTaskInfo->pRoot; SOperatorInfo* pInfo = pTaskInfo->pRoot;
...@@ -307,8 +305,10 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo ...@@ -307,8 +305,10 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
SStreamScanInfo* pScanInfo = pInfo->info; SStreamScanInfo* pScanInfo = pInfo->info;
if (isAdd) { // add new table id if (isAdd) { // add new table id
SArray* qa = filterUnqualifiedTables(pScanInfo, tableIdList, GET_TASKID(pTaskInfo)); SArray* qa = filterUnqualifiedTables(pScanInfo, tableIdList, GET_TASKID(pTaskInfo));
int32_t numOfQualifiedTables = taosArrayGetSize(qa);
qDebug(" %d qualified child tables added into stream scanner", numOfQualifiedTables);
qDebug(" %d qualified child tables added into stream scanner", (int32_t)taosArrayGetSize(qa));
code = tqReaderAddTbUidList(pScanInfo->tqReader, qa); code = tqReaderAddTbUidList(pScanInfo->tqReader, qa);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(qa); taosArrayDestroy(qa);
...@@ -328,7 +328,9 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo ...@@ -328,7 +328,9 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
} }
} }
for (int32_t i = 0; i < taosArrayGetSize(qa); ++i) { STableListInfo* pTableListInfo = &pTaskInfo->tableqinfoList;
for (int32_t i = 0; i < numOfQualifiedTables; ++i) {
uint64_t* uid = taosArrayGet(qa, i); uint64_t* uid = taosArrayGet(qa, i);
STableKeyInfo keyInfo = {.uid = *uid, .groupId = 0}; STableKeyInfo keyInfo = {.uid = *uid, .groupId = 0};
...@@ -358,8 +360,8 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo ...@@ -358,8 +360,8 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
if (!exists) { if (!exists) {
#endif #endif
taosArrayPush(pTaskInfo->tableqinfoList.pTableList, &keyInfo);
taosHashPut(pTaskInfo->tableqinfoList.map, uid, sizeof(*uid), &keyInfo.groupId, sizeof(keyInfo.groupId)); addTableIntoTableList(pTableListInfo, keyInfo.uid, keyInfo.groupId);
} }
if (keyBuf != NULL) { if (keyBuf != NULL) {
...@@ -439,7 +441,7 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, ...@@ -439,7 +441,7 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
qDebug("subplan task create completed, TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, pSubplan->id.queryId); qDebug("subplan task create completed, TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, pSubplan->id.queryId);
_error: _error:
// if failed to add ref for all tables in this query, abort current query // if failed to add ref for all tables in this query, abort current query
return code; return code;
} }
...@@ -935,7 +937,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT ...@@ -935,7 +937,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
/*if (pTaskInfo->streamInfo.lastStatus.type != TMQ_OFFSET__SNAPSHOT_DATA ||*/ /*if (pTaskInfo->streamInfo.lastStatus.type != TMQ_OFFSET__SNAPSHOT_DATA ||*/
/*pTaskInfo->streamInfo.lastStatus.uid != uid || pTaskInfo->streamInfo.lastStatus.ts != ts) {*/ /*pTaskInfo->streamInfo.lastStatus.uid != uid || pTaskInfo->streamInfo.lastStatus.ts != ts) {*/
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
int32_t tableSz = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList); int32_t numOfTables = getTotalTables(&pTaskInfo->tableqinfoList);
#ifndef NDEBUG #ifndef NDEBUG
qDebug("switch to next table %" PRId64 " (cursor %d), %" PRId64 " rows returned", uid, qDebug("switch to next table %" PRId64 " (cursor %d), %" PRId64 " rows returned", uid,
...@@ -944,7 +946,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT ...@@ -944,7 +946,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
#endif #endif
bool found = false; bool found = false;
for (int32_t i = 0; i < tableSz; i++) { for (int32_t i = 0; i < numOfTables; i++) {
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, i); STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, i);
if (pTableInfo->uid == uid) { if (pTableInfo->uid == uid) {
found = true; found = true;
...@@ -957,14 +959,17 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT ...@@ -957,14 +959,17 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
ASSERT(found); ASSERT(found);
if (pTableScanInfo->dataReader == NULL) { if (pTableScanInfo->dataReader == NULL) {
if (tsdbReaderOpen(pTableScanInfo->readHandle.vnode, &pTableScanInfo->cond, STableKeyInfo* pList = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, 0);
pTaskInfo->tableqinfoList.pTableList, &pTableScanInfo->dataReader, NULL) < 0 || int32_t num = getTotalTables(&pTaskInfo->tableqinfoList);
pTableScanInfo->dataReader == NULL) {
if (tsdbReaderOpen(pTableScanInfo->readHandle.vnode, &pTableScanInfo->cond, pList, num,
&pTableScanInfo->dataReader, NULL) < 0 || pTableScanInfo->dataReader == NULL) {
ASSERT(0); ASSERT(0);
} }
} }
tsdbSetTableId(pTableScanInfo->dataReader, uid); STableKeyInfo tki = {.uid = uid};
tsdbSetTableList(pTableScanInfo->dataReader, &tki, 1);
int64_t oldSkey = pTableScanInfo->cond.twindows.skey; int64_t oldSkey = pTableScanInfo->cond.twindows.skey;
pTableScanInfo->cond.twindows.skey = ts + 1; pTableScanInfo->cond.twindows.skey = ts + 1;
tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond); tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond);
...@@ -972,7 +977,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT ...@@ -972,7 +977,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
pTableScanInfo->scanTimes = 0; pTableScanInfo->scanTimes = 0;
qDebug("tsdb reader offset seek to uid %" PRId64 " ts %" PRId64 ", table cur set to %d , all table num %d", uid, qDebug("tsdb reader offset seek to uid %" PRId64 " ts %" PRId64 ", table cur set to %d , all table num %d", uid,
ts, pTableScanInfo->currentTable, tableSz); ts, pTableScanInfo->currentTable, numOfTables);
/*}*/ /*}*/
} else { } else {
ASSERT(0); ASSERT(0);
...@@ -994,9 +999,15 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT ...@@ -994,9 +999,15 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
initQueryTableDataCondForTmq(&pTaskInfo->streamInfo.tableCond, sContext, &mtInfo); initQueryTableDataCondForTmq(&pTaskInfo->streamInfo.tableCond, sContext, &mtInfo);
pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts; pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts;
pTaskInfo->tableqinfoList.pTableList = taosArrayInit(1, sizeof(STableKeyInfo));
taosArrayPush(pTaskInfo->tableqinfoList.pTableList, &(STableKeyInfo){.uid = mtInfo.uid, .groupId = 0}); STableListInfo* pListInfo = &pTaskInfo->tableqinfoList;
tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pTaskInfo->tableqinfoList.pTableList,
pListInfo->pTableList = taosArrayInit(1, sizeof(STableKeyInfo));
taosArrayPush(pListInfo->pTableList, &(STableKeyInfo){.uid = mtInfo.uid, .groupId = 0});
STableKeyInfo* pList = taosArrayGet(pListInfo->pTableList, 0);
tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, taosArrayGetSize(pListInfo->pTableList),
&pInfo->dataReader, NULL); &pInfo->dataReader, NULL);
cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond); cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
......
...@@ -1739,8 +1739,6 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t ...@@ -1739,8 +1739,6 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void doDestroyTableList(STableListInfo* pTableqinfoList);
typedef struct SFetchRspHandleWrapper { typedef struct SFetchRspHandleWrapper {
uint32_t exchangeId; uint32_t exchangeId;
int32_t sourceIndex; int32_t sourceIndex;
...@@ -2030,7 +2028,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn ...@@ -2030,7 +2028,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
sched_yield(); sched_yield();
} }
_error: _error:
pTaskInfo->code = code; pTaskInfo->code = code;
} }
...@@ -2305,7 +2303,7 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode ...@@ -2305,7 +2303,7 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode
createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL, destroyExchangeOperatorInfo, NULL); createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL, destroyExchangeOperatorInfo, NULL);
return pOperator; return pOperator;
_error: _error:
if (pInfo != NULL) { if (pInfo != NULL) {
doDestroyExchangeOperatorInfo(pInfo); doDestroyExchangeOperatorInfo(pInfo);
} }
...@@ -3042,7 +3040,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN ...@@ -3042,7 +3040,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN
return pOperator; return pOperator;
_error: _error:
if (pInfo != NULL) { if (pInfo != NULL) {
destroyAggOperatorInfo(pInfo); destroyAggOperatorInfo(pInfo);
} }
...@@ -3258,7 +3256,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* ...@@ -3258,7 +3256,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
return pOperator; return pOperator;
_error: _error:
if (pInfo != NULL) { if (pInfo != NULL) {
destroyFillOperatorInfo(pInfo); destroyFillOperatorInfo(pInfo);
} }
...@@ -3366,62 +3364,115 @@ static void cleanupTableSchemaInfo(SSchemaInfo* pSchemaInfo) { ...@@ -3366,62 +3364,115 @@ static void cleanupTableSchemaInfo(SSchemaInfo* pSchemaInfo) {
static void cleanupStreamInfo(SStreamTaskInfo* pStreamInfo) { tDeleteSSchemaWrapper(pStreamInfo->schema); } static void cleanupStreamInfo(SStreamTaskInfo* pStreamInfo) { tDeleteSSchemaWrapper(pStreamInfo->schema); }
static int32_t orderbyGroupIdComparFn(const void* p1, const void* p2) {
STableKeyInfo* pInfo1 = (STableKeyInfo*) p1;
STableKeyInfo* pInfo2 = (STableKeyInfo*) p2;
if (pInfo1->groupId == pInfo2->groupId) {
return 0;
} else {
return pInfo1->groupId < pInfo2->groupId? -1:1;
}
}
static int32_t sortTableGroup(STableListInfo* pTableListInfo) { static int32_t sortTableGroup(STableListInfo* pTableListInfo) {
taosArrayClear(pTableListInfo->pGroupList); int32_t code = TSDB_CODE_SUCCESS;
taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn);
int32_t size = taosArrayGetSize(pTableListInfo->pTableList);
SArray* pList = taosArrayInit(4, sizeof(int32_t));
STableKeyInfo* pInfo = taosArrayGet(pTableListInfo->pTableList, 0);
uint64_t gid = pInfo->groupId;
int32_t start = 0;
taosArrayPush(pList, &start);
for(int32_t i = 1; i < size; ++i) {
pInfo = taosArrayGet(pTableListInfo->pTableList, i);
if (pInfo->groupId != gid) {
taosArrayPush(pList, &i);
gid = pInfo->groupId;
}
}
pTableListInfo->numOfOuputGroups = taosArrayGetSize(pList);
pTableListInfo->groupOffset = taosMemoryMalloc(sizeof(int32_t) * pTableListInfo->numOfOuputGroups);
memcpy(pTableListInfo->groupOffset, taosArrayGet(pList, 0), sizeof(int32_t) * pTableListInfo->numOfOuputGroups);
taosArrayDestroy(pList);
# if 0
SArray* sortSupport = taosArrayInit(16, sizeof(uint64_t)); SArray* sortSupport = taosArrayInit(16, sizeof(uint64_t));
if (sortSupport == NULL) return TSDB_CODE_OUT_OF_MEMORY; if (sortSupport == NULL) {
for (int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); i++) { return TSDB_CODE_OUT_OF_MEMORY;
}
size_t num = taosArrayGetSize(pTableListInfo->pTableList);
for (int32_t i = 0; i < num; i++) {
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i); STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
uint64_t* groupId = taosHashGet(pTableListInfo->map, &info->uid, sizeof(uint64_t)); uint64_t* groupId = taosHashGet(pTableListInfo->map, &info->uid, sizeof(uint64_t));
int32_t index = taosArraySearchIdx(sortSupport, groupId, compareUint64Val, TD_EQ); int32_t index = taosArraySearchIdx(sortSupport, groupId, compareUint64Val, TD_EQ);
if (index == -1) { if (index == -1) {
void* p = taosArraySearch(sortSupport, groupId, compareUint64Val, TD_GT); void* p = taosArraySearch(sortSupport, groupId, compareUint64Val, TD_GT);
SArray* tGroup = taosArrayInit(8, sizeof(STableKeyInfo)); SArray* tGroup = taosArrayInit(8, sizeof(STableKeyInfo));
if (tGroup == NULL) { if (tGroup == NULL) {
taosArrayDestroy(sortSupport); code = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_OUT_OF_MEMORY; goto _error;
} }
if (taosArrayPush(tGroup, info) == NULL) { if (taosArrayPush(tGroup, info) == NULL) {
qError("taos push info array error"); qError("taos push info array error");
taosArrayDestroy(sortSupport); code = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_QRY_APP_ERROR; goto _error;
} }
if (p == NULL) { if (p == NULL) {
if (taosArrayPush(sortSupport, groupId) == NULL) { if (taosArrayPush(sortSupport, groupId) == NULL) {
qError("taos push support array error"); qError("taos push support array error");
taosArrayDestroy(sortSupport); code = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_QRY_APP_ERROR; goto _error;
} }
if (taosArrayPush(pTableListInfo->pGroupList, &tGroup) == NULL) { if (taosArrayPush(pTableListInfo->pGroupList, &tGroup) == NULL) {
qError("taos push group array error"); qError("taos push group array error");
taosArrayDestroy(sortSupport); code = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_QRY_APP_ERROR; goto _error;
} }
} else { } else {
int32_t pos = TARRAY_ELEM_IDX(sortSupport, p); int32_t pos = TARRAY_ELEM_IDX(sortSupport, p);
if (taosArrayInsert(sortSupport, pos, groupId) == NULL) { if (taosArrayInsert(sortSupport, pos, groupId) == NULL) {
qError("taos insert support array error"); qError("taos insert support array error");
taosArrayDestroy(sortSupport); code = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_QRY_APP_ERROR; goto _error;
} }
if (taosArrayInsert(pTableListInfo->pGroupList, pos, &tGroup) == NULL) { if (taosArrayInsert(pTableListInfo->pGroupList, pos, &tGroup) == NULL) {
qError("taos insert group array error"); qError("taos insert group array error");
taosArrayDestroy(sortSupport); code = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_QRY_APP_ERROR; goto _error;
} }
} }
} else { } else {
SArray* tGroup = (SArray*)taosArrayGetP(pTableListInfo->pGroupList, index); SArray* tGroup = (SArray*)taosArrayGetP(pTableListInfo->pGroupList, index);
if (taosArrayPush(tGroup, info) == NULL) { if (taosArrayPush(tGroup, info) == NULL) {
qError("taos push uid array error"); qError("taos push uid array error");
taosArrayDestroy(sortSupport); code = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_QRY_APP_ERROR; goto _error;
} }
} }
} }
taosArrayDestroy(sortSupport); taosArrayDestroy(sortSupport);
#endif
return TDB_CODE_SUCCESS; return TDB_CODE_SUCCESS;
_error:
// taosArrayDestroy(sortSupport);
return code;
} }
bool groupbyTbname(SNodeList* pGroupList) { bool groupbyTbname(SNodeList* pGroupList) {
...@@ -3437,38 +3488,50 @@ bool groupbyTbname(SNodeList* pGroupList) { ...@@ -3437,38 +3488,50 @@ bool groupbyTbname(SNodeList* pGroupList) {
return bytbname; return bytbname;
} }
int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* group) { int32_t setGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* group, bool groupSort) {
if (group == NULL) { int32_t code = TSDB_CODE_SUCCESS;
return TDB_CODE_SUCCESS;
}
pTableListInfo->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); pTableListInfo->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
if (pTableListInfo->map == NULL) { if (pTableListInfo->map == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
return code;
} }
bool assignUid = groupbyTbname(group); bool groupByTbname = groupbyTbname(group);
size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList); size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList);
if (group == NULL || groupByTbname) {
if (assignUid) {
for (int32_t i = 0; i < numOfTables; i++) { for (int32_t i = 0; i < numOfTables; i++) {
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i); STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
info->groupId = info->uid; info->groupId = groupByTbname? info->uid:0;
taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &info->groupId, sizeof(uint64_t));
} }
pTableListInfo->oneTableForEachGroup = groupByTbname;
if (groupSort && groupByTbname) {
taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn);
pTableListInfo->numOfOuputGroups = numOfTables;
} else { } else {
int32_t code = getColInfoResultForGroupby(pHandle->meta, group, pTableListInfo); pTableListInfo->numOfOuputGroups = 1;
}
} else {
code = getColInfoResultForGroupby(pHandle->meta, group, pTableListInfo);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
if (groupSort) {
code = sortTableGroup(pTableListInfo);
}
} }
if (pTableListInfo->needSortTableByGroupId) { // add all table entry in the hash map
return sortTableGroup(pTableListInfo); size_t size = taosArrayGetSize(pTableListInfo->pTableList);
for(int32_t i = 0; i < size; ++i) {
STableKeyInfo* p = taosArrayGet(pTableListInfo->pTableList, i);
taosHashPut(pTableListInfo->map, &p->uid, sizeof(uint64_t), &i, sizeof(int32_t));
} }
return TDB_CODE_SUCCESS; return code;
} }
static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pCond) { static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pCond) {
...@@ -3505,6 +3568,12 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -3505,6 +3568,12 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) { if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) {
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
// NOTE: this is an patch to fix the physical plan
// TODO remove it later
if (pTableScanNode->scan.node.pLimit != NULL) {
pTableScanNode->groupSort = true;
}
int32_t code = int32_t code =
createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, pHandle, createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, pHandle,
pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo)); pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo));
...@@ -3526,7 +3595,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -3526,7 +3595,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
} else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) {
STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode; STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode;
int32_t code = int32_t code =
createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, pHandle, createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, /*pTableScanNode->groupSort*/true, pHandle,
pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo)); pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo));
if (code) { if (code) {
pTaskInfo->code = code; pTaskInfo->code = code;
...@@ -3561,9 +3630,11 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -3561,9 +3630,11 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
#ifndef NDEBUG #ifndef NDEBUG
int32_t sz = taosArrayGetSize(pTableListInfo->pTableList); int32_t sz = taosArrayGetSize(pTableListInfo->pTableList);
qDebug("create stream task, total:%d", sz);
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
STableKeyInfo* pKeyInfo = taosArrayGet(pTableListInfo->pTableList, i); STableKeyInfo* pKeyInfo = taosArrayGet(pTableListInfo->pTableList, i);
qDebug("creating stream task: add table %" PRId64, pKeyInfo->uid); qDebug("add table uid:%" PRIu64", gid:%"PRIu64, pKeyInfo->uid, pKeyInfo->groupId);
} }
#endif #endif
} }
...@@ -3586,26 +3657,40 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -3586,26 +3657,40 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
} else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) {
SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode; SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode;
pTableListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo)); pTableListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo));
pTableListInfo->numOfOuputGroups = 1;
if (pBlockNode->tableType == TSDB_SUPER_TABLE) { if (pBlockNode->tableType == TSDB_SUPER_TABLE) {
int32_t code = vnodeGetAllTableList(pHandle->vnode, pBlockNode->uid, pTableListInfo->pTableList); SArray* pList = taosArrayInit(4, sizeof(STableKeyInfo));
int32_t code = vnodeGetAllTableList(pHandle->vnode, pBlockNode->uid, pList);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = terrno; pTaskInfo->code = terrno;
return NULL; return NULL;
} }
for(int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); ++i) {
STableKeyInfo* p = taosArrayGet(pList, i);
addTableIntoTableList(pTableListInfo, p->uid, 0);
}
taosArrayDestroy(pList);
} else { // Create one table group. } else { // Create one table group.
STableKeyInfo info = {.uid = pBlockNode->uid, .groupId = 0}; addTableIntoTableList(pTableListInfo, pBlockNode->uid, 0);
taosArrayPush(pTableListInfo->pTableList, &info);
} }
SQueryTableDataCond cond = {0}; SQueryTableDataCond cond = {0};
int32_t code = initTableblockDistQueryCond(pBlockNode->suid, &cond); int32_t code = initTableblockDistQueryCond(pBlockNode->suid, &cond);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return NULL; return NULL;
} }
size_t num = getTotalTables(pTableListInfo);
void* pList = NULL;
if (num > 0) {
pList = taosArrayGet(pTableListInfo->pTableList, 0);
}
STsdbReader* pReader = NULL; STsdbReader* pReader = NULL;
tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo->pTableList, &pReader, ""); tsdbReaderOpen(pHandle->vnode, &cond, pList, num, &pReader, "");
cleanupQueryTableDataCond(&cond); cleanupQueryTableDataCond(&cond);
pOperator = createDataBlockInfoScanOperator(pReader, pHandle, cond.suid, pBlockNode, pTaskInfo); pOperator = createDataBlockInfoScanOperator(pReader, pHandle, cond.suid, pBlockNode, pTaskInfo);
...@@ -3640,6 +3725,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -3640,6 +3725,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
} }
size_t size = LIST_LENGTH(pPhyNode->pChildren); size_t size = LIST_LENGTH(pPhyNode->pChildren);
SOperatorInfo** ops = taosMemoryCalloc(size, POINTER_BYTES); SOperatorInfo** ops = taosMemoryCalloc(size, POINTER_BYTES);
for (int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i); SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i);
...@@ -3848,7 +3934,7 @@ int32_t encodeOperator(SOperatorInfo* ops, char** result, int32_t* length, int32 ...@@ -3848,7 +3934,7 @@ int32_t encodeOperator(SOperatorInfo* ops, char** result, int32_t* length, int32
*length = *(int32_t*)(*result); *length = *(int32_t*)(*result);
} }
_downstream: _downstream:
for (int32_t i = 0; i < ops->numOfDownstream; ++i) { for (int32_t i = 0; i < ops->numOfDownstream; ++i) {
code = encodeOperator(ops->pDownstream[i], result, length, nOptrWithVal); code = encodeOperator(ops->pDownstream[i], result, length, nOptrWithVal);
if (code != TDB_CODE_SUCCESS) { if (code != TDB_CODE_SUCCESS) {
...@@ -3968,35 +4054,17 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead ...@@ -3968,35 +4054,17 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
return code; return code;
_complete: _complete:
taosMemoryFree(sql); taosMemoryFree(sql);
doDestroyTask(*pTaskInfo); doDestroyTask(*pTaskInfo);
terrno = code; terrno = code;
return code; return code;
} }
void doDestroyTableList(STableListInfo* pTableqinfoList) {
taosArrayDestroy(pTableqinfoList->pTableList);
taosHashCleanup(pTableqinfoList->map);
if (pTableqinfoList->needSortTableByGroupId) {
for (int32_t i = 0; i < taosArrayGetSize(pTableqinfoList->pGroupList); i++) {
SArray* tmp = taosArrayGetP(pTableqinfoList->pGroupList, i);
if (tmp == pTableqinfoList->pTableList) {
continue;
}
taosArrayDestroy(tmp);
}
}
taosArrayDestroy(pTableqinfoList->pGroupList);
pTableqinfoList->pTableList = NULL;
pTableqinfoList->map = NULL;
}
void doDestroyTask(SExecTaskInfo* pTaskInfo) { void doDestroyTask(SExecTaskInfo* pTaskInfo) {
qDebug("%s execTask is freed", GET_TASKID(pTaskInfo)); qDebug("%s execTask is freed", GET_TASKID(pTaskInfo));
doDestroyTableList(&pTaskInfo->tableqinfoList); destroyTableList(&pTaskInfo->tableqinfoList);
destroyOperatorInfo(pTaskInfo->pRoot); destroyOperatorInfo(pTaskInfo->pRoot);
cleanupTableSchemaInfo(&pTaskInfo->schemaInfo); cleanupTableSchemaInfo(&pTaskInfo->schemaInfo);
cleanupStreamInfo(&pTaskInfo->streamInfo); cleanupStreamInfo(&pTaskInfo->streamInfo);
......
...@@ -5384,6 +5384,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { ...@@ -5384,6 +5384,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
maxTs = TMAX(maxTs, pBlock->info.window.ekey); maxTs = TMAX(maxTs, pBlock->info.window.ekey);
minTs = TMIN(minTs, pBlock->info.window.skey); minTs = TMIN(minTs, pBlock->info.window.skey);
doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.groupId, pUpdatedMap); doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.groupId, pUpdatedMap);
} }
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
......
...@@ -46,7 +46,7 @@ class TDTestCase: ...@@ -46,7 +46,7 @@ class TDTestCase:
def init(self, conn, logSql): def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}") tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), False) tdSql.init(conn.cursor(), True)
@property @property
def create_databases_sql_err(self): def create_databases_sql_err(self):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册