提交 dc4052bd 编写于 作者: H Haojun Liao

fix(query): handle the grouped fill.

上级 5250d78c
......@@ -561,6 +561,7 @@ typedef struct SFillOperatorInfo {
SNode* pCondition;
SArray* pColMatchColInfo;
int32_t primaryTsCol;
uint64_t curGroupId; // current handled group id
} SFillOperatorInfo;
typedef struct SGroupbyOperatorInfo {
......
......@@ -1637,8 +1637,6 @@ static int32_t compressQueryColData(SColumnInfoData* pColRes, int32_t numOfRows,
int32_t doFillTimeIntervalGapsInResults(struct SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t capacity) {
int32_t numOfRows = (int32_t)taosFillResultDataBlock(pFillInfo, pBlock, capacity - pBlock->info.rows);
pBlock->info.rows += numOfRows;
return pBlock->info.rows;
}
......@@ -3344,14 +3342,15 @@ static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo* pInfo, SResult
taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock);
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pResultInfo->capacity);
pInfo->curGroupId = pInfo->existNewGroupBlock->info.groupId;
pInfo->existNewGroupBlock = NULL;
*newgroup = true;
// *newgroup = true;
}
static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo* pInfo, SResultInfo* pResultInfo, bool* newgroup,
SExecTaskInfo* pTaskInfo) {
if (taosFillHasMoreResults(pInfo->pFillInfo)) {
*newgroup = false;
// *newgroup = false;
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pResultInfo->capacity);
if (pInfo->pRes->info.rows > pResultInfo->threshold || (!pInfo->multigroupResult)) {
return;
......@@ -3373,10 +3372,7 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
blockDataCleanup(pResBlock);
// todo handle different group data interpolation
bool n = false;
bool* newgroup = &n;
doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, newgroup, pTaskInfo);
doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, NULL, pTaskInfo);
if (pResBlock->info.rows > pResultInfo->threshold || (!pInfo->multigroupResult && pResBlock->info.rows > 0)) {
return pResBlock;
}
......@@ -3384,31 +3380,29 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
SOperatorInfo* pDownstream = pOperator->pDownstream[0];
while (1) {
SSDataBlock* pBlock = pDownstream->fpSet.getNextFn(pDownstream);
if (*newgroup) {
assert(pBlock != NULL);
}
blockDataUpdateTsWindow(pBlock, pInfo->primaryTsCol);
if (*newgroup && pInfo->totalInputRows > 0) { // there are already processed current group data block
pInfo->existNewGroupBlock = pBlock;
*newgroup = false;
if (pBlock == NULL) {
if (pInfo->totalInputRows == 0) {
pOperator->status = OP_EXEC_DONE;
return NULL;
}
// Fill the previous group data block, before handle the data block of new group.
// Close the fill operation for previous group data block
taosFillSetStartInfo(pInfo->pFillInfo, 0, pInfo->win.ekey);
} else {
if (pBlock == NULL) {
if (pInfo->totalInputRows == 0) {
pOperator->status = OP_EXEC_DONE;
return NULL;
}
blockDataUpdateTsWindow(pBlock, pInfo->primaryTsCol);
if (pInfo->curGroupId == 0 || pInfo->curGroupId == pBlock->info.groupId) {
pInfo->curGroupId = pBlock->info.groupId; // the first data block
taosFillSetStartInfo(pInfo->pFillInfo, 0, pInfo->win.ekey);
} else {
pInfo->totalInputRows += pBlock->info.rows;
taosFillSetStartInfo(pInfo->pFillInfo, pBlock->info.rows, pBlock->info.window.ekey);
taosFillSetInputDataBlock(pInfo->pFillInfo, pBlock);
} else if (pInfo->curGroupId != pBlock->info.groupId) { // the new group data block
pInfo->existNewGroupBlock = pBlock;
// Fill the previous group data block, before handle the data block of new group.
// Close the fill operation for previous group data block
taosFillSetStartInfo(pInfo->pFillInfo, 0, pInfo->win.ekey);
}
}
......@@ -3419,17 +3413,17 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
if (pResBlock->info.rows > 0) {
// 1. The result in current group not reach the threshold of output result, continue
// 2. If multiple group results existing in one SSDataBlock is not allowed, return immediately
if (pResBlock->info.rows > pResultInfo->threshold || pBlock == NULL || (!pInfo->multigroupResult)) {
if (pResBlock->info.rows > pResultInfo->threshold || pBlock == NULL || pInfo->existNewGroupBlock != NULL) {
return pResBlock;
}
doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, newgroup, pTaskInfo);
if (pResBlock->info.rows > pOperator->resultInfo.threshold || pBlock == NULL) {
doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, NULL, pTaskInfo);
if (pResBlock->info.rows >= pOperator->resultInfo.threshold || pBlock == NULL) {
return pResBlock;
}
} else if (pInfo->existNewGroupBlock) { // try next group
assert(pBlock != NULL);
doHandleRemainBlockForNewGroupImpl(pInfo, pResultInfo, newgroup, pTaskInfo);
doHandleRemainBlockForNewGroupImpl(pInfo, pResultInfo, NULL, pTaskInfo);
if (pResBlock->info.rows > pResultInfo->threshold) {
return pResBlock;
}
......
......@@ -72,7 +72,7 @@ static void doFillOneRow(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSDataBlock*
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order);
// set the primary timestamp column value
int32_t index = pFillInfo->numOfCurrent;
int32_t index = pBlock->info.rows;
// set the other values
if (pFillInfo->type == TSDB_FILL_PREV) {
......@@ -191,6 +191,7 @@ static void doFillOneRow(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSDataBlock*
SInterval* pInterval = &pFillInfo->interval;
pFillInfo->currentKey =
taosTimeAdd(pFillInfo->currentKey, pInterval->sliding * step, pInterval->slidingUnit, pInterval->precision);
pBlock->info.rows += 1;
pFillInfo->numOfCurrent++;
}
......@@ -273,6 +274,7 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t
}
} else {
assert(pFillInfo->currentKey == ts);
int32_t index = pBlock->info.rows;
if (pFillInfo->type == TSDB_FILL_NEXT && (pFillInfo->index + 1) < pFillInfo->numOfRows) {
int32_t nextRowIndex = pFillInfo->index + 1;
......@@ -296,24 +298,24 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t
if (i == 0 || (/*pCol->functionId != FUNCTION_COUNT &&*/ !colDataIsNull_s(pSrc, pFillInfo->index)) /*||
(pCol->functionId == FUNCTION_COUNT && GET_INT64_VAL(src) != 0)*/) {
bool isNull = colDataIsNull_s(pSrc, pFillInfo->index);
colDataAppend(pDst, pFillInfo->numOfCurrent, src, isNull);
colDataAppend(pDst, index, src, isNull);
saveColData(pFillInfo->prev, i, src, isNull);
} else { // i > 0 and data is null , do interpolation
if (pFillInfo->type == TSDB_FILL_PREV) {
SGroupKeys* pKey = taosArrayGet(pFillInfo->prev, i);
doSetVal(pDst, pFillInfo->numOfCurrent, pKey);
doSetVal(pDst, index, pKey);
} else if (pFillInfo->type == TSDB_FILL_LINEAR) {
bool isNull = colDataIsNull_s(pSrc, pFillInfo->index);
colDataAppend(pDst, pFillInfo->numOfCurrent, src, isNull);
colDataAppend(pDst, index, src, isNull);
saveColData(pFillInfo->prev, i, src, isNull);
} else if (pFillInfo->type == TSDB_FILL_NULL) {
colDataAppendNULL(pDst, pFillInfo->numOfCurrent);
colDataAppendNULL(pDst, index);
} else if (pFillInfo->type == TSDB_FILL_NEXT) {
SGroupKeys* pKey = taosArrayGet(pFillInfo->next, i);
doSetVal(pDst, pFillInfo->numOfCurrent, pKey);
doSetVal(pDst, index, pKey);
} else {
SVariant* pVar = &pFillInfo->pFillCol[i].fillVal;
colDataAppend(pDst, pFillInfo->numOfCurrent, (char*)&pVar->i, false);
colDataAppend(pDst, index, (char*)&pVar->i, false);
}
}
}
......@@ -324,6 +326,7 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t
pFillInfo->currentKey =
taosTimeAdd(pFillInfo->currentKey, pInterval->sliding * step, pInterval->slidingUnit, pInterval->precision);
pBlock->info.rows += 1;
pFillInfo->index += 1;
pFillInfo->numOfCurrent += 1;
}
......
......@@ -6032,9 +6032,6 @@ int32_t lastrowFunction(SqlFunctionCtx* pCtx) {
}
pInfo->ts = cts;
pInfo->hasResult = true;
pResInfo->numOfRes = 1;
if (pCtx->subsidiaries.num > 0) {
STuplePos* pTuplePos = (STuplePos*)(pInfo->buf + bytes + sizeof(TSKEY));
if (!pInfo->hasResult) {
......@@ -6043,6 +6040,8 @@ int32_t lastrowFunction(SqlFunctionCtx* pCtx) {
copyTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos);
}
}
pInfo->hasResult = true;
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册