diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index f392644e6736f528fa75f21ee2857c570c0c22c9..b474bea98717034c68ca4b3beb5a3a61288a064b 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -33,15 +33,11 @@ struct SColumnFilterElem; typedef bool (*__filter_func_t)(struct SColumnFilterElem* pFilter, char* val1, char* val2); typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order); -typedef struct SPosInfo { - int32_t pageId:20; - int32_t rowId:12; -} SPosInfo; - typedef struct SGroupResInfo { int32_t groupId; int32_t numOfDataPages; - SPosInfo pos; + int32_t pageId; + int32_t rowId; } SGroupResInfo; typedef struct SSqlGroupbyExpr { @@ -53,9 +49,10 @@ typedef struct SSqlGroupbyExpr { } SSqlGroupbyExpr; typedef struct SWindowResult { - SPosInfo pos; // Position of current result in disk-based output buffer + int32_t pageId; // pageId & rowId is the position of current result in disk-based output buffer + int32_t rowId:15; + bool closed:1; // this result status: closed or opened uint16_t numOfRows; // number of rows of current time window - bool closed; // this result status: closed or opened SResultInfo* resultInfo; // For each result column, there is a resultInfo union {STimeWindow win; char* key;}; // start key of current time window } SWindowResult; diff --git a/src/query/inc/qUtil.h b/src/query/inc/qUtil.h index 5320e5622e70a93c06edb4a1e5a3fe568498ef21..32f26f66f5a46f98477db7e67c7a3a6d988fdfc8 100644 --- a/src/query/inc/qUtil.h +++ b/src/query/inc/qUtil.h @@ -51,7 +51,7 @@ static FORCE_INLINE char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int3 SQuery *pQuery = pRuntimeEnv->pQuery; - int32_t realRowId = (int32_t)(pResult->pos.rowId * GET_ROW_PARAM_FOR_MULTIOUTPUT(pQuery, pRuntimeEnv->topBotQuery, pRuntimeEnv->stableQuery)); + int32_t realRowId = (int32_t)(pResult->rowId * GET_ROW_PARAM_FOR_MULTIOUTPUT(pQuery, pRuntimeEnv->topBotQuery, pRuntimeEnv->stableQuery)); return ((char *)page->data) + pRuntimeEnv->offset[columnIndex] * pRuntimeEnv->numOfRowsPerPage + pQuery->pSelectExpr[columnIndex].bytes * realRowId; } diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index d46beab2cb88a275a15e3aa2c4de47cedf949175..dd8f83a6431b12ed2a48cb0eade4937634ca4c46 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -557,7 +557,7 @@ static STimeWindow getActiveTimeWindow(SWindowResInfo *pWindowResInfo, int64_t t static int32_t addNewWindowResultBuf(SWindowResult *pWindowRes, SDiskbasedResultBuf *pResultBuf, int32_t sid, int32_t numOfRowsPerPage) { - if (pWindowRes->pos.pageId != -1) { + if (pWindowRes->pageId != -1) { return 0; } @@ -590,11 +590,11 @@ static int32_t addNewWindowResultBuf(SWindowResult *pWindowRes, SDiskbasedResult } // set the number of rows in current disk page - if (pWindowRes->pos.pageId == -1) { // not allocated yet, allocate new buffer - pWindowRes->pos.pageId = pageId; - pWindowRes->pos.rowId = (int32_t)(pData->num++); + if (pWindowRes->pageId == -1) { // not allocated yet, allocate new buffer + pWindowRes->pageId = pageId; + pWindowRes->rowId = (int32_t)(pData->num++); - assert(pWindowRes->pos.pageId >= 0); + assert(pWindowRes->pageId >= 0); } return 0; @@ -616,7 +616,7 @@ static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowRes *newWind = true; // not assign result buffer yet, add new result buffer - if (pWindowRes->pos.pageId == -1) { + if (pWindowRes->pageId == -1) { int32_t ret = addNewWindowResultBuf(pWindowRes, pResultBuf, sid, pRuntimeEnv->numOfRowsPerPage); if (ret != TSDB_CODE_SUCCESS) { return -1; @@ -1143,7 +1143,7 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat assert(pRuntimeEnv->windowResInfo.interval == 0); - if (pWindowRes->pos.pageId == -1) { + if (pWindowRes->pageId == -1) { int32_t ret = addNewWindowResultBuf(pWindowRes, pResultBuf, GROUPRESULTID, pRuntimeEnv->numOfRowsPerPage); if (ret != 0) { return -1; @@ -2652,7 +2652,7 @@ static void doMerge(SQueryRuntimeEnv *pRuntimeEnv, int64_t timestamp, SWindowRes SQuery * pQuery = pRuntimeEnv->pQuery; SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; - tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes->pos.pageId); + tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes->pageId); for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { int32_t functionId = pQuery->pSelectExpr[i].base.functionId; @@ -2823,14 +2823,14 @@ int32_t tableResultComparFn(const void *pLeft, const void *pRight, void *param) SWindowResInfo *pWindowResInfo1 = &supporter->pTableQueryInfo[left]->windowResInfo; SWindowResult * pWindowRes1 = getWindowResult(pWindowResInfo1, leftPos); - tFilePage *page1 = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes1->pos.pageId); + tFilePage *page1 = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes1->pageId); char *b1 = getPosInResultPage(pRuntimeEnv, PRIMARYKEY_TIMESTAMP_COL_INDEX, pWindowRes1, page1); TSKEY leftTimestamp = GET_INT64_VAL(b1); SWindowResInfo *pWindowResInfo2 = &supporter->pTableQueryInfo[right]->windowResInfo; SWindowResult * pWindowRes2 = getWindowResult(pWindowResInfo2, rightPos); - tFilePage *page2 = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes2->pos.pageId); + tFilePage *page2 = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes2->pageId); char *b2 = getPosInResultPage(pRuntimeEnv, PRIMARYKEY_TIMESTAMP_COL_INDEX, pWindowRes2, page2); TSKEY rightTimestamp = GET_INT64_VAL(b2); @@ -2867,7 +2867,7 @@ int32_t mergeIntoGroupResult(SQInfo *pQInfo) { } SGroupResInfo* info = &pQInfo->groupResInfo; - if (pQInfo->groupIndex == numOfGroups && info->pos.pageId == info->numOfDataPages) { + if (pQInfo->groupIndex == numOfGroups && info->pageId == info->numOfDataPages) { SET_STABLE_QUERY_OVER(pQInfo); } @@ -2883,10 +2883,10 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) { SGroupResInfo* pGroupResInfo = &pQInfo->groupResInfo; // all results have been return to client, try next group - if (pGroupResInfo->pos.pageId == pGroupResInfo->numOfDataPages) { + if (pGroupResInfo->pageId == pGroupResInfo->numOfDataPages) { pGroupResInfo->numOfDataPages = 0; - pGroupResInfo->pos.pageId = 0; - pGroupResInfo->pos.rowId = 0; + pGroupResInfo->pageId = 0; + pGroupResInfo->rowId = 0; // current results of group has been sent to client, try next group if (mergeIntoGroupResult(pQInfo) != TSDB_CODE_SUCCESS) { @@ -2914,22 +2914,22 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) { assert(size == pGroupResInfo->numOfDataPages); bool done = false; - for (int32_t j = pGroupResInfo->pos.pageId; j < size; ++j) { + for (int32_t j = pGroupResInfo->pageId; j < size; ++j) { SPageInfo* pi = *(SPageInfo**) taosArrayGet(list, j); tFilePage* pData = getResBufPage(pResultBuf, pi->pageId); - assert(pData->num > 0 && pData->num <= pRuntimeEnv->numOfRowsPerPage && pGroupResInfo->pos.rowId < pData->num); - int32_t numOfRes = (int32_t)(pData->num - pGroupResInfo->pos.rowId); + assert(pData->num > 0 && pData->num <= pRuntimeEnv->numOfRowsPerPage && pGroupResInfo->rowId < pData->num); + int32_t numOfRes = (int32_t)(pData->num - pGroupResInfo->rowId); if (numOfRes > pQuery->rec.capacity - offset) { numOfCopiedRows = (int32_t)(pQuery->rec.capacity - offset); - pGroupResInfo->pos.rowId += numOfCopiedRows; + pGroupResInfo->rowId += numOfCopiedRows; done = true; } else { numOfCopiedRows = (int32_t)pData->num; - pGroupResInfo->pos.pageId += 1; - pGroupResInfo->pos.rowId = 0; + pGroupResInfo->pageId += 1; + pGroupResInfo->rowId = 0; } for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { @@ -3020,8 +3020,8 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) { pGroupResInfo->numOfDataPages = (int32_t)taosArrayGetSize(pageList); pGroupResInfo->groupId = tid; - pGroupResInfo->pos.pageId = 0; - pGroupResInfo->pos.rowId = 0; + pGroupResInfo->pageId = 0; + pGroupResInfo->rowId = 0; return pGroupResInfo->numOfDataPages; } @@ -3067,7 +3067,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) { SWindowResInfo *pWindowResInfo = &pTableList[pos]->windowResInfo; SWindowResult *pWindowRes = getWindowResult(pWindowResInfo, cs.position[pos]); - tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes->pos.pageId); + tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes->pageId); char *b = getPosInResultPage(pRuntimeEnv, PRIMARYKEY_TIMESTAMP_COL_INDEX, pWindowRes, page); TSKEY ts = GET_INT64_VAL(b); @@ -3104,7 +3104,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) { lastTimestamp = ts; // move to the next element of current entry - int32_t currentPageId = pWindowRes->pos.pageId; + int32_t currentPageId = pWindowRes->pageId; cs.position[pos] += 1; if (cs.position[pos] >= pWindowResInfo->size) { @@ -3117,7 +3117,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) { } else { // current page is not needed anymore SWindowResult *pNextWindowRes = getWindowResult(pWindowResInfo, cs.position[pos]); - if (pNextWindowRes->pos.pageId != currentPageId) { + if (pNextWindowRes->pageId != currentPageId) { releaseResBufPage(pRuntimeEnv->pResultBuf, page); } } @@ -3329,7 +3329,8 @@ int32_t createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool is return TSDB_CODE_QRY_OUT_OF_MEMORY; } - pResultRow->pos = (SPosInfo) {-1, -1}; + pResultRow->pageId = -1; + pResultRow->rowId = -1; char* buf = (char*) pResultRow->resultInfo + numOfCols * sizeof(SResultInfo); @@ -3796,7 +3797,7 @@ void setExecutionContext(SQInfo *pQInfo, int32_t groupIndex, TSKEY nextKey) { * not assign result buffer yet, add new result buffer * all group belong to one result set, and each group result has different group id so set the id to be one */ - if (pWindowRes->pos.pageId == -1) { + if (pWindowRes->pageId == -1) { if (addNewWindowResultBuf(pWindowRes, pRuntimeEnv->pResultBuf, groupIndex, pRuntimeEnv->numOfRowsPerPage) != TSDB_CODE_SUCCESS) { return; @@ -3813,7 +3814,7 @@ void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pResult SQuery *pQuery = pRuntimeEnv->pQuery; // Note: pResult->pos[i]->num == 0, there is only fixed number of results for each group - tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pResult->pos.pageId); + tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pResult->pageId); for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i]; @@ -3840,7 +3841,7 @@ void setWindowResOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult * SQuery *pQuery = pRuntimeEnv->pQuery; // Note: pResult->pos[i]->num == 0, there is only fixed number of results for each group - tFilePage* bufPage = getResBufPage(pRuntimeEnv->pResultBuf, pResult->pos.pageId); + tFilePage* bufPage = getResBufPage(pRuntimeEnv->pResultBuf, pResult->pageId); for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i]; @@ -4019,12 +4020,12 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResInfo *pResultInfo, int32_ for (int32_t i = startIdx; (i < totalSet) && (i >= 0); i += step) { if (result[i].numOfRows == 0) { pQInfo->groupIndex += 1; - pGroupResInfo->pos.rowId = 0; + pGroupResInfo->rowId = 0; continue; } - int32_t numOfRowsToCopy = result[i].numOfRows - pGroupResInfo->pos.rowId; - int32_t oldOffset = pGroupResInfo->pos.rowId; + int32_t numOfRowsToCopy = result[i].numOfRows - pGroupResInfo->rowId; + int32_t oldOffset = pGroupResInfo->rowId; /* * current output space is not enough to accommodate all data of this page, only partial results @@ -4032,13 +4033,13 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResInfo *pResultInfo, int32_ */ if (numOfRowsToCopy > pQuery->rec.capacity - numOfResult) { numOfRowsToCopy = (int32_t) pQuery->rec.capacity - numOfResult; - pGroupResInfo->pos.rowId += numOfRowsToCopy; + pGroupResInfo->rowId += numOfRowsToCopy; } else { - pGroupResInfo->pos.rowId = 0; + pGroupResInfo->rowId = 0; pQInfo->groupIndex += 1; } - tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, result[i].pos.pageId); + tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, result[i].pageId); for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { int32_t size = pRuntimeEnv->pCtx[j].outputBytes; diff --git a/src/query/src/qUtil.c b/src/query/src/qUtil.c index 2bd92c74a456c16fe946288968d25d797907a390..ac95afffb157847269122bf14f65a3308ef5d9a2 100644 --- a/src/query/src/qUtil.c +++ b/src/query/src/qUtil.c @@ -266,7 +266,7 @@ void clearTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pWindow return; } - tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes->pos.pageId); + tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes->pageId); for (int32_t i = 0; i < pRuntimeEnv->pQuery->numOfOutput; ++i) { SResultInfo *pResultInfo = &pWindowRes->resultInfo[i]; @@ -279,7 +279,8 @@ void clearTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pWindow } pWindowRes->numOfRows = 0; - pWindowRes->pos = (SPosInfo){-1, -1}; + pWindowRes->pageId = -1; + pWindowRes->rowId = -1; pWindowRes->closed = false; pWindowRes->win = TSWINDOW_INITIALIZER; } @@ -308,10 +309,10 @@ void copyTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *dst, con memcpy(pDst->interResultBuf, pSrc->interResultBuf, pDst->bufLen); // copy the output buffer data from src to dst, the position info keep unchanged - tFilePage *dstpage = getResBufPage(pRuntimeEnv->pResultBuf, dst->pos.pageId); + tFilePage *dstpage = getResBufPage(pRuntimeEnv->pResultBuf, dst->pageId); char * dstBuf = getPosInResultPage(pRuntimeEnv, i, dst, dstpage); - tFilePage *srcpage = getResBufPage(pRuntimeEnv->pResultBuf, src->pos.pageId); + tFilePage *srcpage = getResBufPage(pRuntimeEnv->pResultBuf, src->pageId); char * srcBuf = getPosInResultPage(pRuntimeEnv, i, (SWindowResult *)src, srcpage); size_t s = pRuntimeEnv->pQuery->pSelectExpr[i].bytes;