diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index db0dd1ffbb8e099f0743dc2e5ab55dd37a2cec02..363824f2e2e6a0bce5b988bec2e3511a0433c5ac 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -399,8 +399,8 @@ typedef struct SOptrBasicInfo { // TODO move the resultrowsiz together with SOptrBasicInfo:rowCellInfoOffset typedef struct SAggSupporter { SHashObj* pResultRowHashTable; // quick locate the window object for each result - SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not - SArray* pResultRowArrayList; // The array list that contains the Result rows +// SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not +// SArray* pResultRowArrayList; // The array list that contains the Result rows char* keyBuf; // window key buffer SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 6a5f5b2dc348545f4c350932b55a0b143b201696..321e613f8273673434fe14ba40161caee9eb2d6e 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -324,11 +324,16 @@ static void prepareResultListBuffer(SResultRowInfo* pResultRowInfo, jmp_buf env) newCapacity = (int64_t)(pResultRowInfo->capacity * 1.5); } - if (newCapacity == pResultRowInfo->capacity) { + if (newCapacity <= pResultRowInfo->capacity) { newCapacity += 4; } - pResultRowInfo->pPosition = taosMemoryRealloc(pResultRowInfo->pPosition, newCapacity * sizeof(SResultRowPosition)); + char* p = taosMemoryRealloc(pResultRowInfo->pPosition, newCapacity * sizeof(SResultRowPosition)); + if (p == NULL) { + longjmp(env, TSDB_CODE_OUT_OF_MEMORY); + } + + pResultRowInfo->pPosition = (SResultRowPosition*)p; int32_t inc = (int32_t)newCapacity - pResultRowInfo->capacity; memset(&pResultRowInfo->pPosition[pResultRowInfo->capacity], 0, sizeof(SResultRowPosition) * inc); @@ -419,88 +424,56 @@ SResultRow* getNewResultRow_rv(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, * | 8 bytes | actual length | * +----------+---------------+ */ -static SResultRow* doSetResultOutBufByKey_rv(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, int64_t uid, +static SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, int64_t uid, char* pData, int16_t bytes, bool masterscan, uint64_t groupId, SExecTaskInfo* pTaskInfo, bool isIntervalQuery, SAggSupporter* pSup) { - bool existInCurrentResusltRowInfo = false; SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, groupId); SResultRowPosition* p1 = (SResultRowPosition*)taosHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); + SResultRow* pResult = NULL; + // in case of repeat scan/reverse scan, no new time window added. if (isIntervalQuery) { - if (!masterscan) { // the *p1 may be NULL in case of sliding+offset exists. - if (p1 != NULL) { - return getResultRowByPos(pResultBuf, p1); - } else { - return NULL; - } - } - - if (p1 != NULL) { - if (pResultRowInfo->size == 0) { - existInCurrentResusltRowInfo = - false; // this time window created by other timestamp that does not belongs to current table. - } else if (pResultRowInfo->size == 1) { - SResultRowPosition* p = &pResultRowInfo->pPosition[0]; - existInCurrentResusltRowInfo = (p->pageId == p1->pageId && p->offset == p1->offset); - } else { // check if current pResultRowInfo contains the existInCurrentResusltRowInfo pResultRow - SET_RES_EXT_WINDOW_KEY(pSup->keyBuf, pData, bytes, uid, pResultRowInfo); - int64_t* index = taosHashGet(pSup->pResultRowListSet, pSup->keyBuf, GET_RES_EXT_WINDOW_KEY_LEN(bytes)); - if (index != NULL) { - // TODO check the scan order for current opened time window - existInCurrentResusltRowInfo = true; - } else { - existInCurrentResusltRowInfo = false; - } - } + if (masterscan && p1 != NULL) { // the *p1 may be NULL in case of sliding+offset exists. + pResult = getResultRowByPos(pResultBuf, p1); } } else { // In case of group by column query, the required SResultRow object must be existInCurrentResusltRowInfo in the // pResultRowInfo object. if (p1 != NULL) { - return getResultRowByPos(pResultBuf, p1); + pResult = getResultRowByPos(pResultBuf, p1); } } - SResultRow* pResult = NULL; - if (!existInCurrentResusltRowInfo) { // 1. close current opened time window - if (pResultRowInfo->cur.pageId != -1) { // todo extract function - SResultRowPosition pos = pResultRowInfo->cur; - SFilePage* pPage = getBufPage(pResultBuf, pos.pageId); - SResultRow* pRow = (SResultRow*)((char*)pPage + pos.offset); - closeResultRow(pRow); - releaseBufPage(pResultBuf, pPage); - } - - prepareResultListBuffer(pResultRowInfo, pTaskInfo->env); - if (p1 == NULL) { - pResult = getNewResultRow_rv(pResultBuf, groupId, pSup->resultRowSize); - initResultRow(pResult); - - // add a new result set for a new group - SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset}; - taosHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos, - sizeof(SResultRowPosition)); - SResultRowCell cell = {.groupId = groupId, .pos = pos}; - taosArrayPush(pSup->pResultRowArrayList, &cell); - } else { - pResult = getResultRowByPos(pResultBuf, p1); - } + if (pResultRowInfo->cur.pageId != -1 && ((pResult == NULL) || (pResult->pageId != pResultRowInfo->cur.pageId && + pResult->offset != pResultRowInfo->cur.offset))) { + // todo extract function + SResultRowPosition pos = pResultRowInfo->cur; + SFilePage* pPage = getBufPage(pResultBuf, pos.pageId); + SResultRow* pRow = (SResultRow*)((char*)pPage + pos.offset); + closeResultRow(pRow); + releaseBufPage(pResultBuf, pPage); + } + + // allocate a new buffer page + prepareResultListBuffer(pResultRowInfo, pTaskInfo->env); + if (pResult == NULL) { + pResult = getNewResultRow_rv(pResultBuf, groupId, pSup->resultRowSize); + initResultRow(pResult); - // 2. set the new time window to be the new active time window - pResultRowInfo->pPosition[pResultRowInfo->size++] = - (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset}; - pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset}; - SET_RES_EXT_WINDOW_KEY(pSup->keyBuf, pData, bytes, uid, pResultRowInfo); - taosHashPut(pSup->pResultRowListSet, pSup->keyBuf, GET_RES_EXT_WINDOW_KEY_LEN(bytes), &pResultRowInfo->cur, - POINTER_BYTES); - } else { - pResult = getResultRowByPos(pResultBuf, p1); + // add a new result set for a new group + SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset}; + taosHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos, sizeof(SResultRowPosition)); } + // 2. set the new time window to be the new active time window + pResultRowInfo->pPosition[pResultRowInfo->size++] = + (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset}; + pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset}; + // too many time window in query if (pResultRowInfo->size > MAX_INTERVAL_TIME_WINDOW) { longjmp(pTaskInfo->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW); @@ -656,7 +629,7 @@ static int32_t setResultOutputBufByKey_rv(SResultRowInfo* pResultRowInfo, int64_ int32_t numOfOutput, int32_t* rowCellInfoOffset, SAggSupporter* pAggSup, SExecTaskInfo* pTaskInfo) { assert(win->skey <= win->ekey); - SResultRow* pResultRow = doSetResultOutBufByKey_rv(pAggSup->pResultBuf, pResultRowInfo, id, (char*)&win->skey, + SResultRow* pResultRow = doSetResultOutBufByKey(pAggSup->pResultBuf, pResultRowInfo, id, (char*)&win->skey, TSDB_KEYSIZE, masterscan, tableGroupId, pTaskInfo, true, pAggSup); if (pResultRow == NULL) { @@ -1707,7 +1680,7 @@ int32_t setGroupResultOutputBuf(SOptrBasicInfo* binfo, int32_t numOfCols, char* SResultRowInfo* pResultRowInfo = &binfo->resultRowInfo; SqlFunctionCtx* pCtx = binfo->pCtx; - SResultRow* pResultRow = doSetResultOutBufByKey_rv(pBuf, pResultRowInfo, groupId, (char*)pData, bytes, true, groupId, + SResultRow* pResultRow = doSetResultOutBufByKey(pBuf, pResultRowInfo, groupId, (char*)pData, bytes, true, groupId, pTaskInfo, false, pAggSup); assert(pResultRow != NULL); @@ -2713,7 +2686,7 @@ void setFunctionResultOutput(SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t int64_t tid = 0; int64_t groupId = 0; - SResultRow* pRow = doSetResultOutBufByKey_rv(pSup->pResultBuf, pResultRowInfo, tid, (char*)&tid, sizeof(tid), true, + SResultRow* pRow = doSetResultOutBufByKey(pSup->pResultBuf, pResultRowInfo, tid, (char*)&tid, sizeof(tid), true, groupId, pTaskInfo, false, pSup); for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) { @@ -3001,7 +2974,7 @@ void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, u int32_t* rowCellInfoOffset = pAggInfo->binfo.rowCellInfoOffset; SResultRow* pResultRow = - doSetResultOutBufByKey_rv(pAggInfo->aggSup.pResultBuf, pResultRowInfo, uid, (char*)&groupId, sizeof(groupId), + doSetResultOutBufByKey(pAggInfo->aggSup.pResultBuf, pResultRowInfo, uid, (char*)&groupId, sizeof(groupId), true, groupId, pTaskInfo, false, &pAggInfo->aggSup); assert(pResultRow != NULL); @@ -4895,8 +4868,8 @@ bool aggDecodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasi initResultRow(resultRow); prepareResultListBuffer(&pInfo->resultRowInfo, pOperator->pTaskInfo->env); // pInfo->resultRowInfo.cur = pInfo->resultRowInfo.size; - pInfo->resultRowInfo.pPosition[pInfo->resultRowInfo.size++] = - (SResultRowPosition){.pageId = resultRow->pageId, .offset = resultRow->offset}; +// pInfo->resultRowInfo.pPosition[pInfo->resultRowInfo.size++] = +// (SResultRowPosition){.pageId = resultRow->pageId, .offset = resultRow->offset}; pInfo->resultRowInfo.cur = (SResultRowPosition){.pageId = resultRow->pageId, .offset = resultRow->offset}; } @@ -5609,10 +5582,10 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput); pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t)); pAggSup->pResultRowHashTable = taosHashInit(10, hashFn, true, HASH_NO_LOCK); - pAggSup->pResultRowListSet = taosHashInit(100, hashFn, false, HASH_NO_LOCK); - pAggSup->pResultRowArrayList = taosArrayInit(10, sizeof(SResultRowCell)); +// pAggSup->pResultRowListSet = taosHashInit(100, hashFn, false, HASH_NO_LOCK); +// pAggSup->pResultRowArrayList = taosArrayInit(10, sizeof(SResultRowCell)); - if (pAggSup->keyBuf == NULL || pAggSup->pResultRowArrayList == NULL || pAggSup->pResultRowListSet == NULL || + if (pAggSup->keyBuf == NULL /*|| pAggSup->pResultRowArrayList == NULL || pAggSup->pResultRowListSet == NULL*/ || pAggSup->pResultRowHashTable == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -5628,8 +5601,8 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n static void cleanupAggSup(SAggSupporter* pAggSup) { taosMemoryFreeClear(pAggSup->keyBuf); taosHashCleanup(pAggSup->pResultRowHashTable); - taosHashCleanup(pAggSup->pResultRowListSet); - taosArrayDestroy(pAggSup->pResultRowArrayList); +// taosHashCleanup(pAggSup->pResultRowListSet); +// taosArrayDestroy(pAggSup->pResultRowArrayList); destroyDiskbasedBuf(pAggSup->pResultBuf); } @@ -6352,7 +6325,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo int32_t numOfCols = 0; tsdbReaderT pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId); - if (pDataReader == NULL) { + if (pDataReader == NULL && terrno != 0) { return NULL; }