提交 7d699ae5 编写于 作者: L liuyao

fix sliding window issue

上级 0d2fa6f1
...@@ -1883,7 +1883,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { ...@@ -1883,7 +1883,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
if (pInfo->pRecoverRes != NULL) { if (pInfo->pRecoverRes != NULL) {
pInfo->blockRecoverContiCnt++; pInfo->blockRecoverContiCnt++;
calBlockTbName(pInfo, pInfo->pRecoverRes); calBlockTbName(pInfo, pInfo->pRecoverRes);
if (pInfo->pUpdateInfo) { if (!pInfo->igCheckUpdate && pInfo->pUpdateInfo) {
if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN1) { if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN1) {
TSKEY maxTs = pAPI->stateStore.updateInfoFillBlockData(pInfo->pUpdateInfo, pInfo->pRecoverRes, pInfo->primaryTsIndex); TSKEY maxTs = pAPI->stateStore.updateInfoFillBlockData(pInfo->pUpdateInfo, pInfo->pRecoverRes, pInfo->primaryTsIndex);
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
......
...@@ -1623,7 +1623,7 @@ void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SStreamInt ...@@ -1623,7 +1623,7 @@ void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SStreamInt
SStreamScanInfo* pScanInfo = downstream->info; SStreamScanInfo* pScanInfo = downstream->info;
pScanInfo->windowSup.parentType = type; pScanInfo->windowSup.parentType = type;
pScanInfo->windowSup.pIntervalAggSup = &pInfo->aggSup; pScanInfo->windowSup.pIntervalAggSup = &pInfo->aggSup;
if (!pScanInfo->igCheckUpdate && !pScanInfo->pUpdateInfo) { if (!pScanInfo->pUpdateInfo) {
pScanInfo->pUpdateInfo = pAPI->updateInfoInitP(&pInfo->interval, pInfo->twAggSup.waterMark); pScanInfo->pUpdateInfo = pAPI->updateInfoInitP(&pInfo->interval, pInfo->twAggSup.waterMark);
} }
...@@ -2150,28 +2150,29 @@ static void doBuildPullDataBlock(SArray* array, int32_t* pIndex, SSDataBlock* pB ...@@ -2150,28 +2150,29 @@ static void doBuildPullDataBlock(SArray* array, int32_t* pIndex, SSDataBlock* pB
} }
void processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SInterval* pInterval) { void processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SInterval* pInterval) {
SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
TSKEY* tsData = (TSKEY*)pStartCol->pData; TSKEY* tsData = (TSKEY*)pStartCol->pData;
SColumnInfoData* pEndCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX); SColumnInfoData* pEndCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
TSKEY* tsEndData = (TSKEY*)pEndCol->pData; TSKEY* tsEndData = (TSKEY*)pEndCol->pData;
SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
uint64_t* groupIdData = (uint64_t*)pGroupCol->pData; uint64_t* groupIdData = (uint64_t*)pGroupCol->pData;
int32_t chId = getChildIndex(pBlock); int32_t chId = getChildIndex(pBlock);
for (int32_t i = 0; i < pBlock->info.rows; i++) { for (int32_t i = 0; i < pBlock->info.rows; i++) {
TSKEY winTs = tsData[i]; TSKEY winTs = tsData[i];
while (winTs < tsEndData[i]) { while (winTs <= tsEndData[i]) {
SWinKey winRes = {.ts = winTs, .groupId = groupIdData[i]}; SWinKey winRes = {.ts = winTs, .groupId = groupIdData[i]};
void* chIds = taosHashGet(pMap, &winRes, sizeof(SWinKey)); void* chIds = taosHashGet(pMap, &winRes, sizeof(SWinKey));
if (chIds) { if (chIds) {
SArray* chArray = *(SArray**)chIds; SArray* chArray = *(SArray**)chIds;
int32_t index = taosArraySearchIdx(chArray, &chId, compareInt32Val, TD_EQ); int32_t index = taosArraySearchIdx(chArray, &chId, compareInt32Val, TD_EQ);
if (index != -1) { if (index != -1) {
qDebug("===stream===window %" PRId64 " delete child id %d", winRes.ts, chId); qDebug("===stream===retrive window %" PRId64 " delete child id %d", winRes.ts, chId);
taosArrayRemove(chArray, index); taosArrayRemove(chArray, index);
if (taosArrayGetSize(chArray) == 0) { if (taosArrayGetSize(chArray) == 0) {
// pull data is over // pull data is over
taosArrayDestroy(chArray); taosArrayDestroy(chArray);
taosHashRemove(pMap, &winRes, sizeof(SWinKey)); taosHashRemove(pMap, &winRes, sizeof(SWinKey));
qDebug("===stream===retrive pull data over.window %" PRId64 , winRes.ts);
} }
} }
} }
......
...@@ -1062,7 +1062,7 @@ _end: ...@@ -1062,7 +1062,7 @@ _end:
} }
int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) { int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) {
qWarn("try to write to cf parname"); qDebug("try to write to cf parname");
#ifdef USE_ROCKSDB #ifdef USE_ROCKSDB
if (tSimpleHashGetSize(pState->parNameMap) > MAX_TABLE_NAME_NUM) { if (tSimpleHashGetSize(pState->parNameMap) > MAX_TABLE_NAME_NUM) {
if (tSimpleHashGet(pState->parNameMap, &groupId, sizeof(int64_t)) == NULL) { if (tSimpleHashGet(pState->parNameMap, &groupId, sizeof(int64_t)) == NULL) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册