提交 25b735f0 编写于 作者: 5 54liuyao

feat(stream): delete result

上级 cd22be63
...@@ -142,6 +142,7 @@ typedef struct SqlFunctionCtx { ...@@ -142,6 +142,7 @@ typedef struct SqlFunctionCtx {
struct SSDataBlock *pDstBlock; // used by indifinite rows function to set selectivity struct SSDataBlock *pDstBlock; // used by indifinite rows function to set selectivity
int32_t curBufPage; int32_t curBufPage;
bool increase; bool increase;
bool isStream;
char udfName[TSDB_FUNC_NAME_LEN]; char udfName[TSDB_FUNC_NAME_LEN];
} SqlFunctionCtx; } SqlFunctionCtx;
......
...@@ -987,6 +987,7 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, ...@@ -987,6 +987,7 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput,
pCtx->end.key = INT64_MIN; pCtx->end.key = INT64_MIN;
pCtx->numOfParams = pExpr->base.numOfParams; pCtx->numOfParams = pExpr->base.numOfParams;
pCtx->increase = false; pCtx->increase = false;
pCtx->isStream = false;
pCtx->param = pFunct->pParam; pCtx->param = pFunct->pParam;
} }
......
...@@ -1793,6 +1793,12 @@ void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SAggSuppor ...@@ -1793,6 +1793,12 @@ void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SAggSuppor
pScanInfo->sessionSup.pIntervalAggSup = pSup; pScanInfo->sessionSup.pIntervalAggSup = pSup;
} }
void initStreamFunciton(SqlFunctionCtx* pCtx, int32_t numOfExpr) {
for (int32_t i = 0; i < numOfExpr; i++) {
pCtx[i].isStream = true;
}
}
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
STimeWindowAggSupp* pTwAggSupp, SIntervalPhysiNode* pPhyNode, STimeWindowAggSupp* pTwAggSupp, SIntervalPhysiNode* pPhyNode,
...@@ -1835,6 +1841,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* ...@@ -1835,6 +1841,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
if (isStream) { if (isStream) {
ASSERT(numOfCols > 0); ASSERT(numOfCols > 0);
increaseTs(pSup->pCtx); increaseTs(pSup->pCtx);
initStreamFunciton(pSup->pCtx, pSup->numOfExprs);
} }
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win);
...@@ -3329,6 +3336,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, ...@@ -3329,6 +3336,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
initStreamFunciton(pOperator->exprSupp.pCtx, pOperator->exprSupp.numOfExprs);
initBasicInfo(&pInfo->binfo, pResBlock); initBasicInfo(&pInfo->binfo, pResBlock);
ASSERT(numOfCols > 0); ASSERT(numOfCols > 0);
...@@ -3470,6 +3478,7 @@ int32_t initBasicInfoEx(SOptrBasicInfo* pBasicInfo, SExprSupp* pSup, SExprInfo* ...@@ -3470,6 +3478,7 @@ int32_t initBasicInfoEx(SOptrBasicInfo* pBasicInfo, SExprSupp* pSup, SExprInfo*
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
initStreamFunciton(pSup->pCtx, pSup->numOfExprs);
initBasicInfo(pBasicInfo, pResultBlock); initBasicInfo(pBasicInfo, pResultBlock);
...@@ -4569,8 +4578,8 @@ SStateWindowInfo* getStateWindow(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_ ...@@ -4569,8 +4578,8 @@ SStateWindowInfo* getStateWindow(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_
return insertNewStateWindow(pWinInfos, ts, pKeyData, index + 1, pCol); return insertNewStateWindow(pWinInfos, ts, pKeyData, index + 1, pCol);
} }
int32_t updateStateWindowInfo(SArray* pWinInfos, int32_t winIndex, TSKEY* pTs, SColumnInfoData* pKeyCol, int32_t rows, int32_t updateStateWindowInfo(SArray* pWinInfos, int32_t winIndex, TSKEY* pTs, uint64_t groupId,
int32_t start, bool* allEqual, SHashObj* pSeDelete) { SColumnInfoData* pKeyCol, int32_t rows, int32_t start, bool* allEqual, SHashObj* pSeDeleted) {
*allEqual = true; *allEqual = true;
SStateWindowInfo* pWinInfo = taosArrayGet(pWinInfos, winIndex); SStateWindowInfo* pWinInfo = taosArrayGet(pWinInfos, winIndex);
for (int32_t i = start; i < rows; ++i) { for (int32_t i = start; i < rows; ++i) {
...@@ -4590,9 +4599,10 @@ int32_t updateStateWindowInfo(SArray* pWinInfos, int32_t winIndex, TSKEY* pTs, S ...@@ -4590,9 +4599,10 @@ int32_t updateStateWindowInfo(SArray* pWinInfos, int32_t winIndex, TSKEY* pTs, S
} }
} }
if (pWinInfo->winInfo.win.skey > pTs[i]) { if (pWinInfo->winInfo.win.skey > pTs[i]) {
if (pSeDelete && pWinInfo->winInfo.isOutput) { if (pSeDeleted && pWinInfo->winInfo.isOutput) {
taosHashPut(pSeDelete, &pWinInfo->winInfo.pos, sizeof(SResultRowPosition), &pWinInfo->winInfo.win.skey, SWinRes res = {.ts = pWinInfo->winInfo.win.skey, .groupId = groupId};
sizeof(TSKEY)); taosHashPut(pSeDeleted, &pWinInfo->winInfo.pos, sizeof(SResultRowPosition), &res,
sizeof(SWinRes));
pWinInfo->winInfo.isOutput = false; pWinInfo->winInfo.isOutput = false;
} }
pWinInfo->winInfo.win.skey = pTs[i]; pWinInfo->winInfo.win.skey = pTs[i];
...@@ -4605,22 +4615,23 @@ int32_t updateStateWindowInfo(SArray* pWinInfos, int32_t winIndex, TSKEY* pTs, S ...@@ -4605,22 +4615,23 @@ int32_t updateStateWindowInfo(SArray* pWinInfos, int32_t winIndex, TSKEY* pTs, S
return rows - start; return rows - start;
} }
static void doClearStateWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, int32_t tsIndex, SColumn* pCol, static void doClearStateWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock,
int32_t keyIndex, SHashObj* pSeUpdated, SHashObj* pSeDeleted) { int32_t tsIndex, SColumn* pCol, int32_t keyIndex, SHashObj* pSeUpdated, SHashObj* pSeDeleted) {
SColumnInfoData* pTsColInfo = taosArrayGet(pBlock->pDataBlock, tsIndex); SColumnInfoData* pTsColInfo = taosArrayGet(pBlock->pDataBlock, tsIndex);
SColumnInfoData* pKeyColInfo = taosArrayGet(pBlock->pDataBlock, keyIndex); SColumnInfoData* pKeyColInfo = taosArrayGet(pBlock->pDataBlock, keyIndex);
TSKEY* tsCol = (TSKEY*)pTsColInfo->pData; TSKEY* tsCol = (TSKEY*)pTsColInfo->pData;
bool allEqual = false; bool allEqual = false;
int32_t step = 1; int32_t step = 1;
uint64_t groupId = pBlock->info.groupId;
for (int32_t i = 0; i < pBlock->info.rows; i += step) { for (int32_t i = 0; i < pBlock->info.rows; i += step) {
char* pKeyData = colDataGetData(pKeyColInfo, i); char* pKeyData = colDataGetData(pKeyColInfo, i);
int32_t winIndex = 0; int32_t winIndex = 0;
SStateWindowInfo* pCurWin = getStateWindowByTs(pAggSup, tsCol[i], pBlock->info.groupId, &winIndex); SStateWindowInfo* pCurWin = getStateWindowByTs(pAggSup, tsCol[i], groupId, &winIndex);
if (!pCurWin) { if (!pCurWin) {
continue; continue;
} }
step = updateStateWindowInfo(pAggSup->pCurWins, winIndex, tsCol, pKeyColInfo, pBlock->info.rows, i, &allEqual, step = updateStateWindowInfo(pAggSup->pCurWins, winIndex, tsCol, groupId, pKeyColInfo,
pSeDeleted); pBlock->info.rows, i, &allEqual, pSeDeleted);
ASSERT(isTsInWindow(pCurWin, tsCol[i]) || isEqualStateKey(pCurWin, pKeyData)); ASSERT(isTsInWindow(pCurWin, tsCol[i]) || isEqualStateKey(pCurWin, pKeyData));
taosHashRemove(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition)); taosHashRemove(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition));
deleteWindow(pAggSup->pCurWins, winIndex, destroyStateWinInfo); deleteWindow(pAggSup->pCurWins, winIndex, destroyStateWinInfo);
...@@ -4659,12 +4670,12 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl ...@@ -4659,12 +4670,12 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
int32_t winIndex = 0; int32_t winIndex = 0;
bool allEqual = true; bool allEqual = true;
SStateWindowInfo* pCurWin = SStateWindowInfo* pCurWin =
getStateWindow(pAggSup, tsCols[i], pSDataBlock->info.groupId, pKeyData, &pInfo->stateCol, &winIndex); getStateWindow(pAggSup, tsCols[i], groupId, pKeyData, &pInfo->stateCol, &winIndex);
winRows = updateStateWindowInfo(pAggSup->pCurWins, winIndex, tsCols, pKeyColInfo, pSDataBlock->info.rows, i, winRows = updateStateWindowInfo(pAggSup->pCurWins, winIndex, tsCols, groupId, pKeyColInfo,
&allEqual, pInfo->pSeDeleted); pSDataBlock->info.rows, i, &allEqual, pStDeleted);
if (!allEqual) { if (!allEqual) {
appendOneRow(pAggSup->pScanBlock, &pCurWin->winInfo.win.skey, &pCurWin->winInfo.win.ekey, appendOneRow(pAggSup->pScanBlock, &pCurWin->winInfo.win.skey, &pCurWin->winInfo.win.ekey,
&pSDataBlock->info.groupId); &groupId);
taosHashRemove(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition)); taosHashRemove(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition));
deleteWindow(pAggSup->pCurWins, winIndex, destroyStateWinInfo); deleteWindow(pAggSup->pCurWins, winIndex, destroyStateWinInfo);
continue; continue;
...@@ -4828,9 +4839,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys ...@@ -4828,9 +4839,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pSeDeleted = taosHashInit(64, hashFn, true, HASH_NO_LOCK); pInfo->pSeDeleted = taosHashInit(64, hashFn, true, HASH_NO_LOCK);
pInfo->pDelIterator = NULL; pInfo->pDelIterator = NULL;
// pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT); pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
pInfo->pDelRes = createOneDataBlock(pInfo->binfo.pRes, false); // todo(liuyao) for delete
pInfo->pDelRes->info.type = STREAM_DELETE_RESULT; // todo(liuyao) for delete
pInfo->pChildren = NULL; pInfo->pChildren = NULL;
pInfo->ignoreExpiredData = pStateNode->window.igExpired; pInfo->ignoreExpiredData = pStateNode->window.igExpired;
......
...@@ -5,15 +5,15 @@ sleep 50 ...@@ -5,15 +5,15 @@ sleep 50
sql connect sql connect
print =============== create database print =============== create database
sql create database test vgroups 1 sql create database test vgroups 1;
sql select * from information_schema.ins_databases sql select * from information_schema.ins_databases;
if $rows != 3 then if $rows != 3 then
return -1 return -1
endi endi
print $data00 $data01 $data02 print $data00 $data01 $data02
sql use test sql use test;
sql create table t1(ts timestamp, a int, b int , c int, d double, id int); sql create table t1(ts timestamp, a int, b int , c int, d double, id int);
sql create stream streams1 trigger at_once into streamt1 as select _wstart, count(*) c1, count(d) c2 , sum(a) c3 , max(a) c4, min(c) c5, max(id) c from t1 state_window(a); sql create stream streams1 trigger at_once into streamt1 as select _wstart, count(*) c1, count(d) c2 , sum(a) c3 , max(a) c4, min(c) c5, max(id) c from t1 state_window(a);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册