未验证 提交 53686de9 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #15606 from taosdata/feature/3_liaohj

fix(query): remove invalid time window close ops.
......@@ -549,7 +549,8 @@ typedef struct SMergeAlignedIntervalAggOperatorInfo {
SIntervalAggOperatorInfo *intervalAggOperatorInfo;
bool hasGroupId;
uint64_t groupId;
uint64_t groupId; // current groupId
int64_t curTs; // current ts
SSDataBlock* prefetchedBlock;
bool inputBlocksFinished;
......
......@@ -1445,6 +1445,7 @@ static void doUpdateNumOfRows(SResultRow* pRow, int32_t numOfExprs, const int32_
}
}
// todo extract method with copytoSSDataBlock
int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition,
SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs,
const int32_t* rowCellOffset, SSDataBlock* pBlock,
......
......@@ -507,6 +507,7 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
colDataSetNull_f(bitmap, (*rows));
} else {
memcpy(data + (*columnLen), colDataGetData(pColInfoData, j), bytes);
ASSERT((data + (*columnLen) + bytes - (char*)pPage) <= getBufPageSize(pInfo->pBuf));
}
contentLen = bytes;
}
......
......@@ -4589,11 +4589,10 @@ void destroyMergeAlignedIntervalOperatorInfo(void* param, int32_t numOfOutput) {
static int32_t outputMergeAlignedIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId,
SSDataBlock* pResultBlock, TSKEY wstartTs) {
SMergeAlignedIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
SIntervalAggOperatorInfo* iaInfo = miaInfo->intervalAggOperatorInfo;
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
SExprSupp* pSup = &pOperatorInfo->exprSupp;
bool ascScan = (iaInfo->inputOrder == TSDB_ORDER_ASC);
SIntervalAggOperatorInfo* iaInfo = miaInfo->intervalAggOperatorInfo;
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
SExprSupp* pSup = &pOperatorInfo->exprSupp;
SET_RES_WINDOW_KEY(iaInfo->aggSup.keyBuf, &wstartTs, TSDB_KEYSIZE, tableGroupId);
SResultRowPosition* p1 = (SResultRowPosition*)taosHashGet(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf,
......@@ -4603,8 +4602,9 @@ static int32_t outputMergeAlignedIntervalResult(SOperatorInfo* pOperatorInfo, ui
finalizeResultRowIntoResultDataBlock(iaInfo->aggSup.pResultBuf, p1, pSup->pCtx, pSup->pExprInfo, pSup->numOfExprs,
pSup->rowEntryInfoOffset, pResultBlock, pTaskInfo);
taosHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
ASSERT(taosHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 0);
return 0;
return TSDB_CODE_SUCCESS;
}
static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo,
......@@ -4619,11 +4619,20 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR
int32_t numOfOutput = pSup->numOfExprs;
int64_t* tsCols = extractTsCol(pBlock, iaInfo);
uint64_t tableGroupId = pBlock->info.groupId;
TSKEY blockStartTs = getStartTsKey(&pBlock->info.window, tsCols);
TSKEY currTs = getStartTsKey(&pBlock->info.window, tsCols);
SResultRow* pResult = NULL;
STimeWindow win;
win.skey = blockStartTs;
// there is an result exists
if (miaInfo->curTs != INT64_MIN) {
ASSERT(taosHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 1);
if (currTs != miaInfo->curTs) {
outputMergeAlignedIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, miaInfo->curTs);
miaInfo->curTs = INT64_MIN;
}
}
STimeWindow win = {0};
win.skey = currTs;
win.ekey =
taosTimeAdd(win.skey, iaInfo->interval.interval, iaInfo->interval.intervalUnit, iaInfo->interval.precision) - 1;
......@@ -4634,41 +4643,48 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
TSKEY currTs = blockStartTs;
TSKEY currPos = startPos;
miaInfo->curTs = win.skey;
int32_t currPos = startPos;
STimeWindow currWin = win;
while (1) {
++currPos;
if (currPos >= pBlock->info.rows) {
break;
}
while (++currPos < pBlock->info.rows) {
if (tsCols[currPos] == currTs) {
continue;
} else {
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true);
doApplyFunctions(pTaskInfo, pSup->pCtx, &currWin, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos,
tsCols, pBlock->info.rows, numOfOutput, iaInfo->inputOrder);
outputMergeAlignedIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, currTs);
currTs = tsCols[currPos];
currWin.skey = currTs;
currWin.ekey = taosTimeAdd(currWin.skey, iaInfo->interval.interval, iaInfo->interval.intervalUnit,
iaInfo->interval.precision) -
1;
startPos = currPos;
ret = setTimeWindowOutputBuf(pResultRowInfo, &currWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
}
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true);
doApplyFunctions(pTaskInfo, pSup->pCtx, &currWin, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos,
tsCols, pBlock->info.rows, numOfOutput, iaInfo->inputOrder);
outputMergeAlignedIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, currTs);
miaInfo->curTs = INT64_MIN;
currTs = tsCols[currPos];
currWin.skey = currTs;
currWin.ekey = taosTimeAdd(currWin.skey, iaInfo->interval.interval, iaInfo->interval.intervalUnit,
iaInfo->interval.precision) - 1;
startPos = currPos;
ret = setTimeWindowOutputBuf(pResultRowInfo, &currWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx,
numOfOutput, pSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
miaInfo->curTs = currWin.skey;
}
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true);
doApplyFunctions(pTaskInfo, pSup->pCtx, &currWin, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos,
tsCols, pBlock->info.rows, numOfOutput, iaInfo->inputOrder);
outputMergeAlignedIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, currTs);
if (currPos >= pBlock->info.rows) {
// we need to see next block if exists
} else {
ASSERT(0);
outputMergeAlignedIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, currTs);
miaInfo->curTs = INT64_MIN;
}
}
static SSDataBlock* doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
......@@ -4682,12 +4698,14 @@ static SSDataBlock* doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
SExprSupp* pSup = &pOperator->exprSupp;
SSDataBlock* pRes = iaInfo->binfo.pRes;
blockDataCleanup(pRes);
blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity);
if (!miaInfo->inputBlocksFinished) {
SOperatorInfo* downstream = pOperator->pDownstream[0];
int32_t scanFlag = MAIN_SCAN;
while (1) {
SSDataBlock* pBlock = NULL;
if (miaInfo->prefetchedBlock == NULL) {
......@@ -4699,6 +4717,14 @@ static SSDataBlock* doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
}
if (pBlock == NULL) {
// close last unfinalized time window
if (miaInfo->curTs != INT64_MIN) {
ASSERT(taosHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 1);
outputMergeAlignedIntervalResult(pOperator, miaInfo->groupId, pRes, miaInfo->curTs);
miaInfo->curTs = INT64_MIN;
}
doSetOperatorCompleted(pOperator);
miaInfo->inputBlocksFinished = true;
break;
}
......@@ -4707,7 +4733,11 @@ static SSDataBlock* doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
miaInfo->hasGroupId = true;
miaInfo->groupId = pBlock->info.groupId;
} else if (miaInfo->groupId != pBlock->info.groupId) {
// if there are unclosed time window, close it firstly.
ASSERT(miaInfo->curTs != INT64_MIN);
outputMergeAlignedIntervalResult(pOperator, miaInfo->groupId, pRes, miaInfo->curTs);
miaInfo->prefetchedBlock = pBlock;
miaInfo->curTs = INT64_MIN;
break;
}
......@@ -4722,11 +4752,8 @@ static SSDataBlock* doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
pRes->info.groupId = miaInfo->groupId;
}
miaInfo->hasGroupId = false;
if (miaInfo->inputBlocksFinished) {
doSetOperatorCompleted(pOperator);
}
miaInfo->hasGroupId = false;
size_t rows = pRes->info.rows;
pOperator->resultInfo.totalRows += rows;
......@@ -4752,6 +4779,8 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream,
SExprSupp* pSup = &pOperator->exprSupp;
miaInfo->pCondition = pCondition;
miaInfo->curTs = INT64_MIN;
iaInfo->win = pTaskInfo->window;
iaInfo->inputOrder = TSDB_ORDER_ASC;
iaInfo->interval = *pInterval;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册