提交 2b3ff125 编写于 作者: L Liu Jicong

fix bad merge

上级 eafe8666
...@@ -478,6 +478,7 @@ typedef struct SStreamScanInfo { ...@@ -478,6 +478,7 @@ typedef struct SStreamScanInfo {
uint64_t tableUid; // queried super table uid uint64_t tableUid; // queried super table uid
SExprInfo* pPseudoExpr; SExprInfo* pPseudoExpr;
int32_t numOfPseudoExpr; int32_t numOfPseudoExpr;
SExprSupp tbnameCalSup;
int32_t primaryTsIndex; // primary time stamp slot id int32_t primaryTsIndex; // primary time stamp slot id
SReadHandle readHandle; SReadHandle readHandle;
SInterval interval; // if the upstream is an interval operator, the interval info is also kept here. SInterval interval; // if the upstream is an interval operator, the interval info is also kept here.
......
...@@ -4249,6 +4249,8 @@ int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pStat ...@@ -4249,6 +4249,8 @@ int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pStat
char* tbname = taosHashGet(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t)); char* tbname = taosHashGet(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t));
if (tbname != NULL) { if (tbname != NULL) {
memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN); memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
} else {
pBlock->info.parTbName[0] = 0;
} }
} else { } else {
// current value belongs to different group, it can't be packed into one datablock // current value belongs to different group, it can't be packed into one datablock
......
...@@ -873,7 +873,7 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) { ...@@ -873,7 +873,7 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) {
colDataAppend(pDestCol, pDest->info.rows, pSrcData, isNull); colDataAppend(pDestCol, pDest->info.rows, pSrcData, isNull);
} }
pDest->info.rows++; pDest->info.rows++;
if (i == 0) { if (pInfo->tbnameCalSup.numOfExprs > 0 && i == 0) {
SSDataBlock* pTmpBlock = blockCopyOneRow(pSrc, rowIndex); SSDataBlock* pTmpBlock = blockCopyOneRow(pSrc, rowIndex);
SSDataBlock* pResBlock = createDataBlock(); SSDataBlock* pResBlock = createDataBlock();
pResBlock->info.rowSize = TSDB_TABLE_NAME_LEN; pResBlock->info.rowSize = TSDB_TABLE_NAME_LEN;
...@@ -892,6 +892,9 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) { ...@@ -892,6 +892,9 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) {
} else { } else {
pDest->info.parTbName[0] = 0; pDest->info.parTbName[0] = 0;
} }
/*printf("\n\n set name %s\n\n", pDest->info.parTbName);*/
blockDataDestroy(pTmpBlock);
blockDataDestroy(pResBlock);
} }
} }
blockDataUpdateTsWindow(pDest, pInfo->tsColIndex); blockDataUpdateTsWindow(pDest, pInfo->tsColIndex);
...@@ -915,7 +918,6 @@ static void doStreamHashPartitionImpl(SStreamPartitionOperatorInfo* pInfo, SSDat ...@@ -915,7 +918,6 @@ static void doStreamHashPartitionImpl(SStreamPartitionOperatorInfo* pInfo, SSDat
} else { } else {
SPartitionDataInfo newParData = {0}; SPartitionDataInfo newParData = {0};
newParData.groupId = calcGroupId(pInfo->partitionSup.keyBuf, keyLen); newParData.groupId = calcGroupId(pInfo->partitionSup.keyBuf, keyLen);
/*newParData.tbname = */
newParData.rowIds = taosArrayInit(64, sizeof(int32_t)); newParData.rowIds = taosArrayInit(64, sizeof(int32_t));
taosArrayPush(newParData.rowIds, &i); taosArrayPush(newParData.rowIds, &i);
taosHashPut(pInfo->pPartitions, pInfo->partitionSup.keyBuf, keyLen, &newParData, sizeof(SPartitionDataInfo)); taosHashPut(pInfo->pPartitions, pInfo->partitionSup.keyBuf, keyLen, &newParData, sizeof(SPartitionDataInfo));
......
...@@ -1415,6 +1415,30 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock ...@@ -1415,6 +1415,30 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
} }
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex); blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
blockDataFreeRes((SSDataBlock*)pBlock); blockDataFreeRes((SSDataBlock*)pBlock);
if (pInfo->tbnameCalSup.numOfExprs > 0 && pInfo->pRes->info.rows > 0) {
SSDataBlock* pTmpBlock = blockCopyOneRow(pInfo->pRes, 0);
SSDataBlock* pResBlock = createDataBlock();
pResBlock->info.rowSize = TSDB_TABLE_NAME_LEN;
SColumnInfoData data = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, TSDB_TABLE_NAME_LEN, 0);
taosArrayPush(pResBlock->pDataBlock, &data);
blockDataEnsureCapacity(pResBlock, 1);
projectApplyFunctions(pInfo->tbnameCalSup.pExprInfo, pResBlock, pTmpBlock, pInfo->tbnameCalSup.pCtx, 1, NULL);
ASSERT(pResBlock->info.rows == 1);
ASSERT(taosArrayGetSize(pResBlock->pDataBlock) == 1);
SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, 0);
ASSERT(pCol->info.type == TSDB_DATA_TYPE_VARCHAR);
void* pData = colDataGetData(pCol, 0);
// TODO check tbname validation
if (pData != (void*)-1) {
memcpy(pInfo->pRes->info.parTbName, varDataVal(pData), varDataLen(pData));
} else {
pInfo->pRes->info.parTbName[0] = 0;
}
blockDataDestroy(pTmpBlock);
blockDataDestroy(pResBlock);
}
return 0; return 0;
} }
...@@ -2089,6 +2113,19 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys ...@@ -2089,6 +2113,19 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
} }
} }
if (pTableScanNode->pSubtable != NULL) {
SExprInfo* pSubTableExpr = taosMemoryCalloc(1, sizeof(SExprInfo));
if (pSubTableExpr == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
pInfo->tbnameCalSup.pExprInfo = pSubTableExpr;
createExprFromOneNode(pSubTableExpr, pTableScanNode->pSubtable, 0);
if (initExprSupp(&pInfo->tbnameCalSup, pSubTableExpr, 1) != 0) {
goto _error;
}
}
pInfo->pBlockLists = taosArrayInit(4, POINTER_BYTES); pInfo->pBlockLists = taosArrayInit(4, POINTER_BYTES);
if (pInfo->pBlockLists == NULL) { if (pInfo->pBlockLists == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
......
...@@ -1761,6 +1761,7 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) { ...@@ -1761,6 +1761,7 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) {
nodesDestroyNode((SNode*)pInfo->pPhyNode); nodesDestroyNode((SNode*)pInfo->pPhyNode);
colDataDestroy(&pInfo->twAggSup.timeWindowData); colDataDestroy(&pInfo->twAggSup.timeWindowData);
cleanupGroupResInfo(&pInfo->groupResInfo); cleanupGroupResInfo(&pInfo->groupResInfo);
taosHashCleanup(pInfo->pGroupIdTbNameMap);
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
} }
...@@ -3291,6 +3292,11 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -3291,6 +3292,11 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
} }
printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "interval final recv" : "interval semi recv"); printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "interval final recv" : "interval semi recv");
if (pBlock->info.parTbName[0]) {
taosHashPut(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t), &pBlock->info.parTbName,
TSDB_TABLE_NAME_LEN);
}
ASSERT(pBlock->info.type != STREAM_INVERT); ASSERT(pBlock->info.type != STREAM_INVERT);
if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA) { if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA) {
pInfo->binfo.pRes->info.type = pBlock->info.type; pInfo->binfo.pRes->info.type = pBlock->info.type;
...@@ -3501,6 +3507,9 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, ...@@ -3501,6 +3507,9 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
pInfo->delKey.ts = INT64_MAX; pInfo->delKey.ts = INT64_MAX;
pInfo->delKey.groupId = 0; pInfo->delKey.groupId = 0;
pInfo->pGroupIdTbNameMap =
taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK);
pOperator->operatorType = pPhyNode->type; pOperator->operatorType = pPhyNode->type;
pOperator->blocking = true; pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
...@@ -5666,6 +5675,12 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { ...@@ -5666,6 +5675,12 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
} }
printDataBlock(pBlock, "single interval recv"); printDataBlock(pBlock, "single interval recv");
if (pBlock->info.parTbName[0]) {
taosHashPut(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t), &pBlock->info.parTbName,
TSDB_TABLE_NAME_LEN);
/*printf("\n\n put tbname %s\n\n", pBlock->info.parTbName);*/
}
if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT || if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
pBlock->info.type == STREAM_CLEAR) { pBlock->info.type == STREAM_CLEAR) {
doDeleteWindows(pOperator, &pInfo->interval, pBlock, pInfo->pDelWins, pUpdatedMap); doDeleteWindows(pOperator, &pInfo->interval, pBlock, pInfo->pDelWins, pUpdatedMap);
...@@ -5807,6 +5822,9 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys ...@@ -5807,6 +5822,9 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->delKey.ts = INT64_MAX; pInfo->delKey.ts = INT64_MAX;
pInfo->delKey.groupId = 0; pInfo->delKey.groupId = 0;
pInfo->pGroupIdTbNameMap =
taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK);
pOperator->name = "StreamIntervalOperator"; pOperator->name = "StreamIntervalOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL;
pOperator->blocking = true; pOperator->blocking = true;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册