From fbe202ce410c3c32c2511a35934d6c09368f0ed1 Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Mon, 10 Feb 2020 00:08:01 +0800 Subject: [PATCH] fix bugs in regression test. --- src/client/src/tscSQLParser.c | 6 +- src/client/src/tscStream.c | 4 +- src/system/detail/inc/vnodeQueryImpl.h | 8 +- src/system/detail/inc/vnodeRead.h | 2 +- src/system/detail/src/vnodeQueryImpl.c | 594 ++++++++++++---------- src/system/detail/src/vnodeQueryProcess.c | 4 +- 6 files changed, 342 insertions(+), 276 deletions(-) diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 65b9af611a..da7e22fe1e 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -72,7 +72,7 @@ static void setColumnOffsetValueInResultset(SQueryInfo* pQueryInfo); static int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, tVariantList* pList, SSqlCmd* pCmd); static int32_t parseIntervalClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql); -static int32_t setSlidingClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql); +static int32_t parseSlidingClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql); static int32_t addProjectionExprAndResultField(SQueryInfo* pQueryInfo, tSQLExprItem* pItem); @@ -657,14 +657,14 @@ int32_t parseIntervalClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) { return ret; } - if (setSlidingClause(pQueryInfo, pQuerySql) != TSDB_CODE_SUCCESS) { + if (parseSlidingClause(pQueryInfo, pQuerySql) != TSDB_CODE_SUCCESS) { return TSDB_CODE_INVALID_SQL; } return TSDB_CODE_SUCCESS; } -int32_t setSlidingClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) { +int32_t parseSlidingClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) { const char* msg0 = "sliding value too small"; const char* msg1 = "sliding value no larger than the interval value"; diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index 9fc9706dd9..1b5b55352e 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -396,7 +396,9 @@ static void tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) { int64_t minSlidingTime = (pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsMinSlidingTime * 1000L : tsMinSlidingTime; - if (pQueryInfo->nSlidingTime < minSlidingTime) { + if (pQueryInfo->nSlidingTime == -1) { + pQueryInfo->nSlidingTime = pQueryInfo->nAggTimeInterval; + } else if (pQueryInfo->nSlidingTime < minSlidingTime) { tscWarn("%p stream:%p, original sliding value:%" PRId64 " too small, reset to:%" PRId64 "", pSql, pStream, pQueryInfo->nSlidingTime, minSlidingTime); diff --git a/src/system/detail/inc/vnodeQueryImpl.h b/src/system/detail/inc/vnodeQueryImpl.h index 2b89cae317..40b65aa163 100644 --- a/src/system/detail/inc/vnodeQueryImpl.h +++ b/src/system/detail/inc/vnodeQueryImpl.h @@ -276,11 +276,11 @@ void displayInterResult(SData** pdata, SQuery* pQuery, int32_t numOfRows); void vnodePrintQueryStatistics(SMeterQuerySupportObj* pSupporter); -void clearGroupResultBuf(SOutputRes* pOneOutputRes, int32_t nOutputCols); -void copyGroupResultBuf(SOutputRes* dst, const SOutputRes* src, int32_t nOutputCols); +void clearGroupResultBuf(SQueryRuntimeEnv *pRuntimeEnv, SOutputRes *pOneOutputRes); +void copyGroupResultBuf(SQueryRuntimeEnv *pRuntimeEnv, SOutputRes* dst, const SOutputRes* src); -void resetSlidingWindowInfo(SSlidingWindowInfo* pSlidingWindowInfo, int32_t numOfCols); -void clearCompletedSlidingWindows(SSlidingWindowInfo* pSlidingWindowInfo, int32_t numOfCols); +void resetSlidingWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SSlidingWindowInfo* pSlidingWindowInfo); +void clearCompletedSlidingWindows(SQueryRuntimeEnv* pRuntimeEnv); int32_t numOfClosedSlidingWindow(SSlidingWindowInfo* pSlidingWindowInfo); void closeSlidingWindow(SSlidingWindowInfo* pSlidingWindowInfo, int32_t slot); void closeAllSlidingWindow(SSlidingWindowInfo* pSlidingWindowInfo); diff --git a/src/system/detail/inc/vnodeRead.h b/src/system/detail/inc/vnodeRead.h index ed7b965acb..bda53cd3d8 100644 --- a/src/system/detail/inc/vnodeRead.h +++ b/src/system/detail/inc/vnodeRead.h @@ -165,7 +165,7 @@ typedef struct SQueryRuntimeEnv { SInterpolationInfo interpoInfo; SData** pInterpoBuf; - SSlidingWindowInfo swindowResInfo; + SSlidingWindowInfo swindowResInfo; STSBuf* pTSBuf; STSCursor cur; diff --git a/src/system/detail/src/vnodeQueryImpl.c b/src/system/detail/src/vnodeQueryImpl.c index 07799775bd..017ff02cfc 100644 --- a/src/system/detail/src/vnodeQueryImpl.c +++ b/src/system/detail/src/vnodeQueryImpl.c @@ -68,9 +68,6 @@ static int32_t getNextDataFileCompInfo(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj static void setGroupOutputBuffer(SQueryRuntimeEnv *pRuntimeEnv, SOutputRes *pResult); static void getAlignedIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, TSKEY keyInData, TSKEY skey, TSKEY ekey); -static void doApplyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQueryInfo, - SBlockInfo *pBlockInfo, int64_t *pPrimaryCol, SField *pFields, - __block_search_fn_t searchFn); static int32_t saveResult(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQueryInfo, int32_t numOfResult); static void applyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMeterDataInfo *pMeterDataInfo, @@ -589,7 +586,8 @@ static void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, int64_t StartQue char *primaryColumnData, int32_t size, int32_t functionId, SField *pField, bool hasNull, int32_t blockStatus, void *param, int32_t scanFlag); -void createGroupResultBuf(SQuery *pQuery, SOutputRes *pOneResult, bool isSTableQuery); +void createQueryResultBuf(SQueryRuntimeEnv *pRuntimeEnv, SOutputRes *pResultRow, bool isSTableQuery, SPosInfo *posInfo); + static void destroyGroupResultBuf(SOutputRes *pOneOutputRes, int32_t nOutputCols); static int32_t binarySearchForBlockImpl(SCompBlock *pBlock, int32_t numOfBlocks, TSKEY skey, int32_t order) { @@ -1614,7 +1612,7 @@ static SOutputRes *doSetSlidingWindowFromKey(SSlidingWindowInfo *pSlidingWindowI return &pSlidingWindowInfo->pResult[p]; } -static int32_t initSlidingWindowInfo(SSlidingWindowInfo *pSlidingWindowInfo, int32_t threshold, int16_t type, +static int32_t initSlidingWindowInfo(SSlidingWindowInfo *pSlidingWindowInfo, int32_t threshold, int16_t type, int32_t rowSizes, SOutputRes *pRes) { pSlidingWindowInfo->capacity = threshold; pSlidingWindowInfo->threshold = threshold; @@ -1627,8 +1625,18 @@ static int32_t initSlidingWindowInfo(SSlidingWindowInfo *pSlidingWindowInfo, int pSlidingWindowInfo->curIndex = -1; pSlidingWindowInfo->size = 0; pSlidingWindowInfo->pResult = pRes; - pSlidingWindowInfo->pStatus = calloc(threshold, sizeof(SWindowStatus)); +// createResultBuf(&pSlidingWindowInfo->pResultBuf, 10, rowSizes); + + pSlidingWindowInfo->pStatus = calloc(threshold, sizeof(SWindowStatus)); +// pSlidingWindowInfo->pResultInfo = calloc(threshold, POINTER_BYTES); + +// for(int32_t i = 0; i < threshold; ++i) { +// pSlidingWindowInfo->pResultInfo[i] = calloc((size_t)numOfOutput, sizeof(SResultInfo)); + + +// } + if (pSlidingWindowInfo->pStatus == NULL || pSlidingWindowInfo->hashList == NULL) { return -1; } @@ -1643,17 +1651,19 @@ static void destroySlidingWindowInfo(SSlidingWindowInfo *pSlidingWindowInfo) { } taosCleanUpHashTable(pSlidingWindowInfo->hashList); +// destroyResultBuf(pSlidingWindowInfo->pResultBuf); + tfree(pSlidingWindowInfo->pStatus); } -void resetSlidingWindowInfo(SSlidingWindowInfo *pSlidingWindowInfo, int32_t numOfCols) { +void resetSlidingWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SSlidingWindowInfo *pSlidingWindowInfo) { if (pSlidingWindowInfo == NULL || pSlidingWindowInfo->capacity == 0) { return; } for (int32_t i = 0; i < pSlidingWindowInfo->size; ++i) { SOutputRes *pOneRes = &pSlidingWindowInfo->pResult[i]; - clearGroupResultBuf(pOneRes, numOfCols); + clearGroupResultBuf(pRuntimeEnv, pOneRes); } memset(pSlidingWindowInfo->pStatus, 0, sizeof(SWindowStatus) * pSlidingWindowInfo->capacity); @@ -1669,7 +1679,8 @@ void resetSlidingWindowInfo(SSlidingWindowInfo *pSlidingWindowInfo, int32_t numO pSlidingWindowInfo->prevSKey = 0; } -void clearCompletedSlidingWindows(SSlidingWindowInfo *pSlidingWindowInfo, int32_t numOfCols) { +void clearCompletedSlidingWindows(SQueryRuntimeEnv* pRuntimeEnv) { + SSlidingWindowInfo* pSlidingWindowInfo = &pRuntimeEnv->swindowResInfo; if (pSlidingWindowInfo == NULL || pSlidingWindowInfo->capacity == 0 || pSlidingWindowInfo->size == 0) { return; } @@ -1689,17 +1700,18 @@ void clearCompletedSlidingWindows(SSlidingWindowInfo *pSlidingWindowInfo, int32_ } int32_t remain = pSlidingWindowInfo->size - i; + //clear remain list memmove(pSlidingWindowInfo->pStatus, &pSlidingWindowInfo->pStatus[i], remain * sizeof(SWindowStatus)); memset(&pSlidingWindowInfo->pStatus[remain], 0, (pSlidingWindowInfo->capacity - remain) * sizeof(SWindowStatus)); for(int32_t k = 0; k < remain; ++k) { - copyGroupResultBuf(&pSlidingWindowInfo->pResult[k], &pSlidingWindowInfo->pResult[i + k], numOfCols); + copyGroupResultBuf(pRuntimeEnv, &pSlidingWindowInfo->pResult[k], &pSlidingWindowInfo->pResult[i + k]); } for(int32_t k = remain; k < pSlidingWindowInfo->size; ++k) { SOutputRes *pOneRes = &pSlidingWindowInfo->pResult[k]; - clearGroupResultBuf(pOneRes, numOfCols); + clearGroupResultBuf(pRuntimeEnv, pOneRes); } pSlidingWindowInfo->size = remain; @@ -1720,14 +1732,12 @@ void clearCompletedSlidingWindows(SSlidingWindowInfo *pSlidingWindowInfo, int32_ } int32_t numOfClosedSlidingWindow(SSlidingWindowInfo *pSlidingWindowInfo) { - for (int32_t i = 0; i < pSlidingWindowInfo->size; ++i) { - SWindowStatus *pStatus = &pSlidingWindowInfo->pStatus[i]; - if (pStatus->closed == false) { - return i; - } + int32_t i = 0; + while(i < pSlidingWindowInfo->size && pSlidingWindowInfo->pStatus[i].closed) { + ++i; } - return 0; + return i; } void closeSlidingWindow(SSlidingWindowInfo* pSlidingWindowInfo, int32_t slot) { @@ -2469,20 +2479,21 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, int64_t startQueryTimes } // set the output buffer for the selectivity + tag query -static void setCtxTagColumnInfo(SQuery *pQuery, SQueryRuntimeEnv *pRuntimeEnv) { +static void setCtxTagColumnInfo(SQuery *pQuery, SQLFunctionCtx *pCtx) { if (isSelectivityWithTagsQuery(pQuery)) { int32_t num = 0; - SQLFunctionCtx *pCtx = NULL; + SQLFunctionCtx *p = NULL; + int16_t tagLen = 0; SQLFunctionCtx **pTagCtx = calloc(pQuery->numOfOutputCols, POINTER_BYTES); for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { SSqlFuncExprMsg *pSqlFuncMsg = &pQuery->pSelectExpr[i].pBase; if (pSqlFuncMsg->functionId == TSDB_FUNC_TAG_DUMMY || pSqlFuncMsg->functionId == TSDB_FUNC_TS_DUMMY) { - tagLen += pRuntimeEnv->pCtx[i].outputBytes; - pTagCtx[num++] = &pRuntimeEnv->pCtx[i]; + tagLen += pCtx[i].outputBytes; + pTagCtx[num++] = &pCtx[i]; } else if ((aAggs[pSqlFuncMsg->functionId].nStatus & TSDB_FUNCSTATE_SELECTIVITY) != 0) { - pCtx = &pRuntimeEnv->pCtx[i]; + p = &pCtx[i]; } else if (pSqlFuncMsg->functionId == TSDB_FUNC_TS || pSqlFuncMsg->functionId == TSDB_FUNC_TAG) { // tag function may be the group by tag column // ts may be the required primary timestamp column @@ -2492,14 +2503,14 @@ static void setCtxTagColumnInfo(SQuery *pQuery, SQueryRuntimeEnv *pRuntimeEnv) { } } - pCtx->tagInfo.pTagCtxList = pTagCtx; - pCtx->tagInfo.numOfTagCols = num; - pCtx->tagInfo.tagsLen = tagLen; + p->tagInfo.pTagCtxList = pTagCtx; + p->tagInfo.numOfTagCols = num; + p->tagInfo.tagsLen = tagLen; } } static int32_t setupQueryRuntimeEnv(SMeterObj *pMeterObj, SQuery *pQuery, SQueryRuntimeEnv *pRuntimeEnv, - tTagSchema *pTagsSchema, int16_t order, bool isSTableQuery) { + SColumnModel *pTagsSchema, int16_t order, bool isSTableQuery) { dTrace("QInfo:%p setup runtime env", GET_QINFO_ADDR(pQuery)); pRuntimeEnv->pMeterObj = pMeterObj; @@ -2577,7 +2588,7 @@ static int32_t setupQueryRuntimeEnv(SMeterObj *pMeterObj, SQuery *pQuery, SQuery resetCtxOutputBuf(pRuntimeEnv); } - setCtxTagColumnInfo(pQuery, pRuntimeEnv); + setCtxTagColumnInfo(pQuery, pRuntimeEnv->pCtx); // for loading block data in memory assert(vnodeList[pMeterObj->vnode].cfg.rowsInFileBlock == pMeterObj->pointsPerFileBlock); @@ -4123,21 +4134,29 @@ static void allocMemForInterpo(SMeterQuerySupportObj *pSupporter, SQuery *pQuery } } -static int32_t allocateOutputBufForGroup(SMeterQuerySupportObj *pSupporter, SQuery *pQuery, bool isSTableQuery) { - int32_t slot = 0; +static int32_t createQueryOutputBuffer(SMeterQuerySupportObj *pSupporter, SQuery *pQuery, bool isSTableQuery) { + SQueryRuntimeEnv* pRuntimeEnv = &pSupporter->runtimeEnv; + + int32_t numOfRows = 0; if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (pQuery->nAggTimeInterval > 0 && pQuery->slidingTime > 0)) { - slot = 10000; + numOfRows = 10000; } else { - slot = pSupporter->pSidSet->numOfSubSet; + numOfRows = pSupporter->pSidSet->numOfSubSet; } - pSupporter->pResult = calloc(1, sizeof(SOutputRes) * slot); + createResultBuf(&pRuntimeEnv->pResultBuf, 100, pQuery->rowSize); + + // total number of initial results + pSupporter->pResult = calloc(numOfRows, sizeof(SOutputRes)); if (pSupporter->pResult == NULL) { return TSDB_CODE_SERV_OUT_OF_MEMORY; } - for (int32_t k = 0; k < slot; ++k) { + int32_t pageId = -1; + tFilePage* page = NULL; + + for (int32_t k = 0; k < numOfRows; ++k) { SOutputRes *pOneRes = &pSupporter->pResult[k]; pOneRes->nAlloc = 1; @@ -4152,7 +4171,16 @@ static int32_t allocateOutputBufForGroup(SMeterQuerySupportObj *pSupporter, SQue pOneRes->nAlloc = pExpr->pBase.arg[0].argValue.i64; } - createGroupResultBuf(pQuery, pOneRes, isSTableQuery); + if (page == NULL || page->numOfElems >= pRuntimeEnv->numOfRowsPerPage) { + page = getNewDataBuf(pRuntimeEnv->pResultBuf, 0, &pageId); + } + + assert(pageId >= 0); + + SPosInfo posInfo = {.pageId = pageId, .rowId = page->numOfElems}; + + createQueryResultBuf(pRuntimeEnv, pOneRes, isSTableQuery, &posInfo); + page->numOfElems += 1; // next row is available } return TSDB_CODE_SUCCESS; @@ -4214,6 +4242,32 @@ _error_clean: return TSDB_CODE_SERV_OUT_OF_MEMORY; } +static int32_t getRowParamForMultiRowsOutput(SQuery* pQuery, bool isSTableQuery) { + int32_t rowparam = 1; + + if (isTopBottomQuery(pQuery) && (!isSTableQuery)) { + rowparam = pQuery->pSelectExpr[1].pBase.arg->argValue.i64; + } + + return rowparam; +} + +static int32_t getNumOfRowsInResultPage(SQuery* pQuery, bool isSTableQuery) { + int32_t rowSize = pQuery->rowSize * getRowParamForMultiRowsOutput(pQuery, isSTableQuery); + return (DEFAULT_INTERN_BUF_SIZE - sizeof(tFilePage)) / rowSize; +} + +static char* getPosInResultPage(SQueryRuntimeEnv* pRuntimeEnv, int32_t columnIndex, SOutputRes* pResult) { + SQuery* pQuery = pRuntimeEnv->pQuery; + tFilePage* page = getResultBufferPageById(pRuntimeEnv->pResultBuf, pResult->pos.pageId); + + int32_t numOfRows = getNumOfRowsInResultPage(pQuery, pRuntimeEnv->stableQuery); + int32_t realRowId = pResult->pos.rowId * getRowParamForMultiRowsOutput(pQuery, pRuntimeEnv->stableQuery); + + return ((char*)page->data) + pRuntimeEnv->offset[columnIndex] * numOfRows + + pQuery->pSelectExpr[columnIndex].resBytes * realRowId; +} + int32_t vnodeQuerySingleMeterPrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, SMeterQuerySupportObj *pSupporter, void *param) { SQuery *pQuery = &pQInfo->query; @@ -4283,9 +4337,10 @@ int32_t vnodeQuerySingleMeterPrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, SMete } vnodeRecordAllFiles(pQInfo, pMeterObj->vnode); - + + pRuntimeEnv->numOfRowsPerPage = getNumOfRowsInResultPage(pQuery, false); if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (pQuery->nAggTimeInterval > 0 && pQuery->slidingTime > 0)) { - if ((code = allocateOutputBufForGroup(pSupporter, pQuery, false)) != TSDB_CODE_SUCCESS) { + if ((code = createQueryOutputBuffer(pSupporter, pQuery, false)) != TSDB_CODE_SUCCESS) { return code; } @@ -4296,8 +4351,7 @@ int32_t vnodeQuerySingleMeterPrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, SMete type = TSDB_DATA_TYPE_TIMESTAMP; } - // todo bug! - initSlidingWindowInfo(&pRuntimeEnv->swindowResInfo, 3, type, pSupporter->pResult); + initSlidingWindowInfo(&pRuntimeEnv->swindowResInfo, 3, type, pQuery->rowSize, pSupporter->pResult); } pSupporter->rawSKey = pQuery->skey; @@ -4492,7 +4546,7 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) pQuery->lastKey = pQuery->skey; // create runtime environment - tTagSchema *pTagSchemaInfo = pSupporter->pSidSet->pColumnModel; + SColumnModel *pTagSchemaInfo = pSupporter->pSidSet->pColumnModel; // get one queried meter SMeterObj *pMeter = getMeterObj(pSupporter->pMetersHashTable, pSupporter->pSidSet->pSids[0]->sid); @@ -4519,25 +4573,25 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) tSidSetSort(pSupporter->pSidSet); vnodeRecordAllFiles(pQInfo, pMeter->vnode); - if ((ret = allocateOutputBufForGroup(pSupporter, pQuery, true)) != TSDB_CODE_SUCCESS) { + if ((ret = createQueryOutputBuffer(pSupporter, pQuery, true)) != TSDB_CODE_SUCCESS) { return ret; } if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // group by columns not tags; int16_t type = getGroupbyColumnType(pQuery, pQuery->pGroupbyExpr); - initSlidingWindowInfo(&pRuntimeEnv->swindowResInfo, 10039, type, pSupporter->pResult); + initSlidingWindowInfo(&pRuntimeEnv->swindowResInfo, 4096, type, pQuery->rowSize, pSupporter->pResult); } if (pQuery->nAggTimeInterval != 0) { // one page for each table at least - ret = createResultBuf(&pSupporter->pResultBuf, pSupporter->numOfMeters, pQuery->rowSize); + ret = createResultBuf(&pRuntimeEnv->pResultBuf, pSupporter->numOfMeters, pQuery->rowSize); if (ret != TSDB_CODE_SUCCESS) { return ret; } - - pRuntimeEnv->numOfRowsPerPage = (DEFAULT_INTERN_BUF_SIZE - sizeof(tFilePage)) / pQuery->rowSize; } - + + pRuntimeEnv->numOfRowsPerPage = getNumOfRowsInResultPage(pQuery, true); + // metric query do not invoke interpolation, it will be done at the second-stage merge if (!isPointInterpoQuery(pQuery)) { pQuery->interpoType = TSDB_INTERPO_NONE; @@ -5030,7 +5084,7 @@ void queryOnBlock(SMeterQuerySupportObj *pSupporter, int64_t *primaryKeys, int32 * set tag value in SQLFunctionCtx * e.g.,tag information into input buffer */ -static void doSetTagValueInParam(tTagSchema *pTagSchema, int32_t tagColIdx, SMeterSidExtInfo *pMeterSidInfo, +static void doSetTagValueInParam(SColumnModel *pTagSchema, int32_t tagColIdx, SMeterSidExtInfo *pMeterSidInfo, tVariant *param) { assert(tagColIdx >= 0); @@ -5050,7 +5104,7 @@ static void doSetTagValueInParam(tTagSchema *pTagSchema, int32_t tagColIdx, SMet void vnodeSetTagValueInParam(tSidSet *pSidSet, SQueryRuntimeEnv *pRuntimeEnv, SMeterSidExtInfo *pMeterSidInfo) { SQuery * pQuery = pRuntimeEnv->pQuery; - tTagSchema *pTagSchema = pSidSet->pColumnModel; + SColumnModel *pTagSchema = pSidSet->pColumnModel; SSqlFuncExprMsg *pFuncMsg = &pQuery->pSelectExpr[0].pBase; if (pQuery->numOfOutputCols == 1 && pFuncMsg->functionId == TSDB_FUNC_TS_COMP) { @@ -5246,7 +5300,7 @@ typedef struct SCompSupporter { int64_t getCurrentTimestamp(SCompSupporter *pSupportor, int32_t meterIdx) { Position * pPos = &pSupportor->pPosition[meterIdx]; - tFilePage *pPage = getMeterDataPage(pSupportor->pSupporter->pResultBuf, + tFilePage *pPage = getMeterDataPage(pSupportor->pSupporter->runtimeEnv.pResultBuf, pSupportor->pMeterDataInfo[meterIdx]->pMeterQInfo, pPos->pageIdx); return *(int64_t *)(pPage->data + TSDB_KEYSIZE * pPos->rowIdx); @@ -5257,7 +5311,7 @@ int32_t meterResultComparator(const void *pLeft, const void *pRight, void *param int32_t right = *(int32_t *)pRight; SCompSupporter *supporter = (SCompSupporter *)param; - SQueryResultBuf* pResultBuf = supporter->pSupporter->pResultBuf; + SQueryResultBuf* pResultBuf = supporter->pSupporter->runtimeEnv.pResultBuf; Position leftPos = supporter->pPosition[left]; Position rightPos = supporter->pPosition[right]; @@ -5335,7 +5389,7 @@ void copyResToQueryResultBuf(SMeterQuerySupportObj *pSupporter, SQuery *pQuery) } SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv; - SQueryResultBuf* pResultBuf = pSupporter->pResultBuf; + SQueryResultBuf* pResultBuf = pRuntimeEnv->pResultBuf; SIDList list = getDataBufPagesIdList(pResultBuf, 200000 + pSupporter->offset + (pSupporter->subgroupIdx - 1)* 10000); @@ -5383,7 +5437,7 @@ int32_t doMergeMetersResultsToGroupRes(SMeterQuerySupportObj *pSupporter, SQuery int32_t numOfMeters = 0; for (int32_t i = start; i < end; ++i) { int32_t sid = pMeterDataInfo[i].pMeterQInfo->sid; - SIDList list = getDataBufPagesIdList(pSupporter->pResultBuf, sid); + SIDList list = getDataBufPagesIdList(pRuntimeEnv->pResultBuf, sid); if (list.size > 0 && pMeterDataInfo[i].pMeterQInfo->numOfRes > 0) { pValidMeter[numOfMeters] = &pMeterDataInfo[i]; @@ -5415,7 +5469,7 @@ int32_t doMergeMetersResultsToGroupRes(SMeterQuerySupportObj *pSupporter, SQuery while (1) { int32_t pos = pTree->pNode[0].index; Position * position = &cs.pPosition[pos]; - SQueryResultBuf* pResultBuf = cs.pSupporter->pResultBuf; + SQueryResultBuf* pResultBuf = cs.pSupporter->runtimeEnv.pResultBuf; tFilePage *pPage = getMeterDataPage(pResultBuf, pValidMeter[pos]->pMeterQInfo, position->pageIdx); int64_t ts = getCurrentTimestamp(&cs, pos); @@ -5447,7 +5501,7 @@ int32_t doMergeMetersResultsToGroupRes(SMeterQuerySupportObj *pSupporter, SQuery cs.pPosition[pos].pageIdx += 1; // try next page // check if current page is empty or not. if it is empty, ignore it and try next - SIDList list = getDataBufPagesIdList(pSupporter->pResultBuf, cs.pMeterDataInfo[pos]->pMeterQInfo->sid); + SIDList list = getDataBufPagesIdList(pRuntimeEnv->pResultBuf, cs.pMeterDataInfo[pos]->pMeterQInfo->sid); if (cs.pPosition[pos].pageIdx <= list.size - 1) { tFilePage *newPage = getMeterDataPage(pResultBuf, pValidMeter[pos]->pMeterQInfo, position->pageIdx); @@ -5505,7 +5559,7 @@ int32_t doMergeMetersResultsToGroupRes(SMeterQuerySupportObj *pSupporter, SQuery int32_t flushFromResultBuf(SMeterQuerySupportObj *pSupporter, const SQuery *pQuery, const SQueryRuntimeEnv *pRuntimeEnv) { - SQueryResultBuf* pResultBuf = pSupporter->pResultBuf; + SQueryResultBuf* pResultBuf = pRuntimeEnv->pResultBuf; int32_t capacity = (DEFAULT_INTERN_BUF_SIZE - sizeof(tFilePage))/ pQuery->rowSize; // the base value for group result, since the maximum number of table for each vnode will not exceed 100,000. @@ -5645,49 +5699,62 @@ void enableFunctForMasterScan(SQueryRuntimeEnv *pRuntimeEnv, int32_t order) { pQuery->order.order = (pQuery->order.order ^ 1); } -void createGroupResultBuf(SQuery *pQuery, SOutputRes *pOneResult, bool isSTableQuery) { - int32_t numOfOutput = pQuery->numOfOutputCols; - - pOneResult->resultInfo = calloc((size_t)numOfOutput, sizeof(SResultInfo)); - - pOneResult->result = malloc(POINTER_BYTES * numOfOutput); - for (int32_t i = 0; i < numOfOutput; ++i) { - size_t size = pQuery->pSelectExpr[i].interResBytes; - SResultInfo *pResInfo = &pOneResult->resultInfo[i]; - - pOneResult->result[i] = malloc(sizeof(tFilePage) + size * pOneResult->nAlloc); - pOneResult->result[i]->numOfElems = 0; +void createQueryResultBuf(SQueryRuntimeEnv *pRuntimeEnv, SOutputRes *pResultRow, bool isSTableQuery, SPosInfo *posInfo) { + SQuery* pQuery = pRuntimeEnv->pQuery; + + int32_t numOfCols = pQuery->numOfOutputCols; - setResultInfoBuf(pResInfo, (int32_t)size, isSTableQuery); + pResultRow->resultInfo = calloc((size_t)numOfCols, sizeof(SResultInfo)); + pResultRow->pos = *posInfo;//page->data + (pRuntimeEnv->offset[i] * pRuntimeEnv->numOfRowsPerPage) + page->numOfElems*s1; + + for (int32_t i = 0; i < numOfCols; ++i) { + SResultInfo *pResultInfo = &pResultRow->resultInfo[i]; + size_t size = pQuery->pSelectExpr[i].interResBytes; + setResultInfoBuf(pResultInfo, (int32_t)size, isSTableQuery); } } -void clearGroupResultBuf(SOutputRes *pOneOutputRes, int32_t nOutputCols) { +void clearGroupResultBuf(SQueryRuntimeEnv *pRuntimeEnv, SOutputRes *pOneOutputRes) { if (pOneOutputRes == NULL) { return; } - for (int32_t i = 0; i < nOutputCols; ++i) { - SResultInfo *pResInfo = &pOneOutputRes->resultInfo[i]; - int32_t size = sizeof(tFilePage) + pResInfo->bufLen * pOneOutputRes->nAlloc; + for (int32_t i = 0; i < pRuntimeEnv->pQuery->numOfOutputCols; ++i) { + SResultInfo *pResultInfo = &pOneOutputRes->resultInfo[i]; +// int32_t size = sizeof(tFilePage) + pResultInfo->bufLen * pOneOutputRes->nAlloc; - memset(pOneOutputRes->result[i], 0, (size_t)size); - resetResultInfo(pResInfo); +// memset(pOneOutputRes->pos[i], 0, (size_t)size); + char* s = getPosInResultPage(pRuntimeEnv, i, pOneOutputRes); + size_t size = pRuntimeEnv->pQuery->pSelectExpr[i].resBytes; + memset(s, 0, size); + + resetResultInfo(pResultInfo); } } -void copyGroupResultBuf(SOutputRes* dst, const SOutputRes* src, int32_t nOutputCols) { +void copyGroupResultBuf(SQueryRuntimeEnv *pRuntimeEnv, SOutputRes* dst, const SOutputRes* src) { + dst->numOfRows = src->numOfRows; + dst->nAlloc = src->nAlloc; + + int32_t nOutputCols = pRuntimeEnv->pQuery->numOfOutputCols; + for(int32_t i = 0; i < nOutputCols; ++i) { SResultInfo *pDst = &dst->resultInfo[i]; SResultInfo *pSrc = &src->resultInfo[i]; char* buf = pDst->interResultBuf; memcpy(pDst, pSrc, sizeof(SResultInfo)); - pDst->interResultBuf = buf; + pDst->interResultBuf = buf; // restore the allocated buffer + + // copy the result info struct memcpy(pDst->interResultBuf, pSrc->interResultBuf, pDst->bufLen); - - int32_t size = sizeof(tFilePage) + pSrc->bufLen * src->nAlloc; - memcpy(dst->result[i], src->result[i], size); + + // copy the output buffer data from src to dst, the position info keep unchanged + char* dstBuf = getPosInResultPage(pRuntimeEnv, i, dst); + char* srcBuf = getPosInResultPage(pRuntimeEnv, i, src); + size_t s = pRuntimeEnv->pQuery->pSelectExpr[i].resBytes; + + memcpy(dstBuf, srcBuf, s); } } @@ -5697,12 +5764,12 @@ void destroyGroupResultBuf(SOutputRes *pOneOutputRes, int32_t nOutputCols) { } for (int32_t i = 0; i < nOutputCols; ++i) { - free(pOneOutputRes->result[i]); +// free(pOneOutputRes->pos[i]); free(pOneOutputRes->resultInfo[i].interResultBuf); } free(pOneOutputRes->resultInfo); - free(pOneOutputRes->result); +// free(pOneOutputRes->result); } void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) { @@ -5766,6 +5833,7 @@ void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) { for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) { int32_t functionId = pQuery->pSelectExpr[j].pBase.functionId; pRuntimeEnv->pCtx[j].currentStage = 0; + aAggs[functionId].init(&pRuntimeEnv->pCtx[j]); } } @@ -6107,7 +6175,7 @@ void forwardIntervalQueryRange(SMeterQuerySupportObj *pSupporter, SQueryRuntimeE (!QUERY_IS_ASC_QUERY(pQuery) && pQuery->lastKey <= pQuery->ekey)) { setQueryStatus(pQuery, QUERY_COMPLETED); } else { - TSKEY nextTimestamp = loadRequiredBlockIntoMem(pRuntimeEnv, &pRuntimeEnv->nextPos); + /*TSKEY nextTimestamp =*/ loadRequiredBlockIntoMem(pRuntimeEnv, &pRuntimeEnv->nextPos); } return; @@ -6752,25 +6820,23 @@ void setExecutionContext(SMeterQuerySupportObj *pSupporter, SOutputRes *outputRe static void setGroupOutputBuffer(SQueryRuntimeEnv *pRuntimeEnv, SOutputRes *pResult) { SQuery *pQuery = pRuntimeEnv->pQuery; - - // Note: pResult->result[i]->numOfElems == 0, there is only fixed number of results for each group + + // Note: pResult->pos[i]->numOfElems == 0, there is only fixed number of results for each group for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { - assert(pResult->result[i]->numOfElems == 0 || pResult->result[i]->numOfElems == 1); - SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i]; - pCtx->aOutputBuf = pResult->result[i]->data + pCtx->outputBytes * pResult->result[i]->numOfElems; - + pCtx->aOutputBuf = getPosInResultPage(pRuntimeEnv, i, pResult); + int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId; if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) { pCtx->ptsOutputBuf = pRuntimeEnv->pCtx[0].aOutputBuf; } - + /* * set the output buffer information and intermediate buffer * not all queries require the interResultBuf, such as COUNT */ pCtx->resultInfo = &pResult->resultInfo[i]; - + // set super table query flag SResultInfo *pResInfo = GET_RES_INFO(pCtx); if (!isGroupbyNormalCol(pQuery->pGroupbyExpr)) { @@ -6795,7 +6861,7 @@ void setCtxOutputPointerForSupplementScan(SMeterQuerySupportObj *pSupporter, SMe tFilePage *pData = NULL; int32_t i = 0; - SQueryResultBuf* pResultBuf = pSupporter->pResultBuf; + SQueryResultBuf* pResultBuf = pRuntimeEnv->pResultBuf; // find the position for this output result SIDList list = getDataBufPagesIdList(pResultBuf, pMeterQueryInfo->sid); @@ -6866,7 +6932,7 @@ int32_t setOutputBufferForIntervalQuery(SMeterQuerySupportObj *pSupporter, SMete SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv; tFilePage * pData = NULL; - SQueryResultBuf* pResultBuf = pSupporter->pResultBuf; + SQueryResultBuf* pResultBuf = pRuntimeEnv->pResultBuf; // in the first scan, new space needed for results SIDList list = getDataBufPagesIdList(pResultBuf, pMeterQueryInfo->sid); @@ -6945,171 +7011,171 @@ int32_t setIntervalQueryExecutionContext(SMeterQuerySupportObj *pSupporter, int3 return 0; } -static void doApplyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQueryInfo, - SBlockInfo *pBlockInfo, int64_t *pPrimaryCol, SField *pFields, - __block_search_fn_t searchFn) { - SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv; - SQuery * pQuery = pRuntimeEnv->pQuery; - int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); - - int64_t nextKey = -1; - bool queryCompleted = false; - - while (1) { - int32_t numOfRes = 0; - int32_t steps = applyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, pPrimaryCol, pFields, searchFn, &numOfRes); - assert(steps > 0); - - // NOTE: in case of stable query, only ONE(or ZERO) row of result generated for each query range - if (pMeterQueryInfo->lastResRows == 0) { - pMeterQueryInfo->lastResRows = numOfRes; - } else { - assert(pMeterQueryInfo->lastResRows == 1); - } - - int32_t pos = pQuery->pos + steps * factor; - - // query does not reach the end of current block - if ((pos < pBlockInfo->size && QUERY_IS_ASC_QUERY(pQuery)) || (pos >= 0 && !QUERY_IS_ASC_QUERY(pQuery))) { - nextKey = pPrimaryCol[pos]; - } else { - assert((pQuery->lastKey > pBlockInfo->keyLast && QUERY_IS_ASC_QUERY(pQuery)) || - (pQuery->lastKey < pBlockInfo->keyFirst && !QUERY_IS_ASC_QUERY(pQuery))); - } - - // all data satisfy current query are checked, query completed - if (QUERY_IS_ASC_QUERY(pQuery)) { - queryCompleted = (nextKey > pQuery->ekey || pQuery->ekey <= pBlockInfo->keyLast); - } else { - queryCompleted = (nextKey < pQuery->ekey || pQuery->ekey >= pBlockInfo->keyFirst); - } - - /* - * 1. there may be more date that satisfy current query interval, other than - * current block, we need to try next data blocks - * 2. query completed, since reaches the upper bound of the main query range - */ - if (QUERY_IS_ASC_QUERY(pQuery)) { - if (pQuery->lastKey > pBlockInfo->keyLast || pQuery->lastKey > pSupporter->rawEKey || - nextKey > pSupporter->rawEKey) { - /* - * current interval query is completed, set query result flag closed and - * try next data block if pQuery->ekey == pSupporter->rawEKey, whole query is completed - */ - if (pQuery->lastKey > pBlockInfo->keyLast) { - assert(pQuery->ekey >= pBlockInfo->keyLast); - } - - if (pQuery->lastKey > pSupporter->rawEKey || nextKey > pSupporter->rawEKey) { - /* whole query completed, save result and abort */ - assert(queryCompleted); - saveResult(pSupporter, pMeterQueryInfo, pMeterQueryInfo->lastResRows); - - // save the pQuery->lastKey for retrieve data in cache, actually, there will be no qualified data in cache. - saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo); - } else if (pQuery->ekey == pBlockInfo->keyLast) { - /* current interval query is completed, set the next query range on other data blocks if exist */ - int64_t prevEKey = pQuery->ekey; - - getAlignedIntervalQueryRange(pRuntimeEnv, pQuery->lastKey, pSupporter->rawSKey, pSupporter->rawEKey); - saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo); - - assert(queryCompleted && prevEKey < pQuery->skey); - if (pMeterQueryInfo->lastResRows > 0) { - saveResult(pSupporter, pMeterQueryInfo, pMeterQueryInfo->lastResRows); - } - } else { - /* - * Data that satisfy current query range may locate in current block and blocks that are directly right - * next to current block. Therefore, we need to keep the query range(interval) unchanged until reaching - * the direct next data block, while only forwards the pQuery->lastKey. - * - * With the information of the directly next data block, whether locates in cache or disk, - * current interval query being completed or not can be decided. - */ - saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo); - assert(pQuery->lastKey > pBlockInfo->keyLast && pQuery->lastKey <= pQuery->ekey); - - /* - * if current block is the last block of current file, we still close the result flag, and - * merge with other meters in the same group - */ - if (queryCompleted) { - saveResult(pSupporter, pMeterQueryInfo, pMeterQueryInfo->lastResRows); - } - } - - break; - } - } else { - if (pQuery->lastKey < pBlockInfo->keyFirst || pQuery->lastKey < pSupporter->rawEKey || - nextKey < pSupporter->rawEKey) { - if (pQuery->lastKey < pBlockInfo->keyFirst) { - assert(pQuery->ekey <= pBlockInfo->keyFirst); - } - - if (pQuery->lastKey < pSupporter->rawEKey || (nextKey < pSupporter->rawEKey && nextKey != -1)) { - /* whole query completed, save result and abort */ - assert(queryCompleted); - saveResult(pSupporter, pMeterQueryInfo, pMeterQueryInfo->lastResRows); - - /* - * save the pQuery->lastKey for retrieve data in cache, actually, - * there will be no qualified data in cache. - */ - saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo); - } else if (pQuery->ekey == pBlockInfo->keyFirst) { - // current interval query is completed, set the next query range on other data blocks if exist - int64_t prevEKey = pQuery->ekey; - - getAlignedIntervalQueryRange(pRuntimeEnv, pQuery->lastKey, pSupporter->rawSKey, pSupporter->rawEKey); - saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo); - - assert(queryCompleted && prevEKey > pQuery->skey); - if (pMeterQueryInfo->lastResRows > 0) { - saveResult(pSupporter, pMeterQueryInfo, pMeterQueryInfo->lastResRows); - } - } else { - /* - * Data that satisfy current query range may locate in current block and blocks that are - * directly right next to current block. Therefore, we need to keep the query range(interval) - * unchanged until reaching the direct next data block, while only forwards the pQuery->lastKey. - * - * With the information of the directly next data block, whether locates in cache or disk, - * current interval query being completed or not can be decided. - */ - saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo); - assert(pQuery->lastKey < pBlockInfo->keyFirst && pQuery->lastKey >= pQuery->ekey); - - /* - * if current block is the last block of current file, we still close the result - * flag, and merge with other meters in the same group - */ - if (queryCompleted) { - saveResult(pSupporter, pMeterQueryInfo, pMeterQueryInfo->lastResRows); - } - } - - break; - } - } - - assert(queryCompleted); - saveResult(pSupporter, pMeterQueryInfo, pMeterQueryInfo->lastResRows); - - assert((nextKey >= pQuery->lastKey && QUERY_IS_ASC_QUERY(pQuery)) || - (nextKey <= pQuery->lastKey && !QUERY_IS_ASC_QUERY(pQuery))); - - /* still in the same block to query */ - getAlignedIntervalQueryRange(pRuntimeEnv, nextKey, pSupporter->rawSKey, pSupporter->rawEKey); - saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo); - - int32_t newPos = searchFn((char *)pPrimaryCol, pBlockInfo->size, pQuery->skey, pQuery->order.order); - assert(newPos == pQuery->pos + steps * factor); - - pQuery->pos = newPos; - } -} +//static void doApplyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQueryInfo, +// SBlockInfo *pBlockInfo, int64_t *pPrimaryCol, SField *pFields, +// __block_search_fn_t searchFn) { +// SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv; +// SQuery * pQuery = pRuntimeEnv->pQuery; +// int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); +// +// int64_t nextKey = -1; +// bool queryCompleted = false; +// +// while (1) { +// int32_t numOfRes = 0; +// int32_t steps = applyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, pPrimaryCol, pFields, searchFn, &numOfRes); +// assert(steps > 0); +// +// // NOTE: in case of stable query, only ONE(or ZERO) row of pos generated for each query range +// if (pMeterQueryInfo->lastResRows == 0) { +// pMeterQueryInfo->lastResRows = numOfRes; +// } else { +// assert(pMeterQueryInfo->lastResRows == 1); +// } +// +// int32_t pos = pQuery->pos + steps * factor; +// +// // query does not reach the end of current block +// if ((pos < pBlockInfo->size && QUERY_IS_ASC_QUERY(pQuery)) || (pos >= 0 && !QUERY_IS_ASC_QUERY(pQuery))) { +// nextKey = pPrimaryCol[pos]; +// } else { +// assert((pQuery->lastKey > pBlockInfo->keyLast && QUERY_IS_ASC_QUERY(pQuery)) || +// (pQuery->lastKey < pBlockInfo->keyFirst && !QUERY_IS_ASC_QUERY(pQuery))); +// } +// +// // all data satisfy current query are checked, query completed +// if (QUERY_IS_ASC_QUERY(pQuery)) { +// queryCompleted = (nextKey > pQuery->ekey || pQuery->ekey <= pBlockInfo->keyLast); +// } else { +// queryCompleted = (nextKey < pQuery->ekey || pQuery->ekey >= pBlockInfo->keyFirst); +// } +// +// /* +// * 1. there may be more date that satisfy current query interval, other than +// * current block, we need to try next data blocks +// * 2. query completed, since reaches the upper bound of the main query range +// */ +// if (QUERY_IS_ASC_QUERY(pQuery)) { +// if (pQuery->lastKey > pBlockInfo->keyLast || pQuery->lastKey > pSupporter->rawEKey || +// nextKey > pSupporter->rawEKey) { +// /* +// * current interval query is completed, set query pos flag closed and +// * try next data block if pQuery->ekey == pSupporter->rawEKey, whole query is completed +// */ +// if (pQuery->lastKey > pBlockInfo->keyLast) { +// assert(pQuery->ekey >= pBlockInfo->keyLast); +// } +// +// if (pQuery->lastKey > pSupporter->rawEKey || nextKey > pSupporter->rawEKey) { +// /* whole query completed, save pos and abort */ +// assert(queryCompleted); +// saveResult(pSupporter, pMeterQueryInfo, pMeterQueryInfo->lastResRows); +// +// // save the pQuery->lastKey for retrieve data in cache, actually, there will be no qualified data in cache. +// saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo); +// } else if (pQuery->ekey == pBlockInfo->keyLast) { +// /* current interval query is completed, set the next query range on other data blocks if exist */ +// int64_t prevEKey = pQuery->ekey; +// +// getAlignedIntervalQueryRange(pRuntimeEnv, pQuery->lastKey, pSupporter->rawSKey, pSupporter->rawEKey); +// saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo); +// +// assert(queryCompleted && prevEKey < pQuery->skey); +// if (pMeterQueryInfo->lastResRows > 0) { +// saveResult(pSupporter, pMeterQueryInfo, pMeterQueryInfo->lastResRows); +// } +// } else { +// /* +// * Data that satisfy current query range may locate in current block and blocks that are directly right +// * next to current block. Therefore, we need to keep the query range(interval) unchanged until reaching +// * the direct next data block, while only forwards the pQuery->lastKey. +// * +// * With the information of the directly next data block, whether locates in cache or disk, +// * current interval query being completed or not can be decided. +// */ +// saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo); +// assert(pQuery->lastKey > pBlockInfo->keyLast && pQuery->lastKey <= pQuery->ekey); +// +// /* +// * if current block is the last block of current file, we still close the pos flag, and +// * merge with other meters in the same group +// */ +// if (queryCompleted) { +// saveResult(pSupporter, pMeterQueryInfo, pMeterQueryInfo->lastResRows); +// } +// } +// +// break; +// } +// } else { +// if (pQuery->lastKey < pBlockInfo->keyFirst || pQuery->lastKey < pSupporter->rawEKey || +// nextKey < pSupporter->rawEKey) { +// if (pQuery->lastKey < pBlockInfo->keyFirst) { +// assert(pQuery->ekey <= pBlockInfo->keyFirst); +// } +// +// if (pQuery->lastKey < pSupporter->rawEKey || (nextKey < pSupporter->rawEKey && nextKey != -1)) { +// /* whole query completed, save pos and abort */ +// assert(queryCompleted); +// saveResult(pSupporter, pMeterQueryInfo, pMeterQueryInfo->lastResRows); +// +// /* +// * save the pQuery->lastKey for retrieve data in cache, actually, +// * there will be no qualified data in cache. +// */ +// saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo); +// } else if (pQuery->ekey == pBlockInfo->keyFirst) { +// // current interval query is completed, set the next query range on other data blocks if exist +// int64_t prevEKey = pQuery->ekey; +// +// getAlignedIntervalQueryRange(pRuntimeEnv, pQuery->lastKey, pSupporter->rawSKey, pSupporter->rawEKey); +// saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo); +// +// assert(queryCompleted && prevEKey > pQuery->skey); +// if (pMeterQueryInfo->lastResRows > 0) { +// saveResult(pSupporter, pMeterQueryInfo, pMeterQueryInfo->lastResRows); +// } +// } else { +// /* +// * Data that satisfy current query range may locate in current block and blocks that are +// * directly right next to current block. Therefore, we need to keep the query range(interval) +// * unchanged until reaching the direct next data block, while only forwards the pQuery->lastKey. +// * +// * With the information of the directly next data block, whether locates in cache or disk, +// * current interval query being completed or not can be decided. +// */ +// saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo); +// assert(pQuery->lastKey < pBlockInfo->keyFirst && pQuery->lastKey >= pQuery->ekey); +// +// /* +// * if current block is the last block of current file, we still close the pos +// * flag, and merge with other meters in the same group +// */ +// if (queryCompleted) { +// saveResult(pSupporter, pMeterQueryInfo, pMeterQueryInfo->lastResRows); +// } +// } +// +// break; +// } +// } +// +// assert(queryCompleted); +// saveResult(pSupporter, pMeterQueryInfo, pMeterQueryInfo->lastResRows); +// +// assert((nextKey >= pQuery->lastKey && QUERY_IS_ASC_QUERY(pQuery)) || +// (nextKey <= pQuery->lastKey && !QUERY_IS_ASC_QUERY(pQuery))); +// +// /* still in the same block to query */ +// getAlignedIntervalQueryRange(pRuntimeEnv, nextKey, pSupporter->rawSKey, pSupporter->rawEKey); +// saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo); +// +// int32_t newPos = searchFn((char *)pPrimaryCol, pBlockInfo->size, pQuery->skey, pQuery->order.order); +// assert(newPos == pQuery->pos + steps * factor); +// +// pQuery->pos = newPos; +// } +//} static void doApplyIntervalQueryOnBlock_rv(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQueryInfo, SBlockInfo *pBlockInfo, int64_t *pPrimaryCol, SField *pFields, @@ -7428,7 +7494,7 @@ bool onDemandLoadDatablock(SQuery *pQuery, int16_t queryRangeSet) { static void validateResultBuf(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQueryInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv; SQuery * pQuery = pSupporter->runtimeEnv.pQuery; - SQueryResultBuf* pResultBuf = pSupporter->pResultBuf; + SQueryResultBuf* pResultBuf = pRuntimeEnv->pResultBuf; SIDList list = getDataBufPagesIdList(pResultBuf, pMeterQueryInfo->sid); int32_t id = getLastPageId(&list); @@ -7486,10 +7552,10 @@ int32_t saveResult(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQue pMeterQueryInfo->reverseIndex -= 1; setCtxOutputPointerForSupplementScan(pSupporter, pMeterQueryInfo); } else { - SIDList list = getDataBufPagesIdList(pSupporter->pResultBuf, pMeterQueryInfo->sid); + SIDList list = getDataBufPagesIdList(pRuntimeEnv->pResultBuf, pMeterQueryInfo->sid); int32_t pageId = getLastPageId(&list); - tFilePage* pData = getResultBufferPageById(pSupporter->pResultBuf, pageId); + tFilePage* pData = getResultBufferPageById(pRuntimeEnv->pResultBuf, pageId); // in handling records occuring around '1970-01-01', the aligned start timestamp may be 0. TSKEY ts = *(TSKEY *)getOutputResPos(pRuntimeEnv, pData, pData->numOfElems, 0); @@ -7572,8 +7638,6 @@ static int32_t doCopyFromGroupBuf(SMeterQuerySupportObj *pSupporter, SOutputRes assert(result[i].numOfRows >= 0 && pSupporter->offset <= 1); - tFilePage **srcBuf = result[i].result; - int32_t numOfRowsToCopy = result[i].numOfRows - pSupporter->offset; int32_t oldOffset = pSupporter->offset; @@ -7589,8 +7653,8 @@ static int32_t doCopyFromGroupBuf(SMeterQuerySupportObj *pSupporter, SOutputRes for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) { int32_t elemSize = pRuntimeEnv->pCtx[j].outputBytes; char * outputBuf = pQuery->sdata[j]->data + numOfResult * elemSize; - - memcpy(outputBuf, srcBuf[j]->data + oldOffset * elemSize, elemSize * numOfRowsToCopy); + char* p = getPosInResultPage(pRuntimeEnv, j, &result[i]); + memcpy(outputBuf, p + oldOffset * elemSize, elemSize * numOfRowsToCopy); } numOfResult += numOfRowsToCopy; @@ -7871,10 +7935,10 @@ void vnodePrintQueryStatistics(SMeterQuerySupportObj *pSupporter) { SQInfo *pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery); SQueryCostSummary *pSummary = &pRuntimeEnv->summary; - if (pSupporter->pResultBuf == NULL) { + if (pRuntimeEnv->pResultBuf == NULL) { pSummary->tmpBufferInDisk = 0; } else { - pSummary->tmpBufferInDisk = getResBufSize(pSupporter->pResultBuf); + pSummary->tmpBufferInDisk = getResBufSize(pRuntimeEnv->pResultBuf); } dTrace("QInfo:%p statis: comp blocks:%d, size:%d Bytes, elapsed time:%.2f ms", pQInfo, pSummary->readCompInfo, diff --git a/src/system/detail/src/vnodeQueryProcess.c b/src/system/detail/src/vnodeQueryProcess.c index 278d334c8e..1b04806f7c 100644 --- a/src/system/detail/src/vnodeQueryProcess.c +++ b/src/system/detail/src/vnodeQueryProcess.c @@ -683,7 +683,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) { } resetCtxOutputBuf(pRuntimeEnv); - resetSlidingWindowInfo(&pRuntimeEnv->swindowResInfo, pQuery->numOfOutputCols); + resetSlidingWindowInfo(pRuntimeEnv, &pRuntimeEnv->swindowResInfo); while (pSupporter->meterIdx < pSupporter->numOfMeters) { int32_t k = pSupporter->meterIdx; @@ -1088,7 +1088,7 @@ static void vnodeSingleMeterIntervalMainLooper(SMeterQuerySupportObj *pSupporter (pQuery->skey >= pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))); initCtxOutputBuf(pRuntimeEnv); - clearCompletedSlidingWindows(&pRuntimeEnv->swindowResInfo, pQuery->numOfOutputCols); + clearCompletedSlidingWindows(pRuntimeEnv); vnodeScanAllData(pRuntimeEnv); if (isQueryKilled(pQuery)) { -- GitLab