提交 2305b058 编写于 作者: H Haojun Liao

refactor(query): do some internal refactor for executor.

上级 9d1a9d95
...@@ -404,25 +404,29 @@ typedef struct SAggSupporter { ...@@ -404,25 +404,29 @@ typedef struct SAggSupporter {
SArray* pResultRowArrayList; // The array list that contains the Result rows SArray* pResultRowArrayList; // The array list that contains the Result rows
char* keyBuf; // window key buffer char* keyBuf; // window key buffer
SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file
int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
} SAggSupporter; } SAggSupporter;
typedef struct STableIntervalOperatorInfo { typedef struct STimeWindowSupp {
SOptrBasicInfo binfo; // basic info int8_t calTrigger;
SGroupResInfo groupResInfo; // multiple results build supporter int64_t waterMark;
SInterval interval; // interval info
int32_t primaryTsIndex; // primary time stamp slot id from result of downstream operator.
STimeWindow win; // query time range
bool timeWindowInterpo; // interpolation needed or not
char** pRow; // previous row/tuple of already processed datablock
SAggSupporter aggSup; // aggregate supporter
STableQueryInfo* pCurrent; // current tableQueryInfo struct
int32_t order; // current SSDataBlock scan order
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
SArray* pUpdatedWindow; // updated time window due to the input data block from the downstream operator.
SColumnInfoData timeWindowData; // query time window info for scalar function execution. SColumnInfoData timeWindowData; // query time window info for scalar function execution.
double watermark; // water mark } STimeWindowAggSupp;
int32_t trigger; // trigger model
typedef struct STableIntervalOperatorInfo {
SOptrBasicInfo binfo; // basic info
SGroupResInfo groupResInfo; // multiple results build supporter
SInterval interval; // interval info
int32_t primaryTsIndex; // primary time stamp slot id from result of downstream operator.
STimeWindow win; // query time range
bool timeWindowInterpo; // interpolation needed or not
char** pRow; // previous row/tuple of already processed datablock
SAggSupporter aggSup; // aggregate supporter
STableQueryInfo* pCurrent; // current tableQueryInfo struct
int32_t order; // current SSDataBlock scan order
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
SArray* pUpdatedWindow; // updated time window due to the input data block from the downstream operator.
STimeWindowAggSupp twAggSup;
} STableIntervalOperatorInfo; } STableIntervalOperatorInfo;
typedef struct SAggOperatorInfo { typedef struct SAggOperatorInfo {
...@@ -440,19 +444,19 @@ typedef struct SAggOperatorInfo { ...@@ -440,19 +444,19 @@ typedef struct SAggOperatorInfo {
} SAggOperatorInfo; } SAggOperatorInfo;
typedef struct SProjectOperatorInfo { typedef struct SProjectOperatorInfo {
SOptrBasicInfo binfo; SOptrBasicInfo binfo;
SAggSupporter aggSup; SAggSupporter aggSup;
SSDataBlock* existDataBlock; SSDataBlock* existDataBlock;
SArray* pPseudoColInfo; SArray* pPseudoColInfo;
SLimit limit; SLimit limit;
SLimit slimit; SLimit slimit;
uint64_t groupId; uint64_t groupId;
int64_t curSOffset; int64_t curSOffset;
int64_t curGroupOutput; int64_t curGroupOutput;
int64_t curOffset; int64_t curOffset;
int64_t curOutput; int64_t curOutput;
} SProjectOperatorInfo; } SProjectOperatorInfo;
typedef struct SFillOperatorInfo { typedef struct SFillOperatorInfo {
...@@ -467,10 +471,10 @@ typedef struct SFillOperatorInfo { ...@@ -467,10 +471,10 @@ typedef struct SFillOperatorInfo {
} SFillOperatorInfo; } SFillOperatorInfo;
typedef struct { typedef struct {
char* pData; char* pData;
bool isNull; bool isNull;
int16_t type; int16_t type;
int32_t bytes; int32_t bytes;
} SGroupKeys, SStateKeys; } SGroupKeys, SStateKeys;
typedef struct SGroupbyOperatorInfo { typedef struct SGroupbyOperatorInfo {
...@@ -489,9 +493,9 @@ typedef struct SGroupbyOperatorInfo { ...@@ -489,9 +493,9 @@ typedef struct SGroupbyOperatorInfo {
} SGroupbyOperatorInfo; } SGroupbyOperatorInfo;
typedef struct SDataGroupInfo { typedef struct SDataGroupInfo {
uint64_t groupId; uint64_t groupId;
int64_t numOfRows; int64_t numOfRows;
SArray* pPageList; SArray* pPageList;
} SDataGroupInfo; } SDataGroupInfo;
// The sort in partition may be needed later. // The sort in partition may be needed later.
...@@ -506,9 +510,8 @@ typedef struct SPartitionOperatorInfo { ...@@ -506,9 +510,8 @@ typedef struct SPartitionOperatorInfo {
SDiskbasedBuf* pBuf; // query result buffer based on blocked-wised disk file SDiskbasedBuf* pBuf; // query result buffer based on blocked-wised disk file
int32_t rowCapacity; // maximum number of rows for each buffer page int32_t rowCapacity; // maximum number of rows for each buffer page
int32_t* columnOffset; // start position for each column data int32_t* columnOffset; // start position for each column data
void* pGroupIter; // group iterator
void* pGroupIter; // group iterator int32_t pageIndex; // page index of current group
int32_t pageIndex; // page index of current group
} SPartitionOperatorInfo; } SPartitionOperatorInfo;
typedef struct SWindowRowsSup { typedef struct SWindowRowsSup {
...@@ -519,13 +522,13 @@ typedef struct SWindowRowsSup { ...@@ -519,13 +522,13 @@ typedef struct SWindowRowsSup {
} SWindowRowsSup; } SWindowRowsSup;
typedef struct SSessionAggOperatorInfo { typedef struct SSessionAggOperatorInfo {
SOptrBasicInfo binfo; SOptrBasicInfo binfo;
SAggSupporter aggSup; SAggSupporter aggSup;
SGroupResInfo groupResInfo; SGroupResInfo groupResInfo;
SWindowRowsSup winSup; SWindowRowsSup winSup;
bool reptScan; // next round scan bool reptScan; // next round scan
int64_t gap; // session window gap int64_t gap; // session window gap
SColumnInfoData timeWindowData; // query time window info for scalar function execution. STimeWindowAggSupp twAggSup;
} SSessionAggOperatorInfo; } SSessionAggOperatorInfo;
typedef struct STimeSliceOperatorInfo { typedef struct STimeSliceOperatorInfo {
...@@ -535,14 +538,14 @@ typedef struct STimeSliceOperatorInfo { ...@@ -535,14 +538,14 @@ typedef struct STimeSliceOperatorInfo {
} STimeSliceOperatorInfo; } STimeSliceOperatorInfo;
typedef struct SStateWindowOperatorInfo { typedef struct SStateWindowOperatorInfo {
SOptrBasicInfo binfo; SOptrBasicInfo binfo;
SAggSupporter aggSup; SAggSupporter aggSup;
SGroupResInfo groupResInfo; SGroupResInfo groupResInfo;
SWindowRowsSup winSup; SWindowRowsSup winSup;
int32_t colIndex; // start row index int32_t colIndex; // start row index
bool hasKey; bool hasKey;
SStateKeys stateKey; SStateKeys stateKey;
SColumnInfoData timeWindowData; // query time window info for scalar function execution. STimeWindowAggSupp twAggSup;
// bool reptScan; // bool reptScan;
} SStateWindowOperatorInfo; } SStateWindowOperatorInfo;
...@@ -640,10 +643,11 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataB ...@@ -640,10 +643,11 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataB
SNode* pCondition, SEpSet epset, SArray* colList, SNode* pCondition, SEpSet epset, SArray* colList,
SExecTaskInfo* pTaskInfo, bool showRewrite, int32_t accountId); SExecTaskInfo* pTaskInfo, bool showRewrite, int32_t accountId);
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlot, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo); STimeWindowAggSupp *pTwAggSupp, const STableGroupInfo* pTableGroupInfo,
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, int64_t gap, SExecTaskInfo* pTaskInfo); SSDataBlock* pResBlock, int64_t gap, STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition, SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition,
SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo, SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo,
...@@ -656,7 +660,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp ...@@ -656,7 +660,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp
SInterval* pInterval, SSDataBlock* pResBlock, int32_t fillType, char* fillVal, SInterval* pInterval, SSDataBlock* pResBlock, int32_t fillType, char* fillVal,
bool multigroupResult, SExecTaskInfo* pTaskInfo); bool multigroupResult, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols,
SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo); SSDataBlock* pResBlock, STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, SArray* pGroupColList, SExecTaskInfo* pTaskInfo, SSDataBlock* pResultBlock, SArray* pGroupColList, SExecTaskInfo* pTaskInfo,
......
...@@ -1097,6 +1097,8 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt ...@@ -1097,6 +1097,8 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt
pInput->totalRows = pBlock->info.rows; pInput->totalRows = pBlock->info.rows;
pInput->numOfRows = pBlock->info.rows; pInput->numOfRows = pBlock->info.rows;
pInput->startRowIndex = 0; pInput->startRowIndex = 0;
pInput->pPTS = taosArrayGet(pBlock->pDataBlock, 0); // todo set the correct timestamp column
ASSERT(pInput->pData[j] != NULL); ASSERT(pInput->pData[j] != NULL);
} else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) { } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
if (createDummyCol) { if (createDummyCol) {
...@@ -1516,8 +1518,8 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe ...@@ -1516,8 +1518,8 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
// window start key interpolation // window start key interpolation
doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &win, startPos, forwardStep, pInfo->order, false); doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &win, startPos, forwardStep, pInfo->order, false);
updateTimeWindowInfo(&pInfo->timeWindowData, &win, true); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, true);
doApplyFunctions(pInfo->binfo.pCtx, &win, &pInfo->timeWindowData, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); doApplyFunctions(pInfo->binfo.pCtx, &win, &pInfo->twAggSup.timeWindowData, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
STimeWindow nextWin = win; STimeWindow nextWin = win;
while (1) { while (1) {
...@@ -1548,8 +1550,8 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe ...@@ -1548,8 +1550,8 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &nextWin, startPos, forwardStep, doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &nextWin, startPos, forwardStep,
pInfo->order, false); pInfo->order, false);
updateTimeWindowInfo(&pInfo->timeWindowData, &nextWin, true); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
doApplyFunctions(pInfo->binfo.pCtx, &nextWin, &pInfo->timeWindowData, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); doApplyFunctions(pInfo->binfo.pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
} }
if (pInfo->timeWindowInterpo) { if (pInfo->timeWindowInterpo) {
...@@ -1620,8 +1622,8 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator ...@@ -1620,8 +1622,8 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
} }
// pInfo->numOfRows data belong to the current session window // pInfo->numOfRows data belong to the current session window
updateTimeWindowInfo(&pInfo->timeWindowData, &window, false); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, false);
doApplyFunctions(pInfo->binfo.pCtx, &window, &pInfo->timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); doApplyFunctions(pInfo->binfo.pCtx, &window, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
// here we start a new session window // here we start a new session window
doKeepNewWindowStartInfo(pRowSup, tsList, j); doKeepNewWindowStartInfo(pRowSup, tsList, j);
...@@ -1637,8 +1639,8 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator ...@@ -1637,8 +1639,8 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
} }
updateTimeWindowInfo(&pInfo->timeWindowData, &pRowSup->win, false); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false);
doApplyFunctions(pInfo->binfo.pCtx, &pRowSup->win, &pInfo->timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); doApplyFunctions(pInfo->binfo.pCtx, &pRowSup->win, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
} }
static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) { static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) {
...@@ -5338,8 +5340,8 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI ...@@ -5338,8 +5340,8 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
} }
updateTimeWindowInfo(&pInfo->timeWindowData, &window, false); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, false);
doApplyFunctions(pInfo->binfo.pCtx, &window, &pInfo->timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); doApplyFunctions(pInfo->binfo.pCtx, &window, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
// here we start a new session window // here we start a new session window
doKeepNewWindowStartInfo(pRowSup, tsList, j); doKeepNewWindowStartInfo(pRowSup, tsList, j);
...@@ -5355,8 +5357,8 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI ...@@ -5355,8 +5357,8 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
} }
updateTimeWindowInfo(&pInfo->timeWindowData, &pRowSup->win, false); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false);
doApplyFunctions(pInfo->binfo.pCtx, &pRowSup->win, &pInfo->timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); doApplyFunctions(pInfo->binfo.pCtx, &pRowSup->win, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
} }
static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) { static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) {
...@@ -5863,30 +5865,30 @@ _error: ...@@ -5863,30 +5865,30 @@ _error:
} }
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlot, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo) { STimeWindowAggSupp *pTwAggSupp, const STableGroupInfo* pTableGroupInfo,
SExecTaskInfo* pTaskInfo) {
STableIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STableIntervalOperatorInfo)); STableIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STableIntervalOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
goto _error; goto _error;
} }
pInfo->order = TSDB_ORDER_ASC; pInfo->order = TSDB_ORDER_ASC;
pInfo->interval = *pInterval; pInfo->interval = *pInterval;
pInfo->execModel = pTaskInfo->execModel; pInfo->execModel = pTaskInfo->execModel;
pInfo->win = pTaskInfo->window;
pInfo->win = pTaskInfo->window; pInfo->win.skey = 0;
pInfo->win.skey = 0; pInfo->win.ekey = INT64_MAX;
pInfo->win.ekey = INT64_MAX; pInfo->primaryTsIndex = primaryTsSlotId;
pInfo->twAggSup = *pTwAggSupp;
pInfo->primaryTsIndex = primaryTsSlot;
int32_t numOfRows = 4096; int32_t numOfRows = 4096;
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
initResultSizeInfo(pOperator, numOfRows); initResultSizeInfo(pOperator, numOfRows);
int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str); int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str);
initExecTimeWindowInfo(&pInfo->timeWindowData, &pInfo->win); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win);
// pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo); // pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo);
if (code != TSDB_CODE_SUCCESS /* || pInfo->pTableQueryInfo == NULL*/) { if (code != TSDB_CODE_SUCCESS /* || pInfo->pTableQueryInfo == NULL*/) {
...@@ -5955,7 +5957,8 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* ...@@ -5955,7 +5957,8 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo*
return NULL; return NULL;
} }
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo) { SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, STimeWindowAggSupp *pTwAggSup,
SExecTaskInfo* pTaskInfo) {
SStateWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStateWindowOperatorInfo)); SStateWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStateWindowOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
...@@ -5969,6 +5972,9 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf ...@@ -5969,6 +5972,9 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExpr, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str); initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExpr, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str);
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
pInfo->twAggSup = *pTwAggSup;
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
pOperator->name = "StateWindowOperator"; pOperator->name = "StateWindowOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW;
pOperator->blockingOptr = true; pOperator->blockingOptr = true;
...@@ -5992,7 +5998,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf ...@@ -5992,7 +5998,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf
} }
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, int64_t gap, SExecTaskInfo* pTaskInfo) { SSDataBlock* pResBlock, int64_t gap, STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo) {
SSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSessionAggOperatorInfo)); SSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSessionAggOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
...@@ -6008,8 +6014,9 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo ...@@ -6008,8 +6014,9 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo
goto _error; goto _error;
} }
pInfo->twAggSup = *pTwAggSupp;
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
initExecTimeWindowInfo(&pInfo->timeWindowData, &pTaskInfo->window); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
pInfo->gap = gap; pInfo->gap = gap;
pInfo->binfo.pRes = pResBlock; pInfo->binfo.pRes = pResBlock;
...@@ -6559,8 +6566,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -6559,8 +6566,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
.precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision
}; };
STimeWindowAggSupp as = {.waterMark = pIntervalPhyNode->window.watermark, .calTrigger = pIntervalPhyNode->window.triggerType};
int32_t primaryTsSlotId = ((SColumnNode*) pIntervalPhyNode->window.pTspk)->slotId; int32_t primaryTsSlotId = ((SColumnNode*) pIntervalPhyNode->window.pTspk)->slotId;
pOptr = createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, primaryTsSlotId, pTableGroupInfo, pTaskInfo); pOptr = createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, primaryTsSlotId, &as, pTableGroupInfo, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) {
SSortPhysiNode* pSortPhyNode = (SSortPhysiNode*)pPhyNode; SSortPhysiNode* pSortPhyNode = (SSortPhysiNode*)pPhyNode;
...@@ -6571,9 +6580,11 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -6571,9 +6580,11 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
} else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == type) {
SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode; SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
STimeWindowAggSupp as = {.waterMark = pSessionNode->window.watermark, .calTrigger = pSessionNode->window.triggerType};
SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &num); SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &num);
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
pOptr = createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, pTaskInfo); pOptr = createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, &as, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) {
SPartitionPhysiNode* pPartNode = (SPartitionPhysiNode*) pPhyNode; SPartitionPhysiNode* pPartNode = (SPartitionPhysiNode*) pPhyNode;
SArray* pColList = extractPartitionColInfo(pPartNode->pPartitionKeys); SArray* pColList = extractPartitionColInfo(pPartNode->pPartitionKeys);
...@@ -6584,9 +6595,11 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -6584,9 +6595,11 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
} else if (QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW == type) {
SStateWinodwPhysiNode* pStateNode = (SStateWinodwPhysiNode*) pPhyNode; SStateWinodwPhysiNode* pStateNode = (SStateWinodwPhysiNode*) pPhyNode;
STimeWindowAggSupp as = {.waterMark = pStateNode->window.watermark, .calTrigger = pStateNode->window.triggerType};
SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num); SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num);
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
pOptr = createStatewindowOperatorInfo(ops[0], pExprInfo, num, pResBlock, pTaskInfo); pOptr = createStatewindowOperatorInfo(ops[0], pExprInfo, num, pResBlock, &as, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_JOIN == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_JOIN == type) {
SJoinPhysiNode* pJoinNode = (SJoinPhysiNode*) pPhyNode; SJoinPhysiNode* pJoinNode = (SJoinPhysiNode*) pPhyNode;
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
......
...@@ -771,13 +771,9 @@ bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { ...@@ -771,13 +771,9 @@ bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
return true; return true;
} }
// TODO fix this // This ordinary first function does not care if current scan is ascending order or descending order scan
// This ordinary first function only handle the data block in ascending order // the OPTIMIZED version of first function will only handle the ascending order scan
int32_t firstFunction(SqlFunctionCtx *pCtx) { int32_t firstFunction(SqlFunctionCtx *pCtx) {
if (pCtx->order == TSDB_ORDER_DESC) {
return 0;
}
int32_t numOfElems = 0; int32_t numOfElems = 0;
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
...@@ -786,29 +782,72 @@ int32_t firstFunction(SqlFunctionCtx *pCtx) { ...@@ -786,29 +782,72 @@ int32_t firstFunction(SqlFunctionCtx *pCtx) {
SInputColumnInfoData* pInput = &pCtx->input; SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pInputCol = pInput->pData[0]; SColumnInfoData* pInputCol = pInput->pData[0];
int32_t bytes = pInputCol->info.bytes;
// All null data column, return directly. // All null data column, return directly.
if (pInput->colDataAggIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows)) { if (pInput->colDataAggIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows)) {
ASSERT(pInputCol->hasNull == true); ASSERT(pInputCol->hasNull == true);
return 0; return 0;
} }
// Check for the first not null data SColumnDataAgg* pColAgg = (pInput->colDataAggIsSet)? pInput->pColumnDataAgg[0]:NULL;
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, NULL)) { TSKEY startKey = *(int64_t*)(pInput->pPTS ? colDataGetData(pInput->pPTS, 0) : 0);
continue; TSKEY endKey = *(int64_t*)(pInput->pPTS ? colDataGetData(pInput->pPTS, pInput->totalRows - 1) : 0);
int32_t blockDataOrder = (startKey <= endKey)? TSDB_ORDER_ASC:TSDB_ORDER_DESC;
if (blockDataOrder == TSDB_ORDER_ASC) {
// filter according to current result firstly
if (pResInfo->numOfRes > 0) {
TSKEY ts = *(TSKEY*)(buf + bytes);
if (ts < startKey) {
return TSDB_CODE_SUCCESS;
}
} }
char* data = colDataGetData(pInputCol, i); for (int32_t i = pInput->startRowIndex; i < pInput->startRowIndex + pInput->numOfRows; ++i) {
memcpy(buf, data, pInputCol->info.bytes); if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) {
// TODO handle the subsidary value continue;
// if (pCtx->ptsList != NULL) { }
// TSKEY k = GET_TS_DATA(pCtx, i);
// DO_UPDATE_TAG_COLUMNS(pCtx, k); char* data = colDataGetData(pInputCol, i);
// }
TSKEY cts = pCtx->ptsList ? GET_TS_DATA(pCtx, i) : 0;
if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) < cts) {
memcpy(buf, data, bytes);
*(TSKEY*)(buf + bytes) = cts;
// DO_UPDATE_TAG_COLUMNS(pCtx, ts);
}
pResInfo->complete = true; numOfElems++;
numOfElems++; }
break; } else {
// in case of descending order time stamp serial, which usually happens as the results of the nest query,
// all data needs to be check.
if (pResInfo->numOfRes > 0) {
TSKEY ts = *(TSKEY*)(buf + bytes);
if (ts < endKey) {
return TSDB_CODE_SUCCESS;
}
}
for (int32_t i = pInput->numOfRows + pInput->startRowIndex - 1; i >= pInput->startRowIndex; --i) {
if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) {
continue;
}
char* data = colDataGetData(pInputCol, i);
TSKEY cts = pCtx->ptsList ? GET_TS_DATA(pCtx, i) : 0;
if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) < cts) {
memcpy(buf, data, bytes);
*(TSKEY*)(buf + bytes) = cts;
// DO_UPDATE_TAG_COLUMNS(pCtx, ts);
}
numOfElems++;
}
} }
SET_VAL(pResInfo, numOfElems, 1); SET_VAL(pResInfo, numOfElems, 1);
...@@ -829,7 +868,7 @@ int32_t lastFunction(SqlFunctionCtx *pCtx) { ...@@ -829,7 +868,7 @@ int32_t lastFunction(SqlFunctionCtx *pCtx) {
SColumnInfoData* pInputCol = pInput->pData[0]; SColumnInfoData* pInputCol = pInput->pData[0];
// All null data column, return directly. // All null data column, return directly.
if (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows) { if (pInput->colDataAggIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows)) {
ASSERT(pInputCol->hasNull == true); ASSERT(pInputCol->hasNull == true);
return 0; return 0;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册