提交 8fb78fd7 编写于 作者: 5 54liuyao

feat(stream):add ci

上级 78ff5f75
...@@ -809,23 +809,6 @@ static int32_t savePullWindow(SPullWindowInfo* pPullInfo, SArray* pPullWins) { ...@@ -809,23 +809,6 @@ static int32_t savePullWindow(SPullWindowInfo* pPullInfo, SArray* pPullWins) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t compareResKey(void* pKey, void* data, int32_t index) {
SArray* res = (SArray*)data;
SResKeyPos* pos = taosArrayGetP(res, index);
SWinKey* pData = (SWinKey*)pKey;
if (pData->ts == *(int64_t*)pos->key) {
if (pData->groupId > pos->groupId) {
return 1;
} else if (pData->groupId < pos->groupId) {
return -1;
}
return 0;
} else if (pData->ts > *(int64_t*)pos->key) {
return 1;
}
return -1;
}
static int32_t saveResult(SResultWindowInfo winInfo, SSHashObj* pStUpdated) { static int32_t saveResult(SResultWindowInfo winInfo, SSHashObj* pStUpdated) {
winInfo.sessionWin.win.ekey = winInfo.sessionWin.win.skey; winInfo.sessionWin.win.ekey = winInfo.sessionWin.win.skey;
return tSimpleHashPut(pStUpdated, &winInfo.sessionWin, sizeof(SSessionKey), &winInfo, sizeof(SResultWindowInfo)); return tSimpleHashPut(pStUpdated, &winInfo.sessionWin, sizeof(SSessionKey), &winInfo, sizeof(SResultWindowInfo));
...@@ -863,12 +846,6 @@ static void removeResults(SArray* pWins, SHashObj* pUpdatedMap) { ...@@ -863,12 +846,6 @@ static void removeResults(SArray* pWins, SHashObj* pUpdatedMap) {
} }
} }
int64_t getWinReskey(void* data, int32_t index) {
SArray* res = (SArray*)data;
SWinKey* pos = taosArrayGet(res, index);
return pos->ts;
}
int32_t compareWinRes(void* pKey, void* data, int32_t index) { int32_t compareWinRes(void* pKey, void* data, int32_t index) {
SArray* res = (SArray*)data; SArray* res = (SArray*)data;
SWinKey* pos = taosArrayGet(res, index); SWinKey* pos = taosArrayGet(res, index);
...@@ -1307,27 +1284,6 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) { ...@@ -1307,27 +1284,6 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) {
} }
} }
// todo merged with the build group result.
static void finalizeUpdatedResult(int32_t numOfOutput, SDiskbasedBuf* pBuf, SArray* pUpdateList,
int32_t* rowEntryInfoOffset) {
size_t num = taosArrayGetSize(pUpdateList);
for (int32_t i = 0; i < num; ++i) {
SResKeyPos* pPos = taosArrayGetP(pUpdateList, i);
SFilePage* bufPage = getBufPage(pBuf, pPos->pos.pageId);
SResultRow* pRow = (SResultRow*)((char*)bufPage + pPos->pos.offset);
for (int32_t j = 0; j < numOfOutput; ++j) {
SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, j, rowEntryInfoOffset);
if (pRow->numOfRows < pEntry->numOfRes) {
pRow->numOfRows = pEntry->numOfRes;
}
}
releaseBufPage(pBuf, bufPage);
}
}
static void setInverFunction(SqlFunctionCtx* pCtx, int32_t num, EStreamType type) { static void setInverFunction(SqlFunctionCtx* pCtx, int32_t num, EStreamType type) {
for (int i = 0; i < num; i++) { for (int i = 0; i < num; i++) {
if (type == STREAM_INVERT) { if (type == STREAM_INVERT) {
...@@ -1578,16 +1534,6 @@ static void closeChildIntervalWindow(SOperatorInfo* pOperator, SArray* pChildren ...@@ -1578,16 +1534,6 @@ static void closeChildIntervalWindow(SOperatorInfo* pOperator, SArray* pChildren
} }
} }
static void freeAllPages(SArray* pageIds, SDiskbasedBuf* pDiskBuf) {
int32_t size = taosArrayGetSize(pageIds);
for (int32_t i = 0; i < size; i++) {
int32_t pageId = *(int32_t*)taosArrayGet(pageIds, i);
// SFilePage* bufPage = getBufPage(pDiskBuf, pageId);
// dBufSetBufPageRecycled(pDiskBuf, bufPage);
}
taosArrayClear(pageIds);
}
static void doBuildDeleteResult(SStreamIntervalOperatorInfo* pInfo, SArray* pWins, int32_t* index, static void doBuildDeleteResult(SStreamIntervalOperatorInfo* pInfo, SArray* pWins, int32_t* index,
SSDataBlock* pBlock) { SSDataBlock* pBlock) {
blockDataCleanup(pBlock); blockDataCleanup(pBlock);
...@@ -3353,7 +3299,9 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, ...@@ -3353,7 +3299,9 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
.calTrigger = pIntervalPhyNode->window.triggerType, .calTrigger = pIntervalPhyNode->window.triggerType,
.maxTs = INT64_MIN, .maxTs = INT64_MIN,
.minTs = INT64_MAX, .minTs = INT64_MAX,
.deleteMark = INT64_MAX, // for test 315360000000
.deleteMark = 1000LL * 60LL * 60LL * 24LL * 365LL * 10LL,
// .deleteMark = INT64_MAX,
}; };
ASSERT(pInfo->twAggSup.calTrigger != STREAM_TRIGGER_MAX_DELAY); ASSERT(pInfo->twAggSup.calTrigger != STREAM_TRIGGER_MAX_DELAY);
pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
......
...@@ -236,7 +236,43 @@ endi ...@@ -236,7 +236,43 @@ endi
print loop3 over print loop3 over
sql drop stream if exists streams1;
sql drop database if exists test1;
sql create database test1 vgroups 4 keep 7000;
sql use test1;
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create stream streams1 trigger at_once into streamt1 as select _wstart as c0, count(*) c1, count(a) c2 from st interval(10s) ;
sql insert into t1 values(1648791211000,1,2,3);
sql insert into t1 values(1262275200000,2,2,3);
sql insert into t2 values(1262275200000,1,2,3);
$loop_count = 0
loop4:
sleep 300
sql select * from streamt1 order by c0;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
# row 0
if $rows != 2 then
print =====loop4=rows=$rows
goto loop4
endi
if $data01 != 2 then
print =====loop4=data11=$data11
goto loop4
endi
print loop4 over
#==system sh/exec.sh -n dnode1 -s stop -x SIGINT #==system sh/exec.sh -n dnode1 -s stop -x SIGINT
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册