未验证 提交 87586431 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #17465 from taosdata/feature/stream

fix(stream): stream state and session support partition tbname
...@@ -1095,7 +1095,7 @@ void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput); ...@@ -1095,7 +1095,7 @@ void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput);
int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup, int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup,
SGroupResInfo* pGroupResInfo); SGroupResInfo* pGroupResInfo);
int32_t saveSessionDiscBuf(SStreamState* pState, SSessionKey* key, void* buf, int32_t size); int32_t saveSessionDiscBuf(SStreamState* pState, SSessionKey* key, void* buf, int32_t size);
int32_t buildSessionResultDataBlock(SExecTaskInfo* pTaskInfo, SStreamState* pState, SSDataBlock* pBlock, int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock,
SExprSupp* pSup, SGroupResInfo* pGroupResInfo); SExprSupp* pSup, SGroupResInfo* pGroupResInfo);
int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx, int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx,
int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup); int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup);
......
...@@ -4312,8 +4312,9 @@ int32_t saveSessionDiscBuf(SStreamState* pState, SSessionKey* key, void* buf, in ...@@ -4312,8 +4312,9 @@ int32_t saveSessionDiscBuf(SStreamState* pState, SSessionKey* key, void* buf, in
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t buildSessionResultDataBlock(SExecTaskInfo* pTaskInfo, SStreamState* pState, SSDataBlock* pBlock, int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock,
SExprSupp* pSup, SGroupResInfo* pGroupResInfo) { SExprSupp* pSup, SGroupResInfo* pGroupResInfo) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SExprInfo* pExprInfo = pSup->pExprInfo; SExprInfo* pExprInfo = pSup->pExprInfo;
int32_t numOfExprs = pSup->numOfExprs; int32_t numOfExprs = pSup->numOfExprs;
int32_t* rowEntryOffset = pSup->rowEntryInfoOffset; int32_t* rowEntryOffset = pSup->rowEntryInfoOffset;
...@@ -4338,6 +4339,31 @@ int32_t buildSessionResultDataBlock(SExecTaskInfo* pTaskInfo, SStreamState* pSta ...@@ -4338,6 +4339,31 @@ int32_t buildSessionResultDataBlock(SExecTaskInfo* pTaskInfo, SStreamState* pSta
if (pBlock->info.groupId == 0) { if (pBlock->info.groupId == 0) {
pBlock->info.groupId = pKey->groupId; pBlock->info.groupId = pKey->groupId;
if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
SStreamStateAggOperatorInfo* pInfo = pOperator->info;
char* tbname = taosHashGet(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t));
if (tbname != NULL) {
memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
} else {
pBlock->info.parTbName[0] = 0;
}
} else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
char* tbname = taosHashGet(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t));
if (tbname != NULL) {
memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
} else {
pBlock->info.parTbName[0] = 0;
}
} else {
ASSERT(0);
}
} else { } else {
// current value belongs to different group, it can't be packed into one datablock // current value belongs to different group, it can't be packed into one datablock
if (pBlock->info.groupId != pKey->groupId) { if (pBlock->info.groupId != pKey->groupId) {
...@@ -4383,4 +4409,4 @@ int32_t buildSessionResultDataBlock(SExecTaskInfo* pTaskInfo, SStreamState* pSta ...@@ -4383,4 +4409,4 @@ int32_t buildSessionResultDataBlock(SExecTaskInfo* pTaskInfo, SStreamState* pSta
} }
blockDataUpdateTsWindow(pBlock, 0); blockDataUpdateTsWindow(pBlock, 0);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
\ No newline at end of file
...@@ -1492,6 +1492,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) { ...@@ -1492,6 +1492,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
case STREAM_NORMAL: case STREAM_NORMAL:
case STREAM_INVALID: { case STREAM_INVALID: {
doApplyStreamScalarCalculation(pOperator, pBlock, pInfo->pSrcBlock); doApplyStreamScalarCalculation(pOperator, pBlock, pInfo->pSrcBlock);
memcpy(pInfo->pSrcBlock->info.parTbName, pBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
pInfo->srcRowIndex = 0; pInfo->srcRowIndex = 0;
} break; } break;
default: default:
...@@ -1502,6 +1503,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) { ...@@ -1502,6 +1503,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
doStreamFillImpl(pOperator); doStreamFillImpl(pOperator);
doFilter(pInfo->pCondition, pInfo->pRes, pInfo->pColMatchColInfo, NULL); doFilter(pInfo->pCondition, pInfo->pRes, pInfo->pColMatchColInfo, NULL);
memcpy(pInfo->pRes->info.parTbName, pInfo->pSrcBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
pOperator->resultInfo.totalRows += pInfo->pRes->info.rows; pOperator->resultInfo.totalRows += pInfo->pRes->info.rows;
if (pInfo->pRes->info.rows > 0) { if (pInfo->pRes->info.rows > 0) {
break; break;
......
...@@ -4044,7 +4044,7 @@ void doBuildSessionResult(SOperatorInfo* pOperator, SStreamState* pState, SGroup ...@@ -4044,7 +4044,7 @@ void doBuildSessionResult(SOperatorInfo* pOperator, SStreamState* pState, SGroup
// clear the existed group id // clear the existed group id
pBlock->info.groupId = 0; pBlock->info.groupId = 0;
buildSessionResultDataBlock(pTaskInfo, pState, pBlock, &pOperator->exprSupp, pGroupResInfo); buildSessionResultDataBlock(pOperator, pState, pBlock, &pOperator->exprSupp, pGroupResInfo);
} }
static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
...@@ -4088,6 +4088,12 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { ...@@ -4088,6 +4088,12 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
/*printf("\n\n put tbname %s\n\n", pBlock->info.parTbName);*/ /*printf("\n\n put tbname %s\n\n", pBlock->info.parTbName);*/
} }
if (pBlock->info.parTbName[0]) {
taosHashPut(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t), &pBlock->info.parTbName,
TSDB_TABLE_NAME_LEN);
/*printf("\n\n put tbname %s\n\n", pBlock->info.parTbName);*/
}
if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT || if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
pBlock->info.type == STREAM_CLEAR) { pBlock->info.type == STREAM_CLEAR) {
SArray* pWins = taosArrayInit(16, sizeof(SSessionKey)); SArray* pWins = taosArrayInit(16, sizeof(SSessionKey));
...@@ -4617,6 +4623,12 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { ...@@ -4617,6 +4623,12 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
} }
printDataBlock(pBlock, "single state recv"); printDataBlock(pBlock, "single state recv");
if (pBlock->info.parTbName[0]) {
taosHashPut(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t), &pBlock->info.parTbName,
TSDB_TABLE_NAME_LEN);
/*printf("\n\n put tbname %s\n\n", pBlock->info.parTbName);*/
}
if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT || if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
pBlock->info.type == STREAM_CLEAR) { pBlock->info.type == STREAM_CLEAR) {
SArray* pWins = taosArrayInit(16, sizeof(SSessionKey)); SArray* pWins = taosArrayInit(16, sizeof(SSessionKey));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册