提交 327e13c3 编写于 作者: H Haojun Liao

fix(stream): fix the syntax error.

上级 d6d63ec5
...@@ -1754,7 +1754,8 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { ...@@ -1754,7 +1754,8 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
qDebug("doQueueScan get data from log %" PRId64 " rows, version:%" PRId64, pRes->info.rows, qDebug("doQueueScan get data from log %" PRId64 " rows, version:%" PRId64, pRes->info.rows,
pTaskInfo->streamInfo.currentOffset.version); pTaskInfo->streamInfo.currentOffset.version);
blockDataCleanup(pInfo->pRes); blockDataCleanup(pInfo->pRes);
setBlockIntoRes(pInfo, pRes, true); STimeWindow defaultWindow = {.skey = INT64_MIN, .ekey = INT64_MAX};
setBlockIntoRes(pInfo, pRes, &defaultWindow, true);
if (pInfo->pRes->info.rows > 0) { if (pInfo->pRes->info.rows > 0) {
return pInfo->pRes; return pInfo->pRes;
} }
...@@ -1863,80 +1864,6 @@ void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo) ...@@ -1863,80 +1864,6 @@ void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo)
} }
} }
static void doBlockDataWindowFilter(SSDataBlock* pBlock, int32_t tsIndex, STimeWindow* pWindow, const char* id) {
if (pWindow->skey != INT64_MIN || pWindow->ekey != INT64_MAX) {
bool* p = taosMemoryCalloc(pBlock->info.rows, sizeof(bool));
bool hasUnqualified = false;
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, tsIndex);
if (pWindow->skey != INT64_MIN) {
qDebug("%s filter for additional history window, skey:%" PRId64, id, pWindow->skey);
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
int64_t* ts = (int64_t*)colDataGetData(pCol, i);
p[i] = (*ts >= pWindow->skey);
if (!p[i]) {
hasUnqualified = true;
}
}
} else if (pWindow->ekey != INT64_MAX) {
qDebug("%s filter for additional history window, ekey:%" PRId64, id, pWindow->ekey);
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
int64_t* ts = (int64_t*)colDataGetData(pCol, i);
p[i] = (*ts <= pWindow->ekey);
if (!p[i]) {
hasUnqualified = true;
}
}
}
if (hasUnqualified) {
trimDataBlock(pBlock, pBlock->info.rows, p);
}
taosMemoryFree(p);
}
}
// re-build the delete block, ONLY according to the split timestamp
static void rebuildDeleteBlockData(SSDataBlock* pBlock, int64_t skey, const char* id) {
if (skey == INT64_MIN) {
return;
}
int32_t numOfRows = pBlock->info.rows;
bool* p = taosMemoryCalloc(numOfRows, sizeof(bool));
bool hasUnqualified = false;
SColumnInfoData* pSrcStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
uint64_t* tsStartCol = (uint64_t*)pSrcStartCol->pData;
SColumnInfoData* pSrcEndCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
uint64_t* tsEndCol = (uint64_t*)pSrcEndCol->pData;
for (int32_t i = 0; i < numOfRows; i++) {
if (tsStartCol[i] < skey) {
tsStartCol[i] = skey;
}
if (tsEndCol[i] >= skey) {
p[i] = true;
} else { // this row should be removed, since it is not in this query time window, which is [skey, INT64_MAX]
hasUnqualified = true;
}
}
if (hasUnqualified) {
trimDataBlock(pBlock, pBlock->info.rows, p);
}
qDebug("%s re-build delete datablock, start key revised to:%"PRId64", rows:%"PRId64, id, skey, pBlock->info.rows);
taosMemoryFree(p);
}
static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
// NOTE: this operator does never check if current status is done or not // NOTE: this operator does never check if current status is done or not
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
...@@ -2109,7 +2036,7 @@ FETCH_NEXT_BLOCK: ...@@ -2109,7 +2036,7 @@ FETCH_NEXT_BLOCK:
} }
setBlockGroupIdByUid(pInfo, pDelBlock); setBlockGroupIdByUid(pInfo, pDelBlock);
rebuildDeleteBlockData(pDelBlock, pStreamInfo->fillHistoryWindow.skey, id); rebuildDeleteBlockData(pDelBlock, &pStreamInfo->fillHistoryWindow, id);
printDataBlock(pDelBlock, "stream scan delete recv filtered"); printDataBlock(pDelBlock, "stream scan delete recv filtered");
if (pDelBlock->info.rows == 0) { if (pDelBlock->info.rows == 0) {
if (pInfo->tqReader) { if (pInfo->tqReader) {
...@@ -2254,7 +2181,7 @@ FETCH_NEXT_BLOCK: ...@@ -2254,7 +2181,7 @@ FETCH_NEXT_BLOCK:
continue; continue;
} }
setBlockIntoRes(pInfo, pRes, false); setBlockIntoRes(pInfo, pRes, &pStreamInfo->fillHistoryWindow, false);
if (pInfo->pCreateTbRes->info.rows > 0) { if (pInfo->pCreateTbRes->info.rows > 0) {
pInfo->scanMode = STREAM_SCAN_FROM_RES; pInfo->scanMode = STREAM_SCAN_FROM_RES;
qDebug("create table res exists, rows:%"PRId64" return from stream scan, %s", pInfo->pCreateTbRes->info.rows, id); qDebug("create table res exists, rows:%"PRId64" return from stream scan, %s", pInfo->pCreateTbRes->info.rows, id);
......
...@@ -492,12 +492,6 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory ...@@ -492,12 +492,6 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory
return 0; return 0;
} }
int32_t streamTaskFillHistoryFinished(SStreamTask* pTask) {
void* exec = pTask->exec.pExecutor;
qResetStreamInfoTimeWindow(exec);
return 0;
}
int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) { int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) {
ASSERT(pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY); ASSERT(pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY);
SStreamMeta* pMeta = pTask->pMeta; SStreamMeta* pMeta = pTask->pMeta;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册