提交 fbe202ce 编写于 作者: H hjxilinx

fix bugs in regression test.

上级 23b2c917
......@@ -72,7 +72,7 @@ static void setColumnOffsetValueInResultset(SQueryInfo* pQueryInfo);
static int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, tVariantList* pList, SSqlCmd* pCmd);
static int32_t parseIntervalClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql);
static int32_t setSlidingClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql);
static int32_t parseSlidingClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql);
static int32_t addProjectionExprAndResultField(SQueryInfo* pQueryInfo, tSQLExprItem* pItem);
......@@ -657,14 +657,14 @@ int32_t parseIntervalClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) {
return ret;
}
if (setSlidingClause(pQueryInfo, pQuerySql) != TSDB_CODE_SUCCESS) {
if (parseSlidingClause(pQueryInfo, pQuerySql) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_INVALID_SQL;
}
return TSDB_CODE_SUCCESS;
}
int32_t setSlidingClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) {
int32_t parseSlidingClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) {
const char* msg0 = "sliding value too small";
const char* msg1 = "sliding value no larger than the interval value";
......
......@@ -396,7 +396,9 @@ static void tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) {
int64_t minSlidingTime =
(pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsMinSlidingTime * 1000L : tsMinSlidingTime;
if (pQueryInfo->nSlidingTime < minSlidingTime) {
if (pQueryInfo->nSlidingTime == -1) {
pQueryInfo->nSlidingTime = pQueryInfo->nAggTimeInterval;
} else if (pQueryInfo->nSlidingTime < minSlidingTime) {
tscWarn("%p stream:%p, original sliding value:%" PRId64 " too small, reset to:%" PRId64 "", pSql, pStream,
pQueryInfo->nSlidingTime, minSlidingTime);
......
......@@ -276,11 +276,11 @@ void displayInterResult(SData** pdata, SQuery* pQuery, int32_t numOfRows);
void vnodePrintQueryStatistics(SMeterQuerySupportObj* pSupporter);
void clearGroupResultBuf(SOutputRes* pOneOutputRes, int32_t nOutputCols);
void copyGroupResultBuf(SOutputRes* dst, const SOutputRes* src, int32_t nOutputCols);
void clearGroupResultBuf(SQueryRuntimeEnv *pRuntimeEnv, SOutputRes *pOneOutputRes);
void copyGroupResultBuf(SQueryRuntimeEnv *pRuntimeEnv, SOutputRes* dst, const SOutputRes* src);
void resetSlidingWindowInfo(SSlidingWindowInfo* pSlidingWindowInfo, int32_t numOfCols);
void clearCompletedSlidingWindows(SSlidingWindowInfo* pSlidingWindowInfo, int32_t numOfCols);
void resetSlidingWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SSlidingWindowInfo* pSlidingWindowInfo);
void clearCompletedSlidingWindows(SQueryRuntimeEnv* pRuntimeEnv);
int32_t numOfClosedSlidingWindow(SSlidingWindowInfo* pSlidingWindowInfo);
void closeSlidingWindow(SSlidingWindowInfo* pSlidingWindowInfo, int32_t slot);
void closeAllSlidingWindow(SSlidingWindowInfo* pSlidingWindowInfo);
......
......@@ -165,7 +165,7 @@ typedef struct SQueryRuntimeEnv {
SInterpolationInfo interpoInfo;
SData** pInterpoBuf;
SSlidingWindowInfo swindowResInfo;
SSlidingWindowInfo swindowResInfo;
STSBuf* pTSBuf;
STSCursor cur;
......
......@@ -68,9 +68,6 @@ static int32_t getNextDataFileCompInfo(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj
static void setGroupOutputBuffer(SQueryRuntimeEnv *pRuntimeEnv, SOutputRes *pResult);
static void getAlignedIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, TSKEY keyInData, TSKEY skey, TSKEY ekey);
static void doApplyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQueryInfo,
SBlockInfo *pBlockInfo, int64_t *pPrimaryCol, SField *pFields,
__block_search_fn_t searchFn);
static int32_t saveResult(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQueryInfo, int32_t numOfResult);
static void applyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMeterDataInfo *pMeterDataInfo,
......@@ -589,7 +586,8 @@ static void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, int64_t StartQue
char *primaryColumnData, int32_t size, int32_t functionId, SField *pField, bool hasNull,
int32_t blockStatus, void *param, int32_t scanFlag);
void createGroupResultBuf(SQuery *pQuery, SOutputRes *pOneResult, bool isSTableQuery);
void createQueryResultBuf(SQueryRuntimeEnv *pRuntimeEnv, SOutputRes *pResultRow, bool isSTableQuery, SPosInfo *posInfo);
static void destroyGroupResultBuf(SOutputRes *pOneOutputRes, int32_t nOutputCols);
static int32_t binarySearchForBlockImpl(SCompBlock *pBlock, int32_t numOfBlocks, TSKEY skey, int32_t order) {
......@@ -1614,7 +1612,7 @@ static SOutputRes *doSetSlidingWindowFromKey(SSlidingWindowInfo *pSlidingWindowI
return &pSlidingWindowInfo->pResult[p];
}
static int32_t initSlidingWindowInfo(SSlidingWindowInfo *pSlidingWindowInfo, int32_t threshold, int16_t type,
static int32_t initSlidingWindowInfo(SSlidingWindowInfo *pSlidingWindowInfo, int32_t threshold, int16_t type, int32_t rowSizes,
SOutputRes *pRes) {
pSlidingWindowInfo->capacity = threshold;
pSlidingWindowInfo->threshold = threshold;
......@@ -1627,8 +1625,18 @@ static int32_t initSlidingWindowInfo(SSlidingWindowInfo *pSlidingWindowInfo, int
pSlidingWindowInfo->curIndex = -1;
pSlidingWindowInfo->size = 0;
pSlidingWindowInfo->pResult = pRes;
pSlidingWindowInfo->pStatus = calloc(threshold, sizeof(SWindowStatus));
// createResultBuf(&pSlidingWindowInfo->pResultBuf, 10, rowSizes);
pSlidingWindowInfo->pStatus = calloc(threshold, sizeof(SWindowStatus));
// pSlidingWindowInfo->pResultInfo = calloc(threshold, POINTER_BYTES);
// for(int32_t i = 0; i < threshold; ++i) {
// pSlidingWindowInfo->pResultInfo[i] = calloc((size_t)numOfOutput, sizeof(SResultInfo));
// }
if (pSlidingWindowInfo->pStatus == NULL || pSlidingWindowInfo->hashList == NULL) {
return -1;
}
......@@ -1643,17 +1651,19 @@ static void destroySlidingWindowInfo(SSlidingWindowInfo *pSlidingWindowInfo) {
}
taosCleanUpHashTable(pSlidingWindowInfo->hashList);
// destroyResultBuf(pSlidingWindowInfo->pResultBuf);
tfree(pSlidingWindowInfo->pStatus);
}
void resetSlidingWindowInfo(SSlidingWindowInfo *pSlidingWindowInfo, int32_t numOfCols) {
void resetSlidingWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SSlidingWindowInfo *pSlidingWindowInfo) {
if (pSlidingWindowInfo == NULL || pSlidingWindowInfo->capacity == 0) {
return;
}
for (int32_t i = 0; i < pSlidingWindowInfo->size; ++i) {
SOutputRes *pOneRes = &pSlidingWindowInfo->pResult[i];
clearGroupResultBuf(pOneRes, numOfCols);
clearGroupResultBuf(pRuntimeEnv, pOneRes);
}
memset(pSlidingWindowInfo->pStatus, 0, sizeof(SWindowStatus) * pSlidingWindowInfo->capacity);
......@@ -1669,7 +1679,8 @@ void resetSlidingWindowInfo(SSlidingWindowInfo *pSlidingWindowInfo, int32_t numO
pSlidingWindowInfo->prevSKey = 0;
}
void clearCompletedSlidingWindows(SSlidingWindowInfo *pSlidingWindowInfo, int32_t numOfCols) {
void clearCompletedSlidingWindows(SQueryRuntimeEnv* pRuntimeEnv) {
SSlidingWindowInfo* pSlidingWindowInfo = &pRuntimeEnv->swindowResInfo;
if (pSlidingWindowInfo == NULL || pSlidingWindowInfo->capacity == 0 || pSlidingWindowInfo->size == 0) {
return;
}
......@@ -1689,17 +1700,18 @@ void clearCompletedSlidingWindows(SSlidingWindowInfo *pSlidingWindowInfo, int32_
}
int32_t remain = pSlidingWindowInfo->size - i;
//clear remain list
memmove(pSlidingWindowInfo->pStatus, &pSlidingWindowInfo->pStatus[i], remain * sizeof(SWindowStatus));
memset(&pSlidingWindowInfo->pStatus[remain], 0, (pSlidingWindowInfo->capacity - remain) * sizeof(SWindowStatus));
for(int32_t k = 0; k < remain; ++k) {
copyGroupResultBuf(&pSlidingWindowInfo->pResult[k], &pSlidingWindowInfo->pResult[i + k], numOfCols);
copyGroupResultBuf(pRuntimeEnv, &pSlidingWindowInfo->pResult[k], &pSlidingWindowInfo->pResult[i + k]);
}
for(int32_t k = remain; k < pSlidingWindowInfo->size; ++k) {
SOutputRes *pOneRes = &pSlidingWindowInfo->pResult[k];
clearGroupResultBuf(pOneRes, numOfCols);
clearGroupResultBuf(pRuntimeEnv, pOneRes);
}
pSlidingWindowInfo->size = remain;
......@@ -1720,14 +1732,12 @@ void clearCompletedSlidingWindows(SSlidingWindowInfo *pSlidingWindowInfo, int32_
}
int32_t numOfClosedSlidingWindow(SSlidingWindowInfo *pSlidingWindowInfo) {
for (int32_t i = 0; i < pSlidingWindowInfo->size; ++i) {
SWindowStatus *pStatus = &pSlidingWindowInfo->pStatus[i];
if (pStatus->closed == false) {
return i;
}
int32_t i = 0;
while(i < pSlidingWindowInfo->size && pSlidingWindowInfo->pStatus[i].closed) {
++i;
}
return 0;
return i;
}
void closeSlidingWindow(SSlidingWindowInfo* pSlidingWindowInfo, int32_t slot) {
......@@ -2469,20 +2479,21 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, int64_t startQueryTimes
}
// set the output buffer for the selectivity + tag query
static void setCtxTagColumnInfo(SQuery *pQuery, SQueryRuntimeEnv *pRuntimeEnv) {
static void setCtxTagColumnInfo(SQuery *pQuery, SQLFunctionCtx *pCtx) {
if (isSelectivityWithTagsQuery(pQuery)) {
int32_t num = 0;
SQLFunctionCtx *pCtx = NULL;
SQLFunctionCtx *p = NULL;
int16_t tagLen = 0;
SQLFunctionCtx **pTagCtx = calloc(pQuery->numOfOutputCols, POINTER_BYTES);
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
SSqlFuncExprMsg *pSqlFuncMsg = &pQuery->pSelectExpr[i].pBase;
if (pSqlFuncMsg->functionId == TSDB_FUNC_TAG_DUMMY || pSqlFuncMsg->functionId == TSDB_FUNC_TS_DUMMY) {
tagLen += pRuntimeEnv->pCtx[i].outputBytes;
pTagCtx[num++] = &pRuntimeEnv->pCtx[i];
tagLen += pCtx[i].outputBytes;
pTagCtx[num++] = &pCtx[i];
} else if ((aAggs[pSqlFuncMsg->functionId].nStatus & TSDB_FUNCSTATE_SELECTIVITY) != 0) {
pCtx = &pRuntimeEnv->pCtx[i];
p = &pCtx[i];
} else if (pSqlFuncMsg->functionId == TSDB_FUNC_TS || pSqlFuncMsg->functionId == TSDB_FUNC_TAG) {
// tag function may be the group by tag column
// ts may be the required primary timestamp column
......@@ -2492,14 +2503,14 @@ static void setCtxTagColumnInfo(SQuery *pQuery, SQueryRuntimeEnv *pRuntimeEnv) {
}
}
pCtx->tagInfo.pTagCtxList = pTagCtx;
pCtx->tagInfo.numOfTagCols = num;
pCtx->tagInfo.tagsLen = tagLen;
p->tagInfo.pTagCtxList = pTagCtx;
p->tagInfo.numOfTagCols = num;
p->tagInfo.tagsLen = tagLen;
}
}
static int32_t setupQueryRuntimeEnv(SMeterObj *pMeterObj, SQuery *pQuery, SQueryRuntimeEnv *pRuntimeEnv,
tTagSchema *pTagsSchema, int16_t order, bool isSTableQuery) {
SColumnModel *pTagsSchema, int16_t order, bool isSTableQuery) {
dTrace("QInfo:%p setup runtime env", GET_QINFO_ADDR(pQuery));
pRuntimeEnv->pMeterObj = pMeterObj;
......@@ -2577,7 +2588,7 @@ static int32_t setupQueryRuntimeEnv(SMeterObj *pMeterObj, SQuery *pQuery, SQuery
resetCtxOutputBuf(pRuntimeEnv);
}
setCtxTagColumnInfo(pQuery, pRuntimeEnv);
setCtxTagColumnInfo(pQuery, pRuntimeEnv->pCtx);
// for loading block data in memory
assert(vnodeList[pMeterObj->vnode].cfg.rowsInFileBlock == pMeterObj->pointsPerFileBlock);
......@@ -4123,21 +4134,29 @@ static void allocMemForInterpo(SMeterQuerySupportObj *pSupporter, SQuery *pQuery
}
}
static int32_t allocateOutputBufForGroup(SMeterQuerySupportObj *pSupporter, SQuery *pQuery, bool isSTableQuery) {
int32_t slot = 0;
static int32_t createQueryOutputBuffer(SMeterQuerySupportObj *pSupporter, SQuery *pQuery, bool isSTableQuery) {
SQueryRuntimeEnv* pRuntimeEnv = &pSupporter->runtimeEnv;
int32_t numOfRows = 0;
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (pQuery->nAggTimeInterval > 0 && pQuery->slidingTime > 0)) {
slot = 10000;
numOfRows = 10000;
} else {
slot = pSupporter->pSidSet->numOfSubSet;
numOfRows = pSupporter->pSidSet->numOfSubSet;
}
pSupporter->pResult = calloc(1, sizeof(SOutputRes) * slot);
createResultBuf(&pRuntimeEnv->pResultBuf, 100, pQuery->rowSize);
// total number of initial results
pSupporter->pResult = calloc(numOfRows, sizeof(SOutputRes));
if (pSupporter->pResult == NULL) {
return TSDB_CODE_SERV_OUT_OF_MEMORY;
}
for (int32_t k = 0; k < slot; ++k) {
int32_t pageId = -1;
tFilePage* page = NULL;
for (int32_t k = 0; k < numOfRows; ++k) {
SOutputRes *pOneRes = &pSupporter->pResult[k];
pOneRes->nAlloc = 1;
......@@ -4152,7 +4171,16 @@ static int32_t allocateOutputBufForGroup(SMeterQuerySupportObj *pSupporter, SQue
pOneRes->nAlloc = pExpr->pBase.arg[0].argValue.i64;
}
createGroupResultBuf(pQuery, pOneRes, isSTableQuery);
if (page == NULL || page->numOfElems >= pRuntimeEnv->numOfRowsPerPage) {
page = getNewDataBuf(pRuntimeEnv->pResultBuf, 0, &pageId);
}
assert(pageId >= 0);
SPosInfo posInfo = {.pageId = pageId, .rowId = page->numOfElems};
createQueryResultBuf(pRuntimeEnv, pOneRes, isSTableQuery, &posInfo);
page->numOfElems += 1; // next row is available
}
return TSDB_CODE_SUCCESS;
......@@ -4214,6 +4242,32 @@ _error_clean:
return TSDB_CODE_SERV_OUT_OF_MEMORY;
}
static int32_t getRowParamForMultiRowsOutput(SQuery* pQuery, bool isSTableQuery) {
int32_t rowparam = 1;
if (isTopBottomQuery(pQuery) && (!isSTableQuery)) {
rowparam = pQuery->pSelectExpr[1].pBase.arg->argValue.i64;
}
return rowparam;
}
static int32_t getNumOfRowsInResultPage(SQuery* pQuery, bool isSTableQuery) {
int32_t rowSize = pQuery->rowSize * getRowParamForMultiRowsOutput(pQuery, isSTableQuery);
return (DEFAULT_INTERN_BUF_SIZE - sizeof(tFilePage)) / rowSize;
}
static char* getPosInResultPage(SQueryRuntimeEnv* pRuntimeEnv, int32_t columnIndex, SOutputRes* pResult) {
SQuery* pQuery = pRuntimeEnv->pQuery;
tFilePage* page = getResultBufferPageById(pRuntimeEnv->pResultBuf, pResult->pos.pageId);
int32_t numOfRows = getNumOfRowsInResultPage(pQuery, pRuntimeEnv->stableQuery);
int32_t realRowId = pResult->pos.rowId * getRowParamForMultiRowsOutput(pQuery, pRuntimeEnv->stableQuery);
return ((char*)page->data) + pRuntimeEnv->offset[columnIndex] * numOfRows +
pQuery->pSelectExpr[columnIndex].resBytes * realRowId;
}
int32_t vnodeQuerySingleMeterPrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, SMeterQuerySupportObj *pSupporter,
void *param) {
SQuery *pQuery = &pQInfo->query;
......@@ -4283,9 +4337,10 @@ int32_t vnodeQuerySingleMeterPrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, SMete
}
vnodeRecordAllFiles(pQInfo, pMeterObj->vnode);
pRuntimeEnv->numOfRowsPerPage = getNumOfRowsInResultPage(pQuery, false);
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (pQuery->nAggTimeInterval > 0 && pQuery->slidingTime > 0)) {
if ((code = allocateOutputBufForGroup(pSupporter, pQuery, false)) != TSDB_CODE_SUCCESS) {
if ((code = createQueryOutputBuffer(pSupporter, pQuery, false)) != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -4296,8 +4351,7 @@ int32_t vnodeQuerySingleMeterPrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, SMete
type = TSDB_DATA_TYPE_TIMESTAMP;
}
// todo bug!
initSlidingWindowInfo(&pRuntimeEnv->swindowResInfo, 3, type, pSupporter->pResult);
initSlidingWindowInfo(&pRuntimeEnv->swindowResInfo, 3, type, pQuery->rowSize, pSupporter->pResult);
}
pSupporter->rawSKey = pQuery->skey;
......@@ -4492,7 +4546,7 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param)
pQuery->lastKey = pQuery->skey;
// create runtime environment
tTagSchema *pTagSchemaInfo = pSupporter->pSidSet->pColumnModel;
SColumnModel *pTagSchemaInfo = pSupporter->pSidSet->pColumnModel;
// get one queried meter
SMeterObj *pMeter = getMeterObj(pSupporter->pMetersHashTable, pSupporter->pSidSet->pSids[0]->sid);
......@@ -4519,25 +4573,25 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param)
tSidSetSort(pSupporter->pSidSet);
vnodeRecordAllFiles(pQInfo, pMeter->vnode);
if ((ret = allocateOutputBufForGroup(pSupporter, pQuery, true)) != TSDB_CODE_SUCCESS) {
if ((ret = createQueryOutputBuffer(pSupporter, pQuery, true)) != TSDB_CODE_SUCCESS) {
return ret;
}
if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // group by columns not tags;
int16_t type = getGroupbyColumnType(pQuery, pQuery->pGroupbyExpr);
initSlidingWindowInfo(&pRuntimeEnv->swindowResInfo, 10039, type, pSupporter->pResult);
initSlidingWindowInfo(&pRuntimeEnv->swindowResInfo, 4096, type, pQuery->rowSize, pSupporter->pResult);
}
if (pQuery->nAggTimeInterval != 0) {
// one page for each table at least
ret = createResultBuf(&pSupporter->pResultBuf, pSupporter->numOfMeters, pQuery->rowSize);
ret = createResultBuf(&pRuntimeEnv->pResultBuf, pSupporter->numOfMeters, pQuery->rowSize);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
pRuntimeEnv->numOfRowsPerPage = (DEFAULT_INTERN_BUF_SIZE - sizeof(tFilePage)) / pQuery->rowSize;
}
pRuntimeEnv->numOfRowsPerPage = getNumOfRowsInResultPage(pQuery, true);
// metric query do not invoke interpolation, it will be done at the second-stage merge
if (!isPointInterpoQuery(pQuery)) {
pQuery->interpoType = TSDB_INTERPO_NONE;
......@@ -5030,7 +5084,7 @@ void queryOnBlock(SMeterQuerySupportObj *pSupporter, int64_t *primaryKeys, int32
* set tag value in SQLFunctionCtx
* e.g.,tag information into input buffer
*/
static void doSetTagValueInParam(tTagSchema *pTagSchema, int32_t tagColIdx, SMeterSidExtInfo *pMeterSidInfo,
static void doSetTagValueInParam(SColumnModel *pTagSchema, int32_t tagColIdx, SMeterSidExtInfo *pMeterSidInfo,
tVariant *param) {
assert(tagColIdx >= 0);
......@@ -5050,7 +5104,7 @@ static void doSetTagValueInParam(tTagSchema *pTagSchema, int32_t tagColIdx, SMet
void vnodeSetTagValueInParam(tSidSet *pSidSet, SQueryRuntimeEnv *pRuntimeEnv, SMeterSidExtInfo *pMeterSidInfo) {
SQuery * pQuery = pRuntimeEnv->pQuery;
tTagSchema *pTagSchema = pSidSet->pColumnModel;
SColumnModel *pTagSchema = pSidSet->pColumnModel;
SSqlFuncExprMsg *pFuncMsg = &pQuery->pSelectExpr[0].pBase;
if (pQuery->numOfOutputCols == 1 && pFuncMsg->functionId == TSDB_FUNC_TS_COMP) {
......@@ -5246,7 +5300,7 @@ typedef struct SCompSupporter {
int64_t getCurrentTimestamp(SCompSupporter *pSupportor, int32_t meterIdx) {
Position * pPos = &pSupportor->pPosition[meterIdx];
tFilePage *pPage = getMeterDataPage(pSupportor->pSupporter->pResultBuf,
tFilePage *pPage = getMeterDataPage(pSupportor->pSupporter->runtimeEnv.pResultBuf,
pSupportor->pMeterDataInfo[meterIdx]->pMeterQInfo, pPos->pageIdx);
return *(int64_t *)(pPage->data + TSDB_KEYSIZE * pPos->rowIdx);
......@@ -5257,7 +5311,7 @@ int32_t meterResultComparator(const void *pLeft, const void *pRight, void *param
int32_t right = *(int32_t *)pRight;
SCompSupporter *supporter = (SCompSupporter *)param;
SQueryResultBuf* pResultBuf = supporter->pSupporter->pResultBuf;
SQueryResultBuf* pResultBuf = supporter->pSupporter->runtimeEnv.pResultBuf;
Position leftPos = supporter->pPosition[left];
Position rightPos = supporter->pPosition[right];
......@@ -5335,7 +5389,7 @@ void copyResToQueryResultBuf(SMeterQuerySupportObj *pSupporter, SQuery *pQuery)
}
SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv;
SQueryResultBuf* pResultBuf = pSupporter->pResultBuf;
SQueryResultBuf* pResultBuf = pRuntimeEnv->pResultBuf;
SIDList list = getDataBufPagesIdList(pResultBuf, 200000 + pSupporter->offset + (pSupporter->subgroupIdx - 1)* 10000);
......@@ -5383,7 +5437,7 @@ int32_t doMergeMetersResultsToGroupRes(SMeterQuerySupportObj *pSupporter, SQuery
int32_t numOfMeters = 0;
for (int32_t i = start; i < end; ++i) {
int32_t sid = pMeterDataInfo[i].pMeterQInfo->sid;
SIDList list = getDataBufPagesIdList(pSupporter->pResultBuf, sid);
SIDList list = getDataBufPagesIdList(pRuntimeEnv->pResultBuf, sid);
if (list.size > 0 && pMeterDataInfo[i].pMeterQInfo->numOfRes > 0) {
pValidMeter[numOfMeters] = &pMeterDataInfo[i];
......@@ -5415,7 +5469,7 @@ int32_t doMergeMetersResultsToGroupRes(SMeterQuerySupportObj *pSupporter, SQuery
while (1) {
int32_t pos = pTree->pNode[0].index;
Position * position = &cs.pPosition[pos];
SQueryResultBuf* pResultBuf = cs.pSupporter->pResultBuf;
SQueryResultBuf* pResultBuf = cs.pSupporter->runtimeEnv.pResultBuf;
tFilePage *pPage = getMeterDataPage(pResultBuf, pValidMeter[pos]->pMeterQInfo, position->pageIdx);
int64_t ts = getCurrentTimestamp(&cs, pos);
......@@ -5447,7 +5501,7 @@ int32_t doMergeMetersResultsToGroupRes(SMeterQuerySupportObj *pSupporter, SQuery
cs.pPosition[pos].pageIdx += 1; // try next page
// check if current page is empty or not. if it is empty, ignore it and try next
SIDList list = getDataBufPagesIdList(pSupporter->pResultBuf, cs.pMeterDataInfo[pos]->pMeterQInfo->sid);
SIDList list = getDataBufPagesIdList(pRuntimeEnv->pResultBuf, cs.pMeterDataInfo[pos]->pMeterQInfo->sid);
if (cs.pPosition[pos].pageIdx <= list.size - 1) {
tFilePage *newPage = getMeterDataPage(pResultBuf, pValidMeter[pos]->pMeterQInfo, position->pageIdx);
......@@ -5505,7 +5559,7 @@ int32_t doMergeMetersResultsToGroupRes(SMeterQuerySupportObj *pSupporter, SQuery
int32_t flushFromResultBuf(SMeterQuerySupportObj *pSupporter, const SQuery *pQuery,
const SQueryRuntimeEnv *pRuntimeEnv) {
SQueryResultBuf* pResultBuf = pSupporter->pResultBuf;
SQueryResultBuf* pResultBuf = pRuntimeEnv->pResultBuf;
int32_t capacity = (DEFAULT_INTERN_BUF_SIZE - sizeof(tFilePage))/ pQuery->rowSize;
// the base value for group result, since the maximum number of table for each vnode will not exceed 100,000.
......@@ -5645,49 +5699,62 @@ void enableFunctForMasterScan(SQueryRuntimeEnv *pRuntimeEnv, int32_t order) {
pQuery->order.order = (pQuery->order.order ^ 1);
}
void createGroupResultBuf(SQuery *pQuery, SOutputRes *pOneResult, bool isSTableQuery) {
int32_t numOfOutput = pQuery->numOfOutputCols;
pOneResult->resultInfo = calloc((size_t)numOfOutput, sizeof(SResultInfo));
pOneResult->result = malloc(POINTER_BYTES * numOfOutput);
for (int32_t i = 0; i < numOfOutput; ++i) {
size_t size = pQuery->pSelectExpr[i].interResBytes;
SResultInfo *pResInfo = &pOneResult->resultInfo[i];
pOneResult->result[i] = malloc(sizeof(tFilePage) + size * pOneResult->nAlloc);
pOneResult->result[i]->numOfElems = 0;
void createQueryResultBuf(SQueryRuntimeEnv *pRuntimeEnv, SOutputRes *pResultRow, bool isSTableQuery, SPosInfo *posInfo) {
SQuery* pQuery = pRuntimeEnv->pQuery;
int32_t numOfCols = pQuery->numOfOutputCols;
setResultInfoBuf(pResInfo, (int32_t)size, isSTableQuery);
pResultRow->resultInfo = calloc((size_t)numOfCols, sizeof(SResultInfo));
pResultRow->pos = *posInfo;//page->data + (pRuntimeEnv->offset[i] * pRuntimeEnv->numOfRowsPerPage) + page->numOfElems*s1;
for (int32_t i = 0; i < numOfCols; ++i) {
SResultInfo *pResultInfo = &pResultRow->resultInfo[i];
size_t size = pQuery->pSelectExpr[i].interResBytes;
setResultInfoBuf(pResultInfo, (int32_t)size, isSTableQuery);
}
}
void clearGroupResultBuf(SOutputRes *pOneOutputRes, int32_t nOutputCols) {
void clearGroupResultBuf(SQueryRuntimeEnv *pRuntimeEnv, SOutputRes *pOneOutputRes) {
if (pOneOutputRes == NULL) {
return;
}
for (int32_t i = 0; i < nOutputCols; ++i) {
SResultInfo *pResInfo = &pOneOutputRes->resultInfo[i];
int32_t size = sizeof(tFilePage) + pResInfo->bufLen * pOneOutputRes->nAlloc;
for (int32_t i = 0; i < pRuntimeEnv->pQuery->numOfOutputCols; ++i) {
SResultInfo *pResultInfo = &pOneOutputRes->resultInfo[i];
// int32_t size = sizeof(tFilePage) + pResultInfo->bufLen * pOneOutputRes->nAlloc;
memset(pOneOutputRes->result[i], 0, (size_t)size);
resetResultInfo(pResInfo);
// memset(pOneOutputRes->pos[i], 0, (size_t)size);
char* s = getPosInResultPage(pRuntimeEnv, i, pOneOutputRes);
size_t size = pRuntimeEnv->pQuery->pSelectExpr[i].resBytes;
memset(s, 0, size);
resetResultInfo(pResultInfo);
}
}
void copyGroupResultBuf(SOutputRes* dst, const SOutputRes* src, int32_t nOutputCols) {
void copyGroupResultBuf(SQueryRuntimeEnv *pRuntimeEnv, SOutputRes* dst, const SOutputRes* src) {
dst->numOfRows = src->numOfRows;
dst->nAlloc = src->nAlloc;
int32_t nOutputCols = pRuntimeEnv->pQuery->numOfOutputCols;
for(int32_t i = 0; i < nOutputCols; ++i) {
SResultInfo *pDst = &dst->resultInfo[i];
SResultInfo *pSrc = &src->resultInfo[i];
char* buf = pDst->interResultBuf;
memcpy(pDst, pSrc, sizeof(SResultInfo));
pDst->interResultBuf = buf;
pDst->interResultBuf = buf; // restore the allocated buffer
// copy the result info struct
memcpy(pDst->interResultBuf, pSrc->interResultBuf, pDst->bufLen);
int32_t size = sizeof(tFilePage) + pSrc->bufLen * src->nAlloc;
memcpy(dst->result[i], src->result[i], size);
// copy the output buffer data from src to dst, the position info keep unchanged
char* dstBuf = getPosInResultPage(pRuntimeEnv, i, dst);
char* srcBuf = getPosInResultPage(pRuntimeEnv, i, src);
size_t s = pRuntimeEnv->pQuery->pSelectExpr[i].resBytes;
memcpy(dstBuf, srcBuf, s);
}
}
......@@ -5697,12 +5764,12 @@ void destroyGroupResultBuf(SOutputRes *pOneOutputRes, int32_t nOutputCols) {
}
for (int32_t i = 0; i < nOutputCols; ++i) {
free(pOneOutputRes->result[i]);
// free(pOneOutputRes->pos[i]);
free(pOneOutputRes->resultInfo[i].interResultBuf);
}
free(pOneOutputRes->resultInfo);
free(pOneOutputRes->result);
// free(pOneOutputRes->result);
}
void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) {
......@@ -5766,6 +5833,7 @@ void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) {
for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) {
int32_t functionId = pQuery->pSelectExpr[j].pBase.functionId;
pRuntimeEnv->pCtx[j].currentStage = 0;
aAggs[functionId].init(&pRuntimeEnv->pCtx[j]);
}
}
......@@ -6107,7 +6175,7 @@ void forwardIntervalQueryRange(SMeterQuerySupportObj *pSupporter, SQueryRuntimeE
(!QUERY_IS_ASC_QUERY(pQuery) && pQuery->lastKey <= pQuery->ekey)) {
setQueryStatus(pQuery, QUERY_COMPLETED);
} else {
TSKEY nextTimestamp = loadRequiredBlockIntoMem(pRuntimeEnv, &pRuntimeEnv->nextPos);
/*TSKEY nextTimestamp =*/ loadRequiredBlockIntoMem(pRuntimeEnv, &pRuntimeEnv->nextPos);
}
return;
......@@ -6752,25 +6820,23 @@ void setExecutionContext(SMeterQuerySupportObj *pSupporter, SOutputRes *outputRe
static void setGroupOutputBuffer(SQueryRuntimeEnv *pRuntimeEnv, SOutputRes *pResult) {
SQuery *pQuery = pRuntimeEnv->pQuery;
// Note: pResult->result[i]->numOfElems == 0, there is only fixed number of results for each group
// Note: pResult->pos[i]->numOfElems == 0, there is only fixed number of results for each group
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
assert(pResult->result[i]->numOfElems == 0 || pResult->result[i]->numOfElems == 1);
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i];
pCtx->aOutputBuf = pResult->result[i]->data + pCtx->outputBytes * pResult->result[i]->numOfElems;
pCtx->aOutputBuf = getPosInResultPage(pRuntimeEnv, i, pResult);
int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId;
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) {
pCtx->ptsOutputBuf = pRuntimeEnv->pCtx[0].aOutputBuf;
}
/*
* set the output buffer information and intermediate buffer
* not all queries require the interResultBuf, such as COUNT
*/
pCtx->resultInfo = &pResult->resultInfo[i];
// set super table query flag
SResultInfo *pResInfo = GET_RES_INFO(pCtx);
if (!isGroupbyNormalCol(pQuery->pGroupbyExpr)) {
......@@ -6795,7 +6861,7 @@ void setCtxOutputPointerForSupplementScan(SMeterQuerySupportObj *pSupporter, SMe
tFilePage *pData = NULL;
int32_t i = 0;
SQueryResultBuf* pResultBuf = pSupporter->pResultBuf;
SQueryResultBuf* pResultBuf = pRuntimeEnv->pResultBuf;
// find the position for this output result
SIDList list = getDataBufPagesIdList(pResultBuf, pMeterQueryInfo->sid);
......@@ -6866,7 +6932,7 @@ int32_t setOutputBufferForIntervalQuery(SMeterQuerySupportObj *pSupporter, SMete
SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv;
tFilePage * pData = NULL;
SQueryResultBuf* pResultBuf = pSupporter->pResultBuf;
SQueryResultBuf* pResultBuf = pRuntimeEnv->pResultBuf;
// in the first scan, new space needed for results
SIDList list = getDataBufPagesIdList(pResultBuf, pMeterQueryInfo->sid);
......@@ -6945,171 +7011,171 @@ int32_t setIntervalQueryExecutionContext(SMeterQuerySupportObj *pSupporter, int3
return 0;
}
static void doApplyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQueryInfo,
SBlockInfo *pBlockInfo, int64_t *pPrimaryCol, SField *pFields,
__block_search_fn_t searchFn) {
SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
int64_t nextKey = -1;
bool queryCompleted = false;
while (1) {
int32_t numOfRes = 0;
int32_t steps = applyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, pPrimaryCol, pFields, searchFn, &numOfRes);
assert(steps > 0);
// NOTE: in case of stable query, only ONE(or ZERO) row of result generated for each query range
if (pMeterQueryInfo->lastResRows == 0) {
pMeterQueryInfo->lastResRows = numOfRes;
} else {
assert(pMeterQueryInfo->lastResRows == 1);
}
int32_t pos = pQuery->pos + steps * factor;
// query does not reach the end of current block
if ((pos < pBlockInfo->size && QUERY_IS_ASC_QUERY(pQuery)) || (pos >= 0 && !QUERY_IS_ASC_QUERY(pQuery))) {
nextKey = pPrimaryCol[pos];
} else {
assert((pQuery->lastKey > pBlockInfo->keyLast && QUERY_IS_ASC_QUERY(pQuery)) ||
(pQuery->lastKey < pBlockInfo->keyFirst && !QUERY_IS_ASC_QUERY(pQuery)));
}
// all data satisfy current query are checked, query completed
if (QUERY_IS_ASC_QUERY(pQuery)) {
queryCompleted = (nextKey > pQuery->ekey || pQuery->ekey <= pBlockInfo->keyLast);
} else {
queryCompleted = (nextKey < pQuery->ekey || pQuery->ekey >= pBlockInfo->keyFirst);
}
/*
* 1. there may be more date that satisfy current query interval, other than
* current block, we need to try next data blocks
* 2. query completed, since reaches the upper bound of the main query range
*/
if (QUERY_IS_ASC_QUERY(pQuery)) {
if (pQuery->lastKey > pBlockInfo->keyLast || pQuery->lastKey > pSupporter->rawEKey ||
nextKey > pSupporter->rawEKey) {
/*
* current interval query is completed, set query result flag closed and
* try next data block if pQuery->ekey == pSupporter->rawEKey, whole query is completed
*/
if (pQuery->lastKey > pBlockInfo->keyLast) {
assert(pQuery->ekey >= pBlockInfo->keyLast);
}
if (pQuery->lastKey > pSupporter->rawEKey || nextKey > pSupporter->rawEKey) {
/* whole query completed, save result and abort */
assert(queryCompleted);
saveResult(pSupporter, pMeterQueryInfo, pMeterQueryInfo->lastResRows);
// save the pQuery->lastKey for retrieve data in cache, actually, there will be no qualified data in cache.
saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo);
} else if (pQuery->ekey == pBlockInfo->keyLast) {
/* current interval query is completed, set the next query range on other data blocks if exist */
int64_t prevEKey = pQuery->ekey;
getAlignedIntervalQueryRange(pRuntimeEnv, pQuery->lastKey, pSupporter->rawSKey, pSupporter->rawEKey);
saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo);
assert(queryCompleted && prevEKey < pQuery->skey);
if (pMeterQueryInfo->lastResRows > 0) {
saveResult(pSupporter, pMeterQueryInfo, pMeterQueryInfo->lastResRows);
}
} else {
/*
* Data that satisfy current query range may locate in current block and blocks that are directly right
* next to current block. Therefore, we need to keep the query range(interval) unchanged until reaching
* the direct next data block, while only forwards the pQuery->lastKey.
*
* With the information of the directly next data block, whether locates in cache or disk,
* current interval query being completed or not can be decided.
*/
saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo);
assert(pQuery->lastKey > pBlockInfo->keyLast && pQuery->lastKey <= pQuery->ekey);
/*
* if current block is the last block of current file, we still close the result flag, and
* merge with other meters in the same group
*/
if (queryCompleted) {
saveResult(pSupporter, pMeterQueryInfo, pMeterQueryInfo->lastResRows);
}
}
break;
}
} else {
if (pQuery->lastKey < pBlockInfo->keyFirst || pQuery->lastKey < pSupporter->rawEKey ||
nextKey < pSupporter->rawEKey) {
if (pQuery->lastKey < pBlockInfo->keyFirst) {
assert(pQuery->ekey <= pBlockInfo->keyFirst);
}
if (pQuery->lastKey < pSupporter->rawEKey || (nextKey < pSupporter->rawEKey && nextKey != -1)) {
/* whole query completed, save result and abort */
assert(queryCompleted);
saveResult(pSupporter, pMeterQueryInfo, pMeterQueryInfo->lastResRows);
/*
* save the pQuery->lastKey for retrieve data in cache, actually,
* there will be no qualified data in cache.
*/
saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo);
} else if (pQuery->ekey == pBlockInfo->keyFirst) {
// current interval query is completed, set the next query range on other data blocks if exist
int64_t prevEKey = pQuery->ekey;
getAlignedIntervalQueryRange(pRuntimeEnv, pQuery->lastKey, pSupporter->rawSKey, pSupporter->rawEKey);
saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo);
assert(queryCompleted && prevEKey > pQuery->skey);
if (pMeterQueryInfo->lastResRows > 0) {
saveResult(pSupporter, pMeterQueryInfo, pMeterQueryInfo->lastResRows);
}
} else {
/*
* Data that satisfy current query range may locate in current block and blocks that are
* directly right next to current block. Therefore, we need to keep the query range(interval)
* unchanged until reaching the direct next data block, while only forwards the pQuery->lastKey.
*
* With the information of the directly next data block, whether locates in cache or disk,
* current interval query being completed or not can be decided.
*/
saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo);
assert(pQuery->lastKey < pBlockInfo->keyFirst && pQuery->lastKey >= pQuery->ekey);
/*
* if current block is the last block of current file, we still close the result
* flag, and merge with other meters in the same group
*/
if (queryCompleted) {
saveResult(pSupporter, pMeterQueryInfo, pMeterQueryInfo->lastResRows);
}
}
break;
}
}
assert(queryCompleted);
saveResult(pSupporter, pMeterQueryInfo, pMeterQueryInfo->lastResRows);
assert((nextKey >= pQuery->lastKey && QUERY_IS_ASC_QUERY(pQuery)) ||
(nextKey <= pQuery->lastKey && !QUERY_IS_ASC_QUERY(pQuery)));
/* still in the same block to query */
getAlignedIntervalQueryRange(pRuntimeEnv, nextKey, pSupporter->rawSKey, pSupporter->rawEKey);
saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo);
int32_t newPos = searchFn((char *)pPrimaryCol, pBlockInfo->size, pQuery->skey, pQuery->order.order);
assert(newPos == pQuery->pos + steps * factor);
pQuery->pos = newPos;
}
}
//static void doApplyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQueryInfo,
// SBlockInfo *pBlockInfo, int64_t *pPrimaryCol, SField *pFields,
// __block_search_fn_t searchFn) {
// SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv;
// SQuery * pQuery = pRuntimeEnv->pQuery;
// int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
//
// int64_t nextKey = -1;
// bool queryCompleted = false;
//
// while (1) {
// int32_t numOfRes = 0;
// int32_t steps = applyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, pPrimaryCol, pFields, searchFn, &numOfRes);
// assert(steps > 0);
//
// // NOTE: in case of stable query, only ONE(or ZERO) row of pos generated for each query range
// if (pMeterQueryInfo->lastResRows == 0) {
// pMeterQueryInfo->lastResRows = numOfRes;
// } else {
// assert(pMeterQueryInfo->lastResRows == 1);
// }
//
// int32_t pos = pQuery->pos + steps * factor;
//
// // query does not reach the end of current block
// if ((pos < pBlockInfo->size && QUERY_IS_ASC_QUERY(pQuery)) || (pos >= 0 && !QUERY_IS_ASC_QUERY(pQuery))) {
// nextKey = pPrimaryCol[pos];
// } else {
// assert((pQuery->lastKey > pBlockInfo->keyLast && QUERY_IS_ASC_QUERY(pQuery)) ||
// (pQuery->lastKey < pBlockInfo->keyFirst && !QUERY_IS_ASC_QUERY(pQuery)));
// }
//
// // all data satisfy current query are checked, query completed
// if (QUERY_IS_ASC_QUERY(pQuery)) {
// queryCompleted = (nextKey > pQuery->ekey || pQuery->ekey <= pBlockInfo->keyLast);
// } else {
// queryCompleted = (nextKey < pQuery->ekey || pQuery->ekey >= pBlockInfo->keyFirst);
// }
//
// /*
// * 1. there may be more date that satisfy current query interval, other than
// * current block, we need to try next data blocks
// * 2. query completed, since reaches the upper bound of the main query range
// */
// if (QUERY_IS_ASC_QUERY(pQuery)) {
// if (pQuery->lastKey > pBlockInfo->keyLast || pQuery->lastKey > pSupporter->rawEKey ||
// nextKey > pSupporter->rawEKey) {
// /*
// * current interval query is completed, set query pos flag closed and
// * try next data block if pQuery->ekey == pSupporter->rawEKey, whole query is completed
// */
// if (pQuery->lastKey > pBlockInfo->keyLast) {
// assert(pQuery->ekey >= pBlockInfo->keyLast);
// }
//
// if (pQuery->lastKey > pSupporter->rawEKey || nextKey > pSupporter->rawEKey) {
// /* whole query completed, save pos and abort */
// assert(queryCompleted);
// saveResult(pSupporter, pMeterQueryInfo, pMeterQueryInfo->lastResRows);
//
// // save the pQuery->lastKey for retrieve data in cache, actually, there will be no qualified data in cache.
// saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo);
// } else if (pQuery->ekey == pBlockInfo->keyLast) {
// /* current interval query is completed, set the next query range on other data blocks if exist */
// int64_t prevEKey = pQuery->ekey;
//
// getAlignedIntervalQueryRange(pRuntimeEnv, pQuery->lastKey, pSupporter->rawSKey, pSupporter->rawEKey);
// saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo);
//
// assert(queryCompleted && prevEKey < pQuery->skey);
// if (pMeterQueryInfo->lastResRows > 0) {
// saveResult(pSupporter, pMeterQueryInfo, pMeterQueryInfo->lastResRows);
// }
// } else {
// /*
// * Data that satisfy current query range may locate in current block and blocks that are directly right
// * next to current block. Therefore, we need to keep the query range(interval) unchanged until reaching
// * the direct next data block, while only forwards the pQuery->lastKey.
// *
// * With the information of the directly next data block, whether locates in cache or disk,
// * current interval query being completed or not can be decided.
// */
// saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo);
// assert(pQuery->lastKey > pBlockInfo->keyLast && pQuery->lastKey <= pQuery->ekey);
//
// /*
// * if current block is the last block of current file, we still close the pos flag, and
// * merge with other meters in the same group
// */
// if (queryCompleted) {
// saveResult(pSupporter, pMeterQueryInfo, pMeterQueryInfo->lastResRows);
// }
// }
//
// break;
// }
// } else {
// if (pQuery->lastKey < pBlockInfo->keyFirst || pQuery->lastKey < pSupporter->rawEKey ||
// nextKey < pSupporter->rawEKey) {
// if (pQuery->lastKey < pBlockInfo->keyFirst) {
// assert(pQuery->ekey <= pBlockInfo->keyFirst);
// }
//
// if (pQuery->lastKey < pSupporter->rawEKey || (nextKey < pSupporter->rawEKey && nextKey != -1)) {
// /* whole query completed, save pos and abort */
// assert(queryCompleted);
// saveResult(pSupporter, pMeterQueryInfo, pMeterQueryInfo->lastResRows);
//
// /*
// * save the pQuery->lastKey for retrieve data in cache, actually,
// * there will be no qualified data in cache.
// */
// saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo);
// } else if (pQuery->ekey == pBlockInfo->keyFirst) {
// // current interval query is completed, set the next query range on other data blocks if exist
// int64_t prevEKey = pQuery->ekey;
//
// getAlignedIntervalQueryRange(pRuntimeEnv, pQuery->lastKey, pSupporter->rawSKey, pSupporter->rawEKey);
// saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo);
//
// assert(queryCompleted && prevEKey > pQuery->skey);
// if (pMeterQueryInfo->lastResRows > 0) {
// saveResult(pSupporter, pMeterQueryInfo, pMeterQueryInfo->lastResRows);
// }
// } else {
// /*
// * Data that satisfy current query range may locate in current block and blocks that are
// * directly right next to current block. Therefore, we need to keep the query range(interval)
// * unchanged until reaching the direct next data block, while only forwards the pQuery->lastKey.
// *
// * With the information of the directly next data block, whether locates in cache or disk,
// * current interval query being completed or not can be decided.
// */
// saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo);
// assert(pQuery->lastKey < pBlockInfo->keyFirst && pQuery->lastKey >= pQuery->ekey);
//
// /*
// * if current block is the last block of current file, we still close the pos
// * flag, and merge with other meters in the same group
// */
// if (queryCompleted) {
// saveResult(pSupporter, pMeterQueryInfo, pMeterQueryInfo->lastResRows);
// }
// }
//
// break;
// }
// }
//
// assert(queryCompleted);
// saveResult(pSupporter, pMeterQueryInfo, pMeterQueryInfo->lastResRows);
//
// assert((nextKey >= pQuery->lastKey && QUERY_IS_ASC_QUERY(pQuery)) ||
// (nextKey <= pQuery->lastKey && !QUERY_IS_ASC_QUERY(pQuery)));
//
// /* still in the same block to query */
// getAlignedIntervalQueryRange(pRuntimeEnv, nextKey, pSupporter->rawSKey, pSupporter->rawEKey);
// saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo);
//
// int32_t newPos = searchFn((char *)pPrimaryCol, pBlockInfo->size, pQuery->skey, pQuery->order.order);
// assert(newPos == pQuery->pos + steps * factor);
//
// pQuery->pos = newPos;
// }
//}
static void doApplyIntervalQueryOnBlock_rv(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQueryInfo,
SBlockInfo *pBlockInfo, int64_t *pPrimaryCol, SField *pFields,
......@@ -7428,7 +7494,7 @@ bool onDemandLoadDatablock(SQuery *pQuery, int16_t queryRangeSet) {
static void validateResultBuf(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQueryInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv;
SQuery * pQuery = pSupporter->runtimeEnv.pQuery;
SQueryResultBuf* pResultBuf = pSupporter->pResultBuf;
SQueryResultBuf* pResultBuf = pRuntimeEnv->pResultBuf;
SIDList list = getDataBufPagesIdList(pResultBuf, pMeterQueryInfo->sid);
int32_t id = getLastPageId(&list);
......@@ -7486,10 +7552,10 @@ int32_t saveResult(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQue
pMeterQueryInfo->reverseIndex -= 1;
setCtxOutputPointerForSupplementScan(pSupporter, pMeterQueryInfo);
} else {
SIDList list = getDataBufPagesIdList(pSupporter->pResultBuf, pMeterQueryInfo->sid);
SIDList list = getDataBufPagesIdList(pRuntimeEnv->pResultBuf, pMeterQueryInfo->sid);
int32_t pageId = getLastPageId(&list);
tFilePage* pData = getResultBufferPageById(pSupporter->pResultBuf, pageId);
tFilePage* pData = getResultBufferPageById(pRuntimeEnv->pResultBuf, pageId);
// in handling records occuring around '1970-01-01', the aligned start timestamp may be 0.
TSKEY ts = *(TSKEY *)getOutputResPos(pRuntimeEnv, pData, pData->numOfElems, 0);
......@@ -7572,8 +7638,6 @@ static int32_t doCopyFromGroupBuf(SMeterQuerySupportObj *pSupporter, SOutputRes
assert(result[i].numOfRows >= 0 && pSupporter->offset <= 1);
tFilePage **srcBuf = result[i].result;
int32_t numOfRowsToCopy = result[i].numOfRows - pSupporter->offset;
int32_t oldOffset = pSupporter->offset;
......@@ -7589,8 +7653,8 @@ static int32_t doCopyFromGroupBuf(SMeterQuerySupportObj *pSupporter, SOutputRes
for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) {
int32_t elemSize = pRuntimeEnv->pCtx[j].outputBytes;
char * outputBuf = pQuery->sdata[j]->data + numOfResult * elemSize;
memcpy(outputBuf, srcBuf[j]->data + oldOffset * elemSize, elemSize * numOfRowsToCopy);
char* p = getPosInResultPage(pRuntimeEnv, j, &result[i]);
memcpy(outputBuf, p + oldOffset * elemSize, elemSize * numOfRowsToCopy);
}
numOfResult += numOfRowsToCopy;
......@@ -7871,10 +7935,10 @@ void vnodePrintQueryStatistics(SMeterQuerySupportObj *pSupporter) {
SQInfo *pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery);
SQueryCostSummary *pSummary = &pRuntimeEnv->summary;
if (pSupporter->pResultBuf == NULL) {
if (pRuntimeEnv->pResultBuf == NULL) {
pSummary->tmpBufferInDisk = 0;
} else {
pSummary->tmpBufferInDisk = getResBufSize(pSupporter->pResultBuf);
pSummary->tmpBufferInDisk = getResBufSize(pRuntimeEnv->pResultBuf);
}
dTrace("QInfo:%p statis: comp blocks:%d, size:%d Bytes, elapsed time:%.2f ms", pQInfo, pSummary->readCompInfo,
......
......@@ -683,7 +683,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) {
}
resetCtxOutputBuf(pRuntimeEnv);
resetSlidingWindowInfo(&pRuntimeEnv->swindowResInfo, pQuery->numOfOutputCols);
resetSlidingWindowInfo(pRuntimeEnv, &pRuntimeEnv->swindowResInfo);
while (pSupporter->meterIdx < pSupporter->numOfMeters) {
int32_t k = pSupporter->meterIdx;
......@@ -1088,7 +1088,7 @@ static void vnodeSingleMeterIntervalMainLooper(SMeterQuerySupportObj *pSupporter
(pQuery->skey >= pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery)));
initCtxOutputBuf(pRuntimeEnv);
clearCompletedSlidingWindows(&pRuntimeEnv->swindowResInfo, pQuery->numOfOutputCols);
clearCompletedSlidingWindows(pRuntimeEnv);
vnodeScanAllData(pRuntimeEnv);
if (isQueryKilled(pQuery)) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册