提交 ff5f1aa1 编写于 作者: L Liu Jicong

refactor(stream)

上级 5640036f
...@@ -94,7 +94,6 @@ int64_t tqScan(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffset ...@@ -94,7 +94,6 @@ int64_t tqScan(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffset
rowCnt += pDataBlock->info.rows; rowCnt += pDataBlock->info.rows;
if (rowCnt <= 4096) continue; if (rowCnt <= 4096) continue;
} }
continue;
} }
void* meta = qStreamExtractMetaMsg(task); void* meta = qStreamExtractMetaMsg(task);
......
...@@ -1234,9 +1234,6 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock ...@@ -1234,9 +1234,6 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
doFilter(pInfo->pCondition, pInfo->pRes); doFilter(pInfo->pCondition, pInfo->pRes);
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex); blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
if (pBlockInfo->rows > 0) {
return 0;
}
return 0; return 0;
} }
...@@ -1262,7 +1259,9 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { ...@@ -1262,7 +1259,9 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
/*pTaskInfo->streamInfo.lastStatus = ret.offset;*/ /*pTaskInfo->streamInfo.lastStatus = ret.offset;*/
if (pInfo->pRes->info.rows > 0) { if (pInfo->pRes->info.rows > 0) {
return pInfo->pRes; return pInfo->pRes;
/*} else {*/ } else {
// data is filtered out, do clean
/*tDeleteSSDataBlock(&ret.data);*/ /*tDeleteSSDataBlock(&ret.data);*/
} }
} else if (ret.fetchType == FETCH_TYPE__META) { } else if (ret.fetchType == FETCH_TYPE__META) {
...@@ -1271,14 +1270,8 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { ...@@ -1271,14 +1270,8 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
pTaskInfo->streamInfo.metaBlk = ret.meta; pTaskInfo->streamInfo.metaBlk = ret.meta;
return NULL; return NULL;
} else if (ret.fetchType == FETCH_TYPE__NONE) { } else if (ret.fetchType == FETCH_TYPE__NONE) {
/*if (ret.offset.version == -1) {*/
/*pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;*/
/*pTaskInfo->streamInfo.lastStatus.version = pTaskInfo->streamInfo.prepareStatus.version - 1;*/
/*} else {*/
pTaskInfo->streamInfo.lastStatus = ret.offset; pTaskInfo->streamInfo.lastStatus = ret.offset;
/*pTaskInfo->streamInfo.lastScanUid */
ASSERT(pTaskInfo->streamInfo.lastStatus.version + 1 >= pTaskInfo->streamInfo.prepareStatus.version); ASSERT(pTaskInfo->streamInfo.lastStatus.version + 1 >= pTaskInfo->streamInfo.prepareStatus.version);
/*}*/
return NULL; return NULL;
} else { } else {
ASSERT(0); ASSERT(0);
...@@ -1398,72 +1391,8 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { ...@@ -1398,72 +1391,8 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
return NULL; return NULL;
} }
pInfo->pRes->info.rows = block.info.rows; setBlockIntoRes(pInfo, &block);
pInfo->pRes->info.uid = block.info.uid;
pInfo->pRes->info.type = STREAM_NORMAL;
pInfo->pRes->info.capacity = block.info.rows;
uint64_t* groupIdPre = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &block.info.uid, sizeof(int64_t));
if (groupIdPre) {
pInfo->pRes->info.groupId = *groupIdPre;
} else {
pInfo->pRes->info.groupId = 0;
}
// for generating rollup SMA result, each time is an independent time serie.
// TODO temporarily used, when the statement of "partition by tbname" is ready, remove this
if (pInfo->assignBlockUid) {
pInfo->pRes->info.groupId = block.info.uid;
}
// todo extract method
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pColMatchInfo); ++i) {
SColMatchInfo* pColMatchInfo = taosArrayGet(pInfo->pColMatchInfo, i);
if (!pColMatchInfo->output) {
continue;
}
bool colExists = false;
for (int32_t j = 0; j < blockDataGetNumOfCols(&block); ++j) {
SColumnInfoData* pResCol = bdGetColumnInfoData(&block, j);
if (pResCol->info.colId == pColMatchInfo->colId) {
taosArraySet(pInfo->pRes->pDataBlock, pColMatchInfo->targetSlotId, pResCol);
colExists = true;
break;
}
}
// the required column does not exists in submit block, let's set it to be all null value
if (!colExists) {
SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->targetSlotId);
colDataAppendNNULL(pDst, 0, pBlockInfo->rows);
}
}
taosArrayDestroy(block.pDataBlock);
ASSERT(pInfo->pRes->pDataBlock != NULL);
#if 0
if (pInfo->pRes->pDataBlock == NULL) {
// TODO add log
updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
pOperator->status = OP_EXEC_DONE;
pTaskInfo->code = terrno;
return NULL;
}
#endif
// currently only the tbname pseudo column
if (pInfo->numOfPseudoExpr > 0) {
code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes,
GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
}
}
doFilter(pInfo->pCondition, pInfo->pRes);
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
if (pBlockInfo->rows > 0) { if (pBlockInfo->rows > 0) {
break; break;
} }
...@@ -1493,12 +1422,14 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { ...@@ -1493,12 +1422,14 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes; return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes;
#if 0
} else if (pInfo->blockType == STREAM_INPUT__TABLE_SCAN) { } else if (pInfo->blockType == STREAM_INPUT__TABLE_SCAN) {
ASSERT(0); ASSERT(0);
// check reader last status // check reader last status
// if not match, reset status // if not match, reset status
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp); SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
return pResult && pResult->info.rows > 0 ? pResult : NULL; return pResult && pResult->info.rows > 0 ? pResult : NULL;
#endif
} else { } else {
ASSERT(0); ASSERT(0);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册