提交 95e4a481 编写于 作者: L Liu Jicong

fix(stream): fill combine with subtable

上级 bdfef853
...@@ -1282,7 +1282,9 @@ static void doStreamFillImpl(SOperatorInfo* pOperator) { ...@@ -1282,7 +1282,9 @@ static void doStreamFillImpl(SOperatorInfo* pOperator) {
blockDataCleanup(pInfo->pSrcBlock); blockDataCleanup(pInfo->pSrcBlock);
} }
static void buildDeleteRange(TSKEY start, TSKEY end, uint64_t groupId, SSDataBlock* delRes) { static void buildDeleteRange(SOperatorInfo* pOp, TSKEY start, TSKEY end, uint64_t groupId, SSDataBlock* delRes) {
SStreamState* pState = pOp->pTaskInfo->streamInfo.pState;
SSDataBlock* pBlock = delRes; SSDataBlock* pBlock = delRes;
SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
SColumnInfoData* pEndCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX); SColumnInfoData* pEndCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
...@@ -1290,25 +1292,42 @@ static void buildDeleteRange(TSKEY start, TSKEY end, uint64_t groupId, SSDataBlo ...@@ -1290,25 +1292,42 @@ static void buildDeleteRange(TSKEY start, TSKEY end, uint64_t groupId, SSDataBlo
SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
SColumnInfoData* pCalStartCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX); SColumnInfoData* pCalStartCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
SColumnInfoData* pCalEndCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX); SColumnInfoData* pCalEndCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
SColumnInfoData* pTbNameCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
colDataAppend(pStartCol, pBlock->info.rows, (const char*)&start, false); colDataAppend(pStartCol, pBlock->info.rows, (const char*)&start, false);
colDataAppend(pEndCol, pBlock->info.rows, (const char*)&end, false); colDataAppend(pEndCol, pBlock->info.rows, (const char*)&end, false);
colDataAppendNULL(pUidCol, pBlock->info.rows); colDataAppendNULL(pUidCol, pBlock->info.rows);
colDataAppend(pGroupCol, pBlock->info.rows, (const char*)&groupId, false); colDataAppend(pGroupCol, pBlock->info.rows, (const char*)&groupId, false);
colDataAppendNULL(pCalStartCol, pBlock->info.rows); colDataAppendNULL(pCalStartCol, pBlock->info.rows);
colDataAppendNULL(pCalEndCol, pBlock->info.rows); colDataAppendNULL(pCalEndCol, pBlock->info.rows);
SColumnInfoData* pTableCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
void* tbname = NULL;
streamStateGetParName(pOp->pTaskInfo->streamInfo.pState, groupId, &tbname);
if (tbname == NULL) {
colDataAppendNULL(pTableCol, pBlock->info.rows);
} else {
char parTbName[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN];
STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName));
colDataAppend(pTableCol, pBlock->info.rows, (const char*)parTbName, false);
}
tdbFree(tbname);
pBlock->info.rows++; pBlock->info.rows++;
} }
static void buildDeleteResult(SStreamFillSupporter* pFillSup, TSKEY startTs, TSKEY endTs, uint64_t groupId, static void buildDeleteResult(SOperatorInfo* pOperator, TSKEY startTs, TSKEY endTs, uint64_t groupId,
SSDataBlock* delRes) { SSDataBlock* delRes) {
SStreamFillOperatorInfo* pInfo = pOperator->info;
SStreamFillSupporter* pFillSup = pInfo->pFillSup;
if (hasPrevWindow(pFillSup)) { if (hasPrevWindow(pFillSup)) {
TSKEY start = getNextWindowTs(pFillSup->prev.key, &pFillSup->interval); TSKEY start = getNextWindowTs(pFillSup->prev.key, &pFillSup->interval);
buildDeleteRange(start, endTs, groupId, delRes); buildDeleteRange(pOperator, start, endTs, groupId, delRes);
} else if (hasNextWindow(pFillSup)) { } else if (hasNextWindow(pFillSup)) {
TSKEY end = getPrevWindowTs(pFillSup->next.key, &pFillSup->interval); TSKEY end = getPrevWindowTs(pFillSup->next.key, &pFillSup->interval);
buildDeleteRange(startTs, end, groupId, delRes); buildDeleteRange(pOperator, startTs, end, groupId, delRes);
} else { } else {
buildDeleteRange(startTs, endTs, groupId, delRes); buildDeleteRange(pOperator, startTs, endTs, groupId, delRes);
} }
} }
...@@ -1319,7 +1338,7 @@ static void doDeleteFillResultImpl(SOperatorInfo* pOperator, TSKEY startTs, TSKE ...@@ -1319,7 +1338,7 @@ static void doDeleteFillResultImpl(SOperatorInfo* pOperator, TSKEY startTs, TSKE
SWinKey key = {.ts = startTs, .groupId = groupId}; SWinKey key = {.ts = startTs, .groupId = groupId};
if (!pInfo->pFillInfo->needFill) { if (!pInfo->pFillInfo->needFill) {
streamStateFillDel(pOperator->pTaskInfo->streamInfo.pState, &key); streamStateFillDel(pOperator->pTaskInfo->streamInfo.pState, &key);
buildDeleteResult(pInfo->pFillSup, startTs, endTs, groupId, pInfo->pDelRes); buildDeleteResult(pOperator, startTs, endTs, groupId, pInfo->pDelRes);
} else { } else {
STimeRange tw = { STimeRange tw = {
.skey = startTs, .skey = startTs,
...@@ -1578,7 +1597,7 @@ static SStreamFillSupporter* initStreamFillSup(SStreamFillPhysiNode* pPhyFillNod ...@@ -1578,7 +1597,7 @@ static SStreamFillSupporter* initStreamFillSup(SStreamFillPhysiNode* pPhyFillNod
destroyStreamFillSupporter(pFillSup); destroyStreamFillSupporter(pFillSup);
return NULL; return NULL;
} }
SExprInfo* noFillExpr = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &numOfNotFillCols); SExprInfo* noFillExpr = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &numOfNotFillCols);
code = initExprSupp(&pFillSup->notFillExprSup, noFillExpr, numOfNotFillCols); code = initExprSupp(&pFillSup->notFillExprSup, noFillExpr, numOfNotFillCols);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -1715,9 +1734,9 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi ...@@ -1715,9 +1734,9 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi
} }
pInfo->srcRowIndex = 0; pInfo->srcRowIndex = 0;
setOperatorInfo(pOperator, "StreamFillOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL, false, OP_NOT_OPENED, pInfo, pTaskInfo); setOperatorInfo(pOperator, "StreamFillOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL, false, OP_NOT_OPENED, pInfo,
pOperator->fpSet = pTaskInfo);
createOperatorFpSet(operatorDummyOpenFn, doStreamFill, NULL, destroyStreamFillOperatorInfo, NULL); pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamFill, NULL, destroyStreamFillOperatorInfo, NULL);
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册