未验证 提交 1dfb97be 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #16283 from taosdata/feature/TD-18530

feat(stream): steam state window delete result
......@@ -142,6 +142,7 @@ typedef struct SqlFunctionCtx {
struct SSDataBlock *pDstBlock; // used by indifinite rows function to set selectivity
int32_t curBufPage;
bool increase;
bool isStream;
char udfName[TSDB_FUNC_NAME_LEN];
} SqlFunctionCtx;
......
......@@ -987,6 +987,7 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput,
pCtx->end.key = INT64_MIN;
pCtx->numOfParams = pExpr->base.numOfParams;
pCtx->increase = false;
pCtx->isStream = false;
pCtx->param = pFunct->pParam;
}
......
......@@ -1793,6 +1793,12 @@ void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SAggSuppor
pScanInfo->sessionSup.pIntervalAggSup = pSup;
}
void initStreamFunciton(SqlFunctionCtx* pCtx, int32_t numOfExpr) {
for (int32_t i = 0; i < numOfExpr; i++) {
pCtx[i].isStream = true;
}
}
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
STimeWindowAggSupp* pTwAggSupp, SIntervalPhysiNode* pPhyNode,
......@@ -1835,6 +1841,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
if (isStream) {
ASSERT(numOfCols > 0);
increaseTs(pSup->pCtx);
initStreamFunciton(pSup->pCtx, pSup->numOfExprs);
}
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win);
......@@ -3329,6 +3336,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
initStreamFunciton(pOperator->exprSupp.pCtx, pOperator->exprSupp.numOfExprs);
initBasicInfo(&pInfo->binfo, pResBlock);
ASSERT(numOfCols > 0);
......@@ -3470,6 +3478,7 @@ int32_t initBasicInfoEx(SOptrBasicInfo* pBasicInfo, SExprSupp* pSup, SExprInfo*
if (code != TSDB_CODE_SUCCESS) {
return code;
}
initStreamFunciton(pSup->pCtx, pSup->numOfExprs);
initBasicInfo(pBasicInfo, pResultBlock);
......@@ -4569,8 +4578,8 @@ SStateWindowInfo* getStateWindow(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_
return insertNewStateWindow(pWinInfos, ts, pKeyData, index + 1, pCol);
}
int32_t updateStateWindowInfo(SArray* pWinInfos, int32_t winIndex, TSKEY* pTs, SColumnInfoData* pKeyCol, int32_t rows,
int32_t start, bool* allEqual, SHashObj* pSeDelete) {
int32_t updateStateWindowInfo(SArray* pWinInfos, int32_t winIndex, TSKEY* pTs, uint64_t groupId,
SColumnInfoData* pKeyCol, int32_t rows, int32_t start, bool* allEqual, SHashObj* pSeDeleted) {
*allEqual = true;
SStateWindowInfo* pWinInfo = taosArrayGet(pWinInfos, winIndex);
for (int32_t i = start; i < rows; ++i) {
......@@ -4590,9 +4599,10 @@ int32_t updateStateWindowInfo(SArray* pWinInfos, int32_t winIndex, TSKEY* pTs, S
}
}
if (pWinInfo->winInfo.win.skey > pTs[i]) {
if (pSeDelete && pWinInfo->winInfo.isOutput) {
taosHashPut(pSeDelete, &pWinInfo->winInfo.pos, sizeof(SResultRowPosition), &pWinInfo->winInfo.win.skey,
sizeof(TSKEY));
if (pSeDeleted && pWinInfo->winInfo.isOutput) {
SWinRes res = {.ts = pWinInfo->winInfo.win.skey, .groupId = groupId};
taosHashPut(pSeDeleted, &pWinInfo->winInfo.pos, sizeof(SResultRowPosition), &res,
sizeof(SWinRes));
pWinInfo->winInfo.isOutput = false;
}
pWinInfo->winInfo.win.skey = pTs[i];
......@@ -4605,22 +4615,23 @@ int32_t updateStateWindowInfo(SArray* pWinInfos, int32_t winIndex, TSKEY* pTs, S
return rows - start;
}
static void doClearStateWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, int32_t tsIndex, SColumn* pCol,
int32_t keyIndex, SHashObj* pSeUpdated, SHashObj* pSeDeleted) {
static void doClearStateWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock,
int32_t tsIndex, SColumn* pCol, int32_t keyIndex, SHashObj* pSeUpdated, SHashObj* pSeDeleted) {
SColumnInfoData* pTsColInfo = taosArrayGet(pBlock->pDataBlock, tsIndex);
SColumnInfoData* pKeyColInfo = taosArrayGet(pBlock->pDataBlock, keyIndex);
TSKEY* tsCol = (TSKEY*)pTsColInfo->pData;
bool allEqual = false;
int32_t step = 1;
uint64_t groupId = pBlock->info.groupId;
for (int32_t i = 0; i < pBlock->info.rows; i += step) {
char* pKeyData = colDataGetData(pKeyColInfo, i);
int32_t winIndex = 0;
SStateWindowInfo* pCurWin = getStateWindowByTs(pAggSup, tsCol[i], pBlock->info.groupId, &winIndex);
SStateWindowInfo* pCurWin = getStateWindowByTs(pAggSup, tsCol[i], groupId, &winIndex);
if (!pCurWin) {
continue;
}
step = updateStateWindowInfo(pAggSup->pCurWins, winIndex, tsCol, pKeyColInfo, pBlock->info.rows, i, &allEqual,
pSeDeleted);
step = updateStateWindowInfo(pAggSup->pCurWins, winIndex, tsCol, groupId, pKeyColInfo,
pBlock->info.rows, i, &allEqual, pSeDeleted);
ASSERT(isTsInWindow(pCurWin, tsCol[i]) || isEqualStateKey(pCurWin, pKeyData));
taosHashRemove(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition));
deleteWindow(pAggSup->pCurWins, winIndex, destroyStateWinInfo);
......@@ -4659,12 +4670,12 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
int32_t winIndex = 0;
bool allEqual = true;
SStateWindowInfo* pCurWin =
getStateWindow(pAggSup, tsCols[i], pSDataBlock->info.groupId, pKeyData, &pInfo->stateCol, &winIndex);
winRows = updateStateWindowInfo(pAggSup->pCurWins, winIndex, tsCols, pKeyColInfo, pSDataBlock->info.rows, i,
&allEqual, pInfo->pSeDeleted);
getStateWindow(pAggSup, tsCols[i], groupId, pKeyData, &pInfo->stateCol, &winIndex);
winRows = updateStateWindowInfo(pAggSup->pCurWins, winIndex, tsCols, groupId, pKeyColInfo,
pSDataBlock->info.rows, i, &allEqual, pStDeleted);
if (!allEqual) {
appendOneRow(pAggSup->pScanBlock, &pCurWin->winInfo.win.skey, &pCurWin->winInfo.win.ekey,
&pSDataBlock->info.groupId);
&groupId);
taosHashRemove(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition));
deleteWindow(pAggSup->pCurWins, winIndex, destroyStateWinInfo);
continue;
......@@ -4828,9 +4839,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pSeDeleted = taosHashInit(64, hashFn, true, HASH_NO_LOCK);
pInfo->pDelIterator = NULL;
// pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
pInfo->pDelRes = createOneDataBlock(pInfo->binfo.pRes, false); // todo(liuyao) for delete
pInfo->pDelRes->info.type = STREAM_DELETE_RESULT; // todo(liuyao) for delete
pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
pInfo->pChildren = NULL;
pInfo->ignoreExpiredData = pStateNode->window.igExpired;
......
......@@ -5,15 +5,15 @@ sleep 50
sql connect
print =============== create database
sql create database test vgroups 1
sql select * from information_schema.ins_databases
sql create database test vgroups 1;
sql select * from information_schema.ins_databases;
if $rows != 3 then
return -1
endi
print $data00 $data01 $data02
sql use test
sql use test;
sql create table t1(ts timestamp, a int, b int , c int, d double, id int);
sql create stream streams1 trigger at_once into streamt1 as select _wstart, count(*) c1, count(d) c2 , sum(a) c3 , max(a) c4, min(c) c5, max(id) c from t1 state_window(a);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册