提交 8caca975 编写于 作者: 5 54liuyao

feat:add delete mark for sma

上级 2d6fc970
......@@ -1946,9 +1946,9 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
int32_t len = 0;
len += snprintf(dumpBuf + len, size - len,
"===stream===%s|block type %d|child id %d|group id:%" PRIu64 "|uid:%" PRId64
"|rows:%d|version:%" PRIu64 "\n",
"|rows:%d|version:%" PRIu64 "|cal start:%" PRIu64 "|cal end:%" PRIu64 "\n",
flag, (int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.id.groupId,
pDataBlock->info.id.uid, pDataBlock->info.rows, pDataBlock->info.version);
pDataBlock->info.id.uid, pDataBlock->info.rows, pDataBlock->info.version, pDataBlock->info.calWin.skey, pDataBlock->info.calWin.ekey);
if (len >= size - 1) return dumpBuf;
for (int32_t j = 0; j < rows; j++) {
......
......@@ -23,6 +23,7 @@
#include "ttime.h"
#define IS_FINAL_OP(op) ((op)->isFinal)
#define DEAULT_DELETE_MARK (1000LL * 60LL * 60LL * 24LL * 365LL * 10LL);
typedef struct SSessionAggOperatorInfo {
SOptrBasicInfo binfo;
......@@ -56,6 +57,7 @@ typedef enum SResultTsInterpType {
typedef struct SPullWindowInfo {
STimeWindow window;
uint64_t groupId;
STimeWindow calWin;
} SPullWindowInfo;
typedef struct SOpenWindowInfo {
......@@ -793,17 +795,18 @@ int32_t comparePullWinKey(void* pKey, void* data, int32_t index) {
SArray* res = (SArray*)data;
SPullWindowInfo* pos = taosArrayGet(res, index);
SPullWindowInfo* pData = (SPullWindowInfo*)pKey;
if (pData->window.skey == pos->window.skey) {
if (pData->groupId > pos->groupId) {
return 1;
} else if (pData->groupId < pos->groupId) {
return -1;
}
return 0;
} else if (pData->window.skey > pos->window.skey) {
if (pData->groupId > pos->groupId) {
return 1;
} else if (pData->groupId < pos->groupId) {
return -1;
}
if (pData->window.skey > pos->window.ekey) {
return 1;
} else if (pData->window.ekey < pos->window.skey) {
return -1;
}
return -1;
return 0;
}
static int32_t savePullWindow(SPullWindowInfo* pPullInfo, SArray* pPullWins) {
......@@ -812,10 +815,16 @@ static int32_t savePullWindow(SPullWindowInfo* pPullInfo, SArray* pPullWins) {
if (index == -1) {
index = 0;
} else {
if (comparePullWinKey(pPullInfo, pPullWins, index) > 0) {
index++;
} else {
int32_t code = comparePullWinKey(pPullInfo, pPullWins, index);
if (code == 0) {
SPullWindowInfo* pos = taosArrayGet(pPullWins ,index);
pos->window.skey = TMIN(pos->window.skey, pPullInfo->window.skey);
pos->window.ekey = TMAX(pos->window.ekey, pPullInfo->window.ekey);
pos->calWin.skey = TMIN(pos->calWin.skey, pPullInfo->calWin.skey);
pos->calWin.ekey = TMAX(pos->calWin.ekey, pPullInfo->calWin.ekey);
return TSDB_CODE_SUCCESS;
} else if (code > 0 ){
index++;
}
}
if (taosArrayInsert(pPullWins, index, pPullInfo) == NULL) {
......@@ -2255,8 +2264,8 @@ static void doBuildPullDataBlock(SArray* array, int32_t* pIndex, SSDataBlock* pB
colDataAppend(pStartTs, pBlock->info.rows, (const char*)&pWin->window.skey, false);
colDataAppend(pEndTs, pBlock->info.rows, (const char*)&pWin->window.ekey, false);
colDataAppend(pGroupId, pBlock->info.rows, (const char*)&pWin->groupId, false);
colDataAppend(pCalStartTs, pBlock->info.rows, (const char*)&pWin->window.skey, false);
colDataAppend(pCalEndTs, pBlock->info.rows, (const char*)&pWin->window.ekey, false);
colDataAppend(pCalStartTs, pBlock->info.rows, (const char*)&pWin->calWin.skey, false);
colDataAppend(pCalEndTs, pBlock->info.rows, (const char*)&pWin->calWin.ekey, false);
pBlock->info.rows++;
}
if ((*pIndex) == size) {
......@@ -2266,27 +2275,33 @@ static void doBuildPullDataBlock(SArray* array, int32_t* pIndex, SSDataBlock* pB
blockDataUpdateTsWindow(pBlock, 0);
}
void processPullOver(SSDataBlock* pBlock, SHashObj* pMap) {
void processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SInterval* pInterval) {
SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
TSKEY* tsData = (TSKEY*)pStartCol->pData;
SColumnInfoData* pEndCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
TSKEY* tsEndData = (TSKEY*)pEndCol->pData;
SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
uint64_t* groupIdData = (uint64_t*)pGroupCol->pData;
int32_t chId = getChildIndex(pBlock);
for (int32_t i = 0; i < pBlock->info.rows; i++) {
SWinKey winRes = {.ts = tsData[i], .groupId = groupIdData[i]};
void* chIds = taosHashGet(pMap, &winRes, sizeof(SWinKey));
if (chIds) {
SArray* chArray = *(SArray**)chIds;
int32_t index = taosArraySearchIdx(chArray, &chId, compareInt32Val, TD_EQ);
if (index != -1) {
qDebug("===stream===window %" PRId64 " delete child id %d", winRes.ts, chId);
taosArrayRemove(chArray, index);
if (taosArrayGetSize(chArray) == 0) {
// pull data is over
taosArrayDestroy(chArray);
taosHashRemove(pMap, &winRes, sizeof(SWinKey));
TSKEY winTs = tsData[i];
while (winTs < tsEndData[i]) {
SWinKey winRes = {.ts = winTs, .groupId = groupIdData[i]};
void* chIds = taosHashGet(pMap, &winRes, sizeof(SWinKey));
if (chIds) {
SArray* chArray = *(SArray**)chIds;
int32_t index = taosArraySearchIdx(chArray, &chId, compareInt32Val, TD_EQ);
if (index != -1) {
qDebug("===stream===window %" PRId64 " delete child id %d", winRes.ts, chId);
taosArrayRemove(chArray, index);
if (taosArrayGetSize(chArray) == 0) {
// pull data is over
taosArrayDestroy(chArray);
taosHashRemove(pMap, &winRes, sizeof(SWinKey));
}
}
}
winTs = taosTimeAdd(winTs, pInterval->sliding, pInterval->slidingUnit, pInterval->precision);
}
}
}
......@@ -2299,12 +2314,13 @@ static void addRetriveWindow(SArray* wins, SStreamIntervalOperatorInfo* pInfo) {
if (needDeleteWindowBuf(&nextWin, &pInfo->twAggSup) && !pInfo->ignoreExpiredData) {
void* chIds = taosHashGet(pInfo->pPullDataMap, winKey, sizeof(SWinKey));
if (!chIds) {
SPullWindowInfo pull = {.window = nextWin, .groupId = winKey->groupId};
SPullWindowInfo pull = {.window = nextWin, .groupId = winKey->groupId, .calWin.skey = nextWin.skey, .calWin.ekey = nextWin.skey};
// add pull data request
savePullWindow(&pull, pInfo->pPullWins);
int32_t size1 = taosArrayGetSize(pInfo->pChildren);
addPullWindow(pInfo->pPullDataMap, winKey, size1);
qDebug("===stream===prepare retrive for delete %" PRId64 ", size:%d", winKey->ts, size1);
if (savePullWindow(&pull, pInfo->pPullWins) == TSDB_CODE_SUCCESS) {
int32_t size1 = taosArrayGetSize(pInfo->pChildren);
addPullWindow(pInfo->pPullDataMap, winKey, size1);
qDebug("===stream===prepare retrive for delete %" PRId64 ", size:%d", winKey->ts, size1);
}
}
}
}
......@@ -2374,12 +2390,13 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p
};
void* chIds = taosHashGet(pInfo->pPullDataMap, &winRes, sizeof(SWinKey));
if (isDeletedStreamWindow(&nextWin, groupId, pInfo->pState, &pInfo->twAggSup) && !chIds) {
SPullWindowInfo pull = {.window = nextWin, .groupId = groupId};
SPullWindowInfo pull = {.window = nextWin, .groupId = groupId, .calWin.skey = nextWin.skey, .calWin.ekey = nextWin.skey};
// add pull data request
savePullWindow(&pull, pInfo->pPullWins);
int32_t size = taosArrayGetSize(pInfo->pChildren);
addPullWindow(pInfo->pPullDataMap, &winRes, size);
qDebug("===stream===prepare retrive %" PRId64 ", size:%d", winRes.ts, size);
if (savePullWindow(&pull, pInfo->pPullWins) == TSDB_CODE_SUCCESS) {
int32_t size = taosArrayGetSize(pInfo->pChildren);
addPullWindow(pInfo->pPullDataMap, &winRes, size);
qDebug("===stream===prepare retrive %" PRId64 ", size:%d", winRes.ts, size);
}
} else {
int32_t index = -1;
SArray* chArray = NULL;
......@@ -2560,7 +2577,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
}
continue;
} else if (pBlock->info.type == STREAM_PULL_OVER && IS_FINAL_OP(pInfo)) {
processPullOver(pBlock, pInfo->pPullDataMap);
processPullOver(pBlock, pInfo->pPullDataMap, &pInfo->interval);
continue;
}
......@@ -2638,6 +2655,15 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
return NULL;
}
int64_t getDeleteMark(SIntervalPhysiNode* pIntervalPhyNode) {
if (pIntervalPhyNode->window.deleteMark <= 0) {
return DEAULT_DELETE_MARK;
}
int64_t deleteMark = TMAX(pIntervalPhyNode->window.deleteMark,pIntervalPhyNode->window.watermark);
deleteMark = TMAX(deleteMark, pIntervalPhyNode->interval);
return deleteMark;
}
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
SExecTaskInfo* pTaskInfo, int32_t numOfChild) {
SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
......@@ -2659,9 +2685,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
.calTrigger = pIntervalPhyNode->window.triggerType,
.maxTs = INT64_MIN,
.minTs = INT64_MAX,
// for test 315360000000
.deleteMark = 1000LL * 60LL * 60LL * 24LL * 365LL * 10LL,
// .deleteMark = INT64_MAX,
.deleteMark = getDeleteMark(pIntervalPhyNode),
.deleteMarkSaved = 0,
.calTriggerSaved = 0,
};
......@@ -4805,7 +4829,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
.calTrigger = pIntervalPhyNode->window.triggerType,
.maxTs = INT64_MIN,
.minTs = INT64_MAX,
.deleteMark = INT64_MAX,
.deleteMark = getDeleteMark(pIntervalPhyNode),
};
ASSERT(twAggSupp.calTrigger != STREAM_TRIGGER_MAX_DELAY);
......
......@@ -461,7 +461,7 @@ sql insert into t2 values(1648791213004,4,10,10,4.1);
$loop_count = 0
loop2:
sleep 100
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 10 then
......@@ -519,7 +519,7 @@ print step 6
$loop_count = 0
loop3:
# sleep 300
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
......@@ -618,6 +618,60 @@ if $data41 != 2 then
goto loop4
endi
sql insert into t1 values(1648791343003,4,4,4,3.1);
sql insert into t1 values(1648791213004,4,5,5,4.1);
loop5:
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select * from streamt3;
# row 0
if $rows != 7 then
print =====rows=$rows
goto loop5
endi
if $data01 != 4 then
print =====data01=$data01
goto loop5
endi
if $data11 != 6 then
print =====data11=$data11
goto loop5
endi
if $data21 != 4 then
print =====data21=$data21
goto loop5
endi
if $data31 != 4 then
print =====data31=$data31
goto loop5
endi
if $data41 != 2 then
print =====data41=$data41
goto loop5
endi
if $data51 != 1 then
print =====data51=$data51
goto loop5
endi
if $data61 != 1 then
print =====data61=$data61
goto loop5
endi
$loop_all = $loop_all + 1
print ============loop_all=$loop_all
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册