提交 3c49b6a6 编写于 作者: H Haojun Liao

fix(query): set correct query time window.

上级 5ee4a196
...@@ -455,7 +455,6 @@ typedef struct SIntervalAggOperatorInfo { ...@@ -455,7 +455,6 @@ typedef struct SIntervalAggOperatorInfo {
int32_t primaryTsIndex; // primary time stamp slot id from result of downstream operator. int32_t primaryTsIndex; // primary time stamp slot id from result of downstream operator.
STimeWindow win; // query time range STimeWindow win; // query time range
bool timeWindowInterpo; // interpolation needed or not bool timeWindowInterpo; // interpolation needed or not
char** pRow; // previous row/tuple of already processed datablock
SArray* pInterpCols; // interpolation columns SArray* pInterpCols; // interpolation columns
int32_t order; // current SSDataBlock scan order int32_t order; // current SSDataBlock scan order
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
...@@ -903,7 +902,7 @@ int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result); ...@@ -903,7 +902,7 @@ int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result);
int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* length); int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* length);
STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval, STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval,
int32_t precision, STimeWindow* win); int32_t precision, int32_t order);
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey, int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey,
__block_search_fn_t searchFn, STableQueryInfo* item, int32_t order); __block_search_fn_t searchFn, STableQueryInfo* item, int32_t order);
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order); int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
......
...@@ -938,7 +938,7 @@ static bool prepareDataScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t t ...@@ -938,7 +938,7 @@ static bool prepareDataScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t t
setGroupId(pInfo, pSDB, GROUPID_COLUMN_INDEX, *pRowIndex); setGroupId(pInfo, pSDB, GROUPID_COLUMN_INDEX, *pRowIndex);
(*pRowIndex) += updateSessionWindowInfo(pCurWin, tsCols, NULL, pSDB->info.rows, *pRowIndex, gap, NULL); (*pRowIndex) += updateSessionWindowInfo(pCurWin, tsCols, NULL, pSDB->info.rows, *pRowIndex, gap, NULL);
} else { } else {
win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[*pRowIndex], &pInfo->interval, pInfo->interval.precision, NULL); win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[*pRowIndex], &pInfo->interval, pInfo->interval.precision, TSDB_ORDER_ASC);
setGroupId(pInfo, pSDB, GROUPID_COLUMN_INDEX, *pRowIndex); setGroupId(pInfo, pSDB, GROUPID_COLUMN_INDEX, *pRowIndex);
(*pRowIndex) += (*pRowIndex) +=
getNumOfRowsInTimeWindow(&pSDB->info, tsCols, *pRowIndex, win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); getNumOfRowsInTimeWindow(&pSDB->info, tsCols, *pRowIndex, win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
......
...@@ -79,36 +79,110 @@ static void getInitialStartTimeWindow(SInterval* pInterval, int32_t precision, T ...@@ -79,36 +79,110 @@ static void getInitialStartTimeWindow(SInterval* pInterval, int32_t precision, T
} }
} }
static STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order) {
int32_t factor = (order == TSDB_ORDER_ASC)? -1:1;
STimeWindow win = *pWindow;
STimeWindow save = win;
while(win.skey <= ts && win.ekey >= ts) {
save = win;
win.skey = taosTimeAdd(win.skey, factor * pInterval->sliding, pInterval->slidingUnit, pInterval->precision);
win.ekey = taosTimeAdd(win.ekey, factor * pInterval->sliding, pInterval->slidingUnit, pInterval->precision);
}
return save;
}
// get the correct time window according to the handled timestamp // get the correct time window according to the handled timestamp
STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval, STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval,
int32_t precision, STimeWindow* win) { int32_t precision, int32_t order) {
STimeWindow w = {0}; STimeWindow w = {0};
if (pResultRowInfo->cur.pageId == -1) { // the first window, from the previous stored value if (pResultRowInfo->cur.pageId == -1) { // the first window, from the previous stored value
getInitialStartTimeWindow(pInterval, precision, ts, &w, true); getInitialStartTimeWindow(pInterval, precision, ts, &w, true);
w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1; w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
} else { return w;
w = getResultRowByPos(pBuf, &pResultRowInfo->cur)->win;
} }
if (w.skey > ts || w.ekey < ts) { w = getResultRowByPos(pBuf, &pResultRowInfo->cur)->win;
if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
w.skey = taosTimeTruncate(ts, pInterval, precision); if (pInterval->interval == pInterval->sliding) {
w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1; if (w.skey > ts || w.ekey < ts) {
} else { if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
int64_t st = w.skey; w.skey = taosTimeTruncate(ts, pInterval, precision);
w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
} else {
int64_t st = w.skey;
if (st > ts) { if (st > ts) {
st -= ((st - ts + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding; st -= ((st - ts + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
}
int64_t et = st + pInterval->interval - 1;
if (et < ts) {
st += ((ts - et + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
}
w.skey = st;
w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
} }
}
} else { // it is an sliding window query, in which sliding value is not equalled to
// interval value, and we need to find the first qualified time window for asc/desc traverse respectively.
if (order == TSDB_ORDER_ASC) {
if (w.skey <= ts && w.ekey >= ts) {
// ts is resident in current time window, but we need to find the first
//qualified time window that cover this timestamp
w = getFirstQualifiedTimeWindow(ts, &w, pInterval, order);
} else {
// todo refactor:
if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
w.skey = taosTimeTruncate(ts, pInterval, precision);
w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
} else {
int64_t st = w.skey;
if (st > ts) {
st -= ((st - ts + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
}
int64_t et = st + pInterval->interval - 1; int64_t et = st + pInterval->interval - 1;
if (et < ts) { if (et < ts) {
st += ((ts - et + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding; st += ((ts - et + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
}
w.skey = st;
w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
w = getFirstQualifiedTimeWindow(ts, &w, pInterval, order);
}
} }
} else {
if (w.skey <= ts && w.ekey >= ts) {
w = getFirstQualifiedTimeWindow(ts, &w, pInterval, order);
} else {
// todo refactor:
if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
w.skey = taosTimeTruncate(ts, pInterval, precision);
w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
} else {
int64_t st = w.skey;
if (st > ts) {
st -= ((st - ts + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
}
w.skey = st; int64_t et = st + pInterval->interval - 1;
w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1; if (et < ts) {
st += ((ts - et + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
}
w.skey = st;
w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
w = getFirstQualifiedTimeWindow(ts, &w, pInterval, order);
}
}
} }
} }
return w; return w;
...@@ -856,7 +930,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul ...@@ -856,7 +930,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
SResultRow* pResult = NULL; SResultRow* pResult = NULL;
STimeWindow win = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, STimeWindow win = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval,
pInfo->interval.precision, &pInfo->win); pInfo->interval.precision, pInfo->order);
int32_t ret = TSDB_CODE_SUCCESS; int32_t ret = TSDB_CODE_SUCCESS;
if (!pInfo->ignoreExpiredData || !isCloseWindow(&win, &pInfo->twAggSup)) { if (!pInfo->ignoreExpiredData || !isCloseWindow(&win, &pInfo->twAggSup)) {
ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx, ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx,
...@@ -1010,21 +1084,6 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { ...@@ -1010,21 +1084,6 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pSup->pCtx, pBlock, pInfo->order, scanFlag, true); setInputDataBlock(pOperator, pSup->pCtx, pBlock, pInfo->order, scanFlag, true);
hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag, NULL); hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag, NULL);
#if 0 // test for encode/decode result info
if(pOperator->fpSet.encodeResultRow){
char *result = NULL;
int32_t length = 0;
SAggSupporter *pSup = &pInfo->aggSup;
pOperator->fpSet.encodeResultRow(pOperator, &result, &length);
taosHashClear(pSup->pResultRowHashTable);
pInfo->binfo.resultRowInfo.size = 0;
pOperator->fpSet.decodeResultRow(pOperator, result);
if(result){
taosMemoryFree(result);
}
}
#endif
} }
closeAllResultRows(&pInfo->binfo.resultRowInfo); closeAllResultRows(&pInfo->binfo.resultRowInfo);
...@@ -1330,7 +1389,7 @@ void doDeleteSpecifyIntervalWindow(SAggSupporter* pAggSup, SSDataBlock* pBlock, ...@@ -1330,7 +1389,7 @@ void doDeleteSpecifyIntervalWindow(SAggSupporter* pAggSup, SSDataBlock* pBlock,
for (int32_t i = 0; i < pBlock->info.rows; i++) { for (int32_t i = 0; i < pBlock->info.rows; i++) {
SResultRowInfo dumyInfo; SResultRowInfo dumyInfo;
dumyInfo.cur.pageId = -1; dumyInfo.cur.pageId = -1;
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsStarts[i], pInterval, pInterval->precision, NULL); STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsStarts[i], pInterval, pInterval->precision, TSDB_ORDER_ASC);
doDeleteIntervalWindow(pAggSup, win.skey, groupIds[i]); doDeleteIntervalWindow(pAggSup, win.skey, groupIds[i]);
if (pUpWins) { if (pUpWins) {
SWinRes winRes = {.ts = win.skey, .groupId = groupIds[i]}; SWinRes winRes = {.ts = win.skey, .groupId = groupIds[i]};
...@@ -1352,7 +1411,7 @@ static void doClearWindows(SAggSupporter* pAggSup, SExprSupp* pSup1, SInterval* ...@@ -1352,7 +1411,7 @@ static void doClearWindows(SAggSupporter* pAggSup, SExprSupp* pSup1, SInterval*
for (int32_t i = 0; i < pBlock->info.rows; i += step) { for (int32_t i = 0; i < pBlock->info.rows; i += step) {
SResultRowInfo dumyInfo; SResultRowInfo dumyInfo;
dumyInfo.cur.pageId = -1; dumyInfo.cur.pageId = -1;
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[i], pInterval, pInterval->precision, NULL); STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[i], pInterval, pInterval->precision, TSDB_ORDER_ASC);
step = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, i, win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); step = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, i, win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
uint64_t winGpId = pGpDatas ? pGpDatas[i] : pBlock->info.groupId; uint64_t winGpId = pGpDatas ? pGpDatas[i] : pBlock->info.groupId;
bool res = doClearWindow(pAggSup, pSup1, (char*)&win.skey, sizeof(TKEY), winGpId, numOfOutput); bool res = doClearWindow(pAggSup, pSup1, (char*)&win.skey, sizeof(TKEY), winGpId, numOfOutput);
...@@ -1392,7 +1451,7 @@ static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup, ...@@ -1392,7 +1451,7 @@ static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup,
TSKEY ts = *(int64_t*)((char*)key + sizeof(uint64_t)); TSKEY ts = *(int64_t*)((char*)key + sizeof(uint64_t));
SResultRowInfo dumyInfo; SResultRowInfo dumyInfo;
dumyInfo.cur.pageId = -1; dumyInfo.cur.pageId = -1;
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, ts, pInterval, pInterval->precision, NULL); STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, ts, pInterval, pInterval->precision, TSDB_ORDER_ASC);
SWinRes winRe = { SWinRes winRe = {
.ts = win.skey, .ts = win.skey,
.groupId = groupId, .groupId = groupId,
...@@ -2488,7 +2547,7 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc ...@@ -2488,7 +2547,7 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
int32_t startPos = ascScan ? 0 : (pSDataBlock->info.rows - 1); int32_t startPos = ascScan ? 0 : (pSDataBlock->info.rows - 1);
TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols); TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols);
STimeWindow nextWin = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, STimeWindow nextWin = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval,
pInfo->interval.precision, NULL); pInfo->interval.precision, pInfo->order);
while (1) { while (1) {
bool isClosed = isCloseWindow(&nextWin, &pInfo->twAggSup); bool isClosed = isCloseWindow(&nextWin, &pInfo->twAggSup);
if (pInfo->ignoreExpiredData && isClosed) { if (pInfo->ignoreExpiredData && isClosed) {
...@@ -4454,8 +4513,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR ...@@ -4454,8 +4513,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR
currTs = tsCols[currPos]; currTs = tsCols[currPos];
currWin.skey = currTs; currWin.skey = currTs;
currWin.ekey = taosTimeAdd(currWin.skey, iaInfo->interval.interval, iaInfo->interval.intervalUnit, currWin.ekey = taosTimeAdd(currWin.skey, iaInfo->interval.interval, iaInfo->interval.intervalUnit,
iaInfo->interval.precision) - iaInfo->interval.precision) - 1;
1;
startPos = currPos; startPos = currPos;
ret = setTimeWindowOutputBuf(pResultRowInfo, &currWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, ret = setTimeWindowOutputBuf(pResultRowInfo, &currWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo); pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
...@@ -4692,7 +4750,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* ...@@ -4692,7 +4750,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
SResultRow* pResult = NULL; SResultRow* pResult = NULL;
STimeWindow win = getActiveTimeWindow(iaInfo->aggSup.pResultBuf, pResultRowInfo, blockStartTs, &iaInfo->interval, STimeWindow win = getActiveTimeWindow(iaInfo->aggSup.pResultBuf, pResultRowInfo, blockStartTs, &iaInfo->interval,
iaInfo->interval.precision, &iaInfo->win); iaInfo->interval.precision, iaInfo->order);
int32_t ret = int32_t ret =
setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx, setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册