diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index 73e487f073ac44d5ddeee0086da203f329b3238d..7a63f87412ef5493d181b27684d70c01b23e78f3 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -52,7 +52,7 @@ typedef struct SExprNode { SArray* pAssociation; } SExprNode; -typedef enum EColumnType { COLUMN_TYPE_COLUMN = 1, COLUMN_TYPE_TAG } EColumnType; +typedef enum EColumnType { COLUMN_TYPE_COLUMN = 1, COLUMN_TYPE_TAG, COLUMN_TYPE_TBNAME } EColumnType; typedef struct SColumnNode { SExprNode node; // QUERY_NODE_COLUMN diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index 5eaca60ea5d8c9f9439a7b9139789622ecad7dbf..8290b5628c0a09cff094232546dfdafdf5ce6af4 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -557,6 +557,14 @@ static SNode* physiSessionCopy(const SSessionWinodwPhysiNode* pSrc, SSessionWino return (SNode*)pDst; } +static SNode* physiPartitionCopy(const SPartitionPhysiNode* pSrc, SPartitionPhysiNode* pDst) { + COPY_BASE_OBJECT_FIELD(node, physiNodeCopy); + CLONE_NODE_LIST_FIELD(pExprs); + CLONE_NODE_LIST_FIELD(pPartitionKeys); + CLONE_NODE_LIST_FIELD(pTargets); + return (SNode*)pDst; +} + static SNode* dataBlockDescCopy(const SDataBlockDescNode* pSrc, SDataBlockDescNode* pDst) { COPY_SCALAR_FIELD(dataBlockId); CLONE_NODE_LIST_FIELD(pSlots); @@ -702,6 +710,8 @@ SNode* nodesCloneNode(const SNode* pNode) { case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION: case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION: return physiSessionCopy((const SSessionWinodwPhysiNode*)pNode, (SSessionWinodwPhysiNode*)pDst); + case QUERY_NODE_PHYSICAL_PLAN_PARTITION: + return physiPartitionCopy((const SPartitionPhysiNode*)pNode, (SPartitionPhysiNode*)pDst); default: break; } diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index c8b78fcc8bf6acfc242a975014ef2d11b38c4d23..43c1f211d303ea78b1d9e06ff9e5631e4085f5d8 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -257,7 +257,7 @@ static int32_t checkAuth(SInsertParseContext* pCxt, char* pDbFname, bool* pPass) if (pBasicCtx->async) { return getUserAuthFromCache(pCxt->pMetaCache, pBasicCtx->pUser, pDbFname, AUTH_TYPE_WRITE, pPass); } - SRequestConnInfo conn = {.pTrans = pBasicCtx->pTransporter, + SRequestConnInfo conn = {.pTrans = pBasicCtx->pTransporter, .requestId = pBasicCtx->requestId, .requestObjRefId = pBasicCtx->requestRid, .mgmtEps = pBasicCtx->mgmtEpSet}; @@ -270,11 +270,11 @@ static int32_t getTableSchema(SInsertParseContext* pCxt, SName* pTbName, bool is if (pBasicCtx->async) { return getTableMetaFromCache(pCxt->pMetaCache, pTbName, pTableMeta); } - SRequestConnInfo conn = {.pTrans = pBasicCtx->pTransporter, + SRequestConnInfo conn = {.pTrans = pBasicCtx->pTransporter, .requestId = pBasicCtx->requestId, .requestObjRefId = pBasicCtx->requestRid, .mgmtEps = pBasicCtx->mgmtEpSet}; - + if (isStb) { return catalogGetSTableMeta(pBasicCtx->pCatalog, &conn, pTbName, pTableMeta); } @@ -286,7 +286,7 @@ static int32_t getTableVgroup(SInsertParseContext* pCxt, SName* pTbName, SVgroup if (pBasicCtx->async) { return getTableVgroupFromCache(pCxt->pMetaCache, pTbName, pVg); } - SRequestConnInfo conn = {.pTrans = pBasicCtx->pTransporter, + SRequestConnInfo conn = {.pTrans = pBasicCtx->pTransporter, .requestId = pBasicCtx->requestId, .requestObjRefId = pBasicCtx->requestRid, .mgmtEps = pBasicCtx->mgmtEpSet}; @@ -322,7 +322,7 @@ static int32_t getDBCfg(SInsertParseContext* pCxt, const char* pDbFName, SDbCfgI if (pBasicCtx->async) { CHECK_CODE(getDbCfgFromCache(pCxt->pMetaCache, pDbFName, pInfo)); } else { - SRequestConnInfo conn = {.pTrans = pBasicCtx->pTransporter, + SRequestConnInfo conn = {.pTrans = pBasicCtx->pTransporter, .requestId = pBasicCtx->requestId, .requestObjRefId = pBasicCtx->requestRid, .mgmtEps = pBasicCtx->mgmtEpSet}; @@ -1315,15 +1315,6 @@ static void destroyInsertParseContext(SInsertParseContext* pCxt) { destroyBlockArrayList(pCxt->pVgDataBlocks); } -static int32_t checkSchemalessDb(SInsertParseContext* pCxt, char* pDbName) { -// SDbCfgInfo pInfo = {0}; -// char fullName[TSDB_TABLE_FNAME_LEN]; -// snprintf(fullName, sizeof(fullName), "%d.%s", pCxt->pComCxt->acctId, pDbName); -// CHECK_CODE(getDBCfg(pCxt, fullName, &pInfo)); -// return pInfo.schemaless ? TSDB_CODE_SML_INVALID_DB_CONF : TSDB_CODE_SUCCESS; - return TSDB_CODE_SUCCESS; -} - // tb_name // [USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)] // [(field1_name, ...)] @@ -1377,8 +1368,6 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) { SName name; CHECK_CODE(createSName(&name, &tbnameToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg)); - CHECK_CODE(checkSchemalessDb(pCxt, name.dbname)); - tNameExtractFullName(&name, tbFName); CHECK_CODE(taosHashPut(pCxt->pTableNameHashObj, tbFName, strlen(tbFName), &name, sizeof(SName))); char dbFName[TSDB_DB_FNAME_LEN]; diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index c7490faa0c28524830f16515e2eb16278b894706..36d2efc13f67ff1e7f81e4a4ab021a29fbad0fd9 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -56,8 +56,12 @@ static EDealRes doRewriteExpr(SNode** pNode, void* pContext) { pCol->node.resType = pToBeRewrittenExpr->resType; strcpy(pCol->node.aliasName, pToBeRewrittenExpr->aliasName); strcpy(pCol->colName, ((SExprNode*)pExpr)->aliasName); - if (QUERY_NODE_FUNCTION == nodeType(pExpr) && FUNCTION_TYPE_WSTARTTS == ((SFunctionNode*)pExpr)->funcType) { - pCol->colId = PRIMARYKEY_TIMESTAMP_COL_ID; + if (QUERY_NODE_FUNCTION == nodeType(pExpr)) { + if (FUNCTION_TYPE_WSTARTTS == ((SFunctionNode*)pExpr)->funcType) { + pCol->colId = PRIMARYKEY_TIMESTAMP_COL_ID; + } else if (FUNCTION_TYPE_TBNAME == ((SFunctionNode*)pExpr)->funcType) { + pCol->colType = COLUMN_TYPE_TBNAME; + } } nodesDestroyNode(*pNode); *pNode = (SNode*)pCol; diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 5249bd913d4208c77a540133be7ed192aa24b0e9..cd3218cf94615649cb96ba157089fb8533dc78a4 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -1042,7 +1042,7 @@ static int32_t smaOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) static EDealRes partTagsOptHasColImpl(SNode* pNode, void* pContext) { if (QUERY_NODE_COLUMN == nodeType(pNode)) { - if (COLUMN_TYPE_TAG != ((SColumnNode*)pNode)->colType) { + if (COLUMN_TYPE_TAG != ((SColumnNode*)pNode)->colType && COLUMN_TYPE_TBNAME != ((SColumnNode*)pNode)->colType) { *(bool*)pContext = true; return DEAL_RES_END; } @@ -1057,9 +1057,9 @@ static bool partTagsOptHasCol(SNodeList* pPartKeys) { } static bool partTagsIsOptimizableNode(SLogicNode* pNode) { - return ((QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode) /*|| + return ((QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode) || (QUERY_NODE_LOGIC_PLAN_AGG == nodeType(pNode) && NULL != ((SAggLogicNode*)pNode)->pGroupKeys && - NULL != ((SAggLogicNode*)pNode)->pAggFuncs)*/) && + NULL != ((SAggLogicNode*)pNode)->pAggFuncs)) && 1 == LIST_LENGTH(pNode->pChildren) && QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(nodesListGetNode(pNode->pChildren, 0))); } @@ -1080,6 +1080,28 @@ static bool partTagsOptMayBeOptimized(SLogicNode* pNode) { return !partTagsOptHasCol(partTagsGetPartKeys(pNode)); } +static EDealRes partTagsOptRebuildTbanmeImpl(SNode** pNode, void* pContext) { + if (QUERY_NODE_COLUMN == nodeType(*pNode) && COLUMN_TYPE_TBNAME == ((SColumnNode*)*pNode)->colType) { + SFunctionNode* pFunc = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION); + if (NULL == pFunc) { + *(int32_t*)pContext = TSDB_CODE_OUT_OF_MEMORY; + return DEAL_RES_ERROR; + } + strcpy(pFunc->functionName, "tbname"); + pFunc->funcType = FUNCTION_TYPE_TBNAME; + nodesDestroyNode(*pNode); + *pNode = (SNode*)pFunc; + return DEAL_RES_IGNORE_CHILD; + } + return DEAL_RES_CONTINUE; +} + +static int32_t partTagsOptRebuildTbanme(SNodeList* pPartKeys) { + int32_t code = TSDB_CODE_SUCCESS; + nodesRewriteExprs(pPartKeys, partTagsOptRebuildTbanmeImpl, &code); + return code; +} + static int32_t partTagsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) { SLogicNode* pNode = optFindPossibleNode(pLogicSubplan->pNode, partTagsOptMayBeOptimized); if (NULL == pNode) { @@ -1096,7 +1118,18 @@ static int32_t partTagsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub nodesDestroyNode((SNode*)pNode); } } else { - TSWAP(((SAggLogicNode*)pNode)->pGroupKeys, pScan->pPartTags); + SNode* pGroupKey = NULL; + FOREACH(pGroupKey, ((SAggLogicNode*)pNode)->pGroupKeys) { + code = nodesListMakeStrictAppend( + &pScan->pPartTags, nodesCloneNode(nodesListGetNode(((SGroupingSetNode*)pGroupKey)->pParameterList, 0))); + if (TSDB_CODE_SUCCESS != code) { + break; + } + } + DESTORY_LIST(((SAggLogicNode*)pNode)->pGroupKeys); + } + if (TSDB_CODE_SUCCESS == code) { + code = partTagsOptRebuildTbanme(pScan->pPartTags); } return code; } @@ -1184,7 +1217,7 @@ static const SOptimizeRule optimizeRuleSet[] = { {.pName = "ConditionPushDown", .optimizeFunc = cpdOptimize}, {.pName = "OrderByPrimaryKey", .optimizeFunc = opkOptimize}, {.pName = "SmaIndex", .optimizeFunc = smaOptimize}, - {.pName = "PartitionByTags", .optimizeFunc = partTagsOptimize}, + // {.pName = "PartitionTags", .optimizeFunc = partTagsOptimize}, {.pName = "EliminateProject", .optimizeFunc = eliminateProjOptimize} }; // clang-format on diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index 390d665760bcda5d5fd0acfecad27ca3bb82193b..29119bf1d2ee59582b87246ab52af1a3f31b1ab2 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -176,7 +176,7 @@ static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) { return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(streamQuery, pNode); case QUERY_NODE_LOGIC_PLAN_WINDOW: { SWindowLogicNode* pWindow = (SWindowLogicNode*)pNode; - if (WINDOW_TYPE_STATE == pWindow->winType || (!streamQuery && WINDOW_TYPE_SESSION == pWindow->winType) ) { + if (WINDOW_TYPE_STATE == pWindow->winType || (!streamQuery && WINDOW_TYPE_SESSION == pWindow->winType)) { return false; } return !stbSplHasGatherExecFunc(pWindow->pFuncs) && stbSplHasMultiTbScan(streamQuery, pNode); @@ -380,6 +380,7 @@ static int32_t stbSplCreateExchangeNode(SSplitContext* pCxt, SLogicNode* pParent SExchangeLogicNode* pExchange = NULL; int32_t code = splCreateExchangeNode(pCxt, pPartChild, &pExchange); if (TSDB_CODE_SUCCESS == code) { + pExchange->node.pParent = pParent; code = nodesListMakeAppend(&pParent->pChildren, (SNode*)pExchange); } return code; @@ -484,7 +485,27 @@ static int32_t stbSplSplitSession(SSplitContext* pCxt, SStableSplitInfo* pInfo) } } -static int32_t stbSplSplitWindowNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) { +static SNodeList* stbSplGetPartKeys(SLogicNode* pNode) { + if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) { + return ((SScanLogicNode*)pNode)->pPartTags; + } else { + return NULL; + } +} + +static bool stbSplIsPartTbanme(SNodeList* pPartKeys) { + if (NULL == pPartKeys || 1 != LIST_LENGTH(pPartKeys)) { + return false; + } + SNode* pPartKey = nodesListGetNode(pPartKeys, 0); + return QUERY_NODE_FUNCTION == nodeType(pPartKey) && FUNCTION_TYPE_TBNAME == ((SFunctionNode*)pPartKey)->funcType; +} + +static bool stbSplIsMultiTableWinodw(SWindowLogicNode* pWindow) { + return stbSplIsPartTbanme(stbSplGetPartKeys((SLogicNode*)nodesListGetNode(pWindow->node.pChildren, 0))); +} + +static int32_t stbSplSplitWindowForMergeTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) { switch (((SWindowLogicNode*)pInfo->pSplitNode)->winType) { case WINDOW_TYPE_INTERVAL: return stbSplSplitInterval(pCxt, pInfo); @@ -496,6 +517,34 @@ static int32_t stbSplSplitWindowNode(SSplitContext* pCxt, SStableSplitInfo* pInf return TSDB_CODE_PLAN_INTERNAL_ERROR; } +static int32_t stbSplSplitWindowForMultiTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) { + if (pCxt->pPlanCxt->streamQuery) { + SPLIT_FLAG_SET_MASK(pInfo->pSubplan->splitFlag, SPLIT_FLAG_STABLE_SPLIT); + return TSDB_CODE_SUCCESS; + } + + SExchangeLogicNode* pExchange = NULL; + int32_t code = splCreateExchangeNode(pCxt, pInfo->pSplitNode, &pExchange); + if (TSDB_CODE_SUCCESS == code) { + code = replaceLogicNode(pInfo->pSubplan, pInfo->pSplitNode, (SLogicNode*)pExchange); + } + if (TSDB_CODE_SUCCESS == code) { + code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren, + (SNode*)splCreateScanSubplan(pCxt, pInfo->pSplitNode, SPLIT_FLAG_STABLE_SPLIT)); + } + pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE; + ++(pCxt->groupId); + return code; +} + +static int32_t stbSplSplitWindowNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) { + if (stbSplIsMultiTableWinodw((SWindowLogicNode*)pInfo->pSplitNode)) { + return stbSplSplitWindowForMultiTable(pCxt, pInfo); + } else { + return stbSplSplitWindowForMergeTable(pCxt, pInfo); + } +} + static int32_t stbSplCreatePartAggNode(SAggLogicNode* pMergeAgg, SLogicNode** pOutput) { SNodeList* pFunc = pMergeAgg->pAggFuncs; pMergeAgg->pAggFuncs = NULL; diff --git a/source/libs/planner/test/planOtherTest.cpp b/source/libs/planner/test/planOtherTest.cpp index 12d14369dedb0b5e5823d52454182148855e3ad2..b8963c29f986823057f7f8e38919c56f3bfe2967 100644 --- a/source/libs/planner/test/planOtherTest.cpp +++ b/source/libs/planner/test/planOtherTest.cpp @@ -37,7 +37,9 @@ TEST_F(PlanOtherTest, createStream) { TEST_F(PlanOtherTest, createStreamUseSTable) { useDb("root", "test"); - run("create stream if not exists s1 as select count(*) from st1 interval(10s)"); + run("CREATE STREAM IF NOT EXISTS s1 as SELECT COUNT(*) FROM st1 INTERVAL(10s)"); + + run("CREATE STREAM IF NOT EXISTS s1 as SELECT COUNT(*) FROM st1 PARTITION BY TBNAME INTERVAL(10s)"); } TEST_F(PlanOtherTest, createSmaIndex) { diff --git a/source/util/src/thash.c b/source/util/src/thash.c index 98762d8fb0e4b48907910e6a7e154ed524f57262..8275072748123a4f4fa28b0e8401b967b0d2b099 100644 --- a/source/util/src/thash.c +++ b/source/util/src/thash.c @@ -31,12 +31,12 @@ #define GET_HASH_NODE_DATA(_n) ((char *)(_n) + sizeof(SHashNode)) #define GET_HASH_PNODE(_n) ((SHashNode *)((char *)(_n) - sizeof(SHashNode))) -#define FREE_HASH_NODE(_fp, _n) \ - do { \ - if (_fp != NULL) { \ - (_fp)(_n); \ - } \ - taosMemoryFreeClear(_n); \ +#define FREE_HASH_NODE(_fp, _n) \ + do { \ + if (_fp != NULL) { \ + (_fp)(GET_HASH_NODE_DATA(_n)); \ + } \ + taosMemoryFreeClear(_n); \ } while (0); struct SHashNode { @@ -56,7 +56,7 @@ typedef struct SHashEntry { } SHashEntry; struct SHashObj { - SHashEntry ** hashList; + SHashEntry **hashList; size_t capacity; // number of slots int64_t size; // number of elements in hash table _hash_fn_t hashFp; // hash function @@ -65,7 +65,7 @@ struct SHashObj { SRWLatch lock; // read-write spin lock SHashLockTypeE type; // lock type bool enableUpdate; // enable update - SArray * pMemBlock; // memory block allocated for SHashEntry + SArray *pMemBlock; // memory block allocated for SHashEntry _hash_before_fn_t callbackFp; // function invoked before return the value to caller }; @@ -633,7 +633,7 @@ void taosHashTableResize(SHashObj *pHashObj) { } int64_t st = taosGetTimestampUs(); - void * pNewEntryList = taosMemoryRealloc(pHashObj->hashList, sizeof(void *) * newCapacity); + void *pNewEntryList = taosMemoryRealloc(pHashObj->hashList, sizeof(void *) * newCapacity); if (pNewEntryList == NULL) { // uDebug("cache resize failed due to out of memory, capacity remain:%zu", pHashObj->capacity); return; @@ -642,7 +642,7 @@ void taosHashTableResize(SHashObj *pHashObj) { pHashObj->hashList = pNewEntryList; size_t inc = newCapacity - pHashObj->capacity; - void * p = taosMemoryCalloc(inc, sizeof(SHashEntry)); + void *p = taosMemoryCalloc(inc, sizeof(SHashEntry)); for (int32_t i = 0; i < inc; ++i) { pHashObj->hashList[i + pHashObj->capacity] = (void *)((char *)p + i * sizeof(SHashEntry)); @@ -653,9 +653,9 @@ void taosHashTableResize(SHashObj *pHashObj) { pHashObj->capacity = newCapacity; for (int32_t idx = 0; idx < pHashObj->capacity; ++idx) { SHashEntry *pe = pHashObj->hashList[idx]; - SHashNode * pNode; - SHashNode * pNext; - SHashNode * pPrev = NULL; + SHashNode *pNode; + SHashNode *pNext; + SHashNode *pPrev = NULL; if (pe->num == 0) { assert(pe->next == NULL);