未验证 提交 48c0358e 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #18073 from taosdata/feature/stream

fix(query): set tbname for stream session delete
...@@ -3860,23 +3860,28 @@ void doBuildDeleteDataBlock(SOperatorInfo* pOp, SSHashObj* pStDeleted, SSDataBlo ...@@ -3860,23 +3860,28 @@ void doBuildDeleteDataBlock(SOperatorInfo* pOp, SSHashObj* pStDeleted, SSDataBlo
colDataAppendNULL(pCalEdCol, pBlock->info.rows); colDataAppendNULL(pCalEdCol, pBlock->info.rows);
SHashObj* pGroupIdTbNameMap = NULL; SHashObj* pGroupIdTbNameMap = NULL;
if (pOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION || if (pOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
pOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
pOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) { pOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
SStreamSessionAggOperatorInfo* pInfo = pOp->info; SStreamSessionAggOperatorInfo* pInfo = pOp->info;
pGroupIdTbNameMap = pInfo->pGroupIdTbNameMap; pGroupIdTbNameMap = pInfo->pGroupIdTbNameMap;
} else if (pOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) { } else if (pOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
SStreamStateAggOperatorInfo* pInfo = pOp->info; SStreamStateAggOperatorInfo* pInfo = pOp->info;
pGroupIdTbNameMap = pInfo->pGroupIdTbNameMap; pGroupIdTbNameMap = pInfo->pGroupIdTbNameMap;
} else {
ASSERT(0);
} }
char* tbname = taosHashGet(pGroupIdTbNameMap, &res->groupId, sizeof(int64_t)); char* tbname = taosHashGet(pGroupIdTbNameMap, &res->groupId, sizeof(int64_t));
SColumnInfoData* pTableCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX); SColumnInfoData* pTableCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
if (tbname == NULL) { if (tbname == NULL) {
/*printf("\n\n no tbname for group id %" PRId64 "%p %p\n\n", res->groupId, pOp->info, pGroupIdTbNameMap);*/
colDataAppendNULL(pTableCol, pBlock->info.rows); colDataAppendNULL(pTableCol, pBlock->info.rows);
} else { } else {
char parTbName[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN]; char parTbName[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN];
STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName)); STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName));
colDataAppend(pTableCol, pBlock->info.rows, (const char*)parTbName, false); colDataAppend(pTableCol, pBlock->info.rows, (const char*)parTbName, false);
/*printf("\n\n get tbname %s group id %" PRId64 "\n\n", tbname, res->groupId);*/
} }
pBlock->info.rows += 1; pBlock->info.rows += 1;
} }
...@@ -4053,7 +4058,8 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { ...@@ -4053,7 +4058,8 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
if (pBlock->info.parTbName[0]) { if (pBlock->info.parTbName[0]) {
taosHashPut(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t), &pBlock->info.parTbName, taosHashPut(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t), &pBlock->info.parTbName,
TSDB_TABLE_NAME_LEN); TSDB_TABLE_NAME_LEN);
/*printf("\n\n put tbname %s\n\n", pBlock->info.parTbName);*/ /*printf("\n\n put tbname %s group id %" PRId64 "\n\n into %p %p", pBlock->info.parTbName, pBlock->info.groupId,*/
/*pInfo, pInfo->pGroupIdTbNameMap);*/
} }
if (pBlock->info.parTbName[0]) { if (pBlock->info.parTbName[0]) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册