提交 c3d702fe 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 4a9e1e37
...@@ -902,7 +902,7 @@ int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result); ...@@ -902,7 +902,7 @@ int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result);
int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* length); int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* length);
STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval, STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval,
int32_t precision, int32_t order); int32_t order);
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey, int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey,
__block_search_fn_t searchFn, STableQueryInfo* item, int32_t order); __block_search_fn_t searchFn, STableQueryInfo* item, int32_t order);
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order); int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
......
...@@ -938,7 +938,7 @@ static bool prepareDataScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t t ...@@ -938,7 +938,7 @@ static bool prepareDataScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t t
setGroupId(pInfo, pSDB, GROUPID_COLUMN_INDEX, *pRowIndex); setGroupId(pInfo, pSDB, GROUPID_COLUMN_INDEX, *pRowIndex);
(*pRowIndex) += updateSessionWindowInfo(pCurWin, tsCols, NULL, pSDB->info.rows, *pRowIndex, gap, NULL); (*pRowIndex) += updateSessionWindowInfo(pCurWin, tsCols, NULL, pSDB->info.rows, *pRowIndex, gap, NULL);
} else { } else {
win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[*pRowIndex], &pInfo->interval, pInfo->interval.precision, TSDB_ORDER_ASC); win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[*pRowIndex], &pInfo->interval, TSDB_ORDER_ASC);
setGroupId(pInfo, pSDB, GROUPID_COLUMN_INDEX, *pRowIndex); setGroupId(pInfo, pSDB, GROUPID_COLUMN_INDEX, *pRowIndex);
(*pRowIndex) += (*pRowIndex) +=
getNumOfRowsInTimeWindow(&pSDB->info, tsCols, *pRowIndex, win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); getNumOfRowsInTimeWindow(&pSDB->info, tsCols, *pRowIndex, win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
......
...@@ -93,25 +93,12 @@ static STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, ...@@ -93,25 +93,12 @@ static STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow,
return save; return save;
} }
// todo do refactor static STimeWindow doCalculateTimeWindow(int64_t ts, SInterval* pInterval) {
// get the correct time window according to the handled timestamp
STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval,
int32_t precision, int32_t order) {
STimeWindow w = {0}; STimeWindow w = {0};
if (pResultRowInfo->cur.pageId == -1) { // the first window, from the previous stored value
getInitialStartTimeWindow(pInterval, precision, ts, &w, true);
w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
return w;
}
w = getResultRowByPos(pBuf, &pResultRowInfo->cur)->win;
if (pInterval->interval == pInterval->sliding) {
if (w.skey > ts || w.ekey < ts) {
if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') { if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
w.skey = taosTimeTruncate(ts, pInterval, precision); w.skey = taosTimeTruncate(ts, pInterval, pInterval->precision);
w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1; w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1;
} else { } else {
int64_t st = w.skey; int64_t st = w.skey;
...@@ -125,67 +112,36 @@ STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowI ...@@ -125,67 +112,36 @@ STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowI
} }
w.skey = st; w.skey = st;
w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1; w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1;
}
}
} else { // it is an sliding window query, in which sliding value is not equalled to
// interval value, and we need to find the first qualified time window for asc/desc traverse respectively.
if (order == TSDB_ORDER_ASC) {
if (w.skey <= ts && w.ekey >= ts) {
// ts is resident in current time window, but we need to find the first
//qualified time window that cover this timestamp
w = getFirstQualifiedTimeWindow(ts, &w, pInterval, order);
} else {
// todo refactor:
if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
w.skey = taosTimeTruncate(ts, pInterval, precision);
w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
} else {
int64_t st = w.skey;
if (st > ts) {
st -= ((st - ts + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
}
int64_t et = st + pInterval->interval - 1;
if (et < ts) {
st += ((ts - et + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
} }
w.skey = st; return w;
w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1; }
w = getFirstQualifiedTimeWindow(ts, &w, pInterval, order); // todo do refactor
} // get the correct time window according to the handled timestamp
STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval,
int32_t order) {
STimeWindow w = {0};
if (pResultRowInfo->cur.pageId == -1) { // the first window, from the previous stored value
getInitialStartTimeWindow(pInterval, pInterval->precision, ts, &w, (order == TSDB_ORDER_ASC));
w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1;
return w;
} }
} else {
if (w.skey <= ts && w.ekey >= ts) {
w = getFirstQualifiedTimeWindow(ts, &w, pInterval, order);
} else {
// todo refactor:
if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
w.skey = taosTimeTruncate(ts, pInterval, precision);
w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
} else {
int64_t st = w.skey;
if (st > ts) { w = getResultRowByPos(pBuf, &pResultRowInfo->cur)->win;
st -= ((st - ts + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
}
int64_t et = st + pInterval->interval - 1; // in case of typical time window, we can calculate time window directly.
if (et < ts) { if (w.skey > ts || w.ekey < ts) {
st += ((ts - et + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding; w = doCalculateTimeWindow(ts, pInterval);
} }
w.skey = st; if (pInterval->interval != pInterval->sliding) {
w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1; // it is an sliding window query, in which sliding value is not equalled to
// interval value, and we need to find the first qualified time window.
w = getFirstQualifiedTimeWindow(ts, &w, pInterval, order); w = getFirstQualifiedTimeWindow(ts, &w, pInterval, order);
} }
}
}
}
return w; return w;
} }
...@@ -930,8 +886,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul ...@@ -930,8 +886,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols); TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols);
SResultRow* pResult = NULL; SResultRow* pResult = NULL;
STimeWindow win = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, STimeWindow win = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->order);
pInfo->interval.precision, pInfo->order);
int32_t ret = TSDB_CODE_SUCCESS; int32_t ret = TSDB_CODE_SUCCESS;
if (!pInfo->ignoreExpiredData || !isCloseWindow(&win, &pInfo->twAggSup)) { if (!pInfo->ignoreExpiredData || !isCloseWindow(&win, &pInfo->twAggSup)) {
ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx, ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx,
...@@ -1390,7 +1345,7 @@ void doDeleteSpecifyIntervalWindow(SAggSupporter* pAggSup, SSDataBlock* pBlock, ...@@ -1390,7 +1345,7 @@ void doDeleteSpecifyIntervalWindow(SAggSupporter* pAggSup, SSDataBlock* pBlock,
for (int32_t i = 0; i < pBlock->info.rows; i++) { for (int32_t i = 0; i < pBlock->info.rows; i++) {
SResultRowInfo dumyInfo; SResultRowInfo dumyInfo;
dumyInfo.cur.pageId = -1; dumyInfo.cur.pageId = -1;
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsStarts[i], pInterval, pInterval->precision, TSDB_ORDER_ASC); STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsStarts[i], pInterval, TSDB_ORDER_ASC);
doDeleteIntervalWindow(pAggSup, win.skey, groupIds[i]); doDeleteIntervalWindow(pAggSup, win.skey, groupIds[i]);
if (pUpWins) { if (pUpWins) {
SWinRes winRes = {.ts = win.skey, .groupId = groupIds[i]}; SWinRes winRes = {.ts = win.skey, .groupId = groupIds[i]};
...@@ -1412,7 +1367,7 @@ static void doClearWindows(SAggSupporter* pAggSup, SExprSupp* pSup1, SInterval* ...@@ -1412,7 +1367,7 @@ static void doClearWindows(SAggSupporter* pAggSup, SExprSupp* pSup1, SInterval*
for (int32_t i = 0; i < pBlock->info.rows; i += step) { for (int32_t i = 0; i < pBlock->info.rows; i += step) {
SResultRowInfo dumyInfo; SResultRowInfo dumyInfo;
dumyInfo.cur.pageId = -1; dumyInfo.cur.pageId = -1;
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[i], pInterval, pInterval->precision, TSDB_ORDER_ASC); STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[i], pInterval, TSDB_ORDER_ASC);
step = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, i, win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); step = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, i, win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
uint64_t winGpId = pGpDatas ? pGpDatas[i] : pBlock->info.groupId; uint64_t winGpId = pGpDatas ? pGpDatas[i] : pBlock->info.groupId;
bool res = doClearWindow(pAggSup, pSup1, (char*)&win.skey, sizeof(TKEY), winGpId, numOfOutput); bool res = doClearWindow(pAggSup, pSup1, (char*)&win.skey, sizeof(TKEY), winGpId, numOfOutput);
...@@ -1452,7 +1407,7 @@ static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup, ...@@ -1452,7 +1407,7 @@ static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup,
TSKEY ts = *(int64_t*)((char*)key + sizeof(uint64_t)); TSKEY ts = *(int64_t*)((char*)key + sizeof(uint64_t));
SResultRowInfo dumyInfo; SResultRowInfo dumyInfo;
dumyInfo.cur.pageId = -1; dumyInfo.cur.pageId = -1;
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, ts, pInterval, pInterval->precision, TSDB_ORDER_ASC); STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, ts, pInterval, TSDB_ORDER_ASC);
SWinRes winRe = { SWinRes winRe = {
.ts = win.skey, .ts = win.skey,
.groupId = groupId, .groupId = groupId,
...@@ -2547,8 +2502,7 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc ...@@ -2547,8 +2502,7 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
int32_t startPos = ascScan ? 0 : (pSDataBlock->info.rows - 1); int32_t startPos = ascScan ? 0 : (pSDataBlock->info.rows - 1);
TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols); TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols);
STimeWindow nextWin = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, STimeWindow nextWin = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->order);
pInfo->interval.precision, pInfo->order);
while (1) { while (1) {
bool isClosed = isCloseWindow(&nextWin, &pInfo->twAggSup); bool isClosed = isCloseWindow(&nextWin, &pInfo->twAggSup);
if (pInfo->ignoreExpiredData && isClosed) { if (pInfo->ignoreExpiredData && isClosed) {
...@@ -4751,7 +4705,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* ...@@ -4751,7 +4705,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
SResultRow* pResult = NULL; SResultRow* pResult = NULL;
STimeWindow win = getActiveTimeWindow(iaInfo->aggSup.pResultBuf, pResultRowInfo, blockStartTs, &iaInfo->interval, STimeWindow win = getActiveTimeWindow(iaInfo->aggSup.pResultBuf, pResultRowInfo, blockStartTs, &iaInfo->interval,
iaInfo->interval.precision, iaInfo->order); iaInfo->order);
int32_t ret = int32_t ret =
setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx, setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册