提交 08854f86 编写于 作者: H hjxilinx

refactor codes of super table query in handling the sliding query.

上级 d6529943
......@@ -2162,7 +2162,7 @@ static STopBotInfo *getTopBotOutputInfo(SQLFunctionCtx *pCtx) {
// only the first_stage_merge is directly written data into final output buffer
if (pResInfo->superTableQ && pCtx->currentStage != SECONDARY_STAGE_MERGE) {
return (STopBotInfo*) pCtx->aOutputBuf;
} else { // for normal table query and super table at the secondary_stage, result is written to intermediate buffer
} else { // during normal table query and super table at the secondary_stage, result is written to intermediate buffer
return pResInfo->interResultBuf;
}
}
......
......@@ -175,8 +175,7 @@ void copyFromGroupBuf(SQInfo* pQInfo, SWindowResult* result);
SBlockInfo getBlockBasicInfo(SQueryRuntimeEnv* pRuntimeEnv, void* pBlock, int32_t blockType);
SCacheBlock* getCacheDataBlock(SMeterObj* pMeterObj, SQueryRuntimeEnv* pRuntimeEnv, int32_t slot);
void queryOnBlock(SMeterQuerySupportObj* pSupporter, int64_t* primaryKeys, int32_t blockStatus,
SBlockInfo* pBlockBasicInfo, SMeterDataInfo* pDataHeadInfoEx, SField* pFields,
void queryOnBlock(SMeterQuerySupportObj* pSupporter, int32_t blockStatus, SBlockInfo* pBlockBasicInfo, SMeterDataInfo* pDataHeadInfoEx, SField* pFields,
__block_search_fn_t searchFn);
int32_t vnodeFilterQualifiedMeters(SQInfo *pQInfo, int32_t vid, tSidSet *pSidSet, SMeterDataInfo *pMeterDataInfo,
......@@ -278,14 +277,17 @@ void displayInterResult(SData** pdata, SQuery* pQuery, int32_t numOfRows);
void vnodePrintQueryStatistics(SMeterQuerySupportObj* pSupporter);
void clearGroupResultBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pOneOutputRes);
void copyGroupResultBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult* dst, const SWindowResult* src);
void clearTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pOneOutputRes);
void copyTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult* dst, const SWindowResult* src);
void resetSlidingWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo* pWindowResInfo);
void clearClosedSlidingWindows(SQueryRuntimeEnv* pRuntimeEnv);
int32_t numOfClosedSlidingWindow(SWindowResInfo* pWindowResInfo);
void closeSlidingWindow(SWindowResInfo* pWindowResInfo, int32_t slot);
void closeAllSlidingWindow(SWindowResInfo* pWindowResInfo);
int32_t initWindowResInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRuntimeEnv, int32_t size, int32_t threshold, int16_t type);
void cleanupTimeWindowInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRuntimeEnv);
void resetTimeWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo* pWindowResInfo);
void clearClosedTimeWindow(SQueryRuntimeEnv* pRuntimeEnv);
int32_t numOfClosedTimeWindow(SWindowResInfo* pWindowResInfo);
void closeTimeWindow(SWindowResInfo* pWindowResInfo, int32_t slot);
void closeAllTimeWindow(SWindowResInfo* pWindowResInfo);
#ifdef __cplusplus
}
......
......@@ -197,7 +197,6 @@ typedef struct SMeterQueryInfo {
int16_t lastResRows;
int64_t tag;
STSCursor cur;
SWindowResult* pWindowRes;
int32_t sid; // for retrieve the page id list
SWindowResInfo windowResInfo;
......@@ -279,7 +278,7 @@ typedef struct _qinfo {
int (*fp)(SMeterObj*, SQuery*);
} SQInfo;
int32_t vnodeQuerySingleMeterPrepare(SQInfo* pQInfo, SMeterObj* pMeterObj, SMeterQuerySupportObj* pSMultiMeterObj,
int32_t vnodeQuerySingleTablePrepare(SQInfo* pQInfo, SMeterObj* pMeterObj, SMeterQuerySupportObj* pSMultiMeterObj,
void* param);
void vnodeQueryFreeQInfoEx(SQInfo* pQInfo);
......
......@@ -65,7 +65,7 @@ static TSKEY getTimestampInDiskBlock(SQueryRuntimeEnv *pRuntimeEnv, int32_t inde
static void savePointPosition(SPositionInfo *position, int32_t fileId, int32_t slot, int32_t pos);
static int32_t getNextDataFileCompInfo(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMeterObj, int32_t step);
static void setGroupOutputBuffer(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pResult);
static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pResult);
static void getAlignedIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, TSKEY keyInData, TSKEY skey, TSKEY ekey);
......@@ -83,7 +83,7 @@ static TSKEY getQueryPositionForCacheInvalid(SQueryRuntimeEnv *pRuntimeEnv, __
static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t functionId);
void doGetAlignedIntervalQueryRangeImpl(SQuery *pQuery, int64_t pKey, int64_t keyFirst, int64_t keyLast,
int64_t *actualSkey, int64_t *actualEkey, int64_t *skey, int64_t *ekey);
static void getNextLogicalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow *pTimeWindow);
static void getNextTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow *pTimeWindow);
// check the offset value integrity
static FORCE_INLINE int32_t validateHeaderOffsetSegment(SQInfo *pQInfo, char *filePath, int32_t vid, char *data,
......@@ -588,7 +588,7 @@ static void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, int64_t StartQue
void createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool isSTableQuery, SPosInfo *posInfo);
static void destroyGroupResultBuf(SWindowResult *pOneOutputRes, int32_t nOutputCols);
static void destroyTimeWindowRes(SWindowResult *pOneOutputRes, int32_t nOutputCols);
static int32_t binarySearchForBlockImpl(SCompBlock *pBlock, int32_t numOfBlocks, TSKEY skey, int32_t order) {
int32_t firstSlot = 0;
......@@ -1432,17 +1432,17 @@ static char *getDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sa
return dataBlock;
}
static bool slidingWindowClosed(SWindowResInfo *pWindowResInfo, int32_t slot) {
static bool isWindowResClosed(SWindowResInfo *pWindowResInfo, int32_t slot) {
return (pWindowResInfo->pResult[slot].status.closed == true);
}
static int32_t curSlidingWindow(SWindowResInfo *pWindowResInfo) {
static int32_t curTimeWindow(SWindowResInfo *pWindowResInfo) {
assert(pWindowResInfo->curIndex >= 0 && pWindowResInfo->curIndex < pWindowResInfo->size);
return pWindowResInfo->curIndex;
}
static SWindowResult *doSetSlidingWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowResInfo,
char *pData, int16_t bytes) {
static SWindowResult *doSetTimeWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowResInfo, char *pData,
int16_t bytes) {
SQuery *pQuery = pRuntimeEnv->pQuery;
int32_t *p1 = (int32_t *)taosGetDataFromHashTable(pWindowResInfo->hashList, pData, bytes);
......@@ -1464,7 +1464,7 @@ static SWindowResult *doSetSlidingWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, S
SPosInfo pos = {-1, -1};
createQueryResultInfo(pQuery, &pWindowResInfo->pResult[i], pRuntimeEnv->stableQuery, &pos);
}
pWindowResInfo->capacity = newCap;
}
......@@ -1476,37 +1476,34 @@ static SWindowResult *doSetSlidingWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, S
return &pWindowResInfo->pResult[pWindowResInfo->curIndex];
}
// get the correct sliding window according to the handled timestamp
static STimeWindow getActiveSlidingWindow(SWindowResInfo *pWindowResInfo, int64_t ts, SQuery *pQuery) {
// get the correct time window according to the handled timestamp
static STimeWindow getActiveTimeWindow(SWindowResInfo *pWindowResInfo, int64_t ts, SQuery *pQuery) {
STimeWindow w = {0};
if (pWindowResInfo->curIndex == -1) { // the first window, from the prevous stored value
if (pWindowResInfo->curIndex == -1) { // the first window, from the previous stored value
w.skey = pWindowResInfo->prevSKey;
w.ekey = w.skey + pQuery->intervalTime - 1;
} else {
int32_t slot = curSlidingWindow(pWindowResInfo);
int32_t slot = curTimeWindow(pWindowResInfo);
w = pWindowResInfo->pResult[slot].window;
}
// STimeWindow *window = &pWindowResInfo->pResult[slot].window;
if (w.skey > ts || w.ekey < ts) {
// if (w.skey <= ts && w.ekey >= ts) {
// w = *window; // belongs to current active window
// } else {
int64_t st = w.skey;
while (st > ts) {
st -= pQuery->slidingTime;
}
while ((st + pQuery->intervalTime - 1) < ts) {
st += pQuery->slidingTime;
}
if (w.skey > ts || w.ekey < ts) {
int64_t st = w.skey;
if (st > ts) {
st -= ((st - ts + pQuery->slidingTime - 1)/pQuery->slidingTime) * pQuery->slidingTime;
}
w.skey = st;
w.ekey = w.skey + pQuery->intervalTime - 1;
int64_t et = st + pQuery->intervalTime - 1;
if (et < ts) {
st += ((ts - et + pQuery->slidingTime - 1)/pQuery->slidingTime) * pQuery->slidingTime;
}
w.skey = st;
w.ekey = w.skey + pQuery->intervalTime - 1;
}
assert(ts >= w.skey && ts <= w.ekey);
return w;
}
......@@ -1555,7 +1552,7 @@ static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowRes
assert(win->skey < win->ekey);
SQueryDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf;
SWindowResult *pWindowRes = doSetSlidingWindowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&win->skey, TSDB_KEYSIZE);
SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&win->skey, TSDB_KEYSIZE);
if (pWindowRes == NULL) {
return -1;
}
......@@ -1571,13 +1568,13 @@ static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowRes
// set time window for current result
pWindowRes->window = *win;
setGroupOutputBuffer(pRuntimeEnv, pWindowRes);
setWindowResOutputBuf(pRuntimeEnv, pWindowRes);
initCtxOutputBuf(pRuntimeEnv);
return TSDB_CODE_SUCCESS;
}
static SWindowStatus *getSlidingWindowStatus(SWindowResInfo *pWindowResInfo, int32_t slot) {
static SWindowStatus *getTimeWindowResStatus(SWindowResInfo *pWindowResInfo, int32_t slot) {
assert(slot >= 0 && slot < pWindowResInfo->size);
return &pWindowResInfo->pResult[slot].status;
}
......@@ -1606,11 +1603,11 @@ static void doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey,
if (pQuery->slidingTime > 0 && pQuery->intervalTime > 0 && IS_MASTER_SCAN(pRuntimeEnv)) { // query completed
if ((lastKey >= pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
(lastKey <= pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) {
closeAllSlidingWindow(pWindowResInfo);
closeAllTimeWindow(pWindowResInfo);
pWindowResInfo->curIndex = pWindowResInfo->size - 1;
setQueryStatus(pQuery, QUERY_COMPLETED | QUERY_RESBUF_FULL);
} else {
} else { // set the current index to be the last unclosed window
int32_t i = 0;
int64_t skey = 0;
......@@ -1622,7 +1619,7 @@ static void doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey,
if ((pResult->window.ekey <= lastKey && QUERY_IS_ASC_QUERY(pQuery)) ||
(pResult->window.skey >= lastKey && !QUERY_IS_ASC_QUERY(pQuery))) {
closeSlidingWindow(pWindowResInfo, i);
closeTimeWindow(pWindowResInfo, i);
} else {
skey = pResult->window.skey;
break;
......@@ -1631,24 +1628,109 @@ static void doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey,
// all windows are closed, set the last one to be the skey
if (skey == 0) {
skey = pWindowResInfo->pResult[pWindowResInfo->size-1].window.skey;
assert(i == pWindowResInfo->size);
pWindowResInfo->curIndex = pWindowResInfo->size - 1;
} else {
pWindowResInfo->curIndex = i;
}
pWindowResInfo->prevSKey = skey;
pWindowResInfo->prevSKey = pWindowResInfo->pResult[pWindowResInfo->curIndex].window.skey;
// the number of completed slots are larger than the threshold, dump to client immediately.
int32_t v = numOfClosedSlidingWindow(pWindowResInfo);
if (v > pWindowResInfo->threshold) {
int32_t n = numOfClosedTimeWindow(pWindowResInfo);
if (n > pWindowResInfo->threshold) {
setQueryStatus(pQuery, QUERY_RESBUF_FULL);
}
dTrace("QInfo:%p total window:%d, closed:%d", GET_QINFO_ADDR(pQuery), pWindowResInfo->size, v);
dTrace("QInfo:%p total window:%d, closed:%d", GET_QINFO_ADDR(pQuery), pWindowResInfo->size, n);
}
assert(pWindowResInfo->prevSKey != 0);
}
}
static int32_t getNumOfRowsInTimeWindow(SQuery *pQuery, SBlockInfo *pBlockInfo, TSKEY *pPrimaryColumn, int32_t startPos,
TSKEY ekey, __block_search_fn_t searchFn, bool updateLastKey) {
assert(startPos >= 0 && startPos < pBlockInfo->size);
int32_t forwardStep = -1;
int32_t order = pQuery->order.order;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(order);
if (QUERY_IS_ASC_QUERY(pQuery)) {
if (ekey < pBlockInfo->keyLast) {
forwardStep = getForwardStepsInBlock(pBlockInfo->size, searchFn, ekey, startPos, order, pPrimaryColumn);
if (forwardStep == 0) { // no qualified data in current block, do not update the lastKey value
assert(ekey < pPrimaryColumn[startPos]);
} else {
if (updateLastKey) {
pQuery->lastKey = MAX(ekey, pPrimaryColumn[startPos + (forwardStep - 1)]) + step;
}
}
} else {
forwardStep = pBlockInfo->size - startPos;
if (updateLastKey) {
pQuery->lastKey = pBlockInfo->keyLast + step;
}
}
} else { // desc
if (ekey > pBlockInfo->keyFirst) {
forwardStep = getForwardStepsInBlock(pBlockInfo->size, searchFn, ekey, startPos, order, pPrimaryColumn);
if (forwardStep == 0) { // no qualified data in current block, do not update the lastKey value
assert(ekey > pPrimaryColumn[startPos]);
} else {
if (updateLastKey) {
pQuery->lastKey = MIN(ekey, pPrimaryColumn[startPos - (forwardStep - 1)]) + step;
}
}
} else {
forwardStep = startPos + 1;
if (updateLastKey) {
pQuery->lastKey = pBlockInfo->keyFirst + step;
}
}
}
assert(forwardStep >= 0);
return forwardStep;
}
static void doBlockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SWindowStatus *pStatus, STimeWindow *pWin,
int32_t startPos, int32_t forwardStep) {
SQuery * pQuery = pRuntimeEnv->pQuery;
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
if (IS_MASTER_SCAN(pRuntimeEnv) || pStatus->closed) {
for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) {
pCtx[k].nStartQueryTimestamp = pWin->skey;
pCtx[k].size = forwardStep;
pCtx[k].startOffset = (QUERY_IS_ASC_QUERY(pQuery)) ? startPos : startPos - (forwardStep - 1);
int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId;
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
aAggs[functionId].xFunction(&pCtx[k]);
}
}
}
}
static void doRowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SWindowStatus *pStatus, STimeWindow *pWin) {
SQuery * pQuery = pRuntimeEnv->pQuery;
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
if (IS_MASTER_SCAN(pRuntimeEnv) || pStatus->closed) {
for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) {
pCtx[k].nStartQueryTimestamp = pWin->skey;
int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId;
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
aAggs[functionId].xFunction(&pCtx[k]);
}
}
}
}
/**
*
* @param pRuntimeEnv
......@@ -1659,11 +1741,12 @@ static void doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey,
* @return the incremental number of output value, so it maybe 0 for fixed number of query,
* such as count/min/max etc.
*/
static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t forwardStep, TSKEY *primaryKeyCol,
SField *pFields, SBlockInfo *pBlockInfo, SWindowResInfo *pWindowResInfo,
static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t forwardStep, SField *pFields,
SBlockInfo *pBlockInfo, SWindowResInfo *pWindowResInfo,
__block_search_fn_t searchFn) {
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
SQuery * pQuery = pRuntimeEnv->pQuery;
TSKEY * primaryKeyCol = (TSKEY *)pRuntimeEnv->primaryColBuffer->data;
bool isDiskFileBlock = IS_FILE_BLOCK(pRuntimeEnv->blockStatus);
int64_t prevNumOfRes = getNumOfResult(pRuntimeEnv);
......@@ -1702,50 +1785,25 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t
int32_t offset = GET_COL_DATA_POS(pQuery, 0, step);
TSKEY ts = primaryKeyCol[offset];
STimeWindow win = getActiveSlidingWindow(pWindowResInfo, ts, pQuery);
STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery);
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pRuntimeEnv->pMeterObj->sid, &win) != TSDB_CODE_SUCCESS) {
return 0;
}
if (QUERY_IS_ASC_QUERY(pQuery)) { //todo refactor
if (win.ekey < pBlockInfo->keyLast) {
forwardStep =
getForwardStepsInBlock(pBlockInfo->size, searchFn, win.ekey, pQuery->pos, pQuery->order.order, primaryKeyCol);
} else {
forwardStep = pBlockInfo->size - pQuery->pos;
}
} else {
if (win.skey > pBlockInfo->keyFirst) {
forwardStep = getForwardStepsInBlock(pBlockInfo->size, searchFn, win.skey, pQuery->pos, pQuery->order.order, primaryKeyCol);
} else {
forwardStep = pQuery->pos + 1;
}
}
SWindowStatus* pStatus = getSlidingWindowStatus(pWindowResInfo, curSlidingWindow(pWindowResInfo));
if (IS_MASTER_SCAN(pRuntimeEnv) || pStatus->closed) {
for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) {
pCtx[k].nStartQueryTimestamp = win.skey;
pCtx[k].size = forwardStep;
pCtx[k].startOffset = (QUERY_IS_ASC_QUERY(pQuery)) ? pQuery->pos : pQuery->pos - (forwardStep - 1);
int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId;
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
aAggs[functionId].xFunction(&pCtx[k]);
}
}
}
TSKEY ekey = QUERY_IS_ASC_QUERY(pQuery) ? win.ekey : win.skey;
forwardStep = getNumOfRowsInTimeWindow(pQuery, pBlockInfo, primaryKeyCol, pQuery->pos, ekey, searchFn, false);
int32_t index = pWindowResInfo->curIndex;
SWindowStatus *pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo));
doBlockwiseApplyFunctions(pRuntimeEnv, pStatus, &win, pQuery->pos, forwardStep);
int32_t index = pWindowResInfo->curIndex;
STimeWindow nextWin = win;
while (1) {
getNextLogicalQueryRange(pRuntimeEnv, &nextWin);
getNextTimeWindow(pRuntimeEnv, &nextWin);
if (pWindowResInfo->startTime > nextWin.skey || (nextWin.skey > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
(nextWin.ekey < pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) {
pWindowResInfo->curIndex = index;
break;
}
......@@ -1755,62 +1813,29 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t
break;
}
// if (pBlockInfo->keyLast >= nextWin.skey && pBlockInfo->keyFirst <= nextWin.ekey) {
int32_t startPos = -1;
if (QUERY_IS_ASC_QUERY(pQuery)) {
startPos = searchFn((char *)primaryKeyCol, pBlockInfo->size, nextWin.skey, TSQL_SO_ASC);
} else {
startPos = searchFn((char *)primaryKeyCol, pBlockInfo->size, nextWin.ekey, TSQL_SO_DESC);
}
TSKEY startKey = QUERY_IS_ASC_QUERY(pQuery) ? nextWin.skey : nextWin.ekey;
int32_t startPos = searchFn((char *)primaryKeyCol, pBlockInfo->size, startKey, pQuery->order.order);
/*
* This time window does not cover any data, try next time window
* when the time window is too small, this case may happen
*/
if ((primaryKeyCol[startPos] > nextWin.ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
(primaryKeyCol[startPos] < nextWin.skey && !QUERY_IS_ASC_QUERY(pQuery))) {
(primaryKeyCol[startPos] < nextWin.skey && !QUERY_IS_ASC_QUERY(pQuery))) {
continue;
}
// null data, failed to allocate more memory buffer
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pRuntimeEnv->pMeterObj->sid, &nextWin) !=
TSDB_CODE_SUCCESS) {
pRuntimeEnv->windowResInfo.curIndex = index;
int32_t sid = pRuntimeEnv->pMeterObj->sid;
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, sid, &nextWin) != TSDB_CODE_SUCCESS) {
break;
}
if (QUERY_IS_ASC_QUERY(pQuery)) { //todo refactor
if (nextWin.ekey < pBlockInfo->keyLast) {
forwardStep = getForwardStepsInBlock(pBlockInfo->size, searchFn, nextWin.ekey, startPos, pQuery->order.order, primaryKeyCol);
} else {
forwardStep = pBlockInfo->size - startPos;
}
} else {
if (nextWin.skey > pBlockInfo->keyFirst) {
forwardStep = getForwardStepsInBlock(pBlockInfo->size, searchFn, nextWin.skey, startPos, pQuery->order.order, primaryKeyCol);
} else {
forwardStep = startPos + 1;
}
}
pStatus = getSlidingWindowStatus(pWindowResInfo, curSlidingWindow(pWindowResInfo));
if (IS_MASTER_SCAN(pRuntimeEnv) || pStatus->closed) {
for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) {
pCtx[k].nStartQueryTimestamp = nextWin.skey;
pCtx[k].size = forwardStep;
pCtx[k].startOffset = (QUERY_IS_ASC_QUERY(pQuery))? startPos : startPos - (forwardStep - 1);
int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId;
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
aAggs[functionId].xFunction(&pCtx[k]);
}
}
}
// } else {
// pWindowResInfo->curIndex = index;
// break;
// }
ekey = QUERY_IS_ASC_QUERY(pQuery) ? nextWin.ekey : nextWin.skey;
forwardStep = getNumOfRowsInTimeWindow(pQuery, pBlockInfo, primaryKeyCol, startPos, ekey, searchFn, false);
pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo));
doBlockwiseApplyFunctions(pRuntimeEnv, pStatus, &nextWin, startPos, forwardStep);
}
pWindowResInfo->curIndex = index;
......@@ -1828,9 +1853,6 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t
}
}
TSKEY lastKey = (QUERY_IS_ASC_QUERY(pQuery)) ? pBlockInfo->keyLast : pBlockInfo->keyFirst;
doCheckQueryCompleted(pRuntimeEnv, lastKey, pWindowResInfo);
/*
* No need to calculate the number of output results for group-by normal columns, interval query
* because the results of group by normal column is put into intermediate buffer.
......@@ -1901,19 +1923,24 @@ static bool needToLoadDataBlock(SQuery *pQuery, SField *pField, SQLFunctionCtx *
}
}
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
int32_t functId = pQuery->pSelectExpr[i].pBase.functionId;
if (functId == TSDB_FUNC_TOP || functId == TSDB_FUNC_BOTTOM) {
return top_bot_datablock_filter(&pCtx[i], functId, (char *)&pField[i].min, (char *)&pField[i].max);
}
}
// todo disable this opt code block temporarily
// for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
// int32_t functId = pQuery->pSelectExpr[i].pBase.functionId;
// if (functId == TSDB_FUNC_TOP || functId == TSDB_FUNC_BOTTOM) {
// return top_bot_datablock_filter(&pCtx[i], functId, (char *)&pField[i].min, (char *)&pField[i].max);
// }
// }
return true;
}
static int32_t initSlidingWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowResInfo, int32_t threshold,
int16_t type) {
pWindowResInfo->capacity = threshold;
int32_t initWindowResInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRuntimeEnv, int32_t size,
int32_t threshold, int16_t type) {
if (size < threshold) {
size = threshold;
}
pWindowResInfo->capacity = size;
pWindowResInfo->threshold = threshold;
pWindowResInfo->type = type;
......@@ -1934,23 +1961,29 @@ static int32_t initSlidingWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SWindowResIn
return TSDB_CODE_SUCCESS;
}
static void destroySlidingWindowInfo(SWindowResInfo *pWindowResInfo) {
void cleanupTimeWindowInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRuntimeEnv) {
if (pWindowResInfo == NULL || pWindowResInfo->capacity == 0) {
assert(pWindowResInfo->hashList == NULL && pWindowResInfo->pResult == NULL);
return;
}
for (int32_t i = 0; i < pWindowResInfo->size; ++i) {
SWindowResult *pResult = &pWindowResInfo->pResult[i];
destroyTimeWindowRes(pResult, pRuntimeEnv->pQuery->numOfOutputCols);
}
taosCleanUpHashTable(pWindowResInfo->hashList);
tfree(pWindowResInfo->pResult);
}
void resetSlidingWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowResInfo) {
void resetTimeWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowResInfo) {
if (pWindowResInfo == NULL || pWindowResInfo->capacity == 0) {
return;
}
for (int32_t i = 0; i < pWindowResInfo->size; ++i) {
SWindowResult *pWindowRes = &pWindowResInfo->pResult[i];
clearGroupResultBuf(pRuntimeEnv, pWindowRes);
clearTimeWindowResBuf(pRuntimeEnv, pWindowRes);
}
pWindowResInfo->curIndex = -1;
......@@ -1964,7 +1997,7 @@ void resetSlidingWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWind
pWindowResInfo->prevSKey = 0;
}
void clearClosedSlidingWindows(SQueryRuntimeEnv *pRuntimeEnv) {
void clearClosedTimeWindow(SQueryRuntimeEnv *pRuntimeEnv) {
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
if (pWindowResInfo == NULL || pWindowResInfo->capacity == 0 || pWindowResInfo->size == 0) {
return;
......@@ -1989,13 +2022,13 @@ void clearClosedSlidingWindows(SQueryRuntimeEnv *pRuntimeEnv) {
// clear all the closed windows from the window list
for (int32_t k = 0; k < unclosed; ++k) {
copyGroupResultBuf(pRuntimeEnv, &pWindowResInfo->pResult[k], &pWindowResInfo->pResult[i + k]);
copyTimeWindowResBuf(pRuntimeEnv, &pWindowResInfo->pResult[k], &pWindowResInfo->pResult[i + k]);
}
// move the unclosed window in the front of the window list
for (int32_t k = unclosed; k < pWindowResInfo->size; ++k) {
SWindowResult *pWindowRes = &pWindowResInfo->pResult[k];
clearGroupResultBuf(pRuntimeEnv, pWindowRes);
clearTimeWindowResBuf(pRuntimeEnv, pWindowRes);
}
pWindowResInfo->size = unclosed;
......@@ -2004,9 +2037,9 @@ void clearClosedSlidingWindows(SQueryRuntimeEnv *pRuntimeEnv) {
SWindowResult *pResult = &pWindowResInfo->pResult[k];
int32_t *p = (int32_t *)taosGetDataFromHashTable(pWindowResInfo->hashList, (const char *)&pResult->window.skey,
TSDB_KEYSIZE);
int32_t v = (*p - i);
//todo add the update function for hash table
int32_t v = (*p - i);
// todo add the update function for hash table
taosDeleteFromHashTable(pWindowResInfo->hashList, (const char *)&pResult->window.skey, TSDB_KEYSIZE);
taosAddToHashTable(pWindowResInfo->hashList, (const char *)&pResult->window.skey, TSDB_KEYSIZE, (char *)&v,
sizeof(int32_t));
......@@ -2015,7 +2048,7 @@ void clearClosedSlidingWindows(SQueryRuntimeEnv *pRuntimeEnv) {
pWindowResInfo->curIndex = -1;
}
int32_t numOfClosedSlidingWindow(SWindowResInfo *pWindowResInfo) {
int32_t numOfClosedTimeWindow(SWindowResInfo *pWindowResInfo) {
int32_t i = 0;
while (i < pWindowResInfo->size && pWindowResInfo->pResult[i].status.closed) {
++i;
......@@ -2024,12 +2057,12 @@ int32_t numOfClosedSlidingWindow(SWindowResInfo *pWindowResInfo) {
return i;
}
void closeSlidingWindow(SWindowResInfo *pWindowResInfo, int32_t slot) {
void closeTimeWindow(SWindowResInfo *pWindowResInfo, int32_t slot) {
assert(slot >= 0 && slot < pWindowResInfo->size);
pWindowResInfo->pResult[slot].status.closed = true;
}
void closeAllSlidingWindow(SWindowResInfo *pWindowResInfo) {
void closeAllTimeWindow(SWindowResInfo *pWindowResInfo) {
assert(pWindowResInfo->size >= 0 && pWindowResInfo->capacity >= pWindowResInfo->size);
for (int32_t i = 0; i < pWindowResInfo->size; ++i) {
......@@ -2042,12 +2075,12 @@ static int32_t setGroupResultFromKey(SQueryRuntimeEnv *pRuntimeEnv, char *pData,
return -1;
}
SWindowResult *pWindowRes = doSetSlidingWindowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, pData, bytes);
SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, pData, bytes);
if (pWindowRes == NULL) {
return -1;
}
setGroupOutputBuffer(pRuntimeEnv, pWindowRes);
setWindowResOutputBuf(pRuntimeEnv, pWindowRes);
initCtxOutputBuf(pRuntimeEnv);
return TSDB_CODE_SUCCESS;
......@@ -2139,10 +2172,11 @@ static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx
return true;
}
static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t *forwardStep, TSKEY *primaryKeyCol,
SField *pFields, SBlockInfo *pBlockInfo, SWindowResInfo *pWindowResInfo) {
static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t *forwardStep, SField *pFields,
SBlockInfo *pBlockInfo, SWindowResInfo *pWindowResInfo) {
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
SQuery * pQuery = pRuntimeEnv->pQuery;
TSKEY * primaryKeyCol = (TSKEY *)pRuntimeEnv->primaryColBuffer->data;
bool isDiskFileBlock = IS_FILE_BLOCK(pRuntimeEnv->blockStatus);
SData **data = pRuntimeEnv->colDataBuffer;
......@@ -2221,8 +2255,8 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t *
if (pQuery->slidingTime > 0 && pQuery->intervalTime > 0) {
// decide the time window according to the primary timestamp
int64_t ts = primaryKeyCol[offset];
STimeWindow win = getActiveSlidingWindow(pWindowResInfo, ts, pQuery);
STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery);
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pRuntimeEnv->pMeterObj->sid, &win);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
continue;
......@@ -2230,26 +2264,16 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t *
// all startOffset are identical
offset -= pCtx[0].startOffset;
SWindowStatus *pStatus = getSlidingWindowStatus(pWindowResInfo, curSlidingWindow(pWindowResInfo));
if (IS_MASTER_SCAN(pRuntimeEnv) || pStatus->closed) {
for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) {
int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId;
pCtx[k].nStartQueryTimestamp = win.skey;
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
aAggs[functionId].xFunctionF(&pCtx[k], offset);
}
}
}
SWindowStatus *pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo));
doRowwiseApplyFunctions(pRuntimeEnv, pStatus, &win);
lastKey = ts;
int32_t prev = pWindowResInfo->curIndex;
int32_t prev = pWindowResInfo->curIndex;
STimeWindow nextWin = win;
while (1) {
getNextLogicalQueryRange(pRuntimeEnv, &nextWin);
getNextTimeWindow(pRuntimeEnv, &nextWin);
if (pWindowResInfo->startTime > nextWin.skey || (nextWin.skey > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
(nextWin.skey > pQuery->skey && !QUERY_IS_ASC_QUERY(pQuery))) {
pWindowResInfo->curIndex = prev;
......@@ -2263,18 +2287,9 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t *
pWindowResInfo->curIndex = prev;
break;
}
pStatus = getSlidingWindowStatus(pWindowResInfo, curSlidingWindow(pWindowResInfo));
if (IS_MASTER_SCAN(pRuntimeEnv) || pStatus->closed) {
for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) {
int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId;
pCtx[k].nStartQueryTimestamp = nextWin.skey;
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
aAggs[functionId].xFunctionF(&pCtx[k], offset);
}
}
}
pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo));
doRowwiseApplyFunctions(pRuntimeEnv, pStatus, &nextWin);
} else {
pWindowResInfo->curIndex = prev;
break;
......@@ -2383,40 +2398,13 @@ static void validateQueryRangeAndData(SQueryRuntimeEnv *pRuntimeEnv, const TSKEY
static int32_t applyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *pBlockInfo, int64_t *pPrimaryColumn,
SField *pFields, __block_search_fn_t searchFn, int32_t *numOfRes,
SWindowResInfo *pWindowResInfo) {
int32_t forwardStep = 0;
SQuery *pQuery = pRuntimeEnv->pQuery;
validateQueryRangeAndData(pRuntimeEnv, pPrimaryColumn, pBlockInfo);
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
validateQueryRangeAndData(pRuntimeEnv, pPrimaryColumn, pBlockInfo);
int32_t forwardStep =
getNumOfRowsInTimeWindow(pQuery, pBlockInfo, pPrimaryColumn, pQuery->pos, pQuery->ekey, searchFn, true);
if (QUERY_IS_ASC_QUERY(pQuery)) {
if (pQuery->ekey < pBlockInfo->keyLast) {
forwardStep = getForwardStepsInBlock(pBlockInfo->size, searchFn, pQuery->ekey, pQuery->pos, pQuery->order.order,
pPrimaryColumn);
if (forwardStep == 0) { // no qualified data in current block, do not update the lastKey value
assert(pQuery->ekey < pPrimaryColumn[pQuery->pos]);
} else {
pQuery->lastKey = MAX(pQuery->ekey, pPrimaryColumn[pQuery->pos + (forwardStep - 1)]) + step;
}
} else {
forwardStep = pBlockInfo->size - pQuery->pos;
pQuery->lastKey = pBlockInfo->keyLast + step;
}
} else { // desc
if (pQuery->ekey > pBlockInfo->keyFirst) {
forwardStep = getForwardStepsInBlock(pBlockInfo->size, searchFn, pQuery->ekey, pQuery->pos, pQuery->order.order,
pPrimaryColumn);
if (forwardStep == 0) { // no qualified data in current block, do not update the lastKey value
assert(pQuery->ekey > pPrimaryColumn[pQuery->pos]);
} else {
pQuery->lastKey = MIN(pQuery->ekey, pPrimaryColumn[pQuery->pos - (forwardStep - 1)]) + step;
}
} else {
forwardStep = pQuery->pos + 1;
pQuery->lastKey = pBlockInfo->keyFirst + step;
}
}
assert(forwardStep >= 0);
int32_t newForwardStep = reviseForwardSteps(pRuntimeEnv, forwardStep);
......@@ -2427,18 +2415,15 @@ static int32_t applyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *
pQuery->lastKey = pPrimaryColumn[pQuery->pos + (newForwardStep - 1) * step] + step;
}
if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL || isGroupbyNormalCol(pQuery->pGroupbyExpr) /*||
(pQuery->slidingTime != -1 && pQuery->intervalTime > 0)*/) {
*numOfRes =
rowwiseApplyAllFunctions(pRuntimeEnv, &newForwardStep, pPrimaryColumn, pFields, pBlockInfo, pWindowResInfo);
if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL || isGroupbyNormalCol(pQuery->pGroupbyExpr)) {
*numOfRes = rowwiseApplyAllFunctions(pRuntimeEnv, &newForwardStep, pFields, pBlockInfo, pWindowResInfo);
} else {
*numOfRes = blockwiseApplyAllFunctions(pRuntimeEnv, newForwardStep, pPrimaryColumn, pFields, pBlockInfo,
pWindowResInfo, searchFn);
*numOfRes = blockwiseApplyAllFunctions(pRuntimeEnv, newForwardStep, pFields, pBlockInfo, pWindowResInfo, searchFn);
}
TSKEY lastKey = (QUERY_IS_ASC_QUERY(pQuery)) ? pBlockInfo->keyLast : pBlockInfo->keyFirst;
doCheckQueryCompleted(pRuntimeEnv, lastKey, pWindowResInfo);
assert(*numOfRes >= 0);
// check if buffer is large enough for accommodating all qualified points
......@@ -2761,7 +2746,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
}
tfree(pRuntimeEnv->secondaryUnzipBuffer);
destroySlidingWindowInfo(&pRuntimeEnv->windowResInfo);
cleanupTimeWindowInfo(&pRuntimeEnv->windowResInfo, pRuntimeEnv);
if (pRuntimeEnv->pCtx != NULL) {
for (int32_t i = 0; i < pRuntimeEnv->pQuery->numOfOutputCols; ++i) {
......@@ -4360,44 +4345,6 @@ static int32_t initialNumOfRows(SMeterQuerySupportObj *pSupporter) {
return numOfRows;
}
static int32_t createQueryResultBuffer(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOfRows, bool isSTableQuery) {
SQuery *pQuery = pRuntimeEnv->pQuery;
createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, numOfRows, pQuery->rowSize);
// int32_t pageId = -1;
// tFilePage* page = NULL;
//
// pRuntimeEnv->windowResInfo.pResult = calloc(numOfRows, sizeof(SWindowResult));
//
// for (int32_t k = 0; k < numOfRows; ++k) {
// SWindowResult *pWindowRes = &pRuntimeEnv->windowResInfo.pResult[k];
// pWindowRes->nAlloc = 1;
//
// /*
// * for single table top/bottom query, the output for group by normal column, the output rows is
// * equals to the maximum rows, instead of 1.
// */
// if (!isSTableQuery && isTopBottomQuery(pQuery)) {
// assert(pQuery->numOfOutputCols > 1);
//
// SSqlFunctionExpr *pExpr = &pQuery->pSelectExpr[1];
// pWindowRes->nAlloc = pExpr->pBase.arg[0].argValue.i64;
// }
//
// if (page == NULL || page->numOfElems >= pRuntimeEnv->numOfRowsPerPage) {
// page = getNewDataBuf(pRuntimeEnv->pResultBuf, 0, &pageId);
// }
//
// assert(pageId >= 0);
//
// SPosInfo posInfo = {.pageId = pageId, .rowId = page->numOfElems};
// createQueryResultInfo(pQuery, pWindowRes, isSTableQuery, &posInfo);
// page->numOfElems += 1; // next row is available
// }
return TSDB_CODE_SUCCESS;
}
static int32_t allocateRuntimeEnvBuf(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMeterObj) {
SQuery *pQuery = pRuntimeEnv->pQuery;
......@@ -4481,7 +4428,7 @@ static char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int32_t columnInd
pQuery->pSelectExpr[columnIndex].resBytes * realRowId;
}
int32_t vnodeQuerySingleMeterPrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, SMeterQuerySupportObj *pSupporter,
int32_t vnodeQuerySingleTablePrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, SMeterQuerySupportObj *pSupporter,
void *param) {
SQuery *pQuery = &pQInfo->query;
int32_t code = TSDB_CODE_SUCCESS;
......@@ -4554,7 +4501,7 @@ int32_t vnodeQuerySingleMeterPrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, SMete
pRuntimeEnv->numOfRowsPerPage = getNumOfRowsInResultPage(pQuery, false);
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (pQuery->intervalTime > 0 && pQuery->slidingTime > 0)) {
int32_t rows = initialNumOfRows(pSupporter);
code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, rows, pQuery->rowSize);
if (code != TSDB_CODE_SUCCESS) {
return code;
......@@ -4567,7 +4514,7 @@ int32_t vnodeQuerySingleMeterPrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, SMete
type = TSDB_DATA_TYPE_TIMESTAMP;
}
initSlidingWindowInfo(pRuntimeEnv, &pRuntimeEnv->windowResInfo, rows, type);
initWindowResInfo(&pRuntimeEnv->windowResInfo, pRuntimeEnv, rows, 4096, type);
}
pSupporter->rawSKey = pQuery->skey;
......@@ -4597,6 +4544,13 @@ int32_t vnodeQuerySingleMeterPrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, SMete
if (pQuery->slidingTime > 0 && pQuery->intervalTime > 0) {
STimeWindow win = {0};
getActualRange(pSupporter, &win);
// there is no qualified data with respect to the primary timestamp
if (win.skey > win.ekey) {
sem_post(&pQInfo->dataReady);
pQInfo->over = 1;
return TSDB_CODE_SUCCESS;
}
TSKEY skey1, ekey1;
TSKEY windowSKey = 0, windowEKey = 0;
......@@ -4680,7 +4634,7 @@ void vnodeQueryFreeQInfoEx(SQInfo *pQInfo) {
}
for (int32_t i = 0; i < size; ++i) {
// destroyGroupResultBuf(&pSupporter->pResult[i], pQInfo->query.numOfOutputCols);
// destroyTimeWindowRes(&pSupporter->pResult[i], pQInfo->query.numOfOutputCols);
}
}
......@@ -4770,7 +4724,7 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param)
if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // group by columns not tags;
int16_t type = getGroupbyColumnType(pQuery, pQuery->pGroupbyExpr);
initSlidingWindowInfo(pRuntimeEnv, &pRuntimeEnv->windowResInfo, 128, type);
initWindowResInfo(&pRuntimeEnv->windowResInfo, pRuntimeEnv, 128, 4096, type);
}
} else {
// one page for each table at least
......@@ -5133,7 +5087,7 @@ static void doHandleDataBlockImpl(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *pbl
}
}
static void getNextLogicalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow *pTimeWindow) {
static void getNextTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow *pTimeWindow) {
SQuery *pQuery = pRuntimeEnv->pQuery;
int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
......@@ -5194,33 +5148,15 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
* 1. interval query.
* 2. multi-output query that may cause buffer overflow.
*/
// if (pQuery->intervalTime > 0 || Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)) {
if (nextPos >= blockInfo.size || nextPos < 0) {
moveToNextBlock(pRuntimeEnv, step, searchFn, !LOAD_DATA);
// if (!Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK|QUERY_COMPLETED)) {
// slot/pos/fileId is updated in moveToNextBlock function
savePointPosition(&pRuntimeEnv->nextPos, pQuery->fileId, pQuery->slot, pQuery->pos);
// // check next block
// void *pNextBlock = getGenericDataBlock(pMeterObj, pRuntimeEnv, pQuery->slot);
//
// int32_t blockType = (IS_DISK_DATA_BLOCK(pQuery)) ? BLK_FILE_BLOCK : BLK_CACHE_BLOCK;
// blockInfo = getBlockBasicInfo(pRuntimeEnv, pNextBlock, blockType);
//
// // check if need to close window result or not
// if (pQuery->intervalTime > 0 && pQuery->slidingTime > 0) {
// TSKEY t = (QUERY_IS_ASC_QUERY(pQuery))? blockInfo.keyFirst:blockInfo.keyLast;
// doCheckQueryCompleted(pRuntimeEnv, t, &pRuntimeEnv->windowResInfo);
// }
// }
} else {
savePointPosition(&pRuntimeEnv->nextPos, pQuery->fileId, pQuery->slot, accessPos + step);
}
// } else {
// assert(0);
// }
// if (pQuery->intervalTime > 0 || Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)) {
if (nextPos >= blockInfo.size || nextPos < 0) {
moveToNextBlock(pRuntimeEnv, step, searchFn, !LOAD_DATA);
// slot/pos/fileId is updated in moveToNextBlock function
savePointPosition(&pRuntimeEnv->nextPos, pQuery->fileId, pQuery->slot, pQuery->pos);
} else {
savePointPosition(&pRuntimeEnv->nextPos, pQuery->fileId, pQuery->slot, accessPos + step);
}
break;
} else { // query not completed, move to next block
blockLoadStatus = moveToNextBlock(pRuntimeEnv, step, searchFn, LOAD_DATA);
......@@ -5235,36 +5171,30 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
int32_t blockType = (IS_DISK_DATA_BLOCK(pQuery)) ? BLK_FILE_BLOCK : BLK_CACHE_BLOCK;
blockInfo = getBlockBasicInfo(pRuntimeEnv, pNextBlock, blockType);
if ((QUERY_IS_ASC_QUERY(pQuery) && blockInfo.keyFirst > pQuery->ekey) ||
(!QUERY_IS_ASC_QUERY(pQuery) && blockInfo.keyLast < pQuery->ekey)) {
setQueryStatus(pQuery, QUERY_COMPLETED);
break;
}
// // check if need to close window result or not
// if (pQuery->intervalTime > 0 && pQuery->slidingTime > 0) {
// TSKEY t = (QUERY_IS_ASC_QUERY(pQuery))? blockInfo.keyFirst:blockInfo.keyLast;
// doCheckQueryCompleted(pRuntimeEnv, t, &pRuntimeEnv->windowResInfo);
// }
if(Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)) {
if (Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)) {
break;
}
} // while(1)
if (pQuery->intervalTime > 0) {
if (Q_STATUS_EQUAL(pQuery->over, QUERY_COMPLETED|QUERY_NO_DATA_TO_CHECK)) {
closeAllSlidingWindow(&pRuntimeEnv->windowResInfo);
} else if (Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)) { // check if window needs to be closed
if (Q_STATUS_EQUAL(pQuery->over, QUERY_COMPLETED | QUERY_NO_DATA_TO_CHECK)) {
closeAllTimeWindow(&pRuntimeEnv->windowResInfo);
} else if (Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)) { // check if window needs to be closed
void *pNextBlock = getGenericDataBlock(pMeterObj, pRuntimeEnv, pQuery->slot);
int32_t blockType = (IS_DISK_DATA_BLOCK(pQuery)) ? BLK_FILE_BLOCK : BLK_CACHE_BLOCK;
int32_t blockType = (IS_DISK_DATA_BLOCK(pQuery)) ? BLK_FILE_BLOCK : BLK_CACHE_BLOCK;
SBlockInfo blockInfo = getBlockBasicInfo(pRuntimeEnv, pNextBlock, blockType);
// check if need to close window result or not
TSKEY t = (QUERY_IS_ASC_QUERY(pQuery))? blockInfo.keyFirst:blockInfo.keyLast;
TSKEY t = (QUERY_IS_ASC_QUERY(pQuery)) ? blockInfo.keyFirst : blockInfo.keyLast;
doCheckQueryCompleted(pRuntimeEnv, t, &pRuntimeEnv->windowResInfo);
}
}
......@@ -5274,9 +5204,8 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
static void updatelastkey(SQuery *pQuery, SMeterQueryInfo *pMeterQInfo) { pMeterQInfo->lastKey = pQuery->lastKey; }
void queryOnBlock(SMeterQuerySupportObj *pSupporter, int64_t *primaryKeys, int32_t blockStatus,
SBlockInfo *pBlockBasicInfo, SMeterDataInfo *pMeterDataInfo, SField *pFields,
__block_search_fn_t searchFn) {
void queryOnBlock(SMeterQuerySupportObj *pSupporter, int32_t blockStatus, SBlockInfo *pBlockBasicInfo,
SMeterDataInfo *pMeterDataInfo, SField *pFields, __block_search_fn_t searchFn) {
/* cache blocks may be assign to other meter, abort */
if (pBlockBasicInfo->size <= 0) {
return;
......@@ -5284,15 +5213,19 @@ void queryOnBlock(SMeterQuerySupportObj *pSupporter, int64_t *primaryKeys, int32
SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
TSKEY* primaryKeys = (TSKEY*) pRuntimeEnv->primaryColBuffer->data;
if (pQuery->intervalTime == 0) { // not interval query
assert(0);
int32_t numOfRes = 0;
applyFunctionsOnBlock(pRuntimeEnv, pBlockBasicInfo, primaryKeys, pFields, searchFn, &numOfRes,
&pMeterDataInfo->pMeterQInfo->windowResInfo);
&pMeterDataInfo->pMeterQInfo->windowResInfo);///????bug
// note: only fixed number of output for each group by operation
if (numOfRes > 0) {
pRuntimeEnv->windowResInfo.pResult[pMeterDataInfo->groupIdx].numOfRows = numOfRes;
if (numOfRes > 0) {//???
pRuntimeEnv->windowResInfo.pResult[pMeterDataInfo->groupIdx].numOfRows = numOfRes;////????bug
}
// used to decide the correct start position in cache after check all data in files
......@@ -5879,7 +5812,7 @@ void disableFunctForSuppleScan(SQueryRuntimeEnv *pRuntimeEnv, int32_t order) {
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
for (int32_t i = 0; i < pWindowResInfo->size; ++i) {
SWindowStatus *pStatus = getSlidingWindowStatus(pWindowResInfo, i);
SWindowStatus *pStatus = getTimeWindowResStatus(pWindowResInfo, i);
if (!pStatus->closed) {
continue;
}
......@@ -5941,7 +5874,7 @@ void createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool isSTa
}
}
void clearGroupResultBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pWindowRes) {
void clearTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pWindowRes) {
if (pWindowRes == NULL) {
return;
}
......@@ -5955,12 +5888,12 @@ void clearGroupResultBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pWindowRe
resetResultInfo(pResultInfo);
}
pWindowRes->numOfRows = 0;
pWindowRes->nAlloc = 0;
pWindowRes->pos = (SPosInfo){-1, -1};
pWindowRes->status.closed = false;
pWindowRes->window = (STimeWindow) {0, 0};
pWindowRes->window = (STimeWindow){0, 0};
}
/**
......@@ -5968,12 +5901,12 @@ void clearGroupResultBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pWindowRe
* since the attribute of "Pos" is bound to each window result when the window result is created in the
* disk-based result buffer.
*/
void copyGroupResultBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *dst, const SWindowResult *src) {
void copyTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *dst, const SWindowResult *src) {
dst->numOfRows = src->numOfRows;
dst->nAlloc = src->nAlloc;
dst->window = src->window;
dst->status = src->status;
int32_t nOutputCols = pRuntimeEnv->pQuery->numOfOutputCols;
for (int32_t i = 0; i < nOutputCols; ++i) {
......@@ -5996,7 +5929,7 @@ void copyGroupResultBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *dst, const
}
}
void destroyGroupResultBuf(SWindowResult *pWindowRes, int32_t nOutputCols) {
void destroyTimeWindowRes(SWindowResult *pWindowRes, int32_t nOutputCols) {
if (pWindowRes == NULL) {
return;
}
......@@ -6237,7 +6170,7 @@ void vnodeScanAllData(SQueryRuntimeEnv *pRuntimeEnv) {
continue;
}
setGroupOutputBuffer(pRuntimeEnv, pResult);
setWindowResOutputBuf(pRuntimeEnv, pResult);
for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) {
aAggs[pQuery->pSelectExpr[j].pBase.functionId].xNextStep(&pRuntimeEnv->pCtx[j]);
......@@ -6279,7 +6212,7 @@ void vnodeScanAllData(SQueryRuntimeEnv *pRuntimeEnv) {
doSingleMeterSupplementScan(pRuntimeEnv);
// update the pQuery->skey/pQuery->ekey to limit the scan scope of sliding query during supplementary scan
// update the pQuery->skey/pQuery->ekey to limit the scan scope of sliding query during supplementary scan
pQuery->skey = newSkey;
}
......@@ -6290,16 +6223,16 @@ void doFinalizeResult(SQueryRuntimeEnv *pRuntimeEnv) {
// for each group result, call the finalize function for each column
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) {
closeAllSlidingWindow(pWindowResInfo);
closeAllTimeWindow(pWindowResInfo);
}
for (int32_t i = 0; i < pWindowResInfo->size; ++i) {
SWindowResult *buf = &pWindowResInfo->pResult[i];
if (!slidingWindowClosed(pWindowResInfo, i)) {
if (!isWindowResClosed(pWindowResInfo, i)) {
continue;
}
setGroupOutputBuffer(pRuntimeEnv, buf);
setWindowResOutputBuf(pRuntimeEnv, buf);
for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) {
aAggs[pQuery->pSelectExpr[j].pBase.functionId].xFinalize(&pRuntimeEnv->pCtx[j]);
......@@ -6414,30 +6347,30 @@ void forwardIntervalQueryRange(SMeterQuerySupportObj *pSupporter, SQueryRuntimeE
return;
}
// int32_t r = getNextIntervalQueryRange(pSupporter, pRuntimeEnv, &pQuery->skey, &pQuery->ekey);
// if (r == QUERY_COMPLETED) {
// setQueryStatus(pQuery, QUERY_COMPLETED);
// return;
// }
//
// getNextLogicalQueryRange(pRuntimeEnv, &pRuntimeEnv->intervalWindow);
//
// /* ensure the search in cache will return right position */
// pQuery->lastKey = pQuery->skey;
//
// TSKEY nextTimestamp = loadRequiredBlockIntoMem(pRuntimeEnv, &pRuntimeEnv->nextPos);
// if ((nextTimestamp > pSupporter->rawEKey && QUERY_IS_ASC_QUERY(pQuery)) ||
// (nextTimestamp < pSupporter->rawEKey && !QUERY_IS_ASC_QUERY(pQuery)) ||
// Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK)) {
// setQueryStatus(pQuery, QUERY_COMPLETED);
// return;
// }
//
// // bridge the gap in group by time function
// if ((nextTimestamp > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
// (nextTimestamp < pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) {
// getAlignedIntervalQueryRange(pRuntimeEnv, nextTimestamp, pSupporter->rawSKey, pSupporter->rawEKey);
// }
// int32_t r = getNextIntervalQueryRange(pSupporter, pRuntimeEnv, &pQuery->skey, &pQuery->ekey);
// if (r == QUERY_COMPLETED) {
// setQueryStatus(pQuery, QUERY_COMPLETED);
// return;
// }
//
// getNextTimeWindow(pRuntimeEnv, &pRuntimeEnv->intervalWindow);
//
// /* ensure the search in cache will return right position */
// pQuery->lastKey = pQuery->skey;
//
// TSKEY nextTimestamp = loadRequiredBlockIntoMem(pRuntimeEnv, &pRuntimeEnv->nextPos);
// if ((nextTimestamp > pSupporter->rawEKey && QUERY_IS_ASC_QUERY(pQuery)) ||
// (nextTimestamp < pSupporter->rawEKey && !QUERY_IS_ASC_QUERY(pQuery)) ||
// Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK)) {
// setQueryStatus(pQuery, QUERY_COMPLETED);
// return;
// }
//
// // bridge the gap in group by time function
// if ((nextTimestamp > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
// (nextTimestamp < pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) {
// getAlignedIntervalQueryRange(pRuntimeEnv, nextTimestamp, pSupporter->rawSKey, pSupporter->rawEKey);
// }
}
static int32_t offsetComparator(const void *pLeft, const void *pRight) {
......@@ -6594,7 +6527,7 @@ SMeterQueryInfo *createMeterQueryInfo(SMeterQuerySupportObj *pSupporter, int32_t
pMeterQueryInfo->sid = sid;
pMeterQueryInfo->cur.vnodeIndex = -1;
initSlidingWindowInfo(pRuntimeEnv, &pMeterQueryInfo->windowResInfo, 100, TSDB_DATA_TYPE_INT);
initWindowResInfo(&pMeterQueryInfo->windowResInfo, pRuntimeEnv, 100, 100, TSDB_DATA_TYPE_INT);
return pMeterQueryInfo;
}
......@@ -6618,11 +6551,13 @@ void changeMeterQueryInfoForSuppleQuery(SQueryDiskbasedResultBuf *pResultBuf, SM
return;
}
pMeterQueryInfo->skey = skey;
pMeterQueryInfo->ekey = ekey;
// pMeterQueryInfo->skey = skey;
// pMeterQueryInfo->ekey = ekey;
SWAP(pMeterQueryInfo->skey, pMeterQueryInfo->ekey, TSKEY);
pMeterQueryInfo->lastKey = pMeterQueryInfo->skey;
pMeterQueryInfo->queryRangeSet = 0;
// pMeterQueryInfo->queryRangeSet = 0;
pMeterQueryInfo->cur.order = pMeterQueryInfo->cur.order ^ 1;
pMeterQueryInfo->cur.vnodeIndex = -1;
......@@ -7029,7 +6964,7 @@ void setExecutionContext(SMeterQuerySupportObj *pSupporter, SWindowResult *outpu
int32_t groupIdx, SMeterQueryInfo *pMeterQueryInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv;
setGroupOutputBuffer(pRuntimeEnv, &outputRes[groupIdx]);
setWindowResOutputBuf(pRuntimeEnv, &outputRes[groupIdx]);
initCtxOutputBuf(pRuntimeEnv);
vnodeSetTagValueInParam(pSupporter->pSidSet, pRuntimeEnv, pSupporter->pMeterSidExtInfo[meterIdx]);
......@@ -7046,7 +6981,7 @@ void setExecutionContext(SMeterQuerySupportObj *pSupporter, SWindowResult *outpu
}
}
static void setGroupOutputBuffer(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pResult) {
static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pResult) {
SQuery *pQuery = pRuntimeEnv->pQuery;
// Note: pResult->pos[i]->numOfElems == 0, there is only fixed number of results for each group
......@@ -7067,9 +7002,9 @@ static void setGroupOutputBuffer(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *p
// set super table query flag
SResultInfo *pResInfo = GET_RES_INFO(pCtx);
if (!isGroupbyNormalCol(pQuery->pGroupbyExpr)) {
pResInfo->superTableQ = true;
}
// if (!isGroupbyNormalCol(pQuery->pGroupbyExpr)) {
pResInfo->superTableQ = pRuntimeEnv->stableQuery;
// }
}
}
......@@ -7160,9 +7095,9 @@ int32_t setOutputBufferForIntervalQuery(SQueryRuntimeEnv *pRuntimeEnv, SMeterQue
SQueryDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf;
SWindowResInfo * pWindowResInfo = &pMeterQueryInfo->windowResInfo;
STimeWindow win = getActiveSlidingWindow(pWindowResInfo, pMeterQueryInfo->lastKey, pRuntimeEnv->pQuery);
STimeWindow win = getActiveTimeWindow(pWindowResInfo, pMeterQueryInfo->lastKey, pRuntimeEnv->pQuery);
SWindowResult *pWindowRes = doSetSlidingWindowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&win.skey, TSDB_KEYSIZE);
SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&win.skey, TSDB_KEYSIZE);
if (pWindowRes == NULL) {
return -1;
}
......@@ -7190,8 +7125,8 @@ int32_t setIntervalQueryExecutionContext(SMeterQuerySupportObj *pSupporter, int3
SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv;
if (IS_MASTER_SCAN(pRuntimeEnv)) {
// not enough disk space or memory buffer for intermediate results
if (setOutputBufferForIntervalQuery(pRuntimeEnv, pMeterQueryInfo) != TSDB_CODE_SUCCESS) {
// not enough disk space or memory buffer for intermediate results
return -1;
}
......@@ -7234,172 +7169,6 @@ int32_t setIntervalQueryExecutionContext(SMeterQuerySupportObj *pSupporter, int3
return 0;
}
// static void doApplyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQueryInfo,
// SBlockInfo *pBlockInfo, int64_t *pPrimaryCol, SField *pFields,
// __block_search_fn_t searchFn) {
// SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv;
// SQuery * pQuery = pRuntimeEnv->pQuery;
// int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
//
// int64_t nextKey = -1;
// bool queryCompleted = false;
//
// while (1) {
// int32_t numOfRes = 0;
// int32_t steps = applyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, pPrimaryCol, pFields, searchFn, &numOfRes);
// assert(steps > 0);
//
// // NOTE: in case of stable query, only ONE(or ZERO) row of pos generated for each query range
// if (pMeterQueryInfo->lastResRows == 0) {
// pMeterQueryInfo->lastResRows = numOfRes;
// } else {
// assert(pMeterQueryInfo->lastResRows == 1);
// }
//
// int32_t pos = pQuery->pos + steps * factor;
//
// // query does not reach the end of current block
// if ((pos < pBlockInfo->size && QUERY_IS_ASC_QUERY(pQuery)) || (pos >= 0 && !QUERY_IS_ASC_QUERY(pQuery))) {
// nextKey = pPrimaryCol[pos];
// } else {
// assert((pQuery->lastKey > pBlockInfo->keyLast && QUERY_IS_ASC_QUERY(pQuery)) ||
// (pQuery->lastKey < pBlockInfo->keyFirst && !QUERY_IS_ASC_QUERY(pQuery)));
// }
//
// // all data satisfy current query are checked, query completed
// if (QUERY_IS_ASC_QUERY(pQuery)) {
// queryCompleted = (nextKey > pQuery->ekey || pQuery->ekey <= pBlockInfo->keyLast);
// } else {
// queryCompleted = (nextKey < pQuery->ekey || pQuery->ekey >= pBlockInfo->keyFirst);
// }
//
// /*
// * 1. there may be more date that satisfy current query interval, other than
// * current block, we need to try next data blocks
// * 2. query completed, since reaches the upper bound of the main query range
// */
// if (QUERY_IS_ASC_QUERY(pQuery)) {
// if (pQuery->lastKey > pBlockInfo->keyLast || pQuery->lastKey > pSupporter->rawEKey ||
// nextKey > pSupporter->rawEKey) {
// /*
// * current interval query is completed, set query pos flag closed and
// * try next data block if pQuery->ekey == pSupporter->rawEKey, whole query is completed
// */
// if (pQuery->lastKey > pBlockInfo->keyLast) {
// assert(pQuery->ekey >= pBlockInfo->keyLast);
// }
//
// if (pQuery->lastKey > pSupporter->rawEKey || nextKey > pSupporter->rawEKey) {
// /* whole query completed, save pos and abort */
// assert(queryCompleted);
// saveResult(pSupporter, pMeterQueryInfo, pMeterQueryInfo->lastResRows);
//
// // save the pQuery->lastKey for retrieve data in cache, actually, there will be no qualified data in cache.
// saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo);
// } else if (pQuery->ekey == pBlockInfo->keyLast) {
// /* current interval query is completed, set the next query range on other data blocks if exist */
// int64_t prevEKey = pQuery->ekey;
//
// getAlignedIntervalQueryRange(pRuntimeEnv, pQuery->lastKey, pSupporter->rawSKey, pSupporter->rawEKey);
// saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo);
//
// assert(queryCompleted && prevEKey < pQuery->skey);
// if (pMeterQueryInfo->lastResRows > 0) {
// saveResult(pSupporter, pMeterQueryInfo, pMeterQueryInfo->lastResRows);
// }
// } else {
// /*
// * Data that satisfy current query range may locate in current block and blocks that are directly right
// * next to current block. Therefore, we need to keep the query range(interval) unchanged until reaching
// * the direct next data block, while only forwards the pQuery->lastKey.
// *
// * With the information of the directly next data block, whether locates in cache or disk,
// * current interval query being completed or not can be decided.
// */
// saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo);
// assert(pQuery->lastKey > pBlockInfo->keyLast && pQuery->lastKey <= pQuery->ekey);
//
// /*
// * if current block is the last block of current file, we still close the pos flag, and
// * merge with other meters in the same group
// */
// if (queryCompleted) {
// saveResult(pSupporter, pMeterQueryInfo, pMeterQueryInfo->lastResRows);
// }
// }
//
// break;
// }
// } else {
// if (pQuery->lastKey < pBlockInfo->keyFirst || pQuery->lastKey < pSupporter->rawEKey ||
// nextKey < pSupporter->rawEKey) {
// if (pQuery->lastKey < pBlockInfo->keyFirst) {
// assert(pQuery->ekey <= pBlockInfo->keyFirst);
// }
//
// if (pQuery->lastKey < pSupporter->rawEKey || (nextKey < pSupporter->rawEKey && nextKey != -1)) {
// /* whole query completed, save pos and abort */
// assert(queryCompleted);
// saveResult(pSupporter, pMeterQueryInfo, pMeterQueryInfo->lastResRows);
//
// /*
// * save the pQuery->lastKey for retrieve data in cache, actually,
// * there will be no qualified data in cache.
// */
// saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo);
// } else if (pQuery->ekey == pBlockInfo->keyFirst) {
// // current interval query is completed, set the next query range on other data blocks if exist
// int64_t prevEKey = pQuery->ekey;
//
// getAlignedIntervalQueryRange(pRuntimeEnv, pQuery->lastKey, pSupporter->rawSKey, pSupporter->rawEKey);
// saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo);
//
// assert(queryCompleted && prevEKey > pQuery->skey);
// if (pMeterQueryInfo->lastResRows > 0) {
// saveResult(pSupporter, pMeterQueryInfo, pMeterQueryInfo->lastResRows);
// }
// } else {
// /*
// * Data that satisfy current query range may locate in current block and blocks that are
// * directly right next to current block. Therefore, we need to keep the query range(interval)
// * unchanged until reaching the direct next data block, while only forwards the pQuery->lastKey.
// *
// * With the information of the directly next data block, whether locates in cache or disk,
// * current interval query being completed or not can be decided.
// */
// saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo);
// assert(pQuery->lastKey < pBlockInfo->keyFirst && pQuery->lastKey >= pQuery->ekey);
//
// /*
// * if current block is the last block of current file, we still close the pos
// * flag, and merge with other meters in the same group
// */
// if (queryCompleted) {
// saveResult(pSupporter, pMeterQueryInfo, pMeterQueryInfo->lastResRows);
// }
// }
//
// break;
// }
// }
//
// assert(queryCompleted);
// saveResult(pSupporter, pMeterQueryInfo, pMeterQueryInfo->lastResRows);
//
// assert((nextKey >= pQuery->lastKey && QUERY_IS_ASC_QUERY(pQuery)) ||
// (nextKey <= pQuery->lastKey && !QUERY_IS_ASC_QUERY(pQuery)));
//
// /* still in the same block to query */
// getAlignedIntervalQueryRange(pRuntimeEnv, nextKey, pSupporter->rawSKey, pSupporter->rawEKey);
// saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo);
//
// int32_t newPos = searchFn((char *)pPrimaryCol, pBlockInfo->size, pQuery->skey, pQuery->order.order);
// assert(newPos == pQuery->pos + steps * factor);
//
// pQuery->pos = newPos;
// }
//}
static void doApplyIntervalQueryOnBlock_rv(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQueryInfo,
SBlockInfo *pBlockInfo, int64_t *pPrimaryCol, SField *pFields,
__block_search_fn_t searchFn) {
......@@ -7443,7 +7212,7 @@ static void doApplyIntervalQueryOnBlock_rv(SMeterQuerySupportObj *pSupporter, SM
/*
* 1. there may be more date that satisfy current query interval, other than
* current block, we need to try next data blocks
* 2. query completed, since reaches the upper bound of the main query range
* 2. query completed, since it reaches the upper bound of the main query range
*/
if (!completed) {
/*
......@@ -7501,13 +7270,14 @@ static void doApplyIntervalQueryOnBlock_rv(SMeterQuerySupportObj *pSupporter, SM
pQuery->pos = newPos;
}
}
int64_t getNextAccessedKeyInData(SQuery *pQuery, int64_t *pPrimaryCol, SBlockInfo *pBlockInfo, int32_t blockStatus) {
assert(pQuery->pos >= 0 && pQuery->pos <= pBlockInfo->size - 1);
TSKEY key = -1;
if (IS_DATA_BLOCK_LOADED(blockStatus)) {
key = pPrimaryCol[pQuery->pos];
} else {
} else {// while the data block is not loaded, the position must be the first or last position
assert(pQuery->pos == pBlockInfo->size - 1 || pQuery->pos == 0);
key = QUERY_IS_ASC_QUERY(pQuery) ? pBlockInfo->keyFirst : pBlockInfo->keyLast;
}
......@@ -7530,55 +7300,66 @@ void setIntervalQueryRange(SMeterQueryInfo *pMeterQueryInfo, SMeterQuerySupportO
SQuery * pQuery = pRuntimeEnv->pQuery;
if (pMeterQueryInfo->queryRangeSet) {
assert((QUERY_IS_ASC_QUERY(pQuery) && pQuery->lastKey >= pQuery->skey) ||
(!QUERY_IS_ASC_QUERY(pQuery) && pQuery->lastKey <= pQuery->skey));
if ((pQuery->ekey < key && QUERY_IS_ASC_QUERY(pQuery)) || (pQuery->ekey > key && !QUERY_IS_ASC_QUERY(pQuery))) {
/*
* last query on this block of the meter is done, start next interval on this block
* otherwise, keep the previous query range and proceed
*/
getAlignedIntervalQueryRange(pRuntimeEnv, key, pSupporter->rawSKey, pSupporter->rawEKey);
saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo);
// previous query does not be closed, save the results and close it
if (pMeterQueryInfo->lastResRows > 0) {
saveResult(pSupporter, pMeterQueryInfo, pMeterQueryInfo->lastResRows);
}
} else {
/* current query not completed, continue. do nothing with respect to query range, */
}
// assert((QUERY_IS_ASC_QUERY(pQuery) && pQuery->lastKey >= pQuery->skey) ||
// (!QUERY_IS_ASC_QUERY(pQuery) && pQuery->lastKey <= pQuery->skey));
//
// if ((pQuery->ekey < key && QUERY_IS_ASC_QUERY(pQuery)) || (pQuery->ekey > key && !QUERY_IS_ASC_QUERY(pQuery))) {
// /*
// * last query on this block of the meter is done, start next interval on this block
// * otherwise, keep the previous query range and proceed
// */
// getAlignedIntervalQueryRange(pRuntimeEnv, key, pSupporter->rawSKey, pSupporter->rawEKey);
// saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo);
//
// // previous query does not be closed, save the results and close it
// if (pMeterQueryInfo->lastResRows > 0) {
// saveResult(pSupporter, pMeterQueryInfo, pMeterQueryInfo->lastResRows);
// }
// } else {
// /* current query not completed, continue. do nothing with respect to query range, */
// }
} else {
pQuery->skey = key;
assert(pMeterQueryInfo->lastResRows == 0);
// for too small query range, no data in this interval.
if ((QUERY_IS_ASC_QUERY(pQuery) && (pQuery->ekey < pQuery->skey)) ||
(!QUERY_IS_ASC_QUERY(pQuery) && (pQuery->skey < pQuery->ekey))) {
// for too small query range, no data in this interval.
return;
}
// todo
if (pMeterQueryInfo->windowResInfo.prevSKey == 0) {
pMeterQueryInfo->windowResInfo.prevSKey = key;
/**
* In handling the both ascending and descending order super table query, we need to find the first qualified
* timestamp of this table, and then set the first qualified start timestamp.
* In ascending query, key is the first qualified timestamp. However, in the descending order query, additional
* operations involve.
*/
if (!QUERY_IS_ASC_QUERY(pQuery)) {
}
TSKEY skey1, ekey1;
TSKEY windowSKey = 0, windowEKey = 0;
STimeWindow win = {.skey = key, pSupporter->rawEKey};
SWindowResInfo* pWindowResInfo = &pMeterQueryInfo->windowResInfo;
doGetAlignedIntervalQueryRangeImpl(pQuery, win.skey, win.skey, win.ekey, &skey1, &ekey1, &windowSKey, &windowEKey);
pWindowResInfo->startTime = windowSKey;
if (pWindowResInfo->prevSKey == 0) {
pWindowResInfo->prevSKey = windowSKey;
}
STimeWindow win = getActiveSlidingWindow(&pMeterQueryInfo->windowResInfo, key, pQuery);
int64_t st = win.skey;
SWindowResult *pWindowRes =
doSetSlidingWindowFromKey(pRuntimeEnv, &pMeterQueryInfo->windowResInfo, (char *)&st, TSDB_KEYSIZE);
win = getActiveTimeWindow(pWindowResInfo, key, pQuery);
SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&win.skey, TSDB_KEYSIZE);
if (pWindowRes == NULL) {
return;
}
pWindowRes->window = win;
// setGroupOutputBuffer(pRuntimeEnv, pWindowRes);
// initCtxOutputBuf(pRuntimeEnv);
// getAlignedIntervalQueryRange(pRuntimeEnv, pQuery->skey, pSupporter->rawSKey, pSupporter->rawEKey);
// saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo);
pMeterQueryInfo->queryRangeSet = 1;
pMeterQueryInfo->lastKey = win.skey;
pMeterQueryInfo->skey = win.skey;
}
}
......@@ -7844,7 +7625,7 @@ static int32_t getNumOfSubset(SMeterQuerySupportObj *pSupporter) {
int32_t totalSubset = 0;
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (pQuery->intervalTime > 0 && pQuery->slidingTime > 0)) {
totalSubset = numOfClosedSlidingWindow(&pSupporter->runtimeEnv.windowResInfo);
totalSubset = numOfClosedTimeWindow(&pSupporter->runtimeEnv.windowResInfo);
} else {
totalSubset = pSupporter->pSidSet->numOfSubSet;
}
......@@ -7935,19 +7716,44 @@ static void applyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMeterD
SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
SMeterQueryInfo * pMeterQueryInfo = pMeterDataInfo->pMeterQInfo;
SWindowResInfo* pWindowResInfo = &pMeterQueryInfo->windowResInfo;
int64_t *pPrimaryKey = (int64_t *)pRuntimeEnv->primaryColBuffer->data;
/*
* for each block, we need to handle the previous query, since the determination of previous query being completed
* or not is based on the start key of current block.
*/
TSKEY key = getNextAccessedKeyInData(pQuery, pPrimaryKey, pBlockInfo, blockStatus);
setIntervalQueryRange(pMeterDataInfo->pMeterQInfo, pSupporter, key);
if (((pQuery->skey > pQuery->ekey) && QUERY_IS_ASC_QUERY(pQuery)) ||
((pQuery->skey < pQuery->ekey) && !QUERY_IS_ASC_QUERY(pQuery))) {
return;
setIntervalQueryRange(pMeterQueryInfo, pSupporter, key);
int32_t forwardStep =
getNumOfRowsInTimeWindow(pQuery, pBlockInfo, pPrimaryKey, pQuery->pos, pQuery->ekey, searchFn, true);
int32_t numOfRes = 0;
int64_t st = taosGetTimestampUs();
if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL) {
numOfRes = rowwiseApplyAllFunctions(pRuntimeEnv, &forwardStep, pFields, pBlockInfo, pWindowResInfo);
} else {
numOfRes = blockwiseApplyAllFunctions(pRuntimeEnv, forwardStep, pFields, pBlockInfo, pWindowResInfo, searchFn);
}
int64_t e = taosGetTimestampUs() - st;
printf("-------------------------------%lld\n", e);
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
if ((pQuery->lastKey > pSupporter->rawEKey && QUERY_IS_ASC_QUERY(pQuery)) ||
(pQuery->lastKey < pSupporter->rawEKey && !QUERY_IS_ASC_QUERY(pQuery))) {
pMeterQueryInfo->ekey = pQuery->lastKey - step;
}
// doCheckQueryCompleted(pRuntimeEnv, pQuery->lastKey, pWindowResInfo);
//
// if (((pQuery->skey > pQuery->ekey) && QUERY_IS_ASC_QUERY(pQuery)) ||
// ((pQuery->skey < pQuery->ekey) && !QUERY_IS_ASC_QUERY(pQuery))) {
// return;
// }
// if (((pBlockInfo->keyLast < pQuery->ekey) && QUERY_IS_ASC_QUERY(pQuery)) ||
// ((pBlockInfo->keyFirst > pQuery->ekey) && !QUERY_IS_ASC_QUERY(pQuery))) {
......@@ -7963,7 +7769,7 @@ static void applyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMeterD
// assert(pMeterQueryInfo->lastResRows == 1 || pMeterQueryInfo->lastResRows == 0);
// saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo);
// } else {
doApplyIntervalQueryOnBlock_rv(pSupporter, pMeterQueryInfo, pBlockInfo, pPrimaryKey, pFields, searchFn);
// doApplyIntervalQueryOnBlock_rv(pSupporter, pMeterQueryInfo, pBlockInfo, pPrimaryKey, pFields, searchFn);
// }
}
......
......@@ -241,7 +241,7 @@ static void queryOnMultiDataCache(SQInfo *pQInfo, SMeterDataInfo *pMeterInfo) {
pRuntimeEnv->blockStatus);
totalBlocks++;
queryOnBlock(pSupporter, primaryKeys, pRuntimeEnv->blockStatus, &binfo, &pMeterInfo[k], NULL, searchFn);
queryOnBlock(pSupporter, pRuntimeEnv->blockStatus, &binfo, &pMeterInfo[k], NULL, searchFn);
if (ALL_CACHE_BLOCKS_CHECKED(pQuery)) {
break;
......@@ -447,24 +447,8 @@ static void queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMeterDataInfo
(pBlock->keyFirst >= pQuery->ekey && pBlock->keyLast <= pQuery->lastKey && !QUERY_IS_ASC_QUERY(pQuery)));
}
if (pQuery->intervalTime > 0 && pQuery->slidingTime > 0) {
assert(pMeterQueryInfo->lastKey <= nextKey && QUERY_IS_ASC_QUERY(pQuery));
pMeterQueryInfo->lastKey = nextKey;
pQuery->lastKey = nextKey;
if (pMeterQueryInfo->windowResInfo.prevSKey == 0) {
// normalize the window prev time window
TSKEY skey1, ekey1;
TSKEY windowSKey = 0, windowEKey = 0;
TSKEY skey2 = MIN(pSupporter->rawSKey, pSupporter->rawEKey);
TSKEY ekey2 = MAX(pSupporter->rawSKey, pSupporter->rawEKey);
doGetAlignedIntervalQueryRangeImpl(pQuery, nextKey, skey2, ekey2, &skey1, &ekey1, &windowSKey, &windowEKey);
pMeterQueryInfo->windowResInfo.prevSKey = windowSKey;
}
setIntervalQueryRange(pMeterQueryInfo, pSupporter, nextKey);
ret = setIntervalQueryExecutionContext(pSupporter, pOneMeterDataInfo->meterOrderIdx, pMeterQueryInfo);
if (ret != TSDB_CODE_SUCCESS) {
......@@ -474,8 +458,7 @@ static void queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMeterDataInfo
}
}
queryOnBlock(pSupporter, primaryKeys, pRuntimeEnv->blockStatus, &binfo, pOneMeterDataInfo, pInfoEx->pBlock.fields,
searchFn);
queryOnBlock(pSupporter, pRuntimeEnv->blockStatus, &binfo, pOneMeterDataInfo, pInfoEx->pBlock.fields, searchFn);
}
tfree(pReqMeterDataInfo);
......@@ -711,7 +694,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) {
}
resetCtxOutputBuf(pRuntimeEnv);
resetSlidingWindowInfo(pRuntimeEnv, &pRuntimeEnv->windowResInfo);
resetTimeWindowInfo(pRuntimeEnv, &pRuntimeEnv->windowResInfo);
while (pSupporter->meterIdx < pSupporter->numOfMeters) {
int32_t k = pSupporter->meterIdx;
......@@ -1115,7 +1098,7 @@ static void vnodeSingleMeterIntervalMainLooper(SMeterQuerySupportObj *pSupporter
while (1) {
initCtxOutputBuf(pRuntimeEnv);
clearClosedSlidingWindows(pRuntimeEnv);
clearClosedTimeWindow(pRuntimeEnv);
vnodeScanAllData(pRuntimeEnv);
if (isQueryKilled(pQuery)) {
......@@ -1124,8 +1107,6 @@ static void vnodeSingleMeterIntervalMainLooper(SMeterQuerySupportObj *pSupporter
assert(!Q_STATUS_EQUAL(pQuery->over, QUERY_NOT_COMPLETED));
// clear tag, used to decide if the whole interval query is completed or not
pQuery->over &= (~QUERY_COMPLETED);
doFinalizeResult(pRuntimeEnv);
int64_t maxOutput = getNumOfResult(pRuntimeEnv);
......@@ -1143,7 +1124,7 @@ static void vnodeSingleMeterIntervalMainLooper(SMeterQuerySupportObj *pSupporter
// forwardCtxOutputBuf(pRuntimeEnv, maxOutput);
}
if (Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK)) {
if (Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK|QUERY_COMPLETED)) {
break;
}
......
......@@ -670,7 +670,7 @@ void *vnodeQueryOnSingleTable(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyE
tsBufNextPos(pTSBuf);
}
if (((*code) = vnodeQuerySingleMeterPrepare(pQInfo, pQInfo->pObj, pSupporter, pTSBuf)) != TSDB_CODE_SUCCESS) {
if (((*code) = vnodeQuerySingleTablePrepare(pQInfo, pQInfo->pObj, pSupporter, pTSBuf)) != TSDB_CODE_SUCCESS) {
goto _error;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册