提交 0ff775c4 编写于 作者: H Haojun Liao

fix(query): keep order info in fill operator.

上级 547ab702
...@@ -523,8 +523,8 @@ typedef struct SIntervalAggOperatorInfo { ...@@ -523,8 +523,8 @@ typedef struct SIntervalAggOperatorInfo {
STimeWindow win; // query time range STimeWindow win; // query time range
bool timeWindowInterpo; // interpolation needed or not bool timeWindowInterpo; // interpolation needed or not
SArray* pInterpCols; // interpolation columns SArray* pInterpCols; // interpolation columns
int32_t order; // current SSDataBlock scan order
int32_t resultTsOrder; // result timestamp order int32_t resultTsOrder; // result timestamp order
int32_t inputOrder; // input data ts order
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
STimeWindowAggSupp twAggSup; STimeWindowAggSupp twAggSup;
bool invertible; bool invertible;
...@@ -534,8 +534,7 @@ typedef struct SIntervalAggOperatorInfo { ...@@ -534,8 +534,7 @@ typedef struct SIntervalAggOperatorInfo {
SArray* pDelWins; // SWinRes SArray* pDelWins; // SWinRes
int32_t delIndex; int32_t delIndex;
SSDataBlock* pDelRes; SSDataBlock* pDelRes;
SNode* pCondition;
SNode *pCondition;
} SIntervalAggOperatorInfo; } SIntervalAggOperatorInfo;
typedef struct SMergeAlignedIntervalAggOperatorInfo { typedef struct SMergeAlignedIntervalAggOperatorInfo {
...@@ -805,7 +804,7 @@ typedef struct STagFilterOperatorInfo { ...@@ -805,7 +804,7 @@ typedef struct STagFilterOperatorInfo {
typedef struct SJoinOperatorInfo { typedef struct SJoinOperatorInfo {
SSDataBlock *pRes; SSDataBlock *pRes;
int32_t joinType; int32_t joinType;
int32_t inputTsOrder; int32_t inputOrder;
SSDataBlock *pLeft; SSDataBlock *pLeft;
int32_t leftPos; int32_t leftPos;
......
...@@ -76,10 +76,9 @@ bool taosFillHasMoreResults(struct SFillInfo* pFillInfo); ...@@ -76,10 +76,9 @@ bool taosFillHasMoreResults(struct SFillInfo* pFillInfo);
SFillInfo* taosCreateFillInfo(TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols, SFillInfo* taosCreateFillInfo(TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols,
SInterval* pInterval, int32_t fillType, struct SFillColInfo* pCol, int32_t slotId, SInterval* pInterval, int32_t fillType, struct SFillColInfo* pCol, int32_t slotId,
const char* id); int32_t order, const char* id);
void* taosDestroyFillInfo(struct SFillInfo *pFillInfo); void* taosDestroyFillInfo(struct SFillInfo *pFillInfo);
void taosFillSetDataOrderInfo(SFillInfo* pFillInfo, int32_t order);
int64_t taosFillResultDataBlock(struct SFillInfo* pFillInfo, SSDataBlock* p, int32_t capacity); int64_t taosFillResultDataBlock(struct SFillInfo* pFillInfo, SSDataBlock* p, int32_t capacity);
int64_t getFillInfoStart(struct SFillInfo *pFillInfo); int64_t getFillInfoStart(struct SFillInfo *pFillInfo);
......
...@@ -3234,8 +3234,6 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) { ...@@ -3234,8 +3234,6 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
return pResBlock; return pResBlock;
} }
taosFillSetDataOrderInfo(pInfo->pFillInfo, TSDB_ORDER_ASC);
SOperatorInfo* pDownstream = pOperator->pDownstream[0]; SOperatorInfo* pDownstream = pOperator->pDownstream[0];
while (1) { while (1) {
SSDataBlock* pBlock = pDownstream->fpSet.getNextFn(pDownstream); SSDataBlock* pBlock = pDownstream->fpSet.getNextFn(pDownstream);
...@@ -3588,14 +3586,14 @@ void doDestroyExchangeOperatorInfo(void* param) { ...@@ -3588,14 +3586,14 @@ void doDestroyExchangeOperatorInfo(void* param) {
} }
static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t numOfCols, SNodeListNode* pValNode, static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t numOfCols, SNodeListNode* pValNode,
STimeWindow win, int32_t capacity, const char* id, SInterval* pInterval, int32_t fillType) { STimeWindow win, int32_t capacity, const char* id, SInterval* pInterval, int32_t fillType, int32_t order) {
SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, pValNode); SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, pValNode);
STimeWindow w = getAlignQueryTimeWindow(pInterval, pInterval->precision, win.skey); STimeWindow w = getAlignQueryTimeWindow(pInterval, pInterval->precision, win.skey);
w = getFirstQualifiedTimeWindow(win.skey, &w, pInterval, TSDB_ORDER_ASC); w = getFirstQualifiedTimeWindow(win.skey, &w, pInterval, TSDB_ORDER_ASC);
pInfo->pFillInfo = pInfo->pFillInfo =
taosCreateFillInfo(w.skey, 0, capacity, numOfCols, pInterval, fillType, pColInfo, pInfo->primaryTsCol, id); taosCreateFillInfo(w.skey, 0, capacity, numOfCols, pInterval, fillType, pColInfo, pInfo->primaryTsCol, order, id);
pInfo->win = win; pInfo->win = win;
pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES); pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES);
...@@ -3625,6 +3623,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* ...@@ -3625,6 +3623,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
? &((SMergeAlignedIntervalAggOperatorInfo*)downstream->info)->intervalAggOperatorInfo->interval ? &((SMergeAlignedIntervalAggOperatorInfo*)downstream->info)->intervalAggOperatorInfo->interval
: &((SIntervalAggOperatorInfo*)downstream->info)->interval; : &((SIntervalAggOperatorInfo*)downstream->info)->interval;
int32_t order = (pPhyFillNode->inputTsOrder == ORDER_ASC)? TSDB_ORDER_ASC:TSDB_ORDER_DESC;
int32_t type = convertFillType(pPhyFillNode->mode); int32_t type = convertFillType(pPhyFillNode->mode);
SResultInfo* pResultInfo = &pOperator->resultInfo; SResultInfo* pResultInfo = &pOperator->resultInfo;
...@@ -3636,7 +3635,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* ...@@ -3636,7 +3635,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
&numOfOutputCols, COL_MATCH_FROM_SLOT_ID); &numOfOutputCols, COL_MATCH_FROM_SLOT_ID);
int32_t code = initFillInfo(pInfo, pExprInfo, num, (SNodeListNode*)pPhyFillNode->pValues, pPhyFillNode->timeRange, int32_t code = initFillInfo(pInfo, pExprInfo, num, (SNodeListNode*)pPhyFillNode->pValues, pPhyFillNode->timeRange,
pResultInfo->capacity, pTaskInfo->id.str, pInterval, type); pResultInfo->capacity, pTaskInfo->id.str, pInterval, type, order);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
......
...@@ -77,11 +77,11 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t ...@@ -77,11 +77,11 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
pInfo->pCondAfterMerge = NULL; pInfo->pCondAfterMerge = NULL;
} }
pInfo->inputTsOrder = TSDB_ORDER_ASC; pInfo->inputOrder = TSDB_ORDER_ASC;
if (pJoinNode->inputTsOrder == ORDER_ASC) { if (pJoinNode->inputTsOrder == ORDER_ASC) {
pInfo->inputTsOrder = TSDB_ORDER_ASC; pInfo->inputOrder = TSDB_ORDER_ASC;
} else if (pJoinNode->inputTsOrder == ORDER_DESC) { } else if (pJoinNode->inputTsOrder == ORDER_DESC) {
pInfo->inputTsOrder = TSDB_ORDER_DESC; pInfo->inputOrder = TSDB_ORDER_DESC;
} }
pOperator->fpSet = pOperator->fpSet =
...@@ -312,7 +312,7 @@ static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes) ...@@ -312,7 +312,7 @@ static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes)
int32_t nrows = pRes->info.rows; int32_t nrows = pRes->info.rows;
bool asc = (pJoinInfo->inputTsOrder == TSDB_ORDER_ASC) ? true : false; bool asc = (pJoinInfo->inputOrder == TSDB_ORDER_ASC) ? true : false;
while (1) { while (1) {
int64_t leftTs = 0; int64_t leftTs = 0;
......
...@@ -435,7 +435,7 @@ static int32_t taosNumOfRemainRows(SFillInfo* pFillInfo) { ...@@ -435,7 +435,7 @@ static int32_t taosNumOfRemainRows(SFillInfo* pFillInfo) {
struct SFillInfo* taosCreateFillInfo(TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols, struct SFillInfo* taosCreateFillInfo(TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols,
SInterval* pInterval, int32_t fillType, struct SFillColInfo* pCol, SInterval* pInterval, int32_t fillType, struct SFillColInfo* pCol,
int32_t primaryTsSlotId, const char* id) { int32_t primaryTsSlotId, int32_t order, const char* id) {
if (fillType == TSDB_FILL_NONE) { if (fillType == TSDB_FILL_NONE) {
return NULL; return NULL;
} }
...@@ -446,6 +446,7 @@ struct SFillInfo* taosCreateFillInfo(TSKEY skey, int32_t numOfTags, int32_t capa ...@@ -446,6 +446,7 @@ struct SFillInfo* taosCreateFillInfo(TSKEY skey, int32_t numOfTags, int32_t capa
return NULL; return NULL;
} }
pFillInfo->order = order;
pFillInfo->tsSlotId = primaryTsSlotId; pFillInfo->tsSlotId = primaryTsSlotId;
taosResetFillInfo(pFillInfo, skey); taosResetFillInfo(pFillInfo, skey);
......
...@@ -362,7 +362,7 @@ static void setNotInterpoWindowKey(SqlFunctionCtx* pCtx, int32_t numOfOutput, in ...@@ -362,7 +362,7 @@ static void setNotInterpoWindowKey(SqlFunctionCtx* pCtx, int32_t numOfOutput, in
static bool setTimeWindowInterpolationStartTs(SIntervalAggOperatorInfo* pInfo, int32_t pos, SSDataBlock* pBlock, static bool setTimeWindowInterpolationStartTs(SIntervalAggOperatorInfo* pInfo, int32_t pos, SSDataBlock* pBlock,
const TSKEY* tsCols, STimeWindow* win, SExprSupp* pSup) { const TSKEY* tsCols, STimeWindow* win, SExprSupp* pSup) {
bool ascQuery = (pInfo->order == TSDB_ORDER_ASC); bool ascQuery = (pInfo->inputOrder == TSDB_ORDER_ASC);
TSKEY curTs = tsCols[pos]; TSKEY curTs = tsCols[pos];
...@@ -392,7 +392,7 @@ static bool setTimeWindowInterpolationStartTs(SIntervalAggOperatorInfo* pInfo, i ...@@ -392,7 +392,7 @@ static bool setTimeWindowInterpolationStartTs(SIntervalAggOperatorInfo* pInfo, i
static bool setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SExprSupp* pSup, int32_t endRowIndex, static bool setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SExprSupp* pSup, int32_t endRowIndex,
SArray* pDataBlock, const TSKEY* tsCols, TSKEY blockEkey, SArray* pDataBlock, const TSKEY* tsCols, TSKEY blockEkey,
STimeWindow* win) { STimeWindow* win) {
int32_t order = pInfo->order; int32_t order = pInfo->inputOrder;
TSKEY actualEndKey = tsCols[endRowIndex]; TSKEY actualEndKey = tsCols[endRowIndex];
TSKEY key = (order == TSDB_ORDER_ASC) ? win->ekey : win->skey; TSKEY key = (order == TSDB_ORDER_ASC) ? win->ekey : win->skey;
...@@ -550,7 +550,7 @@ static void doWindowBorderInterpolation(SIntervalAggOperatorInfo* pInfo, SSDataB ...@@ -550,7 +550,7 @@ static void doWindowBorderInterpolation(SIntervalAggOperatorInfo* pInfo, SSDataB
if (!done) { if (!done) {
int32_t endRowIndex = startPos + forwardRows - 1; int32_t endRowIndex = startPos + forwardRows - 1;
TSKEY endKey = (pInfo->order == TSDB_ORDER_ASC) ? pBlock->info.window.ekey : pBlock->info.window.skey; TSKEY endKey = (pInfo->inputOrder == TSDB_ORDER_ASC) ? pBlock->info.window.ekey : pBlock->info.window.skey;
bool interp = setTimeWindowInterpolationEndTs(pInfo, pSup, endRowIndex, pBlock->pDataBlock, tsCols, endKey, win); bool interp = setTimeWindowInterpolationEndTs(pInfo, pSup, endRowIndex, pBlock->pDataBlock, tsCols, endKey, win);
if (interp) { if (interp) {
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
...@@ -639,7 +639,7 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num ...@@ -639,7 +639,7 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num
setNotInterpoWindowKey(pSup->pCtx, numOfExprs, RESULT_ROW_START_INTERP); setNotInterpoWindowKey(pSup->pCtx, numOfExprs, RESULT_ROW_START_INTERP);
doApplyFunctions(pTaskInfo, pSup->pCtx, &w, &pInfo->twAggSup.timeWindowData, startPos, 0, tsCols, pBlock->info.rows, doApplyFunctions(pTaskInfo, pSup->pCtx, &w, &pInfo->twAggSup.timeWindowData, startPos, 0, tsCols, pBlock->info.rows,
numOfExprs, pInfo->order); numOfExprs, pInfo->inputOrder);
if (isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) { if (isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
closeResultRow(pr); closeResultRow(pr);
...@@ -924,11 +924,11 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul ...@@ -924,11 +924,11 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
int32_t numOfOutput = pSup->numOfExprs; int32_t numOfOutput = pSup->numOfExprs;
int64_t* tsCols = extractTsCol(pBlock, pInfo); int64_t* tsCols = extractTsCol(pBlock, pInfo);
uint64_t tableGroupId = pBlock->info.groupId; uint64_t tableGroupId = pBlock->info.groupId;
bool ascScan = (pInfo->order == TSDB_ORDER_ASC); bool ascScan = (pInfo->inputOrder == TSDB_ORDER_ASC);
TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols); TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols);
SResultRow* pResult = NULL; SResultRow* pResult = NULL;
STimeWindow win = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->order); STimeWindow win = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->inputOrder);
int32_t ret = TSDB_CODE_SUCCESS; int32_t ret = TSDB_CODE_SUCCESS;
if ((!pInfo->ignoreExpiredData || !isCloseWindow(&win, &pInfo->twAggSup)) && if ((!pInfo->ignoreExpiredData || !isCloseWindow(&win, &pInfo->twAggSup)) &&
inSlidingWindow(&pInfo->interval, &win, &pBlock->info)) { inSlidingWindow(&pInfo->interval, &win, &pBlock->info)) {
...@@ -946,7 +946,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul ...@@ -946,7 +946,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
TSKEY ekey = ascScan ? win.ekey : win.skey; TSKEY ekey = ascScan ? win.ekey : win.skey;
int32_t forwardRows = int32_t forwardRows =
getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->order); getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->inputOrder);
ASSERT(forwardRows > 0); ASSERT(forwardRows > 0);
// prev time window not interpolation yet. // prev time window not interpolation yet.
...@@ -969,7 +969,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul ...@@ -969,7 +969,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
inSlidingWindow(&pInfo->interval, &win, &pBlock->info)) { inSlidingWindow(&pInfo->interval, &win, &pBlock->info)) {
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, true); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, true);
doApplyFunctions(pTaskInfo, pSup->pCtx, &win, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols, doApplyFunctions(pTaskInfo, pSup->pCtx, &win, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols,
pBlock->info.rows, numOfOutput, pInfo->order); pBlock->info.rows, numOfOutput, pInfo->inputOrder);
} }
doCloseWindow(pResultRowInfo, pInfo, pResult); doCloseWindow(pResultRowInfo, pInfo, pResult);
...@@ -977,14 +977,14 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul ...@@ -977,14 +977,14 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
STimeWindow nextWin = win; STimeWindow nextWin = win;
while (1) { while (1) {
int32_t prevEndPos = forwardRows - 1 + startPos; int32_t prevEndPos = forwardRows - 1 + startPos;
startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos, pInfo->order); startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos, pInfo->inputOrder);
if (startPos < 0) { if (startPos < 0) {
break; break;
} }
if (pInfo->ignoreExpiredData && isCloseWindow(&nextWin, &pInfo->twAggSup)) { if (pInfo->ignoreExpiredData && isCloseWindow(&nextWin, &pInfo->twAggSup)) {
ekey = ascScan ? nextWin.ekey : nextWin.skey; ekey = ascScan ? nextWin.ekey : nextWin.skey;
forwardRows = forwardRows =
getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->order); getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->inputOrder);
continue; continue;
} }
...@@ -1002,14 +1002,14 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul ...@@ -1002,14 +1002,14 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
ekey = ascScan ? nextWin.ekey : nextWin.skey; ekey = ascScan ? nextWin.ekey : nextWin.skey;
forwardRows = forwardRows =
getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->order); getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->inputOrder);
// window start(end) key interpolation // window start(end) key interpolation
doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup); doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
doApplyFunctions(pTaskInfo, pSup->pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols, doApplyFunctions(pTaskInfo, pSup->pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols,
pBlock->info.rows, numOfOutput, pInfo->order); pBlock->info.rows, numOfOutput, pInfo->inputOrder);
doCloseWindow(pResultRowInfo, pInfo, pResult); doCloseWindow(pResultRowInfo, pInfo, pResult);
} }
...@@ -1082,7 +1082,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { ...@@ -1082,7 +1082,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
break; break;
} }
getTableScanInfo(pOperator, &pInfo->order, &scanFlag); getTableScanInfo(pOperator, &pInfo->inputOrder, &scanFlag);
if (pInfo->scalarSupp.pExprInfo != NULL) { if (pInfo->scalarSupp.pExprInfo != NULL) {
SExprSupp* pExprSup = &pInfo->scalarSupp; SExprSupp* pExprSup = &pInfo->scalarSupp;
...@@ -1090,7 +1090,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { ...@@ -1090,7 +1090,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
} }
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pSup->pCtx, pBlock, pInfo->order, scanFlag, true); setInputDataBlock(pOperator, pSup->pCtx, pBlock, pInfo->inputOrder, scanFlag, true);
blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex); blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex);
hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag, NULL); hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag, NULL);
...@@ -1549,7 +1549,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { ...@@ -1549,7 +1549,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
SIntervalAggOperatorInfo* pInfo = pOperator->info; SIntervalAggOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
pInfo->order = TSDB_ORDER_ASC; pInfo->inputOrder = TSDB_ORDER_ASC;
SExprSupp* pSup = &pOperator->exprSupp; SExprSupp* pSup = &pOperator->exprSupp;
if (pOperator->status == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
...@@ -1609,7 +1609,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { ...@@ -1609,7 +1609,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
// The timewindow that overlaps the timestamps of the input pBlock need to be recalculated and return to the // The timewindow that overlaps the timestamps of the input pBlock need to be recalculated and return to the
// caller. Note that all the time window are not close till now. // caller. Note that all the time window are not close till now.
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pSup->pCtx, pBlock, pInfo->order, MAIN_SCAN, true); setInputDataBlock(pOperator, pSup->pCtx, pBlock, pInfo->inputOrder, MAIN_SCAN, true);
if (pInfo->invertible) { if (pInfo->invertible) {
setInverFunction(pSup->pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.type); setInverFunction(pSup->pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.type);
} }
...@@ -1789,8 +1789,8 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* ...@@ -1789,8 +1789,8 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
} }
pInfo->win = pTaskInfo->window; pInfo->win = pTaskInfo->window;
pInfo->order = TSDB_ORDER_ASC; pInfo->inputOrder = (pPhyNode->window.inputTsOrder == ORDER_ASC)? TSDB_ORDER_ASC:TSDB_ORDER_DESC;
pInfo->resultTsOrder = TSDB_ORDER_ASC; pInfo->resultTsOrder = (pPhyNode->window.outputTsOrder == ORDER_ASC)? TSDB_ORDER_ASC:TSDB_ORDER_DESC;
pInfo->interval = *pInterval; pInfo->interval = *pInterval;
pInfo->execModel = pTaskInfo->execModel; pInfo->execModel = pTaskInfo->execModel;
pInfo->twAggSup = *pTwAggSupp; pInfo->twAggSup = *pTwAggSupp;
...@@ -1878,7 +1878,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr ...@@ -1878,7 +1878,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr
goto _error; goto _error;
} }
pInfo->order = TSDB_ORDER_ASC; pInfo->inputOrder = TSDB_ORDER_ASC;
pInfo->interval = *pInterval; pInfo->interval = *pInterval;
pInfo->execModel = OPTR_EXEC_MODEL_STREAM; pInfo->execModel = OPTR_EXEC_MODEL_STREAM;
pInfo->win = pTaskInfo->window; pInfo->win = pTaskInfo->window;
...@@ -4563,7 +4563,7 @@ static int32_t outputMergeAlignedIntervalResult(SOperatorInfo* pOperatorInfo, ui ...@@ -4563,7 +4563,7 @@ static int32_t outputMergeAlignedIntervalResult(SOperatorInfo* pOperatorInfo, ui
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
SExprSupp* pSup = &pOperatorInfo->exprSupp; SExprSupp* pSup = &pOperatorInfo->exprSupp;
bool ascScan = (iaInfo->order == TSDB_ORDER_ASC); bool ascScan = (iaInfo->inputOrder == TSDB_ORDER_ASC);
SET_RES_WINDOW_KEY(iaInfo->aggSup.keyBuf, &wstartTs, TSDB_KEYSIZE, tableGroupId); SET_RES_WINDOW_KEY(iaInfo->aggSup.keyBuf, &wstartTs, TSDB_KEYSIZE, tableGroupId);
SResultRowPosition* p1 = (SResultRowPosition*)taosHashGet(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, SResultRowPosition* p1 = (SResultRowPosition*)taosHashGet(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf,
...@@ -4617,7 +4617,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR ...@@ -4617,7 +4617,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR
} else { } else {
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true); updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true);
doApplyFunctions(pTaskInfo, pSup->pCtx, &currWin, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos, doApplyFunctions(pTaskInfo, pSup->pCtx, &currWin, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos,
tsCols, pBlock->info.rows, numOfOutput, iaInfo->order); tsCols, pBlock->info.rows, numOfOutput, iaInfo->inputOrder);
outputMergeAlignedIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, currTs); outputMergeAlignedIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, currTs);
...@@ -4636,7 +4636,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR ...@@ -4636,7 +4636,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR
} }
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true); updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true);
doApplyFunctions(pTaskInfo, pSup->pCtx, &currWin, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos, doApplyFunctions(pTaskInfo, pSup->pCtx, &currWin, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos,
tsCols, pBlock->info.rows, numOfOutput, iaInfo->order); tsCols, pBlock->info.rows, numOfOutput, iaInfo->inputOrder);
outputMergeAlignedIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, currTs); outputMergeAlignedIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, currTs);
} }
...@@ -4681,8 +4681,8 @@ static SSDataBlock* doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) { ...@@ -4681,8 +4681,8 @@ static SSDataBlock* doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
break; break;
} }
getTableScanInfo(pOperator, &iaInfo->order, &scanFlag); getTableScanInfo(pOperator, &iaInfo->inputOrder, &scanFlag);
setInputDataBlock(pOperator, pSup->pCtx, pBlock, iaInfo->order, scanFlag, true); setInputDataBlock(pOperator, pSup->pCtx, pBlock, iaInfo->inputOrder, scanFlag, true);
doMergeAlignedIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, scanFlag, pRes); doMergeAlignedIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, scanFlag, pRes);
doFilter(miaInfo->pCondition, pRes, NULL); doFilter(miaInfo->pCondition, pRes, NULL);
if (pRes->info.rows >= pOperator->resultInfo.capacity) { if (pRes->info.rows >= pOperator->resultInfo.capacity) {
...@@ -4723,7 +4723,7 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, ...@@ -4723,7 +4723,7 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream,
miaInfo->pCondition = pCondition; miaInfo->pCondition = pCondition;
iaInfo->win = pTaskInfo->window; iaInfo->win = pTaskInfo->window;
iaInfo->order = TSDB_ORDER_ASC; iaInfo->inputOrder = TSDB_ORDER_ASC;
iaInfo->interval = *pInterval; iaInfo->interval = *pInterval;
iaInfo->execModel = pTaskInfo->execModel; iaInfo->execModel = pTaskInfo->execModel;
iaInfo->primaryTsIndex = primaryTsSlotId; iaInfo->primaryTsIndex = primaryTsSlotId;
...@@ -4805,7 +4805,7 @@ static int32_t finalizeWindowResult(SOperatorInfo* pOperatorInfo, uint64_t table ...@@ -4805,7 +4805,7 @@ static int32_t finalizeWindowResult(SOperatorInfo* pOperatorInfo, uint64_t table
SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info; SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo; SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo;
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
bool ascScan = (iaInfo->order == TSDB_ORDER_ASC); bool ascScan = (iaInfo->inputOrder == TSDB_ORDER_ASC);
SExprSupp* pExprSup = &pOperatorInfo->exprSupp; SExprSupp* pExprSup = &pOperatorInfo->exprSupp;
SET_RES_WINDOW_KEY(iaInfo->aggSup.keyBuf, &win->skey, TSDB_KEYSIZE, tableGroupId); SET_RES_WINDOW_KEY(iaInfo->aggSup.keyBuf, &win->skey, TSDB_KEYSIZE, tableGroupId);
...@@ -4823,7 +4823,7 @@ static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t t ...@@ -4823,7 +4823,7 @@ static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t t
SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info; SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo; SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo;
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
bool ascScan = (iaInfo->order == TSDB_ORDER_ASC); bool ascScan = (iaInfo->inputOrder == TSDB_ORDER_ASC);
SExprSupp* pExprSup = &pOperatorInfo->exprSupp; SExprSupp* pExprSup = &pOperatorInfo->exprSupp;
SGroupTimeWindow groupTimeWindow = {.groupId = tableGroupId, .window = *newWin}; SGroupTimeWindow groupTimeWindow = {.groupId = tableGroupId, .window = *newWin};
...@@ -4859,12 +4859,12 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* ...@@ -4859,12 +4859,12 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
int32_t numOfOutput = pExprSup->numOfExprs; int32_t numOfOutput = pExprSup->numOfExprs;
int64_t* tsCols = extractTsCol(pBlock, iaInfo); int64_t* tsCols = extractTsCol(pBlock, iaInfo);
uint64_t tableGroupId = pBlock->info.groupId; uint64_t tableGroupId = pBlock->info.groupId;
bool ascScan = (iaInfo->order == TSDB_ORDER_ASC); bool ascScan = (iaInfo->inputOrder == TSDB_ORDER_ASC);
TSKEY blockStartTs = getStartTsKey(&pBlock->info.window, tsCols); TSKEY blockStartTs = getStartTsKey(&pBlock->info.window, tsCols);
SResultRow* pResult = NULL; SResultRow* pResult = NULL;
STimeWindow win = STimeWindow win =
getActiveTimeWindow(iaInfo->aggSup.pResultBuf, pResultRowInfo, blockStartTs, &iaInfo->interval, iaInfo->order); getActiveTimeWindow(iaInfo->aggSup.pResultBuf, pResultRowInfo, blockStartTs, &iaInfo->interval, iaInfo->inputOrder);
int32_t ret = int32_t ret =
setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx, setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx,
...@@ -4875,7 +4875,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* ...@@ -4875,7 +4875,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
TSKEY ekey = ascScan ? win.ekey : win.skey; TSKEY ekey = ascScan ? win.ekey : win.skey;
int32_t forwardRows = int32_t forwardRows =
getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, iaInfo->order); getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, iaInfo->inputOrder);
ASSERT(forwardRows > 0); ASSERT(forwardRows > 0);
// prev time window not interpolation yet. // prev time window not interpolation yet.
...@@ -4896,7 +4896,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* ...@@ -4896,7 +4896,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &win, true); updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &win, true);
doApplyFunctions(pTaskInfo, pExprSup->pCtx, &win, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols, doApplyFunctions(pTaskInfo, pExprSup->pCtx, &win, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols,
pBlock->info.rows, numOfOutput, iaInfo->order); pBlock->info.rows, numOfOutput, iaInfo->inputOrder);
doCloseWindow(pResultRowInfo, iaInfo, pResult); doCloseWindow(pResultRowInfo, iaInfo, pResult);
// output previous interval results after this interval (&win) is closed // output previous interval results after this interval (&win) is closed
...@@ -4905,7 +4905,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* ...@@ -4905,7 +4905,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
STimeWindow nextWin = win; STimeWindow nextWin = win;
while (1) { while (1) {
int32_t prevEndPos = forwardRows - 1 + startPos; int32_t prevEndPos = forwardRows - 1 + startPos;
startPos = getNextQualifiedWindow(&iaInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos, iaInfo->order); startPos = getNextQualifiedWindow(&iaInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos, iaInfo->inputOrder);
if (startPos < 0) { if (startPos < 0) {
break; break;
} }
...@@ -4920,14 +4920,14 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* ...@@ -4920,14 +4920,14 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
ekey = ascScan ? nextWin.ekey : nextWin.skey; ekey = ascScan ? nextWin.ekey : nextWin.skey;
forwardRows = forwardRows =
getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, iaInfo->order); getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, iaInfo->inputOrder);
// window start(end) key interpolation // window start(end) key interpolation
doWindowBorderInterpolation(iaInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pExprSup); doWindowBorderInterpolation(iaInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pExprSup);
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &nextWin, true); updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &nextWin, true);
doApplyFunctions(pTaskInfo, pExprSup->pCtx, &nextWin, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows, doApplyFunctions(pTaskInfo, pExprSup->pCtx, &nextWin, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows,
tsCols, pBlock->info.rows, numOfOutput, iaInfo->order); tsCols, pBlock->info.rows, numOfOutput, iaInfo->inputOrder);
doCloseWindow(pResultRowInfo, iaInfo, pResult); doCloseWindow(pResultRowInfo, iaInfo, pResult);
// output previous interval results after this interval (&nextWin) is closed // output previous interval results after this interval (&nextWin) is closed
...@@ -4981,8 +4981,8 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) { ...@@ -4981,8 +4981,8 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
break; break;
} }
getTableScanInfo(pOperator, &iaInfo->order, &scanFlag); getTableScanInfo(pOperator, &iaInfo->inputOrder, &scanFlag);
setInputDataBlock(pOperator, pExpSupp->pCtx, pBlock, iaInfo->order, scanFlag, true); setInputDataBlock(pOperator, pExpSupp->pCtx, pBlock, iaInfo->inputOrder, scanFlag, true);
doMergeIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, scanFlag, pRes); doMergeIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, scanFlag, pRes);
if (pRes->info.rows >= pOperator->resultInfo.threshold) { if (pRes->info.rows >= pOperator->resultInfo.threshold) {
...@@ -5024,9 +5024,8 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI ...@@ -5024,9 +5024,8 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI
miaInfo->groupIntervals = tdListNew(sizeof(SGroupTimeWindow)); miaInfo->groupIntervals = tdListNew(sizeof(SGroupTimeWindow));
SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo; SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo;
iaInfo->win = pTaskInfo->window; iaInfo->win = pTaskInfo->window;
iaInfo->order = TSDB_ORDER_ASC; iaInfo->inputOrder = TSDB_ORDER_ASC;
iaInfo->interval = *pInterval; iaInfo->interval = *pInterval;
iaInfo->execModel = pTaskInfo->execModel; iaInfo->execModel = pTaskInfo->execModel;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册