提交 53c3cbd6 编写于 作者: H Haojun Liao

enh(query): support event window.

上级 e7b1a7dc
......@@ -800,6 +800,8 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNode* pSortPhyNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode, SExecTaskInfo* pTaskInfo);
// clang-format on
int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
......@@ -853,6 +855,10 @@ int32_t saveOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult,
void getNextIntervalWindow(SInterval* pInterval, STimeWindow* tw, int32_t order);
int32_t qAppendTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo* pInfo);
void copyResultrowToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultRow* pRow, SqlFunctionCtx* pCtx,
SSDataBlock* pBlock, const int32_t* rowEntryOffset, SExecTaskInfo* pTaskInfo);
void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs, const int32_t* rowEntryOffset) ;
#ifdef __cplusplus
}
#endif
......
......@@ -20,7 +20,6 @@
#include "tcommon.h"
#include "tcompare.h"
#include "tdatablock.h"
#include "tfill.h"
#include "ttime.h"
typedef struct SEventWindowOperatorInfo {
......@@ -29,16 +28,20 @@ typedef struct SEventWindowOperatorInfo {
SExprSupp scalarSup;
SGroupResInfo groupResInfo;
SWindowRowsSup winSup;
SColumn stateCol; // start row index
bool hasKey;
SStateKeys stateKey;
int32_t tsSlotId; // primary timestamp column slot id
STimeWindowAggSupp twAggSup;
SFilterInfo* pStartCondInfo;
SFilterInfo* pEndCondInfo;
bool inWindow;
SResultRow* pRow;
} SEventWindowOperatorInfo;
static int32_t openEventWindowAggOptr(SOperatorInfo* pOperator);
static void destroyEWindowOperatorInfo(void* param);
static void doEventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInfo, SSDataBlock* pBlock);
static SSDataBlock* eventWindowAggregate(SOperatorInfo* pOperator);
static void destroyEWindowOperatorInfo(void* param);
static void eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInfo, SSDataBlock* pBlock);
static SSDataBlock* doEventWindowAgg(SOperatorInfo* pOperator);
// todo : move to util
......@@ -67,7 +70,7 @@ static void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, b
ts[4] = pWin->ekey + delta; // window end key
}
SOperatorInfo* createEventwindowOperatorInfo(SOperatorInfo* downstream, SEventWinodwPhysiNode* pStateNode,
SOperatorInfo* createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode,
SExecTaskInfo* pTaskInfo) {
SEventWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SEventWindowOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
......@@ -75,27 +78,29 @@ SOperatorInfo* createEventwindowOperatorInfo(SOperatorInfo* downstream, SEventWi
goto _error;
}
int32_t tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId;
SColumnNode* pColNode = (SColumnNode*)((STargetNode*)pStateNode->pStartCond)->pExpr;
SEventWinodwPhysiNode* pEventWindowNode = (SEventWinodwPhysiNode*)physiNode;
int32_t tsSlotId = ((SColumnNode*)pEventWindowNode->window.pTspk)->slotId;
int32_t code = filterInitFromNode((SNode*)pEventWindowNode->pStartCond, &pInfo->pStartCondInfo, 0);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
code = filterInitFromNode((SNode*)pEventWindowNode->pEndCond, &pInfo->pEndCondInfo, 0);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
if (pStateNode->window.pExprs != NULL) {
if (pEventWindowNode->window.pExprs != NULL) {
int32_t numOfScalarExpr = 0;
SExprInfo* pScalarExprInfo = createExprInfo(pStateNode->window.pExprs, NULL, &numOfScalarExpr);
int32_t code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr);
SExprInfo* pScalarExprInfo = createExprInfo(pEventWindowNode->window.pExprs, NULL, &numOfScalarExpr);
code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
}
pInfo->stateCol = extractColumnFromColumnNode(pColNode);
pInfo->stateKey.type = pInfo->stateCol.type;
pInfo->stateKey.bytes = pInfo->stateCol.bytes;
pInfo->stateKey.pData = taosMemoryCalloc(1, pInfo->stateCol.bytes);
if (pInfo->stateKey.pData == NULL) {
goto _error;
}
int32_t code = filterInitFromNode((SNode*)pStateNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
code = filterInitFromNode((SNode*)pEventWindowNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
......@@ -103,7 +108,7 @@ SOperatorInfo* createEventwindowOperatorInfo(SOperatorInfo* downstream, SEventWi
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num);
SExprInfo* pExprInfo = createExprInfo(pEventWindowNode->window.pFuncs, NULL, &num);
initResultSizeInfo(&pOperator->resultInfo, 4096);
code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str);
......@@ -111,20 +116,22 @@ SOperatorInfo* createEventwindowOperatorInfo(SOperatorInfo* downstream, SEventWi
goto _error;
}
SSDataBlock* pResBlock = createDataBlockFromDescNode(pStateNode->window.node.pOutputDataBlockDesc);
SSDataBlock* pResBlock = createDataBlockFromDescNode(pEventWindowNode->window.node.pOutputDataBlockDesc);
blockDataEnsureCapacity(pResBlock, pOperator->resultInfo.capacity);
initBasicInfo(&pInfo->binfo, pResBlock);
initResultRowInfo(&pInfo->binfo.resultRowInfo);
pInfo->twAggSup =
(STimeWindowAggSupp){.waterMark = pStateNode->window.watermark, .calTrigger = pStateNode->window.triggerType};
pInfo->twAggSup = (STimeWindowAggSupp){.waterMark = pEventWindowNode->window.watermark,
.calTrigger = pEventWindowNode->window.triggerType};
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
pInfo->tsSlotId = tsSlotId;
setOperatorInfo(pOperator, "StateWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo,
setOperatorInfo(pOperator, "EventWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo,
pTaskInfo);
pOperator->fpSet = createOperatorFpSet(openEventWindowAggOptr, doEventWindowAgg, NULL, destroyEWindowOperatorInfo,
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, eventWindowAggregate, NULL, destroyEWindowOperatorInfo,
optrDefaultBufFn, NULL);
code = appendDownstream(pOperator, &downstream, 1);
......@@ -158,17 +165,14 @@ void destroyEWindowOperatorInfo(void* param) {
taosMemoryFreeClear(param);
}
static int32_t openEventWindowAggOptr(SOperatorInfo* pOperator) {
if (OPTR_IS_OPENED(pOperator)) {
return TSDB_CODE_SUCCESS;
}
static SSDataBlock* eventWindowAggregate(SOperatorInfo* pOperator) {
SEventWindowOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SExprSupp* pSup = &pOperator->exprSupp;
int32_t order = TSDB_ORDER_ASC;
int64_t st = taosGetTimestampUs();
blockDataCleanup(pInfo->binfo.pRes);
SOperatorInfo* downstream = pOperator->pDownstream[0];
while (1) {
......@@ -189,133 +193,113 @@ static int32_t openEventWindowAggOptr(SOperatorInfo* pOperator) {
}
}
doEventWindowAggImpl(pOperator, pInfo, pBlock);
eventWindowAggImpl(pOperator, pInfo, pBlock);
}
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
pOperator->status = OP_RES_TO_RETURN;
return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes;
}
static int32_t setSingleOutputTupleBufv1(SResultRowInfo* pResultRowInfo, STimeWindow* win, SResultRow** pResult,
SExprSupp* pExprSup, SAggSupporter* pAggSup) {
if (*pResult == NULL) {
SResultRow* p = taosMemoryCalloc(1, pAggSup->resultRowSize);
pResultRowInfo->cur = (SResultRowPosition){.pageId = p->pageId, .offset = p->offset};
*pResult = p;
}
(*pResult)->win = *win;
setResultRowInitCtx(*pResult, pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
return TSDB_CODE_SUCCESS;
}
void doEventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInfo, SSDataBlock* pBlock) {
static void doEventWindowAggImpl(SEventWindowOperatorInfo* pInfo, SExprSupp* pSup, int32_t startIndex, int32_t endIndex,
const SSDataBlock* pBlock, int64_t* tsList, SExecTaskInfo* pTaskInfo) {
SWindowRowsSup* pRowSup = &pInfo->winSup;
int32_t numOfOutput = pSup->numOfExprs;
int32_t numOfRows = endIndex - startIndex + 1;
doKeepTuple(pRowSup, tsList[endIndex], pBlock->info.id.groupId);
int32_t ret =
setSingleOutputTupleBufv1(&pInfo->binfo.resultRowInfo, &pRowSup->win, &pInfo->pRow, pSup, &pInfo->aggSup);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR);
}
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false);
applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startIndex, numOfRows,
pBlock->info.rows, numOfOutput);
}
void eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInfo, SSDataBlock* pBlock) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SExprSupp* pSup = &pOperator->exprSupp;
SColumnInfoData* pStateColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->stateCol.slotId);
int64_t gid = pBlock->info.id.groupId;
bool masterScan = true;
int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
int16_t bytes = pStateColInfoData->info.bytes;
int64_t gid = pBlock->info.id.groupId;
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
TSKEY* tsList = (TSKEY*)pColInfoData->pData;
SColumnInfoData *ps = NULL, *pe = NULL;
SWindowRowsSup* pRowSup = &pInfo->winSup;
pRowSup->numOfRows = 0;
struct SColumnDataAgg* pAgg = NULL;
for (int32_t j = 0; j < pBlock->info.rows; ++j) {
pAgg = (pBlock->pBlockAgg != NULL) ? pBlock->pBlockAgg[pInfo->stateCol.slotId] : NULL;
if (colDataIsNull(pStateColInfoData, pBlock->info.rows, j, pAgg)) {
continue;
}
SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock};
int32_t code = filterSetDataFromSlotId(pInfo->pStartCondInfo, &param1);
char* val = colDataGetData(pStateColInfoData, j);
int32_t status1 = 0;
bool keep1 = filterExecute(pInfo->pStartCondInfo, pBlock, &ps, NULL, param1.numOfCols, &status1);
if (gid != pRowSup->groupId || !pInfo->hasKey) {
// todo extract method
if (IS_VAR_DATA_TYPE(pInfo->stateKey.type)) {
varDataCopy(pInfo->stateKey.pData, val);
} else {
memcpy(pInfo->stateKey.pData, val, bytes);
}
SFilterColumnParam param2 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock};
code = filterSetDataFromSlotId(pInfo->pEndCondInfo, &param2);
int32_t status2 = 0;
bool keep2 = filterExecute(pInfo->pEndCondInfo, pBlock, &pe, NULL, param2.numOfCols, &status2);
pInfo->hasKey = true;
int32_t rowIndex = 0;
int32_t startIndex = pInfo->inWindow ? 0 : -1;
doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
doKeepTuple(pRowSup, tsList[j], gid);
} else if (compareVal(val, &pInfo->stateKey)) {
doKeepTuple(pRowSup, tsList[j], gid);
if (j == 0 && pRowSup->startRowIndex != 0) {
pRowSup->startRowIndex = 0;
while (rowIndex < pBlock->info.rows) {
if (pInfo->inWindow) { // let's find the first end value
for (rowIndex = startIndex; rowIndex < pBlock->info.rows; ++rowIndex) {
if (((bool*)pe->pData)[rowIndex]) {
break;
}
}
} else { // a new state window started
SResultRow* pResult = NULL;
// keep the time window for the closed time window.
STimeWindow window = pRowSup->win;
if (rowIndex < pBlock->info.rows) {
doEventWindowAggImpl(pInfo, pSup, startIndex, rowIndex, pBlock, tsList, pTaskInfo);
pRowSup->win.ekey = pRowSup->win.skey;
int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx,
numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR);
}
// todo check if pblock buffer is enough
doUpdateNumOfRows(pSup->pCtx, pInfo->pRow, pSup->numOfExprs, pSup->rowEntryInfoOffset);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, false);
applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
copyResultrowToDataBlock(pSup->pExprInfo, pSup->numOfExprs, pInfo->pRow, pSup->pCtx, pInfo->binfo.pRes,
pSup->rowEntryInfoOffset, pTaskInfo);
// here we start a new session window
doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
doKeepTuple(pRowSup, tsList[j], gid);
pInfo->binfo.pRes->info.rows += pInfo->pRow->numOfRows;
// todo extract method
if (IS_VAR_DATA_TYPE(pInfo->stateKey.type)) {
varDataCopy(pInfo->stateKey.pData, val);
pInfo->inWindow = false;
rowIndex += 1;
} else {
memcpy(pInfo->stateKey.pData, val, bytes);
doEventWindowAggImpl(pInfo, pSup, startIndex, pBlock->info.rows - 1, pBlock, tsList, pTaskInfo);
}
} else { // find the first start value that is fulfill for the start condition
for (; rowIndex < pBlock->info.rows; ++rowIndex) {
if (((bool*)ps->pData)[rowIndex]) {
doKeepNewWindowStartInfo(pRowSup, tsList, rowIndex, gid);
pInfo->inWindow = true;
startIndex = rowIndex;
break;
}
}
}
}
SResultRow* pResult = NULL;
pRowSup->win.ekey = tsList[pBlock->info.rows - 1];
int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win, masterScan, &pResult, gid,
pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR);
}
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false);
applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows,
pBlock->info.rows, numOfOutput);
}
SSDataBlock* doEventWindowAgg(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SEventWindowOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SOptrBasicInfo* pBInfo = &pInfo->binfo;
pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
setOperatorCompleted(pOperator);
return NULL;
}
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
while (1) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
if (!hasRemain) {
setOperatorCompleted(pOperator);
break;
}
if (pBInfo->pRes->info.rows > 0) {
break;
if (pInfo->inWindow) {
continue;
} else {
break;
}
}
}
pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
return (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
}
......@@ -936,8 +936,7 @@ static void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, u
pAggInfo->groupId = groupId;
}
static void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs,
const int32_t* rowEntryOffset) {
void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs, const int32_t* rowEntryOffset) {
bool returnNotNull = false;
for (int32_t j = 0; j < numOfExprs; ++j) {
SResultRowEntryInfo* pResInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
......@@ -960,8 +959,8 @@ static void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t nu
}
}
static void doCopyResultToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultRow* pRow, SqlFunctionCtx* pCtx,
SSDataBlock* pBlock, const int32_t* rowEntryOffset, SExecTaskInfo* pTaskInfo) {
void copyResultrowToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultRow* pRow, SqlFunctionCtx* pCtx,
SSDataBlock* pBlock, const int32_t* rowEntryOffset, SExecTaskInfo* pTaskInfo) {
for (int32_t j = 0; j < numOfExprs; ++j) {
int32_t slotId = pExprInfo[j].base.resSchema.slotId;
......@@ -1022,7 +1021,7 @@ int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPos
T_LONG_JMP(pTaskInfo->env, code);
}
doCopyResultToDataBlock(pExprInfo, pSup->numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo);
copyResultrowToDataBlock(pExprInfo, pSup->numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo);
releaseBufPage(pBuf, page);
pBlock->info.rows += pRow->numOfRows;
......@@ -1070,7 +1069,7 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS
}
pGroupResInfo->index += 1;
doCopyResultToDataBlock(pExprInfo, numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo);
copyResultrowToDataBlock(pExprInfo, numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo);
releaseBufPage(pBuf, page);
pBlock->info.rows += pRow->numOfRows;
......@@ -2087,8 +2086,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pOperator = createCacherowsScanOperator(pScanNode, pHandle, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) {
pOperator = createProjectOperatorInfo(NULL, (SProjectPhysiNode*)pPhyNode, pTaskInfo);
} else {
ASSERT(0);
}
if (pOperator != NULL) {
......@@ -2179,8 +2176,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pOptr = createIndefinitOutputOperatorInfo(ops[0], pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC == type) {
pOptr = createTimeSliceOperatorInfo(ops[0], pPhyNode, pTaskInfo);
} else {
ASSERT(0);
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT == type) {
pOptr = createEventwindowOperatorInfo(ops[0], pPhyNode, pTaskInfo);
}
taosMemoryFree(ops);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册