提交 706e3502 编写于 作者: H Haojun Liao

fix(query): fix error in fill and partition.

上级 5301c575
......@@ -120,6 +120,8 @@ int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, int64_t ekey, int32_t
void taosFillSetStartInfo(struct SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey);
void taosResetFillInfo(struct SFillInfo* pFillInfo, TSKEY startTimestamp);
void taosFillSetInputDataBlock(struct SFillInfo* pFillInfo, const struct SSDataBlock* pInput);
void taosFillUpdateStartTimestampInfo(SFillInfo* pFillInfo, int64_t ts);
bool taosFillNotStarted(const SFillInfo* pFillInfo);
SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfFillExpr, SExprInfo* pNotFillExpr,
int32_t numOfNotFillCols, const struct SNodeListNode* val);
bool taosFillHasMoreResults(struct SFillInfo* pFillInfo);
......
......@@ -139,6 +139,12 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
}
SOperatorInfo* pDownstream = pOperator->pDownstream[0];
// the scan order may be different from the output result order for agg interval operator.
if (pDownstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL) {
order = ((SIntervalAggOperatorInfo*) pDownstream->info)->resultTsOrder;
}
while (1) {
SSDataBlock* pBlock = pDownstream->fpSet.getNextFn(pDownstream);
if (pBlock == NULL) {
......@@ -162,11 +168,24 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
pInfo->curGroupId = pInfo->pRes->info.id.groupId; // the first data block
pInfo->totalInputRows += pInfo->pRes->info.rows;
if (order == pInfo->pFillInfo->order) {
if (order == TSDB_ORDER_ASC) {
int64_t skey = pBlock->info.window.skey;
if (skey < pInfo->pFillInfo->start) { // the start key may be smaller than the
ASSERT( taosFillNotStarted(pInfo->pFillInfo));
taosFillUpdateStartTimestampInfo(pInfo->pFillInfo, skey);
}
taosFillSetStartInfo(pInfo->pFillInfo, pInfo->pRes->info.rows, pBlock->info.window.ekey);
} else {
int64_t ekey = pBlock->info.window.ekey;
if (ekey > pInfo->pFillInfo->start) {
ASSERT( taosFillNotStarted(pInfo->pFillInfo));
taosFillUpdateStartTimestampInfo(pInfo->pFillInfo, ekey);
}
taosFillSetStartInfo(pInfo->pFillInfo, pInfo->pRes->info.rows, pBlock->info.window.skey);
}
taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->pRes);
} else if (pInfo->curGroupId != pBlock->info.id.groupId) { // the new group data block
pInfo->existNewGroupBlock = pBlock;
......@@ -256,11 +275,8 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t
const char* id, SInterval* pInterval, int32_t fillType, int32_t order) {
SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, pNotFillExpr, numOfNotFillCols, pValNode);
STimeWindow w = {0};
int64_t startKey = (order == TSDB_ORDER_ASC) ? win.skey : win.ekey;
getInitialStartTimeWindow(pInterval, startKey, &w, order == TSDB_ORDER_ASC);
pInfo->pFillInfo = taosCreateFillInfo(w.skey, numOfCols, numOfNotFillCols, capacity, pInterval, fillType, pColInfo,
int64_t startKey = (order == TSDB_ORDER_ASC) ? win.skey : win.ekey;
pInfo->pFillInfo = taosCreateFillInfo(startKey, numOfCols, numOfNotFillCols, capacity, pInterval, fillType, pColInfo,
pInfo->primaryTsCol, order, id);
if (order == TSDB_ORDER_ASC) {
......
......@@ -517,11 +517,16 @@ void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey)
return;
}
// the endKey is now the aligned time window value. truncate time window isn't correct.
pFillInfo->end = endKey;
if (!FILL_IS_ASC_FILL(pFillInfo)) {
pFillInfo->end = taosTimeTruncate(endKey, &pFillInfo->interval);
pFillInfo->end = taosTimeAdd(pFillInfo->end, pFillInfo->interval.interval, pFillInfo->interval.intervalUnit,pFillInfo->interval.precision);
#if 0
if (pFillInfo->order == TSDB_ORDER_ASC) {
ASSERT(pFillInfo->start <= pFillInfo->end);
} else {
ASSERT(pFillInfo->start >= pFillInfo->end);
}
#endif
pFillInfo->index = 0;
pFillInfo->numOfRows = numOfRows;
......@@ -531,6 +536,13 @@ void taosFillSetInputDataBlock(SFillInfo* pFillInfo, const SSDataBlock* pInput)
pFillInfo->pSrcBlock = (SSDataBlock*)pInput;
}
void taosFillUpdateStartTimestampInfo(SFillInfo* pFillInfo, int64_t ts) {
pFillInfo->start = ts;
pFillInfo->currentKey = ts;
}
bool taosFillNotStarted(const SFillInfo* pFillInfo) {return pFillInfo->start == pFillInfo->currentKey;}
bool taosFillHasMoreResults(SFillInfo* pFillInfo) {
int32_t remain = taosNumOfRemainRows(pFillInfo);
if (remain > 0) {
......@@ -565,6 +577,7 @@ int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, TSKEY ekey, int32_t ma
(ekey1 > pFillInfo->currentKey && !FILL_IS_ASC_FILL(pFillInfo))) {
return 0;
}
numOfRes = taosTimeCountInterval(ekey1, pFillInfo->currentKey, pFillInfo->interval.sliding,
pFillInfo->interval.slidingUnit, pFillInfo->interval.precision);
numOfRes += 1;
......
......@@ -121,7 +121,7 @@ sql select count(*) from (select ts from $mt1 where ts is not null partition by
if $rows != 1 then
return -1
endi
if $data00 != 2 then
if $data00 != 4 then
return -1
endi
......@@ -129,7 +129,7 @@ sql select count(*) from (select ts from $mt1 where ts is not null partition by
if $rows != 1 then
return -1
endi
if $data00 != 8 then
if $data00 != 16 then
return -1
endi
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册