未验证 提交 6fc45a6f 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #18019 from taosdata/feature/stream

fix(stream): state and session agg partition tbname
......@@ -1776,7 +1776,8 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh
}
initResultRowInfo(&pInfo->binfo.resultRowInfo);
setOperatorInfo(pOperator, "TimeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL, true, OP_NOT_OPENED, pInfo, pTaskInfo);
setOperatorInfo(pOperator, "TimeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL, true, OP_NOT_OPENED,
pInfo, pTaskInfo);
pOperator->fpSet =
createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, NULL, destroyIntervalOperatorInfo, NULL);
......@@ -2551,9 +2552,9 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode
pScanInfo->cond.twindows = pInfo->win;
pScanInfo->cond.type = TIMEWINDOW_RANGE_EXTERNAL;
setOperatorInfo(pOperator, "TimeSliceOperator", QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC, false, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doTimeslice, NULL, destroyTimeSliceOperatorInfo, NULL);
setOperatorInfo(pOperator, "TimeSliceOperator", QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC, false, OP_NOT_OPENED, pInfo,
pTaskInfo);
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTimeslice, NULL, destroyTimeSliceOperatorInfo, NULL);
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
......@@ -2622,7 +2623,8 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi
pInfo->tsSlotId = tsSlotId;
setOperatorInfo(pOperator, "StateWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo, pTaskInfo);
setOperatorInfo(pOperator, "StateWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo,
pTaskInfo);
pOperator->fpSet =
createOperatorFpSet(openStateWindowAggOptr, doStateWindowAgg, NULL, destroyStateWindowOperatorInfo, NULL);
......@@ -2694,7 +2696,8 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW
goto _error;
}
setOperatorInfo(pOperator, "SessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION, true, OP_NOT_OPENED, pInfo, pTaskInfo);
setOperatorInfo(pOperator, "SessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION, true, OP_NOT_OPENED,
pInfo, pTaskInfo);
pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doSessionWindowAgg, NULL, destroySWindowOperatorInfo, NULL);
pOperator->pTaskInfo = pTaskInfo;
......@@ -3822,7 +3825,7 @@ static int32_t copyUpdateResult(SSHashObj* pStUpdated, SArray* pUpdated) {
return TSDB_CODE_SUCCESS;
}
void doBuildDeleteDataBlock(SSHashObj* pStDeleted, SSDataBlock* pBlock, void** Ite) {
void doBuildDeleteDataBlock(SOperatorInfo* pOp, SSHashObj* pStDeleted, SSDataBlock* pBlock, void** Ite) {
blockDataCleanup(pBlock);
int32_t size = tSimpleHashGetSize(pStDeleted);
if (size == 0) {
......@@ -3848,6 +3851,26 @@ void doBuildDeleteDataBlock(SSHashObj* pStDeleted, SSDataBlock* pBlock, void** I
colDataAppendNULL(pCalStCol, pBlock->info.rows);
SColumnInfoData* pCalEdCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
colDataAppendNULL(pCalEdCol, pBlock->info.rows);
SHashObj* pGroupIdTbNameMap = NULL;
if (pOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
pOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
SStreamSessionAggOperatorInfo* pInfo = pOp->info;
pGroupIdTbNameMap = pInfo->pGroupIdTbNameMap;
} else if (pOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
SStreamStateAggOperatorInfo* pInfo = pOp->info;
pGroupIdTbNameMap = pInfo->pGroupIdTbNameMap;
}
char* tbname = taosHashGet(pGroupIdTbNameMap, &res->groupId, sizeof(int64_t));
SColumnInfoData* pTableCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
if (tbname == NULL) {
colDataAppendNULL(pTableCol, pBlock->info.rows);
} else {
char parTbName[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN];
STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName));
colDataAppend(pTableCol, pBlock->info.rows, (const char*)parTbName, false);
}
pBlock->info.rows += 1;
}
if ((*Ite) == NULL) {
......@@ -3994,7 +4017,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
} else if (pOperator->status == OP_RES_TO_RETURN) {
doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "final session" : "single session");
return pInfo->pDelRes;
......@@ -4099,7 +4122,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
taosMemoryFree(pBuf);
#endif
doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "final session" : "single session");
return pInfo->pDelRes;
......@@ -4223,7 +4246,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
return pBInfo->pRes;
}
doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, "semi session delete");
return pInfo->pDelRes;
......@@ -4303,7 +4326,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
return pBInfo->pRes;
}
doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, "semi session delete");
return pInfo->pDelRes;
......@@ -4327,7 +4350,7 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
pInfo->isFinal = (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION);
char* name = (pInfo->isFinal)? "StreamSessionFinalAggOperator":"StreamSessionSemiAggOperator";
char* name = (pInfo->isFinal) ? "StreamSessionFinalAggOperator" : "StreamSessionSemiAggOperator";
if (pPhyNode->type != QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR);
......@@ -4336,7 +4359,7 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream
destroyStreamSessionAggOperatorInfo, NULL);
}
setOperatorInfo(pOperator, name, pPhyNode->type , false, OP_NOT_OPENED, pInfo, pTaskInfo);
setOperatorInfo(pOperator, name, pPhyNode->type, false, OP_NOT_OPENED, pInfo, pTaskInfo);
pInfo->pGroupIdTbNameMap =
taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK);
......@@ -4555,7 +4578,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
SOptrBasicInfo* pBInfo = &pInfo->binfo;
int64_t maxTs = INT64_MIN;
if (pOperator->status == OP_RES_TO_RETURN) {
doBuildDeleteDataBlock(pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
doBuildDeleteDataBlock(pOperator, pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, "single state delete");
return pInfo->pDelRes;
......@@ -4622,7 +4645,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
initGroupResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
doBuildDeleteDataBlock(pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
doBuildDeleteDataBlock(pOperator, pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, "single state delete");
return pInfo->pDelRes;
......@@ -4698,7 +4721,8 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->pGroupIdTbNameMap =
taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK);
setOperatorInfo(pOperator, "StreamStateAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, true, OP_NOT_OPENED, pInfo, pTaskInfo);
setOperatorInfo(pOperator, "StreamStateAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, true, OP_NOT_OPENED,
pInfo, pTaskInfo);
pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doStreamStateAgg, NULL, destroyStreamStateOperatorInfo, NULL);
initDownStream(downstream, &pInfo->streamAggSup, pInfo->twAggSup.waterMark, pOperator->operatorType,
......@@ -4973,7 +4997,8 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream,
initResultRowInfo(&iaInfo->binfo.resultRowInfo);
blockDataEnsureCapacity(iaInfo->binfo.pRes, pOperator->resultInfo.capacity);
setOperatorInfo(pOperator, "TimeMergeAlignedIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL, false, OP_NOT_OPENED, miaInfo, pTaskInfo);
setOperatorInfo(pOperator, "TimeMergeAlignedIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL,
false, OP_NOT_OPENED, miaInfo, pTaskInfo);
pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, mergeAlignedIntervalAgg, NULL, destroyMAIOperatorInfo, NULL);
......@@ -5279,7 +5304,8 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMerge
}
initResultRowInfo(&pIntervalInfo->binfo.resultRowInfo);
setOperatorInfo(pOperator, "TimeMergeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL, false, OP_NOT_OPENED, pMergeIntervalInfo, pTaskInfo);
setOperatorInfo(pOperator, "TimeMergeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL, false,
OP_NOT_OPENED, pMergeIntervalInfo, pTaskInfo);
pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doMergeIntervalAgg, NULL, destroyMergeIntervalOperatorInfo, NULL);
......@@ -5509,9 +5535,10 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->pGroupIdTbNameMap =
taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK);
setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamIntervalAgg, NULL,
destroyStreamFinalIntervalOperatorInfo, NULL);
setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED,
pInfo, pTaskInfo);
pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doStreamIntervalAgg, NULL, destroyStreamFinalIntervalOperatorInfo, NULL);
initIntervalDownStream(downstream, pPhyNode->type, &pInfo->aggSup, &pInfo->interval, &pInfo->twAggSup);
code = appendDownstream(pOperator, &downstream, 1);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册