提交 cebc6b66 编写于 作者: L Liu Jicong

fix(stream): state and session agg partition tbname

上级 d56dc950
......@@ -3843,7 +3843,7 @@ static int32_t copyUpdateResult(SSHashObj* pStUpdated, SArray* pUpdated) {
return TSDB_CODE_SUCCESS;
}
void doBuildDeleteDataBlock(SSHashObj* pStDeleted, SSDataBlock* pBlock, void** Ite) {
void doBuildDeleteDataBlock(SOperatorInfo* pOp, SSHashObj* pStDeleted, SSDataBlock* pBlock, void** Ite) {
blockDataCleanup(pBlock);
int32_t size = tSimpleHashGetSize(pStDeleted);
if (size == 0) {
......@@ -3869,6 +3869,26 @@ void doBuildDeleteDataBlock(SSHashObj* pStDeleted, SSDataBlock* pBlock, void** I
colDataAppendNULL(pCalStCol, pBlock->info.rows);
SColumnInfoData* pCalEdCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
colDataAppendNULL(pCalEdCol, pBlock->info.rows);
SHashObj* pGroupIdTbNameMap;
if (pOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
pOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
SStreamSessionAggOperatorInfo* pInfo = pOp->info;
pGroupIdTbNameMap = pInfo->pGroupIdTbNameMap;
} else if (pOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
SStreamStateAggOperatorInfo* pInfo = pOp->info;
pGroupIdTbNameMap = pInfo->pGroupIdTbNameMap;
}
char* tbname = taosHashGet(pGroupIdTbNameMap, &res->groupId, sizeof(int64_t));
SColumnInfoData* pTableCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
if (tbname == NULL) {
colDataAppendNULL(pTableCol, pBlock->info.rows);
} else {
char parTbName[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN];
STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName));
colDataAppend(pTableCol, pBlock->info.rows, (const char*)parTbName, false);
}
pBlock->info.rows += 1;
}
if ((*Ite) == NULL) {
......@@ -4013,7 +4033,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
} else if (pOperator->status == OP_RES_TO_RETURN) {
doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "final session" : "single session");
return pInfo->pDelRes;
......@@ -4112,7 +4132,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
initGroupResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "final session" : "single session");
return pInfo->pDelRes;
......@@ -4238,7 +4258,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
return pBInfo->pRes;
}
doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, "semi session delete");
return pInfo->pDelRes;
......@@ -4312,7 +4332,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
return pBInfo->pRes;
}
doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, "semi session delete");
return pInfo->pDelRes;
......@@ -4563,7 +4583,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
SOptrBasicInfo* pBInfo = &pInfo->binfo;
int64_t maxTs = INT64_MIN;
if (pOperator->status == OP_RES_TO_RETURN) {
doBuildDeleteDataBlock(pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
doBuildDeleteDataBlock(pOperator, pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, "single state delete");
return pInfo->pDelRes;
......@@ -4630,7 +4650,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
initGroupResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
doBuildDeleteDataBlock(pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
doBuildDeleteDataBlock(pOperator, pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, "single state delete");
return pInfo->pDelRes;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册