提交 24656202 编写于 作者: 5 54liuyao

feat(stream): support expressions

上级 75ed474f
......@@ -1746,7 +1746,7 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
for (int32_t k = 0; k < colNum; k++) {
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
if (colDataIsNull(pColInfoData, rows, j, NULL)) {
if (colDataIsNull(pColInfoData, rows, j, NULL) || !var) {
len += snprintf(dumpBuf + len, size - len, " %15s |", "NULL");
if (len >= size -1) return dumpBuf;
continue;
......
......@@ -481,7 +481,7 @@ typedef struct SStreamFinalIntervalOperatorInfo {
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
SOptrBasicInfo binfo; // basic info
SAggSupporter aggSup; // aggregate supporter
SExprSupp scalarSupp; // supporter for perform scalar function
SGroupResInfo groupResInfo; // multiple results build supporter
SInterval interval; // interval info
int32_t primaryTsIndex; // primary time stamp slot id from result of downstream operator.
......@@ -629,6 +629,7 @@ typedef struct SStateWindowInfo {
typedef struct SStreamSessionAggOperatorInfo {
SOptrBasicInfo binfo;
SStreamAggSupporter streamAggSup;
SExprSupp scalarSupp; // supporter for perform scalar function
SGroupResInfo groupResInfo;
int64_t gap; // session window gap
int32_t primaryTsIndex; // primary timestamp slot id
......@@ -679,11 +680,12 @@ typedef struct SStateWindowOperatorInfo {
typedef struct SStreamStateAggOperatorInfo {
SOptrBasicInfo binfo;
SStreamAggSupporter streamAggSup;
SExprSupp scalarSupp; // supporter for perform scalar function
SGroupResInfo groupResInfo;
int32_t primaryTsIndex; // primary timestamp slot id
int32_t order; // current SSDataBlock scan order
STimeWindowAggSupp twAggSup;
SColumn stateCol; // start row index
SColumn stateCol;
SqlFunctionCtx* pDummyCtx; // for combine
SSDataBlock* pDelRes;
SHashObj* pSeDeleted;
......
......@@ -2401,6 +2401,12 @@ void addPullWindow(SHashObj* pMap, SWinRes* pWinRes, int32_t size) {
static int32_t getChildIndex(SSDataBlock* pBlock) { return pBlock->info.childId; }
STimeWindow getFinalTimeWindow(int64_t ts, SInterval* pInterval) {
STimeWindow w = {.skey = ts, .ekey = INT64_MAX};
w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1;
return w;
}
static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, uint64_t tableGroupId,
SArray* pUpdated) {
SStreamFinalIntervalOperatorInfo* pInfo = (SStreamFinalIntervalOperatorInfo*)pOperatorInfo->info;
......@@ -2420,8 +2426,12 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
int32_t startPos = ascScan ? 0 : (pSDataBlock->info.rows - 1);
TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols);
STimeWindow nextWin =
getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->order);
STimeWindow nextWin = {0};
if (IS_FINAL_OP(pInfo)) {
nextWin = getFinalTimeWindow(ts, &pInfo->interval);
} else {
nextWin = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->order);
}
while (1) {
bool isClosed = isCloseWindow(&nextWin, &pInfo->twAggSup);
if (pInfo->ignoreExpiredData && isClosed) {
......@@ -2478,8 +2488,12 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey, NULL,
if (IS_FINAL_OP(pInfo)) {
forwardRows = 1;
} else {
forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey, NULL,
TSDB_ORDER_ASC);
}
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdated) {
saveResultRow(pResult, tableGroupId, pUpdated);
}
......@@ -2645,7 +2659,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
clearSpecialDataBlock(pInfo->pUpdateRes);
removeDeleteResults(pUpdated, pInfo->pDelWins);
pOperator->status = OP_RES_TO_RETURN;
qInfo("Stream Final Interval return data");
qInfo("%s return data", IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
break;
}
printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "interval Final recv" : "interval Semi recv");
......@@ -2705,6 +2719,10 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
continue;
}
if (pInfo->scalarSupp.pExprInfo != NULL) {
SExprSupp* pExprSup = &pInfo->scalarSupp;
projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
}
setInputDataBlock(pOperator, pSup->pCtx, pBlock, pInfo->order, MAIN_SCAN, true);
doHashInterval(pOperator, pBlock, pBlock->info.groupId, pUpdated);
if (IS_FINAL_OP(pInfo)) {
......@@ -2822,6 +2840,15 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
initResultSizeInfo(pOperator, 4096);
if (pIntervalPhyNode->window.pExprs != NULL) {
int32_t numOfScalar = 0;
SExprInfo* pScalarExprInfo = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &numOfScalar);
int32_t code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
}
int32_t numOfCols = 0;
SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &numOfCols);
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
......@@ -2988,6 +3015,14 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
}
initResultSizeInfo(pOperator, 4096);
if (pSessionNode->window.pExprs != NULL) {
int32_t numOfScalar = 0;
SExprInfo* pScalarExprInfo = createExprInfo(pSessionNode->window.pExprs, NULL, &numOfScalar);
int32_t code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
}
SExprSupp* pSup = &pOperator->exprSupp;
code = initBasicInfoEx(&pInfo->binfo, pSup, pExprInfo, numOfCols, pResBlock);
......@@ -3656,6 +3691,10 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
continue;
}
if (pInfo->scalarSupp.pExprInfo != NULL) {
SExprSupp* pExprSup = &pInfo->scalarSupp;
projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
}
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pSup->pCtx, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
doStreamSessionAggImpl(pOperator, pBlock, pStUpdated, pInfo->pStDeleted, IS_FINAL_OP(pInfo));
......@@ -3788,6 +3827,10 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
continue;
}
if (pInfo->scalarSupp.pExprInfo != NULL) {
SExprSupp* pExprSup = &pInfo->scalarSupp;
projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
}
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pSup->pCtx, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
doStreamSessionAggImpl(pOperator, pBlock, pStUpdated, pInfo->pStDeleted, false);
......@@ -4184,6 +4227,10 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
continue;
}
if (pInfo->scalarSupp.pExprInfo != NULL) {
SExprSupp* pExprSup = &pInfo->scalarSupp;
projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
}
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pSup->pCtx, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
doStreamStateAggImpl(pOperator, pBlock, pSeUpdated, pInfo->pSeDeleted);
......@@ -4237,6 +4284,15 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->stateCol = extractColumnFromColumnNode(pColNode);
initResultSizeInfo(pOperator, 4096);
if (pStateNode->window.pExprs != NULL) {
int32_t numOfScalar = 0;
SExprInfo* pScalarExprInfo = createExprInfo(pStateNode->window.pExprs, NULL, &numOfScalar);
int32_t code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
}
initResultRowInfo(&pInfo->binfo.resultRowInfo);
pInfo->twAggSup = (STimeWindowAggSupp){
.waterMark = pStateNode->window.watermark,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册