diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index d200ed56a5346427d6b2480d50f32297b47e24f0..bed6e93e5a20fc2edf581f6e1b798acfe4bab29f 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -152,17 +152,20 @@ typedef struct STsdbReader STsdbReader; #define CACHESCAN_RETRIEVE_LAST_ROW 0x4 #define CACHESCAN_RETRIEVE_LAST 0x8 -int32_t tsdbSetTableId(STsdbReader *pReader, int64_t uid); -int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, SArray *pTableList, STsdbReader **ppReader, - const char *idstr); +int32_t tsdbSetTableId(STsdbReader *pReader, int64_t uid); +int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, SArray *pTableList, STsdbReader **ppReader, + const char *idstr); + void tsdbReaderClose(STsdbReader *pReader); bool tsdbNextDataBlock(STsdbReader *pReader); +bool tsdbTableNextDataBlock(STsdbReader *pReader, int64_t uid); void tsdbRetrieveDataBlockInfo(STsdbReader *pReader, SDataBlockInfo *pDataBlockInfo); int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SColumnDataAgg ***pBlockStatis, bool *allHave); SArray *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList); int32_t tsdbReaderReset(STsdbReader *pReader, SQueryTableDataCond *pCond); int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo); int64_t tsdbGetNumOfRowsInMemTable(STsdbReader *pHandle); +bool tsdbIsAscendingOrder(STsdbReader *pReader); void *tsdbGetIdx(SMeta *pMeta); void *tsdbGetIvtIdx(SMeta *pMeta); uint64_t getReaderMaxVersion(STsdbReader *pReader); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 98de107ad85c1eccdea9f9b17803e39141d2b805..5a495f263e53332f2d5764cb10e1036f8f551ca5 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -3775,7 +3775,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { return false; } -bool tsdbTableNextDataBlock(STsdbReader* pReader, uint64_t uid) { +bool tsdbTableNextDataBlock(STsdbReader* pReader, int64_t uid) { STableBlockScanInfo* pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &uid, sizeof(uid)); if (pBlockScanInfo == NULL) { // no data block for the table of given uid return false; @@ -4174,3 +4174,4 @@ void tsdbUntakeReadSnap(STsdb* pTsdb, STsdbReadSnap* pSnap, const char* idStr) { } tsdbTrace("vgId:%d, untake read snapshot, %s", TD_VID(pTsdb->pVnode), idStr); } +bool tsdbIsAscendingOrder(STsdbReader* pReader) { return ASCENDING_TRAVERSE(pReader->order); } diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 1d9f26b7edacaa8e1bc7c10b46732e03db367bd6..8769e8ac2fd764825b3f8ba16535e5bd02490e08 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -341,21 +341,23 @@ typedef struct STableScanInfo { } STableScanInfo; typedef struct STableMergeScanInfo { - STableListInfo* tableListInfo; - int32_t tableStartIndex; - int32_t tableEndIndex; - bool hasGroupId; - uint64_t groupId; - SArray* dataReaders; // array of tsdbReaderT* - SReadHandle readHandle; - int32_t bufPageSize; - uint32_t sortBufSize; // max buffer size for in-memory sort - SArray* pSortInfo; - SSortHandle* pSortHandle; - SSDataBlock* pSortInputBlock; - int64_t startTs; // sort start time - SArray* sortSourceParams; - SLimitInfo limitInfo; + STableListInfo* tableListInfo; + int32_t tableStartIndex; + int32_t tableEndIndex; + bool hasGroupId; + uint64_t groupId; + SArray* dataReaders; // array of tsdbReaderT* + SArray* queryConds; // array of queryTableDataCond + STsdbReader* pReader; + SReadHandle readHandle; + int32_t bufPageSize; + uint32_t sortBufSize; // max buffer size for in-memory sort + SArray* pSortInfo; + SSortHandle* pSortHandle; + SSDataBlock* pSortInputBlock; + int64_t startTs; // sort start time + SArray* sortSourceParams; + SLimitInfo limitInfo; SFileBlockLoadRecorder readRecorder; int64_t numOfRows; SScanInfo scanInfo; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 9518e97d0003c2141a7acae945472145fba5939c..e2f3b1c6c4ed9aee1392cc71c1584c832c76b775 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -42,7 +42,7 @@ static int32_t buildDbTableInfoBlock(bool sysInfo, const SSDataBlock* p, const S static char* SYSTABLE_IDX_COLUMN[] = {"table_name", "db_name", "create_time", "columns", "ttl", "stable_name", "vgroup_id', 'uid", "type"}; -static char* SYSTABLE_IDX_EXCEPT[] = {"db_name", "vgroup_id"}; +static char* SYSTABLE_SPECIAL_COL[] = {"db_name", "vgroup_id"}; typedef int32_t (*__sys_filte)(void* pMeta, SNode* cond, SArray* result); typedef int32_t (*__sys_check)(SNode* cond); @@ -363,7 +363,8 @@ void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo if (pLimitInfo->remainOffset >= pBlock->info.rows) { pLimitInfo->remainOffset -= pBlock->info.rows; pBlock->info.rows = 0; - qDebug("current block ignore due to offset, current:%"PRId64", %s", pLimitInfo->remainOffset, GET_TASKID(pTaskInfo)); + qDebug("current block ignore due to offset, current:%" PRId64 ", %s", pLimitInfo->remainOffset, + GET_TASKID(pTaskInfo)); } else { blockDataTrimFirstNRows(pBlock, pLimitInfo->remainOffset); pLimitInfo->remainOffset = 0; @@ -376,9 +377,9 @@ void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo int32_t keep = pBlock->info.rows - overflowRows; blockDataKeepFirstNRows(pBlock, keep); - qDebug("output limit %"PRId64" has reached, %s", pLimit->limit, GET_TASKID(pTaskInfo)); + qDebug("output limit %" PRId64 " has reached, %s", pLimit->limit, GET_TASKID(pTaskInfo)); -// setTaskStatus(pTaskInfo, TASK_COMPLETED); + // setTaskStatus(pTaskInfo, TASK_COMPLETED); pOperator->status = OP_EXEC_DONE; } } @@ -3251,18 +3252,80 @@ static int tableUidCompare(const void* a, const void* b) { } return u1 < u2 ? -1 : 1; } + +typedef struct MergeIndex { + int idx; + int len; +} MergeIndex; + +static FORCE_INLINE int optSysBinarySearch(SArray* arr, int s, int e, uint64_t k) { + uint64_t v; + int32_t m; + while (s <= e) { + m = s + (e - s) / 2; + v = *(uint64_t*)taosArrayGet(arr, m); + if (v >= k) { + e = m - 1; + } else { + s = m + 1; + } + } + return s; +} + +void optSysIntersection(SArray* in, SArray* out) { + int32_t sz = (int32_t)taosArrayGetSize(in); + if (sz <= 0) { + return; + } + MergeIndex* mi = taosMemoryCalloc(sz, sizeof(MergeIndex)); + for (int i = 0; i < sz; i++) { + SArray* t = taosArrayGetP(in, i); + mi[i].len = (int32_t)taosArrayGetSize(t); + mi[i].idx = 0; + } + + SArray* base = taosArrayGetP(in, 0); + for (int i = 0; i < taosArrayGetSize(base); i++) { + uint64_t tgt = *(uint64_t*)taosArrayGet(base, i); + bool has = true; + for (int j = 1; j < taosArrayGetSize(in); j++) { + SArray* oth = taosArrayGetP(in, j); + int mid = optSysBinarySearch(oth, mi[j].idx, mi[j].len - 1, tgt); + if (mid >= 0 && mid < mi[j].len) { + uint64_t val = *(uint64_t*)taosArrayGet(oth, mid); + has = (val == tgt ? true : false); + mi[j].idx = mid; + } else { + has = false; + } + } + if (has == true) { + taosArrayPush(out, &tgt); + } + } + taosMemoryFreeClear(mi); +} + static int32_t optSysMergeRslt(SArray* mRslt, SArray* rslt) { // TODO, find comm mem from mRslt for (int i = 0; i < taosArrayGetSize(mRslt); i++) { - SArray* aRslt = taosArrayGetP(mRslt, i); - taosArrayAddAll(rslt, aRslt); + SArray* arslt = taosArrayGetP(mRslt, i); + taosArraySort(arslt, tableUidCompare); + } + optSysIntersection(mRslt, rslt); + return 0; +} +static int32_t optSysSpecialColumn(SNode* cond) { + SOperatorNode* pOper = (SOperatorNode*)cond; + SColumnNode* pCol = (SColumnNode*)pOper->pLeft; + for (int i = 0; i < sizeof(SYSTABLE_SPECIAL_COL) / sizeof(SYSTABLE_SPECIAL_COL[0]); i++) { + if (0 == strcmp(pCol->colName, SYSTABLE_SPECIAL_COL[i])) { + return 1; + } } - taosArraySort(rslt, tableUidCompare); - taosArrayRemoveDuplicate(rslt, tableUidCompare, NULL); - return 0; } - static int32_t optSysTabFilte(void* arg, SNode* cond, SArray* result) { int ret = -1; if (nodeType(cond) == QUERY_NODE_OPERATOR) { @@ -3286,7 +3349,6 @@ static int32_t optSysTabFilte(void* arg, SNode* cond, SArray* result) { SNodeList* pList = (SNodeList*)pNode->pParameterList; int32_t len = LIST_LENGTH(pList); - if (len <= 0) return ret; bool hasIdx = false; bool hasRslt = true; @@ -3297,12 +3359,16 @@ static int32_t optSysTabFilte(void* arg, SNode* cond, SArray* result) { if (cell == NULL) break; SArray* aRslt = taosArrayInit(16, sizeof(int64_t)); - ret = optSysTabFilteImpl(arg, cell->pNode, aRslt); if (ret == 0) { // has index hasIdx = true; - taosArrayPush(mRslt, &aRslt); + if (optSysSpecialColumn(cell->pNode) == 0) { + taosArrayPush(mRslt, &aRslt); + } else { + // db_name/vgroup not result + taosArrayDestroy(aRslt); + } } else if (ret == -2) { // current vg hasIdx = true; @@ -4172,6 +4238,131 @@ int32_t createMultipleDataReaders(SQueryTableDataCond* pQueryCond, SReadHandle* return TSDB_CODE_SUCCESS; } +int32_t createMultipleDataReaders2(SQueryTableDataCond* pQueryCond, SReadHandle* pHandle, + STableListInfo* pTableListInfo, int32_t tableStartIdx, int32_t tableEndIdx, + STsdbReader** ppReader, const char* idstr) { + STsdbReader* pReader = NULL; + SArray* subTableList = taosArrayInit(1, sizeof(STableKeyInfo)); + for (int32_t i = tableStartIdx; i <= tableEndIdx; ++i) { + taosArrayPush(subTableList, taosArrayGet(pTableListInfo->pTableList, i)); + } + int32_t code = tsdbReaderOpen(pHandle->vnode, pQueryCond, subTableList, &pReader, idstr); + if (code != 0) { + taosArrayDestroy(subTableList); + return code; + } + *ppReader = pReader; + return TSDB_CODE_SUCCESS; +} + +static int32_t loadDataBlockFromOneTable2(SOperatorInfo* pOperator, STableMergeScanInfo* pTableScanInfo, + SSDataBlock* pBlock, uint32_t* status) { + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + STableMergeScanInfo* pInfo = pOperator->info; + + uint64_t uid = pBlock->info.uid; + + SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder; + + pCost->totalBlocks += 1; + pCost->totalRows += pBlock->info.rows; + + *status = pInfo->dataBlockLoadFlag; + if (pTableScanInfo->pFilterNode != NULL || + overlapWithTimeWindow(&pTableScanInfo->interval, &pBlock->info, pTableScanInfo->cond.order)) { + (*status) = FUNC_DATA_REQUIRED_DATA_LOAD; + } + + SDataBlockInfo* pBlockInfo = &pBlock->info; + taosMemoryFreeClear(pBlock->pBlockAgg); + + if (*status == FUNC_DATA_REQUIRED_FILTEROUT) { + qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), + pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); + pCost->filterOutBlocks += 1; + return TSDB_CODE_SUCCESS; + } else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) { + qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), + pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); + pCost->skipBlocks += 1; + + // clear all data in pBlock that are set when handing the previous block + for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) { + SColumnInfoData* pcol = taosArrayGet(pBlock->pDataBlock, i); + pcol->pData = NULL; + } + + return TSDB_CODE_SUCCESS; + } else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) { + pCost->loadBlockStatis += 1; + + bool allColumnsHaveAgg = true; + SColumnDataAgg** pColAgg = NULL; + STsdbReader* reader = pTableScanInfo->pReader; + tsdbRetrieveDatablockSMA(reader, &pColAgg, &allColumnsHaveAgg); + + if (allColumnsHaveAgg == true) { + int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); + + // todo create this buffer during creating operator + if (pBlock->pBlockAgg == NULL) { + pBlock->pBlockAgg = taosMemoryCalloc(numOfCols, POINTER_BYTES); + } + + for (int32_t i = 0; i < numOfCols; ++i) { + SColMatchItem* pColMatchInfo = taosArrayGet(pTableScanInfo->matchInfo.pList, i); + if (!pColMatchInfo->needOutput) { + continue; + } + pBlock->pBlockAgg[pColMatchInfo->dstSlotId] = pColAgg[i]; + } + + return TSDB_CODE_SUCCESS; + } else { // failed to load the block sma data, data block statistics does not exist, load data block instead + *status = FUNC_DATA_REQUIRED_DATA_LOAD; + } + } + + ASSERT(*status == FUNC_DATA_REQUIRED_DATA_LOAD); + + pCost->totalCheckedRows += pBlock->info.rows; + pCost->loadBlocks += 1; + + STsdbReader* reader = pTableScanInfo->pReader; + SArray* pCols = tsdbRetrieveDataBlock(reader, NULL); + if (pCols == NULL) { + return terrno; + } + + relocateColumnData(pBlock, pTableScanInfo->matchInfo.pList, pCols, true); + + // currently only the tbname pseudo column + if (pTableScanInfo->pseudoSup.numOfExprs > 0) { + int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pseudoSup.pExprInfo, + pTableScanInfo->pseudoSup.numOfExprs, pBlock, GET_TASKID(pTaskInfo)); + if (code != TSDB_CODE_SUCCESS) { + T_LONG_JMP(pTaskInfo->env, code); + } + } + + if (pTableScanInfo->pFilterNode != NULL) { + int64_t st = taosGetTimestampMs(); + doFilter(pTableScanInfo->pFilterNode, pBlock, &pTableScanInfo->matchInfo, NULL); + + double el = (taosGetTimestampUs() - st) / 1000.0; + pTableScanInfo->readRecorder.filterTime += el; + + if (pBlock->info.rows == 0) { + pCost->filterOutBlocks += 1; + qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d, elapsed time:%.2f ms", + GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, el); + } else { + qDebug("%s data block filter applied, elapsed time:%.2f ms", GET_TASKID(pTaskInfo), el); + } + } + return TSDB_CODE_SUCCESS; +} + // todo refactor static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeScanInfo* pTableScanInfo, int32_t readerIdx, SSDataBlock* pBlock, uint32_t* status) { @@ -4255,7 +4446,7 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc pCost->totalCheckedRows += pBlock->info.rows; pCost->loadBlocks += 1; - STsdbReader* reader = taosArrayGetP(pTableScanInfo->dataReaders, readerIdx); + STsdbReader* reader = pTableScanInfo->pReader; // taosArrayGetP(pTableScanInfo->dataReaders, readerIdx); SArray* pCols = tsdbRetrieveDataBlock(reader, NULL); if (pCols == NULL) { return terrno; @@ -4294,9 +4485,143 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc typedef struct STableMergeScanSortSourceParam { SOperatorInfo* pOperator; int32_t readerIdx; + int64_t uid; SSDataBlock* inputBlock; } STableMergeScanSortSourceParam; +static SSDataBlock* getTableDataBlockTemp(void* param) { + STableMergeScanSortSourceParam* source = param; + SOperatorInfo* pOperator = source->pOperator; + STableMergeScanInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + int32_t readIdx = source->readerIdx; + SSDataBlock* pBlock = source->inputBlock; + STableMergeScanInfo* pTableScanInfo = pOperator->info; + + SQueryTableDataCond* pQueryCond = taosArrayGet(pTableScanInfo->queryConds, readIdx); + + blockDataCleanup(pBlock); + + int64_t st = taosGetTimestampUs(); + + SArray* subTable = taosArrayInit(1, sizeof(STableKeyInfo)); + taosArrayPush(subTable, taosArrayGet(pInfo->tableListInfo->pTableList, readIdx + pInfo->tableStartIndex)); + SReadHandle* pHandle = &pInfo->readHandle; + tsdbReaderOpen(pHandle->vnode, pQueryCond, subTable, &pInfo->pReader, GET_TASKID(pTaskInfo)); + taosArrayDestroy(subTable); + + STsdbReader* reader = pInfo->pReader; + while (tsdbNextDataBlock(reader)) { + if (isTaskKilled(pOperator->pTaskInfo)) { + T_LONG_JMP(pOperator->pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); + } + + // process this data block based on the probabilities + bool processThisBlock = processBlockWithProbability(&pTableScanInfo->sample); + if (!processThisBlock) { + continue; + } + + blockDataCleanup(pBlock); + SDataBlockInfo binfo = pBlock->info; + tsdbRetrieveDataBlockInfo(reader, &binfo); + + blockDataEnsureCapacity(pBlock, binfo.rows); + pBlock->info.type = binfo.type; + pBlock->info.uid = binfo.uid; + pBlock->info.window = binfo.window; + pBlock->info.rows = binfo.rows; + + if (tsdbIsAscendingOrder(pInfo->pReader)) { + pQueryCond->twindows.skey = pBlock->info.window.ekey + 1; + } else { + pQueryCond->twindows.ekey = pBlock->info.window.skey - 1; + } + + uint32_t status = 0; + int32_t code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, readIdx, pBlock, &status); + if (code != TSDB_CODE_SUCCESS) { + T_LONG_JMP(pOperator->pTaskInfo->env, code); + } + + // current block is filter out according to filter condition, continue load the next block + if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) { + continue; + } + + uint64_t* groupId = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t)); + if (groupId) { + pBlock->info.groupId = *groupId; + } + + pOperator->resultInfo.totalRows += pBlock->info.rows; // pTableScanInfo->readRecorder.totalRows; + pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; + + tsdbReaderClose(pInfo->pReader); + pInfo->pReader = NULL; + return pBlock; + } + tsdbReaderClose(pInfo->pReader); + pInfo->pReader = NULL; + return NULL; +} +static SSDataBlock* getTableDataBlock2(void* param) { + STableMergeScanSortSourceParam* source = param; + SOperatorInfo* pOperator = source->pOperator; + int64_t uid = source->uid; + SSDataBlock* pBlock = source->inputBlock; + STableMergeScanInfo* pTableScanInfo = pOperator->info; + + int64_t st = taosGetTimestampUs(); + + blockDataCleanup(pBlock); + + STsdbReader* reader = pTableScanInfo->pReader; + while (tsdbTableNextDataBlock(reader, uid)) { + if (isTaskKilled(pOperator->pTaskInfo)) { + T_LONG_JMP(pOperator->pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); + } + + // process this data block based on the probabilities + bool processThisBlock = processBlockWithProbability(&pTableScanInfo->sample); + if (!processThisBlock) { + continue; + } + + blockDataCleanup(pBlock); + SDataBlockInfo binfo = pBlock->info; + tsdbRetrieveDataBlockInfo(reader, &binfo); + + blockDataEnsureCapacity(pBlock, binfo.rows); + pBlock->info.type = binfo.type; + pBlock->info.uid = binfo.uid; + pBlock->info.window = binfo.window; + pBlock->info.rows = binfo.rows; + + uint32_t status = 0; + int32_t code = loadDataBlockFromOneTable2(pOperator, pTableScanInfo, pBlock, &status); + if (code != TSDB_CODE_SUCCESS) { + T_LONG_JMP(pOperator->pTaskInfo->env, code); + } + + // current block is filter out according to filter condition, continue load the next block + if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) { + continue; + } + + uint64_t* groupId = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t)); + if (groupId) { + pBlock->info.groupId = *groupId; + } + + pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows; + pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; + + return pBlock; + } + return NULL; +} + static SSDataBlock* getTableDataBlock(void* param) { STableMergeScanSortSourceParam* source = param; SOperatorInfo* pOperator = source->pOperator; @@ -4332,7 +4657,6 @@ static SSDataBlock* getTableDataBlock(void* param) { uint32_t status = 0; int32_t code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, readerIdx, pBlock, &status); - // int32_t code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status); if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pOperator->pTaskInfo->env, code); } @@ -4375,6 +4699,14 @@ SArray* generateSortByTsInfo(SArray* colMatchInfo, int32_t order) { return pList; } +int32_t dumpSQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond* dst) { + memcpy((void*)dst, (void*)src, sizeof(SQueryTableDataCond)); + dst->colList = taosMemoryCalloc(src->numOfCols, sizeof(SColumnInfo)); + for (int i = 0; i < src->numOfCols; i++) { + dst->colList[i] = src->colList[i]; + } + return 0; +} int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { STableMergeScanInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -4395,10 +4727,9 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { int32_t tableEndIdx = pInfo->tableEndIndex; STableListInfo* tableListInfo = pInfo->tableListInfo; - pInfo->dataReaders = taosArrayInit(64, POINTER_BYTES); - createMultipleDataReaders(&pInfo->cond, &pInfo->readHandle, tableListInfo, tableStartIdx, tableEndIdx, - pInfo->dataReaders, GET_TASKID(pTaskInfo)); + // pInfo->dataReaders = taosArrayInit(64, POINTER_BYTES); + pInfo->pReader = NULL; // todo the total available buffer should be determined by total capacity of buffer of this task. // the additional one is reserved for merge result pInfo->sortBufSize = pInfo->bufPageSize * (tableEndIdx - tableStartIdx + 1 + 1); @@ -4406,18 +4737,27 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage, pInfo->pSortInputBlock, pTaskInfo->id.str); - tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlock, NULL, NULL); + tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlockTemp, NULL, NULL); + + // one table has one data block + int32_t numOfTable = tableEndIdx - tableStartIdx + 1; + pInfo->queryConds = taosArrayInit(numOfTable, sizeof(SQueryTableDataCond)); + + for (int32_t i = 0; i < numOfTable; ++i) { + STableKeyInfo* tableKeyInfo = taosArrayGet(pInfo->tableListInfo->pTableList, i + tableStartIdx); - size_t numReaders = taosArrayGetSize(pInfo->dataReaders); - for (int32_t i = 0; i < numReaders; ++i) { STableMergeScanSortSourceParam param = {0}; param.readerIdx = i; param.pOperator = pOperator; param.inputBlock = createOneDataBlock(pInfo->pResBlock, false); taosArrayPush(pInfo->sortSourceParams, ¶m); + + SQueryTableDataCond cond; + dumpSQueryTableCond(&pInfo->cond, &cond); + taosArrayPush(pInfo->queryConds, &cond); } - for (int32_t i = 0; i < numReaders; ++i) { + for (int32_t i = 0; i < numOfTable; ++i) { SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i); ps->param = param; @@ -4437,7 +4777,7 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) { STableMergeScanInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - size_t numReaders = taosArrayGetSize(pInfo->dataReaders); + int32_t numOfTable = taosArrayGetSize(pInfo->queryConds); SSortExecInfo sortExecInfo = tsortGetSortExecInfo(pInfo->pSortHandle); pInfo->sortExecInfo.sortMethod = sortExecInfo.sortMethod; @@ -4446,7 +4786,7 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) { pInfo->sortExecInfo.readBytes += sortExecInfo.readBytes; pInfo->sortExecInfo.writeBytes += sortExecInfo.writeBytes; - for (int32_t i = 0; i < numReaders; ++i) { + for (int32_t i = 0; i < numOfTable; ++i) { STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i); blockDataDestroy(param->inputBlock); } @@ -4454,12 +4794,13 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) { tsortDestroySortHandle(pInfo->pSortHandle); - for (int32_t i = 0; i < numReaders; ++i) { - STsdbReader* reader = taosArrayGetP(pInfo->dataReaders, i); - tsdbReaderClose(reader); + for (int32_t i = 0; i < taosArrayGetSize(pInfo->queryConds); i++) { + SQueryTableDataCond* cond = taosArrayGet(pInfo->queryConds, i); + taosMemoryFree(cond->colList); } - taosArrayDestroy(pInfo->dataReaders); - pInfo->dataReaders = NULL; + taosArrayDestroy(pInfo->queryConds); + pInfo->queryConds = NULL; + return TSDB_CODE_SUCCESS; } @@ -4543,13 +4884,23 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { void destroyTableMergeScanOperatorInfo(void* param) { STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param; cleanupQueryTableDataCond(&pTableScanInfo->cond); + + int32_t numOfTable = taosArrayGetSize(pTableScanInfo->queryConds); + + for (int32_t i = 0; i < numOfTable; i++) { + STableMergeScanSortSourceParam* param = taosArrayGet(pTableScanInfo->sortSourceParams, i); + blockDataDestroy(param->inputBlock); + } taosArrayDestroy(pTableScanInfo->sortSourceParams); - for (int32_t i = 0; i < taosArrayGetSize(pTableScanInfo->dataReaders); ++i) { - STsdbReader* reader = taosArrayGetP(pTableScanInfo->dataReaders, i); - tsdbReaderClose(reader); + tsdbReaderClose(pTableScanInfo->pReader); + pTableScanInfo->pReader = NULL; + + for (int i = 0; i < taosArrayGetSize(pTableScanInfo->queryConds); i++) { + SQueryTableDataCond* pCond = taosArrayGet(pTableScanInfo->queryConds, i); + taosMemoryFree(pCond->colList); } - taosArrayDestroy(pTableScanInfo->dataReaders); + taosArrayDestroy(pTableScanInfo->queryConds); if (pTableScanInfo->matchInfo.pList != NULL) { taosArrayDestroy(pTableScanInfo->matchInfo.pList); diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 16d853ac7e628a1562714f77c5e0b4713f5b9549..26f1932b1262c98c8cd6b42586b0e6dabc284ae2 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -38,7 +38,8 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* SExprInfo* pExprInfo = createExprInfo(pSortNode->pExprs, NULL, &numOfCols); int32_t numOfOutputCols = 0; - int32_t code = extractColMatchInfo(pSortNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID, &pInfo->matchInfo); + int32_t code = + extractColMatchInfo(pSortNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID, &pInfo->matchInfo); pOperator->exprSupp.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset); @@ -62,8 +63,8 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* // there are headers, so pageSize = rowSize + header pInfo->sortBufSize = pInfo->bufPageSize * 16; // TODO dynamic set the available sort buffer - pOperator->fpSet = createOperatorFpSet(doOpenSortOperator, doSort, NULL, NULL, destroyOrderOperatorInfo, - getExplainExecInfo); + pOperator->fpSet = + createOperatorFpSet(doOpenSortOperator, doSort, NULL, NULL, destroyOrderOperatorInfo, getExplainExecInfo); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -126,7 +127,7 @@ SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, i int32_t numOfCols = taosArrayGetSize(pColMatchInfo); for (int32_t i = 0; i < numOfCols; ++i) { SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, i); -// ASSERT(pmInfo->matchType == COL_MATCH_FROM_SLOT_ID); + // ASSERT(pmInfo->matchType == COL_MATCH_FROM_SLOT_ID); SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId); SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->dstSlotId); @@ -316,7 +317,7 @@ SSDataBlock* getGroupSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlo int32_t numOfCols = taosArrayGetSize(pColMatchInfo); for (int32_t i = 0; i < numOfCols; ++i) { SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, i); -// ASSERT(pmInfo->matchType == COL_MATCH_FROM_SLOT_ID); + // ASSERT(pmInfo->matchType == COL_MATCH_FROM_SLOT_ID); SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId); SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->dstSlotId); @@ -592,7 +593,7 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData blockDataEnsureCapacity(p, capacity); - _retry: +_retry: while (1) { STupleHandle* pTupleHandle = NULL; if (pInfo->groupSort) { @@ -647,13 +648,13 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData int32_t numOfCols = taosArrayGetSize(pColMatchInfo); for (int32_t i = 0; i < numOfCols; ++i) { SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, i); -// ASSERT(pColMatchInfo-> == COL_MATCH_FROM_SLOT_ID); + // ASSERT(pColMatchInfo-> == COL_MATCH_FROM_SLOT_ID); SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId); SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->dstSlotId); colDataAssign(pDst, pSrc, p->info.rows, &pDataBlock->info); } - + pInfo->limitInfo.numOfOutputRows += p->info.rows; pDataBlock->info.rows = p->info.rows; pDataBlock->info.groupId = pInfo->groupId; } @@ -734,7 +735,8 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size SArray* pSortInfo = createSortInfo(pMergePhyNode->pMergeKeys); int32_t numOfOutputCols = 0; - code = extractColMatchInfo(pMergePhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID, &pInfo->matchInfo); + code = extractColMatchInfo(pMergePhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID, + &pInfo->matchInfo); if (code != TSDB_CODE_SUCCESS) { goto _error; }