提交 9b99c697 编写于 作者: H Haojun Liao

fix(query): support partition by + limit push down.

上级 0035bb19
......@@ -164,14 +164,6 @@ typedef enum EStreamType {
STREAM_FILL_OVER,
} EStreamType;
typedef struct {
SArray* pGroupList;
SArray* pTableList;
SHashObj* map; // speedup acquire the tableQueryInfo by table uid
bool needSortTableByGroupId;
uint64_t suid;
} STableListInfo;
#pragma pack(push, 1)
typedef struct SColumnDataAgg {
int16_t colId;
......
......@@ -152,13 +152,13 @@ typedef struct STsdbReader STsdbReader;
#define CACHESCAN_RETRIEVE_LAST_ROW 0x4
#define CACHESCAN_RETRIEVE_LAST 0x8
int32_t tsdbSetTableId(STsdbReader *pReader, int64_t uid);
int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, SArray *pTableList, STsdbReader **ppReader,
const char *idstr);
int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t num);
int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables,
STsdbReader **ppReader, const char *idstr);
void tsdbReaderClose(STsdbReader *pReader);
bool tsdbNextDataBlock(STsdbReader *pReader);
bool tsdbTableNextDataBlock(STsdbReader *pReader, int64_t uid);
bool tsdbTableNextDataBlock(STsdbReader *pReader, uint64_t uid);
void tsdbRetrieveDataBlockInfo(STsdbReader *pReader, SDataBlockInfo *pDataBlockInfo);
int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SColumnDataAgg ***pBlockStatis, bool *allHave);
SArray *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList);
......@@ -170,8 +170,8 @@ void *tsdbGetIdx(SMeta *pMeta);
void *tsdbGetIvtIdx(SMeta *pMeta);
uint64_t getReaderMaxVersion(STsdbReader *pReader);
int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, SArray *pTableIdList, int32_t numOfCols, uint64_t suid,
void **pReader);
int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, int32_t numOfTables,
int32_t numOfCols, uint64_t suid, void** pReader);
int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, SArray *pTableUids);
void *tsdbCacherowsReaderClose(void *pReader);
int32_t tsdbGetTableSchema(SVnode *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid);
......
......@@ -716,7 +716,9 @@ typedef struct SCacheRowsReader {
int32_t numOfCols;
int32_t type;
int32_t tableIndex; // currently returned result tables
SArray *pTableList; // table id list
STableKeyInfo *pTableList; // table id list
int32_t numOfTables;
SSttBlockLoadInfo *pLoadInfo;
STsdbReadSnap *pReadSnap;
SDataFReader *pDataFReader;
......
......@@ -162,10 +162,7 @@ int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSub
int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock,
SSubmitBlkRsp* pRsp);
int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey);
STsdbReader tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
void* pMemRef);
int32_t tsdbSetKeepCfg(STsdb* pTsdb, STsdbCfg* pCfg);
int32_t tsdbGetStbIdList(SMeta* pMeta, int64_t suid, SArray* list);
// tq
int tqInit();
......
......@@ -671,7 +671,6 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma
while (1) {
uint64_t ts;
bool hasMore = false;
// ASSERT(0);
int32_t code = qExecTaskOpt(taskInfo, pResList, &ts, &hasMore, NULL);
if (code < 0) {
if (code == TSDB_CODE_QRY_IN_EXEC) {
......
......@@ -97,10 +97,9 @@ static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pRea
}
}
int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList, int32_t numOfCols, uint64_t suid,
void** pReader) {
int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, int32_t numOfTables, int32_t numOfCols,
uint64_t suid, void** pReader) {
*pReader = NULL;
SCacheRowsReader* p = taosMemoryCalloc(1, sizeof(SCacheRowsReader));
if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
......@@ -111,14 +110,15 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList
p->numOfCols = numOfCols;
p->suid = suid;
if (taosArrayGetSize(pTableIdList) == 0) {
if (numOfTables == 0) {
*pReader = p;
return TSDB_CODE_SUCCESS;
}
STableKeyInfo* pKeyInfo = taosArrayGet(pTableIdList, 0);
STableKeyInfo* pKeyInfo = &((STableKeyInfo*)pTableIdList)[0];
p->pSchema = metaGetTbTSchema(p->pVnode->pMeta, pKeyInfo->uid, -1, 1);
p->pTableList = pTableIdList;
p->numOfTables = numOfTables;
p->transferBuf = taosMemoryCalloc(p->pSchema->numOfCols, POINTER_BYTES);
if (p->transferBuf == NULL) {
......@@ -205,7 +205,6 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
SLRUCache* lruCache = pr->pVnode->pTsdb->lruCache;
LRUHandle* h = NULL;
SArray* pRow = NULL;
size_t numOfTables = taosArrayGetSize(pr->pTableList);
bool hasRes = false;
SArray* pLastCols = NULL;
......@@ -243,8 +242,8 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
// retrieve the only one last row of all tables in the uid list.
if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_SINGLE)) {
for (int32_t i = 0; i < numOfTables; ++i) {
STableKeyInfo* pKeyInfo = taosArrayGet(pr->pTableList, i);
for (int32_t i = 0; i < pr->numOfTables; ++i) {
STableKeyInfo* pKeyInfo = &pr->pTableList[i];
code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h);
if (code != TSDB_CODE_SUCCESS) {
......@@ -308,8 +307,8 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
}
} else if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_ALL)) {
for (int32_t i = pr->tableIndex; i < numOfTables; ++i) {
STableKeyInfo* pKeyInfo = (STableKeyInfo*)taosArrayGet(pr->pTableList, i);
for (int32_t i = pr->tableIndex; i < pr->numOfTables; ++i) {
STableKeyInfo* pKeyInfo = &pr->pTableList[i];
code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h);
if (code != TSDB_CODE_SUCCESS) {
return code;
......@@ -334,7 +333,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
code = TSDB_CODE_INVALID_PARA;
}
_end:
_end:
tsdbDataFReaderClose(&pr->pDataFReaderLast);
tsdbDataFReaderClose(&pr->pDataFReader);
......
......@@ -270,24 +270,27 @@ static void resetDataBlockScanInfo(SHashObj* pTableMap, int64_t ts) {
}
}
static void destroyBlockScanInfo(SHashObj* pTableMap) {
STableBlockScanInfo* p = NULL;
static void clearBlockScanInfo(STableBlockScanInfo* p) {
p->iterInit = false;
p->iiter.hasVal = false;
while ((p = taosHashIterate(pTableMap, p)) != NULL) {
p->iterInit = false;
p->iiter.hasVal = false;
if (p->iter.iter != NULL) {
p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter);
}
if (p->iter.iter != NULL) {
p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter);
}
if (p->iiter.iter != NULL) {
p->iiter.iter = tsdbTbDataIterDestroy(p->iiter.iter);
}
if (p->iiter.iter != NULL) {
p->iiter.iter = tsdbTbDataIterDestroy(p->iiter.iter);
}
p->delSkyline = taosArrayDestroy(p->delSkyline);
p->pBlockList = taosArrayDestroy(p->pBlockList);
tMapDataClear(&p->mapData);
}
p->delSkyline = taosArrayDestroy(p->delSkyline);
p->pBlockList = taosArrayDestroy(p->pBlockList);
tMapDataClear(&p->mapData);
static void destroyBlockScanInfo(SHashObj* pTableMap) {
STableBlockScanInfo* p = NULL;
while ((p = taosHashIterate(pTableMap, p)) != NULL) {
clearBlockScanInfo(p);
}
taosHashCleanup(pTableMap);
......@@ -423,7 +426,7 @@ static bool filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader) {
return true;
}
_err:
_err:
return false;
}
......@@ -525,7 +528,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
*ppReader = pReader;
return code;
_end:
_end:
tsdbReaderClose(pReader);
*ppReader = NULL;
return code;
......@@ -577,7 +580,7 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader,
pReader->cost.headFileLoadTime += (et1 - st) / 1000.0;
_end:
_end:
taosArrayDestroy(aBlockIdx);
return code;
}
......@@ -951,7 +954,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
int32_t unDumpedRows = asc ? pBlock->nRow - pDumpInfo->rowIndex : pDumpInfo->rowIndex + 1;
tsdbDebug("%p copy file block to sdatablock, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, remain,
unDumpedRows, pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr);
......@@ -978,7 +981,7 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI
code = tsdbReadDataBlock(pReader->pFileReader, pBlock, pBlockData);
if (code != TSDB_CODE_SUCCESS) {
tsdbError("%p error occurs in loading file block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
", rows:%d, code:%s %s",
", rows:%d, code:%s %s",
pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
tstrerror(code), pReader->idStr);
return code;
......@@ -987,7 +990,7 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI
double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
tsdbDebug("%p load file block into buffer, global index:%d, index in table block list:%d, brange:%" PRId64 "-%" PRId64
", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr);
......@@ -1402,8 +1405,8 @@ static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pBlock
// log the reason why load the datablock for profile
if (loadDataBlock) {
tsdbDebug("%p uid:%" PRIu64
" need to load the datablock, overlapwithneighborblock:%d, hasDup:%d, partiallyRequired:%d, "
"overlapWithKey:%d, greaterThanBuf:%d, overlapWithDel:%d, overlapWithlastBlock:%d, %s",
" need to load the datablock, overlapwithneighborblock:%d, hasDup:%d, partiallyRequired:%d, "
"overlapWithKey:%d, greaterThanBuf:%d, overlapWithDel:%d, overlapWithlastBlock:%d, %s",
pReader, pBlockInfo->uid, info.overlapWithNeighborBlock, info.hasDupTs, info.partiallyRequired,
info.overlapWithKeyInBuf, info.moreThanCapcity, info.overlapWithDelInfo, info.overlapWithLastBlock,
pReader->idStr);
......@@ -1438,7 +1441,7 @@ static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo*
double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
tsdbDebug("%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%d, brange:%" PRId64
" - %" PRId64 " %s",
" - %" PRId64 " %s",
pReader, elapsedTime, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey,
pReader->idStr);
......@@ -2036,7 +2039,7 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea
pBlockScanInfo->iter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iter.iter) != NULL);
tsdbDebug("%p uid:%" PRId64 ", check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
"-%" PRId64 " %s",
"-%" PRId64 " %s",
pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, d->minKey, d->maxKey, pReader->idStr);
} else {
tsdbError("%p uid:%" PRId64 ", failed to create iterator for imem, code:%s, %s", pReader, pBlockScanInfo->uid,
......@@ -2057,7 +2060,7 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea
pBlockScanInfo->iiter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iiter.iter) != NULL);
tsdbDebug("%p uid:%" PRId64 ", check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
"-%" PRId64 " %s",
"-%" PRId64 " %s",
pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, di->minKey, di->maxKey, pReader->idStr);
} else {
tsdbError("%p uid:%" PRId64 ", failed to create iterator for mem, code:%s, %s", pReader, pBlockScanInfo->uid,
......@@ -2292,7 +2295,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
}
}
_end:
_end:
pResBlock->info.uid = (pBlockScanInfo != NULL) ? pBlockScanInfo->uid : 0;
blockDataUpdateTsWindow(pResBlock, 0);
......@@ -2304,7 +2307,7 @@ _end:
if (pResBlock->info.rows > 0) {
tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64
" rows:%d, elapsed time:%.2f ms %s",
" rows:%d, elapsed time:%.2f ms %s",
pReader, pResBlock->info.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
pResBlock->info.rows, el, pReader->idStr);
}
......@@ -2385,13 +2388,13 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader*
taosArrayDestroy(pDelData);
pBlockScanInfo->iter.index =
ASCENDING_TRAVERSE(pReader->order) ? 0 : taosArrayGetSize(pBlockScanInfo->delSkyline) - 1;
ASCENDING_TRAVERSE(pReader->order) ? 0 : taosArrayGetSize(pBlockScanInfo->delSkyline) - 1;
pBlockScanInfo->iiter.index = pBlockScanInfo->iter.index;
pBlockScanInfo->fileDelIndex = pBlockScanInfo->iter.index;
pBlockScanInfo->lastBlockDelIndex = pBlockScanInfo->iter.index;
return code;
_err:
_err:
taosArrayDestroy(pDelData);
return code;
}
......@@ -2722,7 +2725,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
SDataBlockIter* pBlockIter = &pReader->status.blockIter;
if (pBlockIter->numOfBlocks == 0) {
_begin:
_begin:
code = doLoadLastBlockSequentially(pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
......@@ -2811,8 +2814,8 @@ static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* ret
int8_t precision = pVnode->config.tsdbCfg.precision;
int64_t now = taosGetTimestamp(precision);
int64_t offset = tsQueryRsmaTolerance * ((precision == TSDB_TIME_PRECISION_MILLI) ? 1L
: (precision == TSDB_TIME_PRECISION_MICRO) ? 1000L
: 1000000L);
: (precision == TSDB_TIME_PRECISION_MICRO) ? 1000L
: 1000000L);
for (int8_t i = 0; i < TSDB_RETENTION_MAX; ++i) {
SRetention* pRetention = retentions + level;
......@@ -3452,13 +3455,23 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e
return TSDB_CODE_SUCCESS;
}
// todo refactor, use arraylist instead
int32_t tsdbSetTableId(STsdbReader* pReader, int64_t uid) {
// TODO refactor: with createDataBlockScanInfo
int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t num) {
ASSERT(pReader != NULL);
STableBlockScanInfo* p = NULL;
while ((p = taosHashIterate(pReader->status.pTableMap, p)) != NULL) {
clearBlockScanInfo(p);
}
taosHashClear(pReader->status.pTableMap);
STableBlockScanInfo info = {.lastKey = 0, .uid = uid};
taosHashPut(pReader->status.pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info));
STableKeyInfo* pList = (STableKeyInfo*) pTableList;
for(int32_t i = 0; i < num; ++i) {
STableBlockScanInfo info = {.lastKey = 0, .uid = pList[i].uid};
taosHashPut(pReader->status.pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info));
}
return TDB_CODE_SUCCESS;
}
......@@ -3494,8 +3507,8 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) {
}
// ====================================== EXPOSED APIs ======================================
int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTableList, STsdbReader** ppReader,
const char* idstr) {
int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables,
STsdbReader** ppReader, const char* idstr) {
STimeWindow window = pCond->twindows;
if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) {
pCond->twindows.skey += 1;
......@@ -3554,8 +3567,8 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
if (pReader->pSchema == NULL) {
tsdbError("failed to get table schema, suid:%" PRIu64 ", ver:-1, %s", pReader->suid, pReader->idStr);
}
} else if (taosArrayGetSize(pTableList) > 0) {
STableKeyInfo* pKey = taosArrayGet(pTableList, 0);
} else if (numOfTables > 0) {
STableKeyInfo* pKey = pTableList;
pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pKey->uid, -1, 1);
if (pReader->pSchema == NULL) {
tsdbError("failed to get table schema, uid:%" PRIu64 ", ver:-1, %s", pKey->uid, pReader->idStr);
......@@ -3564,8 +3577,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
STsdbReader* p = pReader->innerReader[0] != NULL ? pReader->innerReader[0] : pReader;
int32_t numOfTables = taosArrayGetSize(pTableList);
pReader->status.pTableMap = createDataBlockScanInfo(p, pTableList->pData, numOfTables);
pReader->status.pTableMap = createDataBlockScanInfo(p, pTableList, numOfTables);
if (pReader->status.pTableMap == NULL) {
tsdbReaderClose(pReader);
*ppReader = NULL;
......@@ -3622,7 +3634,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr);
return code;
_err:
_err:
tsdbError("failed to create data reader, code:%s %s", tstrerror(code), idstr);
return code;
}
......@@ -3775,7 +3787,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
return false;
}
bool tsdbTableNextDataBlock(STsdbReader* pReader, int64_t uid) {
bool tsdbTableNextDataBlock(STsdbReader* pReader, uint64_t uid) {
STableBlockScanInfo* pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &uid, sizeof(uid));
if (pBlockScanInfo == NULL) { // no data block for the table of given uid
return false;
......@@ -3964,7 +3976,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
}
tsdbDebug("%p reset reader, suid:%" PRIu64 ", numOfTables:%d, skey:%" PRId64 ", query range:%" PRId64 " - %" PRId64
" in query %s",
" in query %s",
pReader, pReader->suid, numOfTables, pCond->twindows.skey, pReader->window.skey, pReader->window.ekey,
pReader->idStr);
......@@ -4155,7 +4167,7 @@ int32_t tsdbTakeReadSnap(STsdb* pTsdb, STsdbReadSnap** ppSnap, const char* idStr
}
tsdbTrace("vgId:%d, take read snapshot, %s", TD_VID(pTsdb->pVnode), idStr);
_exit:
_exit:
return code;
}
......@@ -4174,4 +4186,3 @@ void tsdbUntakeReadSnap(STsdb* pTsdb, STsdbReadSnap* pSnap, const char* idStr) {
}
tsdbTrace("vgId:%d, untake read snapshot, %s", TD_VID(pTsdb->pVnode), idStr);
}
bool tsdbIsAscendingOrder(STsdbReader* pReader) { return ASCENDING_TRAVERSE(pReader->order); }
......@@ -28,7 +28,7 @@
do { \
ASSERT((_c) != -1); \
longjmp((_obj), (_c)); \
} while (0);
} while (0)
#define SET_RES_WINDOW_KEY(_k, _ori, _len, _uid) \
do { \
......@@ -95,6 +95,25 @@ typedef struct SColMatchInfo {
int32_t matchType; // determinate the source according to col id or slot id
} SColMatchInfo;
// If the numOfOutputGroups is 1, the data blocks that belongs to different groups will be provided randomly
// The numOfOutputGroups is specified by physical plan. and will not be affect by numOfGroups
typedef struct STableListInfo {
bool oneTableForEachGroup;
int32_t numOfOuputGroups; // the data block will be generated one by one
int32_t* groupOffset; // keep the offset value for each group in the tableList
SArray* pTableList;
SHashObj* map; // speedup acquire the tableQueryInfo by table uid
uint64_t suid;
} STableListInfo;
void destroyTableList(STableListInfo* pTableList);
int32_t getNumOfOutputGroups(const STableListInfo* pTableList);
bool oneTableForEachGroup(const STableListInfo* pTableList);
uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid);
int32_t addTableIntoTableList(STableListInfo* pTableList, uint64_t uid, uint64_t gid);
int32_t getTablesOfGroup(const STableListInfo* pTableList, int32_t ordinalIndex, STableKeyInfo** pKeyInfo, int32_t* num);
uint64_t getTotalTables(const STableListInfo* pTableList);
struct SqlFunctionCtx;
size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput);
......
......@@ -1077,7 +1077,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex);
bool groupbyTbname(SNodeList* pGroupList);
int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* groupKey);
int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* group, bool groupSort);
void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput);
int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup,
SGroupResInfo* pGroupResInfo);
......
......@@ -48,6 +48,10 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
int32_t numOfCols = 0;
code =
extractColMatchInfo(pScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
removeRedundantTsCol(pScanNode, &pInfo->matchInfo);
code = extractCacheScanSlotId(pInfo->matchInfo.pList, pTaskInfo, &pInfo->pSlotIds);
......@@ -61,11 +65,15 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
pInfo->pUidList = taosArrayInit(4, sizeof(int64_t));
// partition by tbname
if (taosArrayGetSize(pTableList->pGroupList) == taosArrayGetSize(pTableList->pTableList)) {
// partition by tbname, todo opt perf
if (oneTableForEachGroup(pTableList) || (getTotalTables(pTableList) == 1)) {
pInfo->retrieveType =
CACHESCAN_RETRIEVE_TYPE_ALL | (pScanNode->ignoreNull ? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW);
code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pTableList->pTableList,
STableKeyInfo* pList = taosArrayGet(pTableList->pTableList, 0);
size_t num = taosArrayGetSize(pTableList->pTableList);
code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, num,
taosArrayGetSize(pInfo->matchInfo.pList), pTableList->suid, &pInfo->pLastrowReader);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
......@@ -98,7 +106,7 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
pOperator->cost.openCost = 0;
return pOperator;
_error:
_error:
pTaskInfo->code = code;
destroyLastrowScanOperator(pInfo);
taosMemoryFree(pOperator);
......@@ -167,16 +175,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
}
}
if (pTableList->map != NULL) {
int64_t* groupId = taosHashGet(pTableList->map, &pInfo->pRes->info.uid, sizeof(int64_t));
if (groupId != NULL) {
pInfo->pRes->info.groupId = *groupId;
}
} else {
ASSERT(taosArrayGetSize(pTableList->pTableList) == 1);
STableKeyInfo* pKeyInfo = taosArrayGet(pTableList->pTableList, 0);
pInfo->pRes->info.groupId = pKeyInfo->groupId;
}
pInfo->pRes->info.groupId = getTableGroupId(pTableList, pInfo->pRes->info.uid);
pInfo->indexOfBufferedRes += 1;
return pInfo->pRes;
......@@ -185,18 +184,25 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
return NULL;
}
} else {
size_t totalGroups = taosArrayGetSize(pTableList->pGroupList);
size_t totalGroups = getNumOfOutputGroups(pTableList);
while (pInfo->currentGroupIndex < totalGroups) {
SArray* pGroupTableList = taosArrayGetP(pTableList->pGroupList, pInfo->currentGroupIndex);
tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pGroupTableList,
STableKeyInfo* pList = NULL;
int32_t num = 0;
int32_t code = getTablesOfGroup(pTableList, pInfo->currentGroupIndex, &pList, &num);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, num,
taosArrayGetSize(pInfo->matchInfo.pList), pTableList->suid, &pInfo->pLastrowReader);
taosArrayClear(pInfo->pUidList);
int32_t code = tsdbRetrieveCacheRows(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pUidList);
code = tsdbRetrieveCacheRows(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pUidList);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
T_LONG_JMP(pTaskInfo->env, code);
}
pInfo->currentGroupIndex += 1;
......@@ -206,7 +212,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
if (pInfo->pseudoExprSup.numOfExprs > 0) {
SExprSupp* pSup = &pInfo->pseudoExprSup;
STableKeyInfo* pKeyInfo = taosArrayGet(pGroupTableList, 0);
STableKeyInfo* pKeyInfo = &((STableKeyInfo*)pTableList)[0];
pInfo->pRes->info.groupId = pKeyInfo->groupId;
if (taosArrayGetSize(pInfo->pUidList) > 0) {
......
......@@ -337,8 +337,8 @@ static EDealRes getColumn(SNode** pNode, void* pContext) {
taosHashPut(pData->colHash, &pSColumnNode->colId, sizeof(pSColumnNode->colId), pNode, sizeof((*pNode)));
pSColumnNode->slotId = pData->index++;
SColumnInfo cInfo = {.colId = pSColumnNode->colId,
.type = pSColumnNode->node.resType.type,
.bytes = pSColumnNode->node.resType.bytes};
.type = pSColumnNode->node.resType.type,
.bytes = pSColumnNode->node.resType.bytes};
#if TAG_FILTER_DEBUG
qDebug("tagfilter build column info, slotId:%d, colId:%d, type:%d", pSColumnNode->slotId, cInfo.colId, cInfo.type);
#endif
......@@ -507,7 +507,7 @@ static SColumnInfoData* getColInfoResult(void* metaHandle, int64_t suid, SArray*
// int64_t st2 = taosGetTimestampUs();
// qDebug("calculate tag block rows:%d, cost:%ld us", rows, st2-st1);
end:
end:
taosHashCleanup(tags);
taosHashCleanup(ctx.colHash);
taosArrayDestroy(ctx.cInfoList);
......@@ -544,6 +544,7 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
ctx.index = 0;
ctx.cInfoList = taosArrayInit(4, sizeof(SColumnInfo));
if (ctx.cInfoList == NULL) {
......@@ -606,6 +607,7 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis
} else {
void* tag = taosHashGet(tags, uid, sizeof(int64_t));
ASSERT(tag);
STagVal tagVal = {0};
tagVal.cid = pColInfo->info.colId;
const char* p = metaGetTableTagVal(tag, pColInfo->info.type, &tagVal);
......@@ -636,6 +638,7 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis
}
}
}
pResBlock->info.rows = rows;
// int64_t st1 = taosGetTimestampUs();
......@@ -661,10 +664,12 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis
}
break;
}
default:
code = TSDB_CODE_OPS_NOT_SUPPORT;
goto end;
}
if (nodeType(pNode) == QUERY_NODE_COLUMN) {
SColumnNode* pSColumnNode = (SColumnNode*)pNode;
SColumnInfoData* pColInfo = (SColumnInfoData*)taosArrayGet(pResBlock->pDataBlock, pSColumnNode->slotId);
......@@ -674,10 +679,12 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis
} else {
code = scalarCalculate(pNode, pBlockList, &output);
}
if (code != TSDB_CODE_SUCCESS) {
releaseColInfoData(output.columnData);
goto end;
}
taosArrayPush(groupData, &output.columnData);
}
......@@ -696,6 +703,7 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
for (int i = 0; i < rows; i++) {
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
......@@ -739,7 +747,7 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis
// int64_t st2 = taosGetTimestampUs();
// qDebug("calculate tag block rows:%d, cost:%ld us", rows, st2-st1);
end:
end:
taosMemoryFreeClear(keyBuf);
taosHashCleanup(tags);
taosHashCleanup(ctx.colHash);
......@@ -817,38 +825,86 @@ static int32_t removeInvalidTable(SArray* uids, SHashObj* tags) {
taosArrayDestroy(validUid);
return 0;
}
static int32_t nameComparFn(const void* p1, const void* p2) {
const char* pName1 = *(const char**) p1;
const char* pName2 = *(const char**) p2;
int32_t ret = strcmp(pName1, pName2);
if (ret == 0) {
return 0;
} else {
return (ret > 0)? 1:-1;
}
}
static SArray* getTableNameList(const SNodeListNode* pList) {
int32_t len = LIST_LENGTH(pList->pNodeList);
SListCell* cell = pList->pNodeList->pHead;
SArray* pTbList = taosArrayInit(len, POINTER_BYTES);
for (int i = 0; i < pList->pNodeList->length; i++) {
SValueNode* valueNode = (SValueNode*) cell->pNode;
if (!IS_VAR_DATA_TYPE(valueNode->node.resType.type)) {
terrno = TSDB_CODE_INVALID_PARA;
taosArrayDestroy(pTbList);
return NULL;
}
char* name = varDataVal(valueNode->datum.p);
taosArrayPush(pTbList, &name);
cell = cell->pNext;
}
size_t numOfTables = taosArrayGetSize(pTbList);
// order the name
taosArraySort(pTbList, nameComparFn);
// remove the duplicates
SArray* pNewList = taosArrayInit(taosArrayGetSize(pTbList), sizeof(void*));
taosArrayPush(pNewList, taosArrayGet(pTbList, 0));
for (int32_t i = 1; i < numOfTables; ++i) {
char** name = taosArrayGetLast(pNewList);
char** nameInOldList = taosArrayGet(pTbList, i);
if (strcmp(*name, *nameInOldList) == 0) {
continue;
}
taosArrayPush(pNewList, nameInOldList);
}
taosArrayDestroy(pTbList);
return pNewList;
}
static int32_t optimizeTbnameInCondImpl(void* metaHandle, int64_t suid, SArray* list, SNode* pTagCond) {
if (nodeType(pTagCond) != QUERY_NODE_OPERATOR) {
return -1;
}
SOperatorNode* pNode = (SOperatorNode*)pTagCond;
if (pNode->opType != OP_TYPE_IN) {
return -1;
}
if ((pNode->pLeft != NULL && nodeType(pNode->pLeft) == QUERY_NODE_COLUMN &&
((SColumnNode*)pNode->pLeft)->colType == COLUMN_TYPE_TBNAME) &&
(pNode->pRight != NULL && nodeType(pNode->pRight) == QUERY_NODE_NODE_LIST)) {
SNodeListNode* pList = (SNodeListNode*)pNode->pRight;
int32_t len = LIST_LENGTH(pList->pNodeList);
if (len <= 0) return -1;
if (len <= 0) {
return -1;
}
SListCell* cell = pList->pNodeList->pHead;
SArray* pTbList = getTableNameList(pList);
int32_t numOfTables = taosArrayGetSize(pTbList);
SArray* pTbList = taosArrayInit(len, sizeof(void*));
for (int i = 0; i < pList->pNodeList->length; i++) {
SValueNode* valueNode = (SValueNode*)cell->pNode;
if (!IS_VAR_DATA_TYPE(valueNode->node.resType.type)) {
taosArrayDestroy(pTbList);
return -1;
}
char* name = varDataVal(valueNode->datum.p);
taosArrayPush(pTbList, &name);
cell = cell->pNext;
}
for (int i = 0; i < numOfTables; i++) {
char* name = taosArrayGetP(pTbList, i);
for (int i = 0; i < taosArrayGetSize(pTbList); i++) {
char* name = taosArrayGetP(pTbList, i);
uint64_t uid = 0;
if (metaGetTableUidByName(metaHandle, name, &uid) == 0) {
ETableType tbType = TSDB_TABLE_MAX;
......@@ -863,11 +919,14 @@ static int32_t optimizeTbnameInCondImpl(void* metaHandle, int64_t suid, SArray*
terrno = 0;
}
}
taosArrayDestroy(pTbList);
return 0;
}
return -1;
}
int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond,
STableListInfo* pListInfo) {
int32_t code = TSDB_CODE_SUCCESS;
......@@ -946,14 +1005,6 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode,
}
taosArrayDestroy(res);
pListInfo->pGroupList = taosArrayInit(4, POINTER_BYTES);
if (pListInfo->pGroupList == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
// put into list as default group, remove it if grouping sorting is required later
taosArrayPush(pListInfo->pGroupList, &pListInfo->pTableList);
return code;
}
......@@ -1686,4 +1737,4 @@ void destroyTableList(STableListInfo* pTableqinfoList) {
pTableqinfoList->pTableList = NULL;
pTableqinfoList->map = NULL;
}
}
\ No newline at end of file
......@@ -293,9 +293,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
qDebug("add %d tables id into query list, %s", (int32_t)taosArrayGetSize(tableIdList), pTaskInfo->id.str);
}
if (pListInfo->map == NULL) {
pListInfo->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
}
// traverse to the stream scanner node to add this table id
SOperatorInfo* pInfo = pTaskInfo->pRoot;
......@@ -307,8 +305,10 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
SStreamScanInfo* pScanInfo = pInfo->info;
if (isAdd) { // add new table id
SArray* qa = filterUnqualifiedTables(pScanInfo, tableIdList, GET_TASKID(pTaskInfo));
int32_t numOfQualifiedTables = taosArrayGetSize(qa);
qDebug(" %d qualified child tables added into stream scanner", numOfQualifiedTables);
qDebug(" %d qualified child tables added into stream scanner", (int32_t)taosArrayGetSize(qa));
code = tqReaderAddTbUidList(pScanInfo->tqReader, qa);
if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(qa);
......@@ -328,7 +328,9 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
}
}
for (int32_t i = 0; i < taosArrayGetSize(qa); ++i) {
STableListInfo* pTableListInfo = &pTaskInfo->tableqinfoList;
for (int32_t i = 0; i < numOfQualifiedTables; ++i) {
uint64_t* uid = taosArrayGet(qa, i);
STableKeyInfo keyInfo = {.uid = *uid, .groupId = 0};
......@@ -358,8 +360,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
if (!exists) {
#endif
taosArrayPush(pTaskInfo->tableqinfoList.pTableList, &keyInfo);
taosHashPut(pTaskInfo->tableqinfoList.map, uid, sizeof(*uid), &keyInfo.groupId, sizeof(keyInfo.groupId));
addTableIntoTableList(pTableListInfo, keyInfo.uid, keyInfo.groupId);
}
if (keyBuf != NULL) {
......@@ -439,7 +440,7 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
qDebug("subplan task create completed, TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, pSubplan->id.queryId);
_error:
_error:
// if failed to add ref for all tables in this query, abort current query
return code;
}
......@@ -935,7 +936,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
/*if (pTaskInfo->streamInfo.lastStatus.type != TMQ_OFFSET__SNAPSHOT_DATA ||*/
/*pTaskInfo->streamInfo.lastStatus.uid != uid || pTaskInfo->streamInfo.lastStatus.ts != ts) {*/
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
int32_t tableSz = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList);
int32_t numOfTables = getTotalTables(&pTaskInfo->tableqinfoList);
#ifndef NDEBUG
qDebug("switch to next table %" PRId64 " (cursor %d), %" PRId64 " rows returned", uid,
......@@ -944,7 +945,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
#endif
bool found = false;
for (int32_t i = 0; i < tableSz; i++) {
for (int32_t i = 0; i < numOfTables; i++) {
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, i);
if (pTableInfo->uid == uid) {
found = true;
......@@ -957,14 +958,17 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
ASSERT(found);
if (pTableScanInfo->dataReader == NULL) {
if (tsdbReaderOpen(pTableScanInfo->readHandle.vnode, &pTableScanInfo->cond,
pTaskInfo->tableqinfoList.pTableList, &pTableScanInfo->dataReader, NULL) < 0 ||
pTableScanInfo->dataReader == NULL) {
STableKeyInfo* pList = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, 0);
int32_t num = getTotalTables(&pTaskInfo->tableqinfoList);
if (tsdbReaderOpen(pTableScanInfo->readHandle.vnode, &pTableScanInfo->cond, pList, num,
&pTableScanInfo->dataReader, NULL) < 0 || pTableScanInfo->dataReader == NULL) {
ASSERT(0);
}
}
tsdbSetTableId(pTableScanInfo->dataReader, uid);
STableKeyInfo tki = {.uid = uid};
tsdbSetTableList(pTableScanInfo->dataReader, &tki, 1);
int64_t oldSkey = pTableScanInfo->cond.twindows.skey;
pTableScanInfo->cond.twindows.skey = ts + 1;
tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond);
......@@ -972,7 +976,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
pTableScanInfo->scanTimes = 0;
qDebug("tsdb reader offset seek to uid %" PRId64 " ts %" PRId64 ", table cur set to %d , all table num %d", uid,
ts, pTableScanInfo->currentTable, tableSz);
ts, pTableScanInfo->currentTable, numOfTables);
/*}*/
} else {
ASSERT(0);
......@@ -994,9 +998,15 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
initQueryTableDataCondForTmq(&pTaskInfo->streamInfo.tableCond, sContext, &mtInfo);
pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts;
pTaskInfo->tableqinfoList.pTableList = taosArrayInit(1, sizeof(STableKeyInfo));
taosArrayPush(pTaskInfo->tableqinfoList.pTableList, &(STableKeyInfo){.uid = mtInfo.uid, .groupId = 0});
tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pTaskInfo->tableqinfoList.pTableList,
STableListInfo* pListInfo = &pTaskInfo->tableqinfoList;
pListInfo->pTableList = taosArrayInit(1, sizeof(STableKeyInfo));
taosArrayPush(pListInfo->pTableList, &(STableKeyInfo){.uid = mtInfo.uid, .groupId = 0});
STableKeyInfo* pList = taosArrayGet(pListInfo->pTableList, 0);
tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, taosArrayGetSize(pListInfo->pTableList),
&pInfo->dataReader, NULL);
cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
......
......@@ -494,7 +494,7 @@ static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int
// todo: refactor this
if (fmIsImplicitTsFunc(pCtx[i].functionId) && (j == pOneExpr->base.numOfParams - 1)) {
pInput->pPTS = pInput->pData[j]; // in case of merge function, this is not always the ts column data.
// ASSERT(pInput->pPTS->info.type == TSDB_DATA_TYPE_TIMESTAMP);
// ASSERT(pInput->pPTS->info.type == TSDB_DATA_TYPE_TIMESTAMP);
}
ASSERT(pInput->pData[j] != NULL);
} else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
......@@ -815,7 +815,7 @@ bool isTaskKilled(SExecTaskInfo* pTaskInfo) {
// abort current query execution.
if (pTaskInfo->owner != 0 &&
((taosGetTimestampSec() - pTaskInfo->cost.start / 1000) > 10 * getMaximumIdleDurationSec())
/*(!needBuildResAfterQueryComplete(pTaskInfo))*/) {
/*(!needBuildResAfterQueryComplete(pTaskInfo))*/) {
assert(pTaskInfo->cost.start != 0);
// qDebug("QInfo:%" PRIu64 " retrieve not arrive beyond %d ms, abort current query execution, start:%" PRId64
// ", current:%d", pQInfo->qId, 1, pQInfo->startExecTs, taosGetTimestampSec());
......@@ -1739,8 +1739,6 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t
return TSDB_CODE_SUCCESS;
}
static void doDestroyTableList(STableListInfo* pTableqinfoList);
typedef struct SFetchRspHandleWrapper {
uint32_t exchangeId;
int32_t sourceIndex;
......@@ -1965,7 +1963,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
if (pRsp->numOfRows == 0) {
qDebug("%s vgId:%d, taskId:0x%" PRIx64 " execId:%d index:%d completed, rowsOfSource:%" PRIu64
", totalRows:%" PRIu64 ", completed:%d try next %d/%" PRIzu,
", totalRows:%" PRIu64 ", completed:%d try next %d/%" PRIzu,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pDataInfo->totalRows,
pExchangeInfo->loadInfo.totalRows, completed + 1, i + 1, totalSources);
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
......@@ -1992,10 +1990,10 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
if (pRsp->completed == 1) {
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64
" execId:%d"
" index:%d completed, blocks:%d, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64
", total:%.2f Kb,"
" completed:%d try next %d/%" PRIzu,
" execId:%d"
" index:%d completed, blocks:%d, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64
", total:%.2f Kb,"
" completed:%d try next %d/%" PRIzu,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pRsp->numOfBlocks,
pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0,
completed + 1, i + 1, totalSources);
......@@ -2003,7 +2001,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
} else {
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64
" execId:%d blocks:%d, numOfRows:%d, totalRows:%" PRIu64 ", total:%.2f Kb",
" execId:%d blocks:%d, numOfRows:%d, totalRows:%" PRIu64 ", total:%.2f Kb",
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRsp->numOfBlocks,
pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0);
}
......@@ -2030,7 +2028,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
sched_yield();
}
_error:
_error:
pTaskInfo->code = code;
}
......@@ -2091,7 +2089,7 @@ static int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
if (pRsp->numOfRows == 0) {
qDebug("%s vgId:%d, taskID:0x%" PRIx64 " execId:%d %d of total completed, rowsOfSource:%" PRIu64
", totalRows:%" PRIu64 " try next",
", totalRows:%" PRIu64 " try next",
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pExchangeInfo->current + 1,
pDataInfo->totalRows, pLoadInfo->totalRows);
......@@ -2108,7 +2106,7 @@ static int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
if (pRsp->completed == 1) {
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, rowsOfSource:%" PRIu64
", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu,
", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRetrieveRsp->numOfRows,
pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, pExchangeInfo->current + 1,
totalSources);
......@@ -2117,7 +2115,7 @@ static int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
pExchangeInfo->current += 1;
} else {
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, totalRows:%" PRIu64
", totalBytes:%" PRIu64,
", totalBytes:%" PRIu64,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRetrieveRsp->numOfRows,
pLoadInfo->totalRows, pLoadInfo->totalSize);
}
......@@ -2305,7 +2303,7 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode
createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL, destroyExchangeOperatorInfo, NULL);
return pOperator;
_error:
_error:
if (pInfo != NULL) {
doDestroyExchangeOperatorInfo(pInfo);
}
......@@ -3042,7 +3040,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN
return pOperator;
_error:
_error:
if (pInfo != NULL) {
destroyAggOperatorInfo(pInfo);
}
......@@ -3212,8 +3210,8 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
SInterval* pInterval =
QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == downstream->operatorType
? &((SMergeAlignedIntervalAggOperatorInfo*)downstream->info)->intervalAggOperatorInfo->interval
: &((SIntervalAggOperatorInfo*)downstream->info)->interval;
? &((SMergeAlignedIntervalAggOperatorInfo*)downstream->info)->intervalAggOperatorInfo->interval
: &((SIntervalAggOperatorInfo*)downstream->info)->interval;
int32_t order = (pPhyFillNode->inputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
int32_t type = convertFillType(pPhyFillNode->mode);
......@@ -3258,7 +3256,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
code = appendDownstream(pOperator, &downstream, 1);
return pOperator;
_error:
_error:
if (pInfo != NULL) {
destroyFillOperatorInfo(pInfo);
}
......@@ -3366,62 +3364,116 @@ static void cleanupTableSchemaInfo(SSchemaInfo* pSchemaInfo) {
static void cleanupStreamInfo(SStreamTaskInfo* pStreamInfo) { tDeleteSSchemaWrapper(pStreamInfo->schema); }
static int32_t orderbyGroupIdComparFn(const void* p1, const void* p2) {
STableKeyInfo* pInfo1 = (STableKeyInfo*) p1;
STableKeyInfo* pInfo2 = (STableKeyInfo*) p2;
if (pInfo1->groupId == pInfo2->groupId) {
return 0;
} else {
return pInfo1->groupId < pInfo2->groupId? -1:1;
}
}
static int32_t sortTableGroup(STableListInfo* pTableListInfo) {
taosArrayClear(pTableListInfo->pGroupList);
int32_t code = TSDB_CODE_SUCCESS;
taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn);
int32_t size = getTotalTables(pTableListInfo);
SArray* pList = taosArrayInit(4, sizeof(int32_t));
STableKeyInfo* pInfo = taosArrayGet(pTableListInfo->pTableList, 0);
uint64_t gid = pInfo->groupId;
int32_t start = 0;
taosArrayPush(pList, &start);
for(int32_t i = 1; i < size; ++i) {
pInfo = taosArrayGet(pTableListInfo->pTableList, i);
if (pInfo->groupId != gid) {
taosArrayPush(pList, &i);
gid = pInfo->groupId;
}
}
pTableListInfo->numOfOuputGroups = taosArrayGetSize(pList);
pTableListInfo->groupOffset = taosMemoryMalloc(sizeof(int32_t) * pTableListInfo->numOfOuputGroups);
memcpy(pTableListInfo->groupOffset, taosArrayGet(pList, 0), sizeof(int32_t) * pTableListInfo->numOfOuputGroups);
taosArrayDestroy(pList);
# if 0
SArray* sortSupport = taosArrayInit(16, sizeof(uint64_t));
if (sortSupport == NULL) return TSDB_CODE_OUT_OF_MEMORY;
for (int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); i++) {
if (sortSupport == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
size_t num = taosArrayGetSize(pTableListInfo->pTableList);
for (int32_t i = 0; i < num; i++) {
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
uint64_t* groupId = taosHashGet(pTableListInfo->map, &info->uid, sizeof(uint64_t));
int32_t index = taosArraySearchIdx(sortSupport, groupId, compareUint64Val, TD_EQ);
if (index == -1) {
void* p = taosArraySearch(sortSupport, groupId, compareUint64Val, TD_GT);
SArray* tGroup = taosArrayInit(8, sizeof(STableKeyInfo));
if (tGroup == NULL) {
taosArrayDestroy(sortSupport);
return TSDB_CODE_OUT_OF_MEMORY;
code = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
if (taosArrayPush(tGroup, info) == NULL) {
qError("taos push info array error");
taosArrayDestroy(sortSupport);
return TSDB_CODE_QRY_APP_ERROR;
code = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
if (p == NULL) {
if (taosArrayPush(sortSupport, groupId) == NULL) {
qError("taos push support array error");
taosArrayDestroy(sortSupport);
return TSDB_CODE_QRY_APP_ERROR;
code = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
if (taosArrayPush(pTableListInfo->pGroupList, &tGroup) == NULL) {
qError("taos push group array error");
taosArrayDestroy(sortSupport);
return TSDB_CODE_QRY_APP_ERROR;
code = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
} else {
int32_t pos = TARRAY_ELEM_IDX(sortSupport, p);
if (taosArrayInsert(sortSupport, pos, groupId) == NULL) {
qError("taos insert support array error");
taosArrayDestroy(sortSupport);
return TSDB_CODE_QRY_APP_ERROR;
code = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
if (taosArrayInsert(pTableListInfo->pGroupList, pos, &tGroup) == NULL) {
qError("taos insert group array error");
taosArrayDestroy(sortSupport);
return TSDB_CODE_QRY_APP_ERROR;
code = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
}
} else {
SArray* tGroup = (SArray*)taosArrayGetP(pTableListInfo->pGroupList, index);
if (taosArrayPush(tGroup, info) == NULL) {
qError("taos push uid array error");
taosArrayDestroy(sortSupport);
return TSDB_CODE_QRY_APP_ERROR;
code = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
}
}
taosArrayDestroy(sortSupport);
#endif
return TDB_CODE_SUCCESS;
_error:
// taosArrayDestroy(sortSupport);
return code;
}
bool groupbyTbname(SNodeList* pGroupList) {
......@@ -3437,38 +3489,44 @@ bool groupbyTbname(SNodeList* pGroupList) {
return bytbname;
}
int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* group) {
int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* group, bool groupSort) {
int32_t code = TSDB_CODE_SUCCESS;
if (group == NULL) {
return TDB_CODE_SUCCESS;
return code;
}
pTableListInfo->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
if (pTableListInfo->map == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
code = TSDB_CODE_OUT_OF_MEMORY;
return code;
}
bool assignUid = groupbyTbname(group);
bool assignUid = groupbyTbname(group);
size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList);
if (assignUid) {
if (assignUid) { // in case of group/partition by tbname, the group id is equalled to the uid of table
for (int32_t i = 0; i < numOfTables; i++) {
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
info->groupId = info->uid;
taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &info->groupId, sizeof(uint64_t));
}
pTableListInfo->oneTableForEachGroup = true;
if (groupSort) {
pTableListInfo->numOfOuputGroups = numOfTables;
}
} else {
int32_t code = getColInfoResultForGroupby(pHandle->meta, group, pTableListInfo);
code = getColInfoResultForGroupby(pHandle->meta, group, pTableListInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
if (pTableListInfo->needSortTableByGroupId) {
return sortTableGroup(pTableListInfo);
if (groupSort) {
code = sortTableGroup(pTableListInfo);
}
}
return TDB_CODE_SUCCESS;
return code;
}
static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pCond) {
......@@ -3505,6 +3563,12 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) {
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
// NOTE: this is an patch to fix the physical plan
// TODO remove it later
if (pTableScanNode->scan.node.pLimit != NULL) {
pTableScanNode->groupSort = true;
}
int32_t code =
createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, pHandle,
pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo));
......@@ -3563,8 +3627,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
int32_t sz = taosArrayGetSize(pTableListInfo->pTableList);
for (int32_t i = 0; i < sz; i++) {
STableKeyInfo* pKeyInfo = taosArrayGet(pTableListInfo->pTableList, i);
qDebug("creating stream task: add table %" PRId64, pKeyInfo->uid);
qDebug("creating stream task: add table uid:%" PRIu64, pKeyInfo->uid);
}
qDebug("table in hashmap, %d", (int32_t) getTotalTables(pTableListInfo));
#endif
}
......@@ -3599,13 +3665,20 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
}
SQueryTableDataCond cond = {0};
int32_t code = initTableblockDistQueryCond(pBlockNode->suid, &cond);
int32_t code = initTableblockDistQueryCond(pBlockNode->suid, &cond);
if (code != TSDB_CODE_SUCCESS) {
return NULL;
}
size_t num = getTotalTables(pTableListInfo);
void* pList = NULL;
if (num > 0) {
pList = taosArrayGet(pTableListInfo->pTableList, 0);
}
STsdbReader* pReader = NULL;
tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo->pTableList, &pReader, "");
tsdbReaderOpen(pHandle->vnode, &cond, pList, num, &pReader, "");
cleanupQueryTableDataCond(&cond);
pOperator = createDataBlockInfoScanOperator(pReader, pHandle, cond.suid, pBlockNode, pTaskInfo);
......@@ -3639,7 +3712,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return pOperator;
}
size_t size = LIST_LENGTH(pPhyNode->pChildren);
size_t size = LIST_LENGTH(pPhyNode->pChildren);
SOperatorInfo** ops = taosMemoryCalloc(size, POINTER_BYTES);
for (int32_t i = 0; i < size; ++i) {
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i);
......@@ -3848,7 +3922,7 @@ int32_t encodeOperator(SOperatorInfo* ops, char** result, int32_t* length, int32
*length = *(int32_t*)(*result);
}
_downstream:
_downstream:
for (int32_t i = 0; i < ops->numOfDownstream; ++i) {
code = encodeOperator(ops->pDownstream[i], result, length, nOptrWithVal);
if (code != TDB_CODE_SUCCESS) {
......@@ -3968,35 +4042,17 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
return code;
_complete:
_complete:
taosMemoryFree(sql);
doDestroyTask(*pTaskInfo);
terrno = code;
return code;
}
void doDestroyTableList(STableListInfo* pTableqinfoList) {
taosArrayDestroy(pTableqinfoList->pTableList);
taosHashCleanup(pTableqinfoList->map);
if (pTableqinfoList->needSortTableByGroupId) {
for (int32_t i = 0; i < taosArrayGetSize(pTableqinfoList->pGroupList); i++) {
SArray* tmp = taosArrayGetP(pTableqinfoList->pGroupList, i);
if (tmp == pTableqinfoList->pTableList) {
continue;
}
taosArrayDestroy(tmp);
}
}
taosArrayDestroy(pTableqinfoList->pGroupList);
pTableqinfoList->pTableList = NULL;
pTableqinfoList->map = NULL;
}
void doDestroyTask(SExecTaskInfo* pTaskInfo) {
qDebug("%s execTask is freed", GET_TASKID(pTaskInfo));
doDestroyTableList(&pTaskInfo->tableqinfoList);
destroyTableList(&pTaskInfo->tableqinfoList);
destroyOperatorInfo(pTaskInfo->pRoot);
cleanupTableSchemaInfo(&pTaskInfo->schemaInfo);
cleanupStreamInfo(&pTaskInfo->streamInfo);
......@@ -4125,8 +4181,8 @@ int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pStat
int32_t size = 0;
void* pVal = NULL;
SWinKey key = {
.ts = *(TSKEY*)pPos->key,
.groupId = pPos->groupId,
.ts = *(TSKEY*)pPos->key,
.groupId = pPos->groupId,
};
int32_t code = streamStateGet(pState, &key, &pVal, &size);
ASSERT(code == 0);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册