diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index ad4b59714cbcd83f80a4ad260004310e0740b94f..533bdeb651dcdcd107cd902cd84e3af5108ebff5 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -103,6 +103,7 @@ typedef struct SScanLogicNode { bool hasNormalCols; // neither tag column nor primary key tag column bool sortPrimaryKey; bool igLastNull; + char* stbFullTableName; } SScanLogicNode; typedef struct SJoinLogicNode { @@ -253,6 +254,7 @@ typedef struct SPartitionLogicNode { SNodeList* pPartitionKeys; SNodeList* pTags; SNode* pSubtable; + char* stbFullTableName; } SPartitionLogicNode; typedef enum ESubplanType { @@ -367,6 +369,7 @@ typedef struct STableScanPhysiNode { int8_t igExpired; bool assignBlockUid; int8_t igCheckUpdate; + char stbFullTableName[TSDB_TABLE_NAME_LEN]; } STableScanPhysiNode; typedef STableScanPhysiNode STableSeqScanPhysiNode; @@ -538,6 +541,7 @@ typedef struct SStreamPartitionPhysiNode { SPartitionPhysiNode part; SNodeList* pTags; SNode* pSubtable; + char stbFullTableName[TSDB_TABLE_NAME_LEN]; } SStreamPartitionPhysiNode; typedef struct SDataSinkNode { diff --git a/include/libs/planner/planner.h b/include/libs/planner/planner.h index 41c0e98084a95f34d3ef6fffb094d47c6a9976dc..b35d7d74a84f88a11d77e01880773478f9417dea 100644 --- a/include/libs/planner/planner.h +++ b/include/libs/planner/planner.h @@ -42,6 +42,7 @@ typedef struct SPlanContext { const char* pUser; bool sysInfo; int64_t allocatorId; + char* stbFullTableName; } SPlanContext; // Create the physical plan for the query, according to the AST. diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 8956801bd47aac88e383ae7257e018951bb9bc5b..f165917198f5415c8785de21598512b8d4c5f249 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -239,6 +239,7 @@ typedef struct { void* vnode; // not available to encoder and decoder FTbSink* tbSinkFunc; STSchema* pTSchema; + SSHashObj* pUidInfo; } STaskSinkTb; typedef void FSmaSink(void* vnode, int64_t smaId, const SArray* data); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index df7955771d799b35030bc745df7f220e6920ed43..aa641c9834a791734015f9b196e202cf498a1475 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -392,6 +392,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, .igExpired = pObj->igExpired, .deleteMark = pObj->deleteMark, .igCheckUpdate = pObj->igCheckUpdate, + .stbFullTableName = pObj->targetSTbName, }; // using ast and param to build physical plan diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 21c2ab13ab088768d9277f5d8dd378c5fd50f78f..1c11890bbc86ea0155660e2ac205bd2d9d00335c 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -646,6 +646,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { if (pTask->tbSink.pTSchema == NULL) { return -1; } + pTask->tbSink.pUidInfo = tSimpleHashInit(10240, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT)); } if (pTask->taskLevel == TASK_LEVEL__SOURCE) { diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 9373f23c0292e37a5f91739382997772399ff974..00beb12b92c56a32b04f432982345e050d22ff71 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -90,6 +90,15 @@ end: return ret; } +int32_t getTblUid(SSHashObj* tblInfo ,char* tbName, int64_t* uid) { + void* pVal = tSimpleHashGet(tblInfo, tbName, strlen(tbName)); + if (pVal) { + *uid = *(int64_t*)pVal; + return TSDB_CODE_SUCCESS; + } + return TSDB_CODE_FAILED; +} + int32_t tqPutReqToQueue(SVnode* pVnode, SVCreateTbBatchReq* pReqs) { void* buf = NULL; int32_t tlen = 0; @@ -261,99 +270,94 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d tbData.uid = 0; // uid is assigned by vnode tbData.sver = pTSchema->version; - char* ctbName = NULL; - tqDebug("vgId:%d, stream write into %s, table auto created", TD_VID(pVnode), pDataBlock->info.parTbName); - if (pDataBlock->info.parTbName[0]) { - ctbName = taosStrdup(pDataBlock->info.parTbName); - } else { - ctbName = buildCtbNameByGroupId(stbFullName, pDataBlock->info.id.groupId); + char* ctbName = pDataBlock->info.parTbName; + if (!pDataBlock->info.parTbName[0]) { + char* tmp = buildCtbNameByGroupId(stbFullName, pDataBlock->info.id.groupId); + memcpy(ctbName, tmp, strlen(tmp)); + taosMemoryFree(tmp); + tqError("vgId:%d, gropuid:%" PRIu64 " datablock tabel name is null", TD_VID(pVnode), pDataBlock->info.id.groupId); } - SMetaReader mr = {0}; - metaReaderInit(&mr, pVnode->pMeta, 0); - if (metaGetTableEntryByName(&mr, ctbName) < 0) { - metaReaderClear(&mr); - tqDebug("vgId:%d, stream write into %s, table auto created", TD_VID(pVnode), ctbName); + int32_t res = getTblUid(pTask->tbSink.pUidInfo, ctbName, &tbData.uid); + if (res != TSDB_CODE_SUCCESS) { + SMetaReader mr = {0}; + metaReaderInit(&mr, pVnode->pMeta, 0); + if (metaGetTableEntryByName(&mr, ctbName) < 0) { + metaReaderClear(&mr); + tqDebug("vgId:%d, stream write into %s, table auto created", TD_VID(pVnode), ctbName); - SVCreateTbReq* pCreateTbReq = NULL; + SVCreateTbReq* pCreateTbReq = NULL; - if (!(pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateStbReq)))) { - taosMemoryFree(ctbName); - goto _end; - }; + if (!(pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateStbReq)))) { + goto _end; + }; - // set const - pCreateTbReq->flags = 0; - pCreateTbReq->type = TSDB_CHILD_TABLE; - pCreateTbReq->ctb.suid = suid; + // set const + pCreateTbReq->flags = 0; + pCreateTbReq->type = TSDB_CHILD_TABLE; + pCreateTbReq->ctb.suid = suid; - // set super table name - SName name = {0}; - tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); - pCreateTbReq->ctb.stbName = taosStrdup((char*)tNameGetTableName(&name)); // taosStrdup(stbFullName); + // set super table name + SName name = {0}; + tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); + pCreateTbReq->ctb.stbName = taosStrdup((char*)tNameGetTableName(&name)); // taosStrdup(stbFullName); - // set tag content - tagArray = taosArrayInit(1, sizeof(STagVal)); - if (!tagArray) { - taosMemoryFree(ctbName); - tdDestroySVCreateTbReq(pCreateTbReq); - goto _end; - } - STagVal tagVal = { - .cid = pTSchema->numOfCols + 1, - .type = TSDB_DATA_TYPE_UBIGINT, - .i64 = (int64_t)pDataBlock->info.id.groupId, - }; - taosArrayPush(tagArray, &tagVal); - pCreateTbReq->ctb.tagNum = taosArrayGetSize(tagArray); + // set tag content + tagArray = taosArrayInit(1, sizeof(STagVal)); + if (!tagArray) { + tdDestroySVCreateTbReq(pCreateTbReq); + goto _end; + } + STagVal tagVal = { + .cid = pTSchema->numOfCols + 1, + .type = TSDB_DATA_TYPE_UBIGINT, + .i64 = (int64_t)pDataBlock->info.id.groupId, + }; + taosArrayPush(tagArray, &tagVal); + pCreateTbReq->ctb.tagNum = taosArrayGetSize(tagArray); - STag* pTag = NULL; - tTagNew(tagArray, 1, false, &pTag); - tagArray = taosArrayDestroy(tagArray); - if (pTag == NULL) { - taosMemoryFree(ctbName); - tdDestroySVCreateTbReq(pCreateTbReq); - terrno = TSDB_CODE_OUT_OF_MEMORY; - taosMemoryFree(ctbName); - tdDestroySVCreateTbReq(pCreateTbReq); - goto _end; - } - pCreateTbReq->ctb.pTag = (uint8_t*)pTag; + STag* pTag = NULL; + tTagNew(tagArray, 1, false, &pTag); + tagArray = taosArrayDestroy(tagArray); + if (pTag == NULL) { + tdDestroySVCreateTbReq(pCreateTbReq); + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _end; + } + pCreateTbReq->ctb.pTag = (uint8_t*)pTag; - // set tag name - SArray* tagName = taosArrayInit(1, TSDB_COL_NAME_LEN); - char tagNameStr[TSDB_COL_NAME_LEN] = {0}; - strcpy(tagNameStr, "group_id"); - taosArrayPush(tagName, tagNameStr); - pCreateTbReq->ctb.tagName = tagName; + // set tag name + SArray* tagName = taosArrayInit(1, TSDB_COL_NAME_LEN); + char tagNameStr[TSDB_COL_NAME_LEN] = {0}; + strcpy(tagNameStr, "group_id"); + taosArrayPush(tagName, tagNameStr); + pCreateTbReq->ctb.tagName = tagName; - // set table name - pCreateTbReq->name = ctbName; - ctbName = NULL; - - tbData.pCreateTbReq = pCreateTbReq; - tbData.flags = SUBMIT_REQ_AUTO_CREATE_TABLE; - } else { - if (mr.me.type != TSDB_CHILD_TABLE) { - tqError("vgId:%d, failed to write into %s, since table type incorrect, type %d", TD_VID(pVnode), ctbName, - mr.me.type); - metaReaderClear(&mr); - taosMemoryFree(ctbName); - continue; - } + // set table name + pCreateTbReq->name = taosStrdup(ctbName); + + tbData.pCreateTbReq = pCreateTbReq; + tbData.flags = SUBMIT_REQ_AUTO_CREATE_TABLE; + } else { + if (mr.me.type != TSDB_CHILD_TABLE) { + tqError("vgId:%d, failed to write into %s, since table type incorrect, type %d", TD_VID(pVnode), ctbName, + mr.me.type); + metaReaderClear(&mr); + continue; + } - if (mr.me.ctbEntry.suid != suid) { - tqError("vgId:%d, failed to write into %s, since suid mismatch, expect suid: %" PRId64 - ", actual suid %" PRId64 "", - TD_VID(pVnode), ctbName, suid, mr.me.ctbEntry.suid); + if (mr.me.ctbEntry.suid != suid) { + tqError("vgId:%d, failed to write into %s, since suid mismatch, expect suid: %" PRId64 + ", actual suid %" PRId64 "", + TD_VID(pVnode), ctbName, suid, mr.me.ctbEntry.suid); + metaReaderClear(&mr); + continue; + } + + tbData.uid = mr.me.uid; + tSimpleHashPut(pTask->tbSink.pUidInfo, ctbName, strlen(ctbName), &tbData.uid, sizeof(int64_t)); metaReaderClear(&mr); - taosMemoryFree(ctbName); - continue; } - - tbData.uid = mr.me.uid; - metaReaderClear(&mr); - taosMemoryFreeClear(ctbName); } // rows diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index c9d0eebe2bf3308eaf374691f5a9923c1035cbb9..fdeae13a444c271f91b320df3a77474f6ba0f4d3 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -366,6 +366,7 @@ typedef struct SStreamScanInfo { int8_t igCheckUpdate; int8_t igExpired; SStreamState* pState; + char stbFullName[TSDB_TABLE_FNAME_LEN]; } SStreamScanInfo; typedef struct { @@ -525,6 +526,7 @@ typedef struct SStreamPartitionOperatorInfo { int32_t tsColIndex; SSDataBlock* pDelRes; SSDataBlock* pCreateTbRes; + char stbFullName[TSDB_TABLE_FNAME_LEN]; } SStreamPartitionOperatorInfo; typedef struct SStreamFillSupporter { @@ -657,7 +659,7 @@ void getNextIntervalWindow(SInterval* pInterval, STimeWindow* tw, int32_t ord int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int32_t pos, int32_t order, int64_t* pData); void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp* pTagSup, uint64_t groupId, - SSDataBlock* pSrcBlock, int32_t rowId, SSDataBlock* pDestBlock); + SSDataBlock* pSrcBlock, int32_t rowId, SSDataBlock* pDestBlock, char* stbFullName); SSDataBlock* buildCreateTableBlock(SExprSupp* tbName, SExprSupp* tag); SExprInfo* createExpr(SNodeList* pNodeList, int32_t* numOfExprs); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index c51dc39b5b06bfa7eda2481905a8a8427714fe68..3160925abcadbba18e6b3a80acd50b2da0acefd3 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1486,7 +1486,7 @@ static int32_t setSelectValueColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutpu for (int32_t i = 0; i < numOfOutput; ++i) { const char* pName = pCtx[i].pExpr->pExpr->_function.functionName; - if ((strcmp(pName, "_select_value") == 0) || (strcmp(pName, "_group_key") == 0)) { + if (strcmp(pName, "_select_value") == 0) { pValCtx[num++] = &pCtx[i]; } else if (fmIsSelectFunc(pCtx[i].functionId)) { p = &pCtx[i]; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 280edf8b53d89ef05c7cd4f08060a51312754ca7..b6b8d4b5aeb6593d3f93d4bf8a812b86b900b842 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -991,46 +991,48 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) { } void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp* pTagSup, uint64_t groupId, - SSDataBlock* pSrcBlock, int32_t rowId, SSDataBlock* pDestBlock) { + SSDataBlock* pSrcBlock, int32_t rowId, SSDataBlock* pDestBlock, char* stbFullName) { void* pValue = NULL; if (streamStateGetParName(pState, groupId, &pValue) != 0) { SSDataBlock* pTmpBlock = blockCopyOneRow(pSrcBlock, rowId); memset(pTmpBlock->info.parTbName, 0, TSDB_TABLE_NAME_LEN); pTmpBlock->info.id.groupId = groupId; char* tbName = pSrcBlock->info.parTbName; + int32_t len = 0; if (pTableSup->numOfExprs > 0) { projectApplyFunctions(pTableSup->pExprInfo, pDestBlock, pTmpBlock, pTableSup->pCtx, pTableSup->numOfExprs, NULL); SColumnInfoData* pTbCol = taosArrayGet(pDestBlock->pDataBlock, UD_TABLE_NAME_COLUMN_INDEX); memset(tbName, 0, TSDB_TABLE_NAME_LEN); - int32_t len = 0; if (colDataIsNull_s(pTbCol, pDestBlock->info.rows - 1)) { - len = 1; - tbName[0] = 0; + char* pTbName = buildCtbNameByGroupId(stbFullName, groupId); + len = TMIN(strlen(pTbName), TSDB_TABLE_NAME_LEN - 1); + memcpy(tbName, pTbName, len); } else { void* pData = colDataGetData(pTbCol, pDestBlock->info.rows - 1); len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN - 1); memcpy(tbName, varDataVal(pData), len); - streamStatePutParName(pState, groupId, tbName); } - memcpy(pTmpBlock->info.parTbName, tbName, len); pDestBlock->info.rows--; } else { void* pTbNameCol = taosArrayGet(pDestBlock->pDataBlock, UD_TABLE_NAME_COLUMN_INDEX); colDataSetNULL(pTbNameCol, pDestBlock->info.rows); - tbName[0] = 0; + char* pTbName = buildCtbNameByGroupId(stbFullName, groupId); + len = TMIN(strlen(pTbName), TSDB_TABLE_NAME_LEN - 1); + memcpy(tbName, pTbName, len); } if (pTagSup->numOfExprs > 0) { projectApplyFunctions(pTagSup->pExprInfo, pDestBlock, pTmpBlock, pTagSup->pCtx, pTagSup->numOfExprs, NULL); pDestBlock->info.rows--; - } else { - memcpy(pDestBlock->info.parTbName, pTmpBlock->info.parTbName, TSDB_TABLE_NAME_LEN); } void* pGpIdCol = taosArrayGet(pDestBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX); colDataAppend(pGpIdCol, pDestBlock->info.rows, (const char*)&groupId, false); pDestBlock->info.rows++; blockDataDestroy(pTmpBlock); + pDestBlock->info.id.groupId = groupId; + memcpy(pDestBlock->info.parTbName, tbName, len); + streamStatePutParName(pState, groupId, tbName); } else { memcpy(pSrcBlock->info.parTbName, pValue, TSDB_TABLE_NAME_LEN); } @@ -1039,8 +1041,7 @@ void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp* static SSDataBlock* buildStreamCreateTableResult(SOperatorInfo* pOperator) { SStreamPartitionOperatorInfo* pInfo = pOperator->info; - if ((pInfo->tbnameCalSup.numOfExprs == 0 && pInfo->tagCalSup.numOfExprs == 0) || - taosHashGetSize(pInfo->pPartitions) == 0) { + if (taosHashGetSize(pInfo->pPartitions) == 0) { return NULL; } blockDataCleanup(pInfo->pCreateTbRes); @@ -1051,7 +1052,7 @@ static SSDataBlock* buildStreamCreateTableResult(SOperatorInfo* pOperator) { SPartitionDataInfo* pParInfo = (SPartitionDataInfo*)pInfo->pTbNameIte; int32_t rowId = *(int32_t*)taosArrayGet(pParInfo->rowIds, 0); appendCreateTableRow(pOperator->pTaskInfo->streamInfo.pState, &pInfo->tbnameCalSup, &pInfo->tagCalSup, - pParInfo->groupId, pSrc, rowId, pInfo->pCreateTbRes); + pParInfo->groupId, pSrc, rowId, pInfo->pCreateTbRes, pInfo->stbFullName); pInfo->pTbNameIte = taosHashIterate(pInfo->pPartitions, pInfo->pTbNameIte); } return pInfo->pCreateTbRes->info.rows > 0 ? pInfo->pCreateTbRes : NULL; @@ -1120,6 +1121,7 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) { } break; default: ASSERTS(pBlock->info.type == STREAM_CREATE_CHILD_TABLE || pBlock->info.type == STREAM_RETRIEVE, "invalid SSDataBlock type"); + printDataBlock(pBlock, "stream partitionby"); return pBlock; } @@ -1265,11 +1267,7 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr } } - if (pInfo->tbnameCalSup.numOfExprs != 0 || pInfo->tagCalSup.numOfExprs != 0) { - pInfo->pCreateTbRes = buildCreateTableBlock(&pInfo->tbnameCalSup, &pInfo->tagCalSup); - } else { - pInfo->pCreateTbRes = NULL; - } + pInfo->pCreateTbRes = buildCreateTableBlock(&pInfo->tbnameCalSup, &pInfo->tagCalSup); int32_t keyLen = 0; code = initGroupOptrInfo(&pInfo->partitionSup.pGroupColVals, &keyLen, &pInfo->partitionSup.keyBuf, @@ -1295,6 +1293,7 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr pInfo->pPartitions = taosHashInit(1024, hashFn, false, HASH_NO_LOCK); pInfo->tsColIndex = 0; pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT); + memcpy(pInfo->stbFullName, pPartNode->stbFullTableName, strlen(pPartNode->stbFullTableName)); int32_t numOfCols = 0; SExprInfo* pExprInfo = createExprInfo(pPartNode->part.pTargets, NULL, &numOfCols); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 5a539106b3060f0372163c2706123f6db493385f..ffd2f5c086046dcc5c8c50ec7484b96dd04dd7f3 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1465,11 +1465,9 @@ static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, static void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock) { SExprSupp* pTbNameCalSup = &pInfo->tbnameCalSup; blockDataCleanup(pInfo->pCreateTbRes); - if (pInfo->tbnameCalSup.numOfExprs == 0 && pInfo->tagCalSup.numOfExprs == 0) { - pBlock->info.parTbName[0] = 0; - } else { + if (!pInfo->partitionSup.needCalc) { appendCreateTableRow(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, &pInfo->tbnameCalSup, &pInfo->tagCalSup, - pBlock->info.id.groupId, pBlock, 0, pInfo->pCreateTbRes); + pBlock->info.id.groupId, pBlock, 0, pInfo->pCreateTbRes, pInfo->stbFullName); } } @@ -1583,7 +1581,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock // currently only the tbname pseudo column if (pInfo->numOfPseudoExpr > 0) { int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes, - pInfo->pRes->info.rows, GET_TASKID(pTaskInfo), NULL); + pInfo->pRes->info.rows, GET_TASKID(pTaskInfo), &pTableScanInfo->base.metaCache); // ignore the table not exists error, since this table may have been dropped during the scan procedure. if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_PAR_TABLE_NOT_EXIST) { blockDataFreeRes((SSDataBlock*)pBlock); @@ -2486,6 +2484,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys pInfo->igExpired = pTableScanNode->igExpired; pInfo->twAggSup.maxTs = INT64_MIN; pInfo->pState = NULL; + memcpy(pInfo->stbFullName, pTableScanNode->stbFullTableName, strlen(pTableScanNode->stbFullTableName)); // for stream if (pTaskInfo->streamInfo.pState) { diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index ca3165bf92bd6ce6faa6905a8c9eb541bb3a77b2..dc58ee499ff8b492d64f0fbd5ce63690d11fb0c3 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -3457,6 +3457,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { getAllSessionWindow(pAggSup->pResultRows, pInfo->pStUpdated); continue; } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) { + printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "final session" : "single session"); return pBlock; } else { ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type"); diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index d9a4c5178fab59c5fb481fb8a67206a54fb54a07..5e409d68df5dd87a6d7807b6419df71ca2c32ec7 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -393,6 +393,7 @@ static int32_t logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) { CLONE_NODE_LIST_FIELD(pTags); CLONE_NODE_FIELD(pSubtable); COPY_SCALAR_FIELD(igLastNull); + COPY_SCALAR_FIELD(stbFullTableName); return TSDB_CODE_SUCCESS; } @@ -508,6 +509,7 @@ static int32_t logicPartitionCopy(const SPartitionLogicNode* pSrc, SPartitionLog CLONE_NODE_LIST_FIELD(pPartitionKeys); CLONE_NODE_LIST_FIELD(pTags); CLONE_NODE_FIELD(pSubtable); + COPY_SCALAR_FIELD(stbFullTableName); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 919be78125ec33f1121dd61711554b6800400f53..1e1c3f89d83c73e2e08501632e925eb0f36d70d0 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -1651,6 +1651,7 @@ static const char* jkTableScanPhysiPlanTags = "Tags"; static const char* jkTableScanPhysiPlanSubtable = "Subtable"; static const char* jkTableScanPhysiPlanAssignBlockUid = "AssignBlockUid"; static const char* jkTableScanPhysiPlanIgnoreUpdate = "IgnoreUpdate"; +static const char* jkTableScanPhysiPlanStbFullTableName = "stbFullTableName"; static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) { const STableScanPhysiNode* pNode = (const STableScanPhysiNode*)pObj; @@ -1719,6 +1720,9 @@ static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddIntegerToObject(pJson, jkTableScanPhysiPlanIgnoreUpdate, pNode->igCheckUpdate); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddStringToObject(pJson, jkTableScanPhysiPlanStbFullTableName, pNode->stbFullTableName); + } return code; } @@ -1790,6 +1794,9 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = tjsonGetTinyIntValue(pJson, jkTableScanPhysiPlanIgnoreUpdate, &pNode->igCheckUpdate); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetStringValue(pJson, jkTableScanPhysiPlanStbFullTableName, pNode->stbFullTableName); + } return code; } @@ -2454,6 +2461,7 @@ static int32_t jsonToPhysiPartitionNode(const SJson* pJson, void* pObj) { static const char* jkStreamPartitionPhysiPlanTags = "Tags"; static const char* jkStreamPartitionPhysiPlanSubtable = "Subtable"; +static const char* jkStreamPartitionPhysiPlanStbFullTableName = "StbFullTableName"; static int32_t physiStreamPartitionNodeToJson(const void* pObj, SJson* pJson) { const SStreamPartitionPhysiNode* pNode = (const SStreamPartitionPhysiNode*)pObj; @@ -2465,6 +2473,9 @@ static int32_t physiStreamPartitionNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddObject(pJson, jkStreamPartitionPhysiPlanSubtable, nodeToJson, pNode->pSubtable); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddStringToObject(pJson, jkStreamPartitionPhysiPlanStbFullTableName, pNode->stbFullTableName); + } return code; } @@ -2479,6 +2490,9 @@ static int32_t jsonToPhysiStreamPartitionNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeObject(pJson, jkStreamPartitionPhysiPlanSubtable, &pNode->pSubtable); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetStringValue(pJson, jkStreamPartitionPhysiPlanStbFullTableName, pNode->stbFullTableName); + } return code; } diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 66b478004f9dfbc3040047b7ea5d36a37bb95b95..4f964362d12f334d01db5a6ed8cc152e5da8dbca 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -349,6 +349,7 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect pScan->deleteMark = pCxt->pPlanCxt->deleteMark; pScan->igExpired = pCxt->pPlanCxt->igExpired; pScan->igCheckUpdate = pCxt->pPlanCxt->igCheckUpdate; + pScan->stbFullTableName = pCxt->pPlanCxt->stbFullTableName; } // set columns to scan @@ -1152,6 +1153,7 @@ static int32_t createPartitionLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pS } else { nodesDestroyNode((SNode*)pPartition); } + pPartition->stbFullTableName = pCxt->pPlanCxt->stbFullTableName; return code; } diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index e2c2e4c65563e3b4a49add42d73c004f1ea39bb7..2cfe477593bfc78c0e92a047e111aee768e83b88 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -584,6 +584,9 @@ static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubp pTableScan->igExpired = pScanLogicNode->igExpired; pTableScan->igCheckUpdate = pScanLogicNode->igCheckUpdate; pTableScan->assignBlockUid = pCxt->pPlanCxt->rSmaQuery ? true : false; + if (pScanLogicNode->stbFullTableName) { + strcpy(pTableScan->stbFullTableName, pScanLogicNode->stbFullTableName); + } int32_t code = createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pTableScan, pPhyNode); if (TSDB_CODE_SUCCESS == code) { @@ -1462,6 +1465,9 @@ static int32_t createStreamPartitionPhysiNode(SPhysiPlanContext* pCxt, SNodeList if (TSDB_CODE_SUCCESS == code) { code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pPartLogicNode->pSubtable, &pPart->pSubtable); } + if (pPartLogicNode->stbFullTableName) { + strcpy(pPart->stbFullTableName, pPartLogicNode->stbFullTableName); + } if (TSDB_CODE_SUCCESS == code) { *pPhyNode = (SPhysiNode*)pPart; } else { diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index abd14801a81a55fb66c287562552a5c0f4b41c5a..a19cf7e62f640af9d2f5abf3498a53152678bae5 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -1046,7 +1046,6 @@ _end: } int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) { - qWarn("try to write to cf parname"); #ifdef USE_ROCKSDB if (tSimpleHashGetSize(pState->parNameMap) > MAX_TABLE_NAME_NUM) { if (tSimpleHashGet(pState->parNameMap, &groupId, sizeof(int64_t)) == NULL) {