提交 110b1d83 编写于 作者: H hjxilinx

refactor code for sliding query processing

上级 3b334b0a
......@@ -198,7 +198,7 @@ typedef struct SQueryInfo {
char intervalTimeUnit;
int64_t etime, stime;
int64_t nAggTimeInterval; // aggregation time interval
int64_t intervalTime; // aggregation time interval
int64_t nSlidingTime; // sliding window in mseconds
SSqlGroupbyExpr groupbyExpr; // group by tags info
......
......@@ -100,7 +100,7 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSubquerySupporter* pSuppor
* in case of stable query, limit/offset is not applied here. the limit/offset is applied to the
* final results which is acquired after the secondry merge of in the client.
*/
if (pLimit->offset == 0 || pQueryInfo->nAggTimeInterval > 0 || QUERY_IS_STABLE_QUERY(pQueryInfo->type)) {
if (pLimit->offset == 0 || pQueryInfo->intervalTime > 0 || QUERY_IS_STABLE_QUERY(pQueryInfo->type)) {
if (*st > elem1.ts) {
*st = elem1.ts;
}
......@@ -165,7 +165,7 @@ SJoinSubquerySupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pS
pSupporter->subqueryIndex = index;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
pSupporter->interval = pQueryInfo->nAggTimeInterval;
pSupporter->interval = pQueryInfo->intervalTime;
pSupporter->limit = pQueryInfo->limit;
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, pSql->cmd.clauseIndex, index);
......@@ -293,7 +293,7 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) {
// set the second stage sub query for join process
pQueryInfo->type |= TSDB_QUERY_TYPE_JOIN_SEC_STAGE;
pQueryInfo->nAggTimeInterval = pSupporter->interval;
pQueryInfo->intervalTime = pSupporter->interval;
pQueryInfo->groupbyExpr = pSupporter->groupbyExpr;
tscColumnBaseInfoCopy(&pQueryInfo->colList, &pSupporter->colList, 0);
......
......@@ -590,20 +590,20 @@ int32_t parseIntervalClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) {
// interval is not null
SSQLToken* t = &pQuerySql->interval;
if (getTimestampInUsFromStr(t->z, t->n, &pQueryInfo->nAggTimeInterval) != TSDB_CODE_SUCCESS) {
if (getTimestampInUsFromStr(t->z, t->n, &pQueryInfo->intervalTime) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_INVALID_SQL;
}
// if the unit of time window value is millisecond, change the value from microsecond
if (pMeterMetaInfo->pMeterMeta->precision == TSDB_TIME_PRECISION_MILLI) {
pQueryInfo->nAggTimeInterval = pQueryInfo->nAggTimeInterval / 1000;
pQueryInfo->intervalTime = pQueryInfo->intervalTime / 1000;
}
/* parser has filter the illegal type, no need to check here */
pQueryInfo->intervalTimeUnit = pQuerySql->interval.z[pQuerySql->interval.n - 1];
// interval cannot be less than 10 milliseconds
if (pQueryInfo->nAggTimeInterval < tsMinIntervalTime) {
if (pQueryInfo->intervalTime < tsMinIntervalTime) {
return invalidSqlErrMsg(pQueryInfo->msg, msg2);
}
......@@ -627,7 +627,7 @@ int32_t parseIntervalClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) {
* check invalid SQL:
* select tbname, tags_fields from super_table_name interval(1s)
*/
if (tscQueryMetricTags(pQueryInfo) && pQueryInfo->nAggTimeInterval > 0) {
if (tscQueryMetricTags(pQueryInfo) && pQueryInfo->intervalTime > 0) {
return invalidSqlErrMsg(pQueryInfo->msg, msg1);
}
......@@ -681,11 +681,11 @@ int32_t parseSlidingClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) {
return invalidSqlErrMsg(pQueryInfo->msg, msg0);
}
if (pQueryInfo->nSlidingTime > pQueryInfo->nAggTimeInterval) {
if (pQueryInfo->nSlidingTime > pQueryInfo->intervalTime) {
return invalidSqlErrMsg(pQueryInfo->msg, msg1);
}
} else {
pQueryInfo->nSlidingTime = -1;
pQueryInfo->nSlidingTime = pQueryInfo->intervalTime;
}
return TSDB_CODE_SUCCESS;
......@@ -4252,7 +4252,7 @@ int32_t validateSqlFunctionInStreamSql(SQueryInfo* pQueryInfo) {
const char* msg0 = "sample interval can not be less than 10ms.";
const char* msg1 = "functions not allowed in select clause";
if (pQueryInfo->nAggTimeInterval != 0 && pQueryInfo->nAggTimeInterval < 10) {
if (pQueryInfo->intervalTime != 0 && pQueryInfo->intervalTime < 10) {
return invalidSqlErrMsg(pQueryInfo->msg, msg0);
}
......@@ -4950,7 +4950,7 @@ static int32_t doAddGroupbyColumnsOnDemand(SQueryInfo* pQueryInfo) {
insertResultField(pQueryInfo, pQueryInfo->fieldsInfo.numOfOutputCols, &ids, bytes, type, name);
} else {
// if this query is "group by" normal column, interval is not allowed
if (pQueryInfo->nAggTimeInterval > 0) {
if (pQueryInfo->intervalTime > 0) {
return invalidSqlErrMsg(pQueryInfo->msg, msg2);
}
......@@ -4983,7 +4983,7 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
// only retrieve tags, group by is not supportted
if (pCmd->command == TSDB_SQL_RETRIEVE_TAGS) {
if (pQueryInfo->groupbyExpr.numOfGroupCols > 0 || pQueryInfo->nAggTimeInterval > 0) {
if (pQueryInfo->groupbyExpr.numOfGroupCols > 0 || pQueryInfo->intervalTime > 0) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg4);
} else {
return TSDB_CODE_SUCCESS;
......@@ -5399,7 +5399,7 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
if (parseIntervalClause(pQueryInfo, pQuerySql) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_INVALID_SQL;
} else {
if ((pQueryInfo->nAggTimeInterval > 0) &&
if ((pQueryInfo->intervalTime > 0) &&
(validateFunctionsInIntervalOrGroupbyQuery(pQueryInfo) != TSDB_CODE_SUCCESS)) {
return TSDB_CODE_INVALID_SQL;
}
......@@ -5429,7 +5429,7 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
* not here.
*/
if (pQuerySql->fillType != NULL) {
if (pQueryInfo->nAggTimeInterval == 0) {
if (pQueryInfo->intervalTime == 0) {
return invalidSqlErrMsg(pQueryInfo->msg, msg3);
}
......@@ -5541,7 +5541,7 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
if (parseIntervalClause(pQueryInfo, pQuerySql) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_INVALID_SQL;
} else {
if ((pQueryInfo->nAggTimeInterval > 0) &&
if ((pQueryInfo->intervalTime > 0) &&
(validateFunctionsInIntervalOrGroupbyQuery(pQueryInfo) != TSDB_CODE_SUCCESS)) {
return TSDB_CODE_INVALID_SQL;
}
......@@ -5614,14 +5614,14 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
* the columns may be increased due to group by operation
*/
if (pQuerySql->fillType != NULL) {
if (pQueryInfo->nAggTimeInterval == 0 && (!tscIsPointInterpQuery(pQueryInfo))) {
if (pQueryInfo->intervalTime == 0 && (!tscIsPointInterpQuery(pQueryInfo))) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg5);
}
if (pQueryInfo->nAggTimeInterval > 0) {
if (pQueryInfo->intervalTime > 0) {
int64_t timeRange = labs(pQueryInfo->stime - pQueryInfo->etime);
// number of result is not greater than 10,000,000
if ((timeRange == 0) || (timeRange / pQueryInfo->nAggTimeInterval) > MAX_RETRIEVE_ROWS_IN_INTERVAL_QUERY) {
if ((timeRange == 0) || (timeRange / pQueryInfo->intervalTime) > MAX_RETRIEVE_ROWS_IN_INTERVAL_QUERY) {
return invalidSqlErrMsg(pQueryInfo->msg, msg6);
}
}
......
......@@ -325,7 +325,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
int16_t prec = pMeterMetaInfo->pMeterMeta->precision;
int64_t stime = (pQueryInfo->stime < pQueryInfo->etime) ? pQueryInfo->stime : pQueryInfo->etime;
int64_t revisedSTime = taosGetIntervalStartTimestamp(stime, pQueryInfo->nAggTimeInterval, pQueryInfo->intervalTimeUnit, prec);
int64_t revisedSTime = taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->intervalTimeUnit, prec);
SInterpolationInfo *pInterpoInfo = &pReducer->interpolationInfo;
taosInitInterpoInfo(pInterpoInfo, pQueryInfo->order.order, revisedSTime, pQueryInfo->groupbyExpr.numOfGroupCols,
......@@ -522,7 +522,7 @@ static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SSqlCmd *pCm
}
// primary timestamp column is involved in final result
if (pQueryInfo->nAggTimeInterval != 0 || tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
if (pQueryInfo->intervalTime != 0 || tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
numOfGroupByCols++;
}
......@@ -539,7 +539,7 @@ static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SSqlCmd *pCm
orderIdx[i] = startCols++;
}
if (pQueryInfo->nAggTimeInterval != 0) {
if (pQueryInfo->intervalTime != 0) {
// the first column is the timestamp, handles queries like "interval(10m) group by tags"
orderIdx[numOfGroupByCols - 1] = PRIMARYKEY_TIMESTAMP_COL_INDEX;
}
......@@ -581,10 +581,10 @@ bool isSameGroup(SSqlCmd *pCmd, SLocalReducer *pReducer, char *pPrev, tFilePage
if (pOrderDesc->orderIdx.pData[numOfCols - 1] == PRIMARYKEY_TIMESTAMP_COL_INDEX) { //<= 0
// super table interval query
assert(pQueryInfo->nAggTimeInterval > 0);
assert(pQueryInfo->intervalTime > 0);
pOrderDesc->orderIdx.numOfCols -= 1;
} else { // simple group by query
assert(pQueryInfo->nAggTimeInterval == 0);
assert(pQueryInfo->intervalTime == 0);
}
// only one row exists
......@@ -752,7 +752,7 @@ void savePrevRecordAndSetupInterpoInfo(SLocalReducer *pLocalReducer, SQueryInfo*
int16_t prec = pMeterMetaInfo->pMeterMeta->precision;
int64_t stime = (pQueryInfo->stime < pQueryInfo->etime) ? pQueryInfo->stime : pQueryInfo->etime;
int64_t revisedSTime = taosGetIntervalStartTimestamp(stime, pQueryInfo->nAggTimeInterval, pQueryInfo->intervalTimeUnit, prec);
int64_t revisedSTime = taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->intervalTimeUnit, prec);
taosInitInterpoInfo(pInterpoInfo, pQueryInfo->order.order, revisedSTime, pQueryInfo->groupbyExpr.numOfGroupCols,
pLocalReducer->rowSize);
......@@ -820,7 +820,7 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo
assert(pRes->pLocalReducer == NULL);
}
if (pQueryInfo->nAggTimeInterval == 0 || pQueryInfo->interpoType == TSDB_INTERPO_NONE) {
if (pQueryInfo->intervalTime == 0 || pQueryInfo->interpoType == TSDB_INTERPO_NONE) {
// no interval query, no interpolation
pRes->data = pLocalReducer->pFinalRes;
pRes->numOfRows = pFinalDataPage->numOfElems;
......@@ -898,13 +898,13 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo
while (1) {
int32_t remains = taosNumOfRemainPoints(pInterpoInfo);
TSKEY etime = taosGetRevisedEndKey(actualETime, pQueryInfo->order.order, pQueryInfo->nAggTimeInterval, pQueryInfo->intervalTimeUnit,
TSKEY etime = taosGetRevisedEndKey(actualETime, pQueryInfo->order.order, pQueryInfo->intervalTime, pQueryInfo->intervalTimeUnit,
precision);
int32_t nrows = taosGetNumOfResultWithInterpo(pInterpoInfo, pPrimaryKeys, remains, pQueryInfo->nAggTimeInterval, etime,
int32_t nrows = taosGetNumOfResultWithInterpo(pInterpoInfo, pPrimaryKeys, remains, pQueryInfo->intervalTime, etime,
pLocalReducer->resColModel->capacity);
int32_t newRows = taosDoInterpoResult(pInterpoInfo, pQueryInfo->interpoType, pResPages, remains, nrows,
pQueryInfo->nAggTimeInterval, pPrimaryKeys, pLocalReducer->resColModel, srcData,
pQueryInfo->intervalTime, pPrimaryKeys, pLocalReducer->resColModel, srcData,
pQueryInfo->defaultVal, functions, pLocalReducer->resColModel->capacity);
assert(newRows <= nrows);
......@@ -937,7 +937,7 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo
/* all output for current group are completed */
int32_t totalRemainRows =
taosGetNumOfResWithoutLimit(pInterpoInfo, pPrimaryKeys, rpoints, pQueryInfo->nAggTimeInterval, actualETime);
taosGetNumOfResWithoutLimit(pInterpoInfo, pPrimaryKeys, rpoints, pQueryInfo->intervalTime, actualETime);
if (totalRemainRows <= 0) {
break;
}
......@@ -1244,7 +1244,7 @@ static void resetEnvForNewResultset(SSqlRes *pRes, SSqlCmd *pCmd, SLocalReducer
// for group result interpolation, do not return if not data is generated
if (pQueryInfo->interpoType != TSDB_INTERPO_NONE) {
int64_t stime = (pQueryInfo->stime < pQueryInfo->etime) ? pQueryInfo->stime : pQueryInfo->etime;
int64_t newTime = taosGetIntervalStartTimestamp(stime, pQueryInfo->nAggTimeInterval, pQueryInfo->intervalTimeUnit, precision);
int64_t newTime = taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->intervalTimeUnit, precision);
taosInitInterpoInfo(&pLocalReducer->interpolationInfo, pQueryInfo->order.order, newTime, pQueryInfo->groupbyExpr.numOfGroupCols,
pLocalReducer->rowSize);
......@@ -1273,9 +1273,9 @@ static bool doInterpolationForCurrentGroup(SSqlObj *pSql) {
int64_t etime = *(int64_t *)(pFinalDataBuf->data + TSDB_KEYSIZE * (pInterpoInfo->numOfRawDataInRows - 1));
int32_t remain = taosNumOfRemainPoints(pInterpoInfo);
TSKEY ekey = taosGetRevisedEndKey(etime, pQueryInfo->order.order, pQueryInfo->nAggTimeInterval, pQueryInfo->intervalTimeUnit, p);
TSKEY ekey = taosGetRevisedEndKey(etime, pQueryInfo->order.order, pQueryInfo->intervalTime, pQueryInfo->intervalTimeUnit, p);
int32_t rows = taosGetNumOfResultWithInterpo(pInterpoInfo, (TSKEY *)pLocalReducer->pBufForInterpo, remain,
pQueryInfo->nAggTimeInterval, ekey, pLocalReducer->resColModel->capacity);
pQueryInfo->intervalTime, ekey, pLocalReducer->resColModel->capacity);
if (rows > 0) { // do interpo
doInterpolateResult(pSql, pLocalReducer, false);
}
......@@ -1305,8 +1305,8 @@ static bool doHandleLastRemainData(SSqlObj *pSql) {
if (pQueryInfo->interpoType != TSDB_INTERPO_NONE) {
int64_t etime = (pQueryInfo->stime < pQueryInfo->etime) ? pQueryInfo->etime : pQueryInfo->stime;
etime = taosGetRevisedEndKey(etime, pQueryInfo->order.order, pQueryInfo->nAggTimeInterval, pQueryInfo->intervalTimeUnit, precision);
int32_t rows = taosGetNumOfResultWithInterpo(pInterpoInfo, NULL, 0, pQueryInfo->nAggTimeInterval, etime,
etime = taosGetRevisedEndKey(etime, pQueryInfo->order.order, pQueryInfo->intervalTime, pQueryInfo->intervalTimeUnit, precision);
int32_t rows = taosGetNumOfResultWithInterpo(pInterpoInfo, NULL, 0, pQueryInfo->intervalTime, etime,
pLocalReducer->resColModel->capacity);
if (rows > 0) { // do interpo
doInterpolateResult(pSql, pLocalReducer, true);
......
......@@ -649,7 +649,7 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySu
tscTagCondCopy(&pSupporter->tagCond, &pNewQueryInfo->tagCond);
pNew->cmd.numOfCols = 0;
pNewQueryInfo->nAggTimeInterval = 0;
pNewQueryInfo->intervalTime = 0;
memset(&pNewQueryInfo->limit, 0, sizeof(SLimitVal));
// backup the data and clear it in the sqlcmd object
......@@ -1683,12 +1683,12 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
return -1;
}
pQueryMsg->nAggTimeInterval = htobe64(pQueryInfo->nAggTimeInterval);
pQueryMsg->intervalTime = htobe64(pQueryInfo->intervalTime);
pQueryMsg->intervalTimeUnit = pQueryInfo->intervalTimeUnit;
pQueryMsg->slidingTime = htobe64(pQueryInfo->nSlidingTime);
if (pQueryInfo->nAggTimeInterval < 0) {
tscError("%p illegal value of aggregation time interval in query msg: %ld", pSql, pQueryInfo->nAggTimeInterval);
if (pQueryInfo->intervalTime < 0) {
tscError("%p illegal value of aggregation time interval in query msg: %ld", pSql, pQueryInfo->intervalTime);
return -1;
}
......
......@@ -395,7 +395,7 @@ static void **doSetResultRowData(SSqlObj *pSql) {
pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i, pQueryInfo->order) + pRes->bytes[i] * pRes->row;
// primary key column cannot be null in interval query, no need to check
if (i == 0 && pQueryInfo->nAggTimeInterval > 0) {
if (i == 0 && pQueryInfo->intervalTime > 0) {
continue;
}
......
......@@ -381,23 +381,23 @@ static void tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
if (pQueryInfo->nAggTimeInterval < minIntervalTime) {
if (pQueryInfo->intervalTime < minIntervalTime) {
tscWarn("%p stream:%p, original sample interval:%ld too small, reset to:%" PRId64 "", pSql, pStream,
pQueryInfo->nAggTimeInterval, minIntervalTime);
pQueryInfo->nAggTimeInterval = minIntervalTime;
pQueryInfo->intervalTime, minIntervalTime);
pQueryInfo->intervalTime = minIntervalTime;
}
pStream->interval = pQueryInfo->nAggTimeInterval; // it shall be derived from sql string
pStream->interval = pQueryInfo->intervalTime; // it shall be derived from sql string
if (pQueryInfo->nSlidingTime == 0) {
pQueryInfo->nSlidingTime = pQueryInfo->nAggTimeInterval;
pQueryInfo->nSlidingTime = pQueryInfo->intervalTime;
}
int64_t minSlidingTime =
(pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsMinSlidingTime * 1000L : tsMinSlidingTime;
if (pQueryInfo->nSlidingTime == -1) {
pQueryInfo->nSlidingTime = pQueryInfo->nAggTimeInterval;
pQueryInfo->nSlidingTime = pQueryInfo->intervalTime;
} else if (pQueryInfo->nSlidingTime < minSlidingTime) {
tscWarn("%p stream:%p, original sliding value:%" PRId64 " too small, reset to:%" PRId64 "", pSql, pStream,
pQueryInfo->nSlidingTime, minSlidingTime);
......@@ -405,16 +405,16 @@ static void tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) {
pQueryInfo->nSlidingTime = minSlidingTime;
}
if (pQueryInfo->nSlidingTime > pQueryInfo->nAggTimeInterval) {
if (pQueryInfo->nSlidingTime > pQueryInfo->intervalTime) {
tscWarn("%p stream:%p, sliding value:%" PRId64 " can not be larger than interval range, reset to:%" PRId64 "", pSql, pStream,
pQueryInfo->nSlidingTime, pQueryInfo->nAggTimeInterval);
pQueryInfo->nSlidingTime, pQueryInfo->intervalTime);
pQueryInfo->nSlidingTime = pQueryInfo->nAggTimeInterval;
pQueryInfo->nSlidingTime = pQueryInfo->intervalTime;
}
pStream->slidingTime = pQueryInfo->nSlidingTime;
pQueryInfo->nAggTimeInterval = 0; // clear the interval value to avoid the force time window split by query processor
pQueryInfo->intervalTime = 0; // clear the interval value to avoid the force time window split by query processor
pQueryInfo->nSlidingTime = 0;
}
......
......@@ -514,7 +514,7 @@ typedef struct {
int16_t numOfCols; // the number of columns will be load from vnode
char intervalTimeUnit; // time interval type, for revisement of interval(1d)
int64_t nAggTimeInterval; // time interval for aggregation, in million second
int64_t intervalTime; // time interval for aggregation, in million second
int64_t slidingTime; // value for sliding window
// tag schema, used to parse tag information in pSidExtInfo
......
......@@ -14,7 +14,7 @@ typedef struct SIDList {
int32_t* pData;
} SIDList;
typedef struct SQueryResultBuf {
typedef struct SQueryDiskbasedResultBuf {
int32_t numOfRowsPerPage;
int32_t numOfPages;
int64_t totalBufSize;
......@@ -27,7 +27,7 @@ typedef struct SQueryResultBuf {
uint32_t numOfAllocGroupIds; // number of allocated id list
void* idsTable; // id hash table
SIDList* list; // for each id, there is a page id list
} SQueryResultBuf;
} SQueryDiskbasedResultBuf;
/**
* create disk-based result buffer
......@@ -36,7 +36,7 @@ typedef struct SQueryResultBuf {
* @param rowSize
* @return
*/
int32_t createResultBuf(SQueryResultBuf** pResultBuf, int32_t size, int32_t rowSize);
int32_t createDiskbasedResultBuffer(SQueryDiskbasedResultBuf** pResultBuf, int32_t size, int32_t rowSize);
/**
*
......@@ -45,14 +45,14 @@ int32_t createResultBuf(SQueryResultBuf** pResultBuf, int32_t size, int32_t rowS
* @param pageId
* @return
*/
tFilePage* getNewDataBuf(SQueryResultBuf* pResultBuf, int32_t groupId, int32_t* pageId);
tFilePage* getNewDataBuf(SQueryDiskbasedResultBuf* pResultBuf, int32_t groupId, int32_t* pageId);
/**
*
* @param pResultBuf
* @return
*/
int32_t getNumOfRowsPerPage(SQueryResultBuf* pResultBuf);
int32_t getNumOfRowsPerPage(SQueryDiskbasedResultBuf* pResultBuf);
/**
*
......@@ -60,7 +60,7 @@ int32_t getNumOfRowsPerPage(SQueryResultBuf* pResultBuf);
* @param groupId
* @return
*/
SIDList getDataBufPagesIdList(SQueryResultBuf* pResultBuf, int32_t groupId);
SIDList getDataBufPagesIdList(SQueryDiskbasedResultBuf* pResultBuf, int32_t groupId);
/**
* get the specified buffer page by id
......@@ -68,27 +68,27 @@ SIDList getDataBufPagesIdList(SQueryResultBuf* pResultBuf, int32_t groupId);
* @param id
* @return
*/
tFilePage* getResultBufferPageById(SQueryResultBuf* pResultBuf, int32_t id);
tFilePage* getResultBufferPageById(SQueryDiskbasedResultBuf* pResultBuf, int32_t id);
/**
* get the total buffer size in the format of disk file
* @param pResultBuf
* @return
*/
int32_t getResBufSize(SQueryResultBuf* pResultBuf);
int32_t getResBufSize(SQueryDiskbasedResultBuf* pResultBuf);
/**
* get the number of groups in the result buffer
* @param pResultBuf
* @return
*/
int32_t getNumOfResultBufGroupId(SQueryResultBuf* pResultBuf);
int32_t getNumOfResultBufGroupId(SQueryDiskbasedResultBuf* pResultBuf);
/**
* destroy result buffer
* @param pResultBuf
*/
void destroyResultBuf(SQueryResultBuf* pResultBuf);
void destroyResultBuf(SQueryDiskbasedResultBuf* pResultBuf);
/**
*
......
......@@ -259,7 +259,7 @@ typedef struct SQuery {
int64_t blockId;
TSKEY skey;
TSKEY ekey;
int64_t nAggTimeInterval;
int64_t intervalTime;
int64_t slidingTime; // sliding time for sliding window query
char intervalTimeUnit; // interval data type, used for daytime revise
int8_t precision;
......
......@@ -170,7 +170,7 @@ void disableFunctForSuppleScan(SQueryRuntimeEnv* pRuntimeEnv, int32_t order);
void enableFunctForMasterScan(SQueryRuntimeEnv* pRuntimeEnv, int32_t order);
int32_t mergeMetersResultToOneGroups(SMeterQuerySupportObj* pSupporter);
void copyFromGroupBuf(SQInfo* pQInfo, SOutputRes* result);
void copyFromGroupBuf(SQInfo* pQInfo, SWindowResult* result);
SBlockInfo getBlockBasicInfo(SQueryRuntimeEnv* pRuntimeEnv, void* pBlock, int32_t blockType);
SCacheBlock* getCacheDataBlock(SMeterObj* pMeterObj, SQueryRuntimeEnv* pRuntimeEnv, int32_t slot);
......@@ -181,18 +181,20 @@ void queryOnBlock(SMeterQuerySupportObj* pSupporter, int64_t* primaryKeys, int32
int32_t vnodeFilterQualifiedMeters(SQInfo *pQInfo, int32_t vid, tSidSet *pSidSet, SMeterDataInfo *pMeterDataInfo,
int32_t *numOfMeters, SMeterDataInfo ***pReqMeterDataInfo);
int32_t vnodeGetVnodeHeaderFileIdx(int32_t* fid, SQueryRuntimeEnv* pRuntimeEnv, int32_t order);
int32_t vnodeGetVnodeHeaderFileIndex(int32_t* fid, SQueryRuntimeEnv* pRuntimeEnv, int32_t order);
int32_t createDataBlocksInfoEx(SMeterDataInfo** pMeterDataInfo, int32_t numOfMeters,
SMeterDataBlockInfoEx** pDataBlockInfoEx, int32_t numOfCompBlocks,
int32_t* nAllocBlocksInfoSize, int64_t addr);
void freeMeterBlockInfoEx(SMeterDataBlockInfoEx* pDataBlockInfoEx, int32_t len);
void setExecutionContext(SMeterQuerySupportObj* pSupporter, SOutputRes* outputRes, int32_t meterIdx, int32_t groupIdx,
void setExecutionContext(SMeterQuerySupportObj* pSupporter, SWindowResult* outputRes, int32_t meterIdx, int32_t groupIdx,
SMeterQueryInfo* sqinfo);
int32_t setIntervalQueryExecutionContext(SMeterQuerySupportObj* pSupporter, int32_t meterIdx, SMeterQueryInfo* sqinfo);
void doGetAlignedIntervalQueryRangeImpl(SQuery *pQuery, int64_t pKey, int64_t keyFirst, int64_t keyLast,
int64_t *actualSkey, int64_t *actualEkey, int64_t *skey, int64_t *ekey);
int64_t getQueryStartPositionInCache(SQueryRuntimeEnv* pRuntimeEnv, int32_t* slot, int32_t* pos, bool ignoreQueryRange);
int64_t getQueryStartPositionInCache(SQueryRuntimeEnv* pRuntimeEnv, int32_t* slot, int32_t* pos, bool ignoreQueryRange);
int64_t getNextAccessedKeyInData(SQuery* pQuery, int64_t* pPrimaryCol, SBlockInfo* pBlockInfo, int32_t blockStatus);
int32_t getDataBlocksForMeters(SMeterQuerySupportObj* pSupporter, SQuery* pQuery, int32_t numOfMeters,
......@@ -209,7 +211,7 @@ int32_t vnodeGetHeaderFile(SQueryRuntimeEnv *pRuntimeEnv, int32_t fileIndex);
* @param ekey
* @return
*/
SMeterQueryInfo* createMeterQueryInfo(SQuery* pQuery, int32_t sid, TSKEY skey, TSKEY ekey);
SMeterQueryInfo* createMeterQueryInfo(SMeterQuerySupportObj *pSupporter, int32_t sid, TSKEY skey, TSKEY ekey);
/**
* Destroy meter query info
......@@ -224,7 +226,7 @@ void destroyMeterQueryInfo(SMeterQueryInfo *pMeterQueryInfo, int32_t numOfCols);
* @param skey
* @param ekey
*/
void changeMeterQueryInfoForSuppleQuery(SQueryResultBuf* pResultBuf, SMeterQueryInfo *pMeterQueryInfo, TSKEY skey, TSKEY ekey);
void changeMeterQueryInfoForSuppleQuery(SQueryDiskbasedResultBuf* pResultBuf, SMeterQueryInfo *pMeterQueryInfo, TSKEY skey, TSKEY ekey);
/**
* add the new allocated disk page to meter query info
......@@ -276,14 +278,14 @@ void displayInterResult(SData** pdata, SQuery* pQuery, int32_t numOfRows);
void vnodePrintQueryStatistics(SMeterQuerySupportObj* pSupporter);
void clearGroupResultBuf(SQueryRuntimeEnv *pRuntimeEnv, SOutputRes *pOneOutputRes);
void copyGroupResultBuf(SQueryRuntimeEnv *pRuntimeEnv, SOutputRes* dst, const SOutputRes* src);
void clearGroupResultBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pOneOutputRes);
void copyGroupResultBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult* dst, const SWindowResult* src);
void resetSlidingWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SSlidingWindowInfo* pSlidingWindowInfo);
void resetSlidingWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo* pWindowResInfo);
void clearCompletedSlidingWindows(SQueryRuntimeEnv* pRuntimeEnv);
int32_t numOfClosedSlidingWindow(SSlidingWindowInfo* pSlidingWindowInfo);
void closeSlidingWindow(SSlidingWindowInfo* pSlidingWindowInfo, int32_t slot);
void closeAllSlidingWindow(SSlidingWindowInfo* pSlidingWindowInfo);
int32_t numOfClosedSlidingWindow(SWindowResInfo* pWindowResInfo);
void closeSlidingWindow(SWindowResInfo* pWindowResInfo, int32_t slot);
void closeAllSlidingWindow(SWindowResInfo* pWindowResInfo);
#ifdef __cplusplus
}
......
......@@ -86,16 +86,27 @@ typedef struct SQueryCostSummary {
} SQueryCostSummary;
typedef struct SPosInfo {
int64_t pageId;
int32_t rowId;
int16_t pageId;
int16_t rowId;
} SPosInfo;
typedef struct SOutputRes {
typedef struct STimeWindow {
TSKEY skey;
TSKEY ekey;
} STimeWindow;
typedef struct SWindowStatus {
bool closed;
} SWindowStatus;
typedef struct SWindowResult {
uint16_t numOfRows;
int32_t nAlloc;
SPosInfo pos;
SResultInfo* resultInfo;
} SOutputRes;
int16_t nAlloc;
SPosInfo pos; // Position of current result in disk-based output buffer
SResultInfo* resultInfo; // For each result column, there is a resultInfo
STimeWindow window; // The time window that current result covers.
SWindowStatus status;
} SWindowResult;
/*
* header files info, avoid to iterate the directory, the data is acquired
......@@ -118,19 +129,8 @@ typedef struct SQueryFilesInfo {
char dbFilePathPrefix[PATH_MAX];
} SQueryFilesInfo;
typedef struct STimeWindow {
TSKEY skey;
TSKEY ekey;
} STimeWindow;
typedef struct SWindowStatus {
STimeWindow window;
bool closed;
} SWindowStatus;
typedef struct SSlidingWindowInfo {
SOutputRes* pResult; // reference to SQuerySupporter->pResult
SWindowStatus* pStatus; // current query window closed or not?
typedef struct SWindowResInfo {
SWindowResult* pResult; // reference to SQuerySupporter->pResult
void* hashList; // hash list for quick access
int16_t type; // data type for hash key
int32_t capacity; // max capacity
......@@ -140,7 +140,7 @@ typedef struct SSlidingWindowInfo {
int64_t startTime; // start time of the first time window for sliding query
int64_t prevSKey; // previous (not completed) sliding window start key
int64_t threshold; // threshold for return completed results.
} SSlidingWindowInfo;
} SWindowResInfo;
typedef struct SQueryRuntimeEnv {
SPositionInfo startPos; /* the start position, used for secondary/third iteration */
......@@ -165,7 +165,7 @@ typedef struct SQueryRuntimeEnv {
SInterpolationInfo interpoInfo;
SData** pInterpoBuf;
SSlidingWindowInfo swindowResInfo;
SWindowResInfo windowResInfo;
STSBuf* pTSBuf;
STSCursor cur;
......@@ -181,7 +181,7 @@ typedef struct SQueryRuntimeEnv {
*/
SCacheBlock cacheBlock;
SQueryResultBuf* pResultBuf;
SQueryDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file
bool stableQuery; // is super table query or not
} SQueryRuntimeEnv;
......@@ -197,8 +197,10 @@ typedef struct SMeterQueryInfo {
int16_t lastResRows;
int64_t tag;
STSCursor cur;
SResultInfo* resultInfo;
SWindowResult* pWindowRes;
int32_t sid; // for retrieve the page id list
SWindowResInfo windowResInfo;
} SMeterQueryInfo;
typedef struct SMeterDataInfo {
......@@ -225,7 +227,7 @@ typedef struct SMeterQuerySupportObj {
* rows may be generated by a specific subgroup. When query on all subgroups is executed,
* the result is copy to output buffer. This attribution is not used during single meter query processing.
*/
SOutputRes* pResult;
// SWindowResult* pResult;
SQueryRuntimeEnv runtimeEnv;
int64_t rawSKey;
int64_t rawEKey;
......
......@@ -132,7 +132,7 @@ static void queryOnMultiDataCache(SQInfo *pQInfo, SMeterDataInfo *pMeterInfo) {
pRuntimeEnv->pMeterObj = pMeterObj;
if (pMeterInfo[k].pMeterQInfo == NULL) {
pMeterInfo[k].pMeterQInfo = createMeterQueryInfo(pQuery, pMeterObj->sid, pSupporter->rawSKey, pSupporter->rawEKey);
pMeterInfo[k].pMeterQInfo = createMeterQueryInfo(pSupporter, pMeterObj->sid, pSupporter->rawSKey, pSupporter->rawEKey);
}
if (pMeterInfo[k].pMeterObj == NULL) { // no data in disk for this meter, set its pointer
......@@ -154,7 +154,7 @@ static void queryOnMultiDataCache(SQInfo *pQInfo, SMeterDataInfo *pMeterInfo) {
vnodeUpdateQueryColumnIndex(pQuery, pMeterObj);
vnodeUpdateFilterColumnIndex(pQuery);
if (pQuery->nAggTimeInterval == 0) {
if (pQuery->intervalTime == 0) {
if ((pQuery->lastKey > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
(pQuery->lastKey < pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) {
dTrace(
......@@ -166,7 +166,7 @@ static void queryOnMultiDataCache(SQInfo *pQInfo, SMeterDataInfo *pMeterInfo) {
continue;
}
setExecutionContext(pSupporter, pSupporter->pResult, k, pMeterInfo[k].groupIdx, pMeterQueryInfo);
setExecutionContext(pSupporter, pRuntimeEnv->windowResInfo.pResult, k, pMeterInfo[k].groupIdx, pMeterQueryInfo);
} else {
int32_t ret = setIntervalQueryExecutionContext(pSupporter, k, pMeterQueryInfo);
if (ret != TSDB_CODE_SUCCESS) {
......@@ -291,7 +291,7 @@ static void queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMeterDataInfo
break;
}
int32_t fileIdx = vnodeGetVnodeHeaderFileIdx(&fid, pRuntimeEnv, pQuery->order.order);
int32_t fileIdx = vnodeGetVnodeHeaderFileIndex(&fid, pRuntimeEnv, pQuery->order.order);
if (fileIdx < 0) { // no valid file, abort current search
break;
}
......@@ -398,7 +398,7 @@ static void queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMeterDataInfo
restoreIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo);
if (pQuery->nAggTimeInterval == 0) { // normal query
if (pQuery->intervalTime == 0) { // normal query
if ((pQuery->lastKey > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
(pQuery->lastKey < pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) {
qTrace(
......@@ -410,9 +410,9 @@ static void queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMeterDataInfo
continue;
}
setExecutionContext(pSupporter, pSupporter->pResult, pOneMeterDataInfo->meterOrderIdx,
setExecutionContext(pSupporter, pRuntimeEnv->windowResInfo.pResult, pOneMeterDataInfo->meterOrderIdx,
pOneMeterDataInfo->groupIdx, pMeterQueryInfo);
} else { // interval query
} else if (pQuery->intervalTime > 0 && pQuery->slidingTime == -1){ // interval query
ret = setIntervalQueryExecutionContext(pSupporter, pOneMeterDataInfo->meterOrderIdx, pMeterQueryInfo);
if (ret != TSDB_CODE_SUCCESS) {
tfree(pReqMeterDataInfo); // error code has been set
......@@ -431,12 +431,13 @@ static void queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMeterDataInfo
}
SBlockInfo binfo = getBlockBasicInfo(pRuntimeEnv, pBlock, BLK_FILE_BLOCK);
int64_t nextKey = -1;
assert(pQuery->pos >= 0 && pQuery->pos < pBlock->numOfPoints);
TSKEY *primaryKeys = (TSKEY *)pRuntimeEnv->primaryColBuffer->data;
if (IS_DATA_BLOCK_LOADED(pRuntimeEnv->blockStatus) && needPrimaryTimestampCol(pQuery, &binfo)) {
TSKEY nextKey = primaryKeys[pQuery->pos];
nextKey = primaryKeys[pQuery->pos];
if (!doCheckWithPrevQueryRange(pQInfo, nextKey, pOneMeterDataInfo)) {
continue;
}
......@@ -446,6 +447,33 @@ static void queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMeterDataInfo
(pBlock->keyFirst >= pQuery->ekey && pBlock->keyLast <= pQuery->lastKey && !QUERY_IS_ASC_QUERY(pQuery)));
}
if (pQuery->intervalTime > 0 && pQuery->slidingTime > 0) {
assert(pMeterQueryInfo->lastKey <= nextKey && QUERY_IS_ASC_QUERY(pQuery));
pMeterQueryInfo->lastKey = nextKey;
pQuery->lastKey = nextKey;
if (pMeterQueryInfo->windowResInfo.prevSKey == 0) {
// normalize the window prev time window
TSKEY skey1, ekey1;
TSKEY windowSKey = 0, windowEKey = 0;
TSKEY skey2 = MIN(pSupporter->rawSKey, pSupporter->rawEKey);
TSKEY ekey2 = MAX(pSupporter->rawSKey, pSupporter->rawEKey);
doGetAlignedIntervalQueryRangeImpl(pQuery, nextKey, skey2, ekey2, &skey1, &ekey1, &windowSKey, &windowEKey);
pMeterQueryInfo->windowResInfo.prevSKey = windowSKey;
}
ret = setIntervalQueryExecutionContext(pSupporter, pOneMeterDataInfo->meterOrderIdx, pMeterQueryInfo);
if (ret != TSDB_CODE_SUCCESS) {
tfree(pReqMeterDataInfo); // error code has been set
pQInfo->killed = 1;
return;
}
}
queryOnBlock(pSupporter, primaryKeys, pRuntimeEnv->blockStatus, &binfo, pOneMeterDataInfo, pInfoEx->pBlock.fields,
searchFn);
}
......@@ -670,7 +698,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) {
* we need to return it to client in the first place.
*/
if (pSupporter->subgroupIdx > 0) {
copyFromGroupBuf(pQInfo, pSupporter->pResult);
copyFromGroupBuf(pQInfo, pRuntimeEnv->windowResInfo.pResult);
pQInfo->pointsRead += pQuery->pointsRead;
if (pQuery->pointsRead > 0) {
......@@ -683,7 +711,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) {
}
resetCtxOutputBuf(pRuntimeEnv);
resetSlidingWindowInfo(pRuntimeEnv, &pRuntimeEnv->swindowResInfo);
resetSlidingWindowInfo(pRuntimeEnv, &pRuntimeEnv->windowResInfo);
while (pSupporter->meterIdx < pSupporter->numOfMeters) {
int32_t k = pSupporter->meterIdx;
......@@ -808,20 +836,21 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) {
// todo refactor
if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) {
SSlidingWindowInfo* pSlidingWindowInfo = &pRuntimeEnv->swindowResInfo;
SWindowResInfo* pWindowResInfo = &pRuntimeEnv->windowResInfo;
for (int32_t i = 0; i < pSlidingWindowInfo->size; ++i) {
SOutputRes *buf = &pSlidingWindowInfo->pResult[i];
pSlidingWindowInfo->pStatus[i].closed = true; // enable return all results for group by normal columns
for (int32_t i = 0; i < pWindowResInfo->size; ++i) {
SWindowStatus* pStatus = &pWindowResInfo->pResult[i].status;
pStatus->closed = true; // enable return all results for group by normal columns
SWindowResult *pResult = &pWindowResInfo->pResult[i];
for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) {
buf->numOfRows = MAX(buf->numOfRows, buf->resultInfo[j].numOfRes);
pResult->numOfRows = MAX(pResult->numOfRows, pResult->resultInfo[j].numOfRes);
}
}
pQInfo->pMeterQuerySupporter->subgroupIdx = 0;
pQuery->pointsRead = 0;
copyFromGroupBuf(pQInfo, pSlidingWindowInfo->pResult);
copyFromGroupBuf(pQInfo, pWindowResInfo->pResult);
}
pQInfo->pointsRead += pQuery->pointsRead;
......@@ -858,7 +887,7 @@ static void doOrderedScan(SQInfo *pQInfo) {
static void setupMeterQueryInfoForSupplementQuery(SMeterQuerySupportObj *pSupporter) {
for (int32_t i = 0; i < pSupporter->numOfMeters; ++i) {
SMeterQueryInfo *pMeterQueryInfo = pSupporter->pMeterDataInfo[i].pMeterQInfo;
SQueryResultBuf* pResultBuf = pSupporter->runtimeEnv.pResultBuf;
SQueryDiskbasedResultBuf* pResultBuf = pSupporter->runtimeEnv.pResultBuf;
changeMeterQueryInfoForSuppleQuery(pResultBuf, pMeterQueryInfo, pSupporter->rawSKey, pSupporter->rawEKey);
}
......@@ -906,6 +935,7 @@ static void doMultiMeterSupplementaryScan(SQInfo *pQInfo) {
static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) {
SMeterQuerySupportObj *pSupporter = pQInfo->pMeterQuerySupporter;
SQueryRuntimeEnv* pRuntimeEnv = &pSupporter->runtimeEnv;
SQuery * pQuery = &pQInfo->query;
if (pSupporter->subgroupIdx > 0) {
......@@ -913,14 +943,14 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) {
* if the subgroupIdx > 0, the query process must be completed yet, we only need to
* copy the data into output buffer
*/
if (pQuery->nAggTimeInterval > 0) {
if (pQuery->intervalTime > 0) {
copyResToQueryResultBuf(pSupporter, pQuery);
#ifdef _DEBUG_VIEW
displayInterResult(pQuery->sdata, pQuery, pQuery->sdata[0]->len);
#endif
} else {
copyFromGroupBuf(pQInfo, pSupporter->pResult);
copyFromGroupBuf(pQInfo, pRuntimeEnv->windowResInfo.pResult);
}
pQInfo->pointsRead += pQuery->pointsRead;
......@@ -964,7 +994,7 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) {
return;
}
if (pQuery->nAggTimeInterval > 0) {
if (pQuery->intervalTime > 0) {
assert(pSupporter->subgroupIdx == 0 && pSupporter->numOfGroupResultPages == 0);
if (mergeMetersResultToOneGroups(pSupporter) == TSDB_CODE_SUCCESS) {
......@@ -975,7 +1005,7 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) {
#endif
}
} else { // not a interval query
copyFromGroupBuf(pQInfo, pSupporter->pResult);
copyFromGroupBuf(pQInfo, pRuntimeEnv->windowResInfo.pResult);
}
// handle the limitation of output buffer
......@@ -1178,10 +1208,10 @@ static void vnodeSingleTableIntervalProcessor(SQInfo *pQInfo) {
}
}
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (pQuery->slidingTime > 0 && pQuery->nAggTimeInterval > 0)) {
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (pQuery->slidingTime > 0 && pQuery->intervalTime > 0)) {
pQInfo->pMeterQuerySupporter->subgroupIdx = 0;
pQuery->pointsRead = 0;
copyFromGroupBuf(pQInfo, pRuntimeEnv->swindowResInfo.pResult);
copyFromGroupBuf(pQInfo, pRuntimeEnv->windowResInfo.pResult);
}
pQInfo->pointsRead += pQuery->pointsRead;
......@@ -1252,7 +1282,7 @@ void vnodeSingleTableQuery(SSchedMsg *pMsg) {
if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) {
pQuery->pointsRead = 0;
if (pQInfo->pMeterQuerySupporter->subgroupIdx > 0) {
copyFromGroupBuf(pQInfo, pQInfo->pMeterQuerySupporter->pResult);
copyFromGroupBuf(pQInfo, pQInfo->pMeterQuerySupporter->runtimeEnv.windowResInfo.pResult);
pQInfo->pointsRead += pQuery->pointsRead;
if (pQuery->pointsRead > 0) {
......@@ -1286,7 +1316,7 @@ void vnodeSingleTableQuery(SSchedMsg *pMsg) {
int64_t st = taosGetTimestampUs();
// group by normal column, sliding window query, interval query are handled by interval query processor
if (pQuery->nAggTimeInterval != 0 || isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // interval (down sampling operation)
if (pQuery->intervalTime != 0 || isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // interval (down sampling operation)
assert(pQuery->checkBufferInLoop == 0 && pQuery->pointsOffset == pQuery->pointsToRead);
vnodeSingleTableIntervalProcessor(pQInfo);
} else {
......@@ -1334,12 +1364,12 @@ void vnodeMultiMeterQuery(SSchedMsg *pMsg) {
pQuery->pointsRead = 0;
int64_t st = taosGetTimestampUs();
if (pQuery->nAggTimeInterval > 0 ||
if (pQuery->intervalTime > 0 ||
(isFixedOutputQuery(pQuery) && (!isPointInterpoQuery(pQuery)) && !isGroupbyNormalCol(pQuery->pGroupbyExpr))) {
assert(pQuery->checkBufferInLoop == 0);
vnodeMultiMeterQueryProcessor(pQInfo);
} else {
assert((pQuery->checkBufferInLoop == 1 && pQuery->nAggTimeInterval == 0) || isPointInterpoQuery(pQuery) ||
assert((pQuery->checkBufferInLoop == 1 && pQuery->intervalTime == 0) || isPointInterpoQuery(pQuery) ||
isGroupbyNormalCol(pQuery->pGroupbyExpr));
vnodeSTableSeqProcessor(pQInfo);
......
......@@ -266,7 +266,7 @@ static SQInfo *vnodeAllocateQInfoEx(SQueryMeterMsg *pQueryMsg, SSqlGroupbyExpr *
}
pQuery->pGroupbyExpr = pGroupbyExpr;
pQuery->nAggTimeInterval = pQueryMsg->nAggTimeInterval;
pQuery->intervalTime = pQueryMsg->intervalTime;
pQuery->slidingTime = pQueryMsg->slidingTime;
pQuery->interpoType = pQueryMsg->interpoType;
pQuery->intervalTimeUnit = pQueryMsg->intervalTimeUnit;
......@@ -920,8 +920,8 @@ int vnodeSaveQueryResult(void *handle, char *data, int32_t *size) {
}
static int32_t validateQueryMeterMsg(SQueryMeterMsg *pQueryMsg) {
if (pQueryMsg->nAggTimeInterval < 0) {
dError("qmsg:%p illegal value of aggTimeInterval %" PRId64 "", pQueryMsg, pQueryMsg->nAggTimeInterval);
if (pQueryMsg->intervalTime < 0) {
dError("qmsg:%p illegal value of aggTimeInterval %" PRId64 "", pQueryMsg, pQueryMsg->intervalTime);
return -1;
}
......@@ -975,7 +975,7 @@ int32_t vnodeConvertQueryMeterMsg(SQueryMeterMsg *pQueryMsg) {
pQueryMsg->queryType = htons(pQueryMsg->queryType);
pQueryMsg->nAggTimeInterval = htobe64(pQueryMsg->nAggTimeInterval);
pQueryMsg->intervalTime = htobe64(pQueryMsg->intervalTime);
pQueryMsg->slidingTime = htobe64(pQueryMsg->slidingTime);
pQueryMsg->numOfTagsCols = htons(pQueryMsg->numOfTagsCols);
......@@ -1146,7 +1146,7 @@ int32_t vnodeConvertQueryMeterMsg(SQueryMeterMsg *pQueryMsg) {
"offset:%" PRId64,
pQueryMsg, pQueryMsg->numOfSids, pQueryMsg->skey, pQueryMsg->ekey, pQueryMsg->numOfGroupCols,
pQueryMsg->numOfTagsCols, pQueryMsg->order, pQueryMsg->orderType, pQueryMsg->orderByIdx,
pQueryMsg->numOfOutputCols, pQueryMsg->numOfCols, pQueryMsg->nAggTimeInterval, pQueryMsg->interpoType,
pQueryMsg->numOfOutputCols, pQueryMsg->numOfCols, pQueryMsg->intervalTime, pQueryMsg->interpoType,
pQueryMsg->tsLen, pQueryMsg->limit, pQueryMsg->offset);
return 0;
......
......@@ -7,8 +7,8 @@
#define DEFAULT_INTERN_BUF_SIZE 16384L
int32_t createResultBuf(SQueryResultBuf** pResultBuf, int32_t size, int32_t rowSize) {
SQueryResultBuf* pResBuf = calloc(1, sizeof(SQueryResultBuf));
int32_t createDiskbasedResultBuffer(SQueryDiskbasedResultBuf** pResultBuf, int32_t size, int32_t rowSize) {
SQueryDiskbasedResultBuf* pResBuf = calloc(1, sizeof(SQueryDiskbasedResultBuf));
pResBuf->numOfRowsPerPage = (DEFAULT_INTERN_BUF_SIZE - sizeof(tFilePage)) / rowSize;
pResBuf->numOfPages = size;
......@@ -50,17 +50,17 @@ int32_t createResultBuf(SQueryResultBuf** pResultBuf, int32_t size, int32_t rowS
return TSDB_CODE_SUCCESS;
}
tFilePage* getResultBufferPageById(SQueryResultBuf* pResultBuf, int32_t id) {
tFilePage* getResultBufferPageById(SQueryDiskbasedResultBuf* pResultBuf, int32_t id) {
assert(id < pResultBuf->numOfPages && id >= 0);
return (tFilePage*)(pResultBuf->pBuf + DEFAULT_INTERN_BUF_SIZE * id);
}
int32_t getNumOfResultBufGroupId(SQueryResultBuf* pResultBuf) { return taosNumElemsInHashTable(pResultBuf->idsTable); }
int32_t getNumOfResultBufGroupId(SQueryDiskbasedResultBuf* pResultBuf) { return taosNumElemsInHashTable(pResultBuf->idsTable); }
int32_t getResBufSize(SQueryResultBuf* pResultBuf) { return pResultBuf->totalBufSize; }
int32_t getResBufSize(SQueryDiskbasedResultBuf* pResultBuf) { return pResultBuf->totalBufSize; }
static int32_t extendDiskFileSize(SQueryResultBuf* pResultBuf, int32_t numOfPages) {
static int32_t extendDiskFileSize(SQueryDiskbasedResultBuf* pResultBuf, int32_t numOfPages) {
assert(pResultBuf->numOfPages * DEFAULT_INTERN_BUF_SIZE == pResultBuf->totalBufSize);
int32_t ret = munmap(pResultBuf->pBuf, pResultBuf->totalBufSize);
......@@ -88,11 +88,11 @@ static int32_t extendDiskFileSize(SQueryResultBuf* pResultBuf, int32_t numOfPage
return TSDB_CODE_SUCCESS;
}
static bool noMoreAvailablePages(SQueryResultBuf* pResultBuf) {
static bool noMoreAvailablePages(SQueryDiskbasedResultBuf* pResultBuf) {
return (pResultBuf->allocateId == pResultBuf->numOfPages - 1);
}
static int32_t getGroupIndex(SQueryResultBuf* pResultBuf, int32_t groupId) {
static int32_t getGroupIndex(SQueryDiskbasedResultBuf* pResultBuf, int32_t groupId) {
assert(pResultBuf != NULL);
char* p = taosGetDataFromHashTable(pResultBuf->idsTable, (const char*)&groupId, sizeof(int32_t));
......@@ -106,7 +106,7 @@ static int32_t getGroupIndex(SQueryResultBuf* pResultBuf, int32_t groupId) {
return slot;
}
static int32_t addNewGroupId(SQueryResultBuf* pResultBuf, int32_t groupId) {
static int32_t addNewGroupId(SQueryDiskbasedResultBuf* pResultBuf, int32_t groupId) {
int32_t num = getNumOfResultBufGroupId(pResultBuf); // the num is the newest allocated group id slot
if (pResultBuf->numOfAllocGroupIds <= num) {
......@@ -148,7 +148,7 @@ static int32_t doRegisterId(SIDList* pList, int32_t id) {
return 0;
}
static void registerPageId(SQueryResultBuf* pResultBuf, int32_t groupId, int32_t pageId) {
static void registerPageId(SQueryDiskbasedResultBuf* pResultBuf, int32_t groupId, int32_t pageId) {
int32_t slot = getGroupIndex(pResultBuf, groupId);
if (slot < 0) {
slot = addNewGroupId(pResultBuf, groupId);
......@@ -158,7 +158,7 @@ static void registerPageId(SQueryResultBuf* pResultBuf, int32_t groupId, int32_t
doRegisterId(pList, pageId);
}
tFilePage* getNewDataBuf(SQueryResultBuf* pResultBuf, int32_t groupId, int32_t* pageId) {
tFilePage* getNewDataBuf(SQueryDiskbasedResultBuf* pResultBuf, int32_t groupId, int32_t* pageId) {
if (noMoreAvailablePages(pResultBuf)) {
if (extendDiskFileSize(pResultBuf, pResultBuf->incStep) != TSDB_CODE_SUCCESS) {
return NULL;
......@@ -177,9 +177,9 @@ tFilePage* getNewDataBuf(SQueryResultBuf* pResultBuf, int32_t groupId, int32_t*
return page;
}
int32_t getNumOfRowsPerPage(SQueryResultBuf* pResultBuf) { return pResultBuf->numOfRowsPerPage; }
int32_t getNumOfRowsPerPage(SQueryDiskbasedResultBuf* pResultBuf) { return pResultBuf->numOfRowsPerPage; }
SIDList getDataBufPagesIdList(SQueryResultBuf* pResultBuf, int32_t groupId) {
SIDList getDataBufPagesIdList(SQueryDiskbasedResultBuf* pResultBuf, int32_t groupId) {
SIDList list = {0};
int32_t slot = getGroupIndex(pResultBuf, groupId);
if (slot < 0) {
......@@ -189,7 +189,7 @@ SIDList getDataBufPagesIdList(SQueryResultBuf* pResultBuf, int32_t groupId) {
}
}
void destroyResultBuf(SQueryResultBuf* pResultBuf) {
void destroyResultBuf(SQueryDiskbasedResultBuf* pResultBuf) {
if (pResultBuf == NULL) {
return;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册