提交 896c1b3a 编写于 作者: X Xiaoyu Wang

feat: optimize partition by tbname

上级 aca91eb8
......@@ -52,7 +52,7 @@ typedef struct SExprNode {
SArray* pAssociation;
} SExprNode;
typedef enum EColumnType { COLUMN_TYPE_COLUMN = 1, COLUMN_TYPE_TAG } EColumnType;
typedef enum EColumnType { COLUMN_TYPE_COLUMN = 1, COLUMN_TYPE_TAG, COLUMN_TYPE_TBNAME } EColumnType;
typedef struct SColumnNode {
SExprNode node; // QUERY_NODE_COLUMN
......
......@@ -257,7 +257,7 @@ static int32_t checkAuth(SInsertParseContext* pCxt, char* pDbFname, bool* pPass)
if (pBasicCtx->async) {
return getUserAuthFromCache(pCxt->pMetaCache, pBasicCtx->pUser, pDbFname, AUTH_TYPE_WRITE, pPass);
}
SRequestConnInfo conn = {.pTrans = pBasicCtx->pTransporter,
SRequestConnInfo conn = {.pTrans = pBasicCtx->pTransporter,
.requestId = pBasicCtx->requestId,
.requestObjRefId = pBasicCtx->requestRid,
.mgmtEps = pBasicCtx->mgmtEpSet};
......@@ -270,11 +270,11 @@ static int32_t getTableSchema(SInsertParseContext* pCxt, SName* pTbName, bool is
if (pBasicCtx->async) {
return getTableMetaFromCache(pCxt->pMetaCache, pTbName, pTableMeta);
}
SRequestConnInfo conn = {.pTrans = pBasicCtx->pTransporter,
SRequestConnInfo conn = {.pTrans = pBasicCtx->pTransporter,
.requestId = pBasicCtx->requestId,
.requestObjRefId = pBasicCtx->requestRid,
.mgmtEps = pBasicCtx->mgmtEpSet};
if (isStb) {
return catalogGetSTableMeta(pBasicCtx->pCatalog, &conn, pTbName, pTableMeta);
}
......@@ -286,7 +286,7 @@ static int32_t getTableVgroup(SInsertParseContext* pCxt, SName* pTbName, SVgroup
if (pBasicCtx->async) {
return getTableVgroupFromCache(pCxt->pMetaCache, pTbName, pVg);
}
SRequestConnInfo conn = {.pTrans = pBasicCtx->pTransporter,
SRequestConnInfo conn = {.pTrans = pBasicCtx->pTransporter,
.requestId = pBasicCtx->requestId,
.requestObjRefId = pBasicCtx->requestRid,
.mgmtEps = pBasicCtx->mgmtEpSet};
......@@ -322,7 +322,7 @@ static int32_t getDBCfg(SInsertParseContext* pCxt, const char* pDbFName, SDbCfgI
if (pBasicCtx->async) {
CHECK_CODE(getDbCfgFromCache(pCxt->pMetaCache, pDbFName, pInfo));
} else {
SRequestConnInfo conn = {.pTrans = pBasicCtx->pTransporter,
SRequestConnInfo conn = {.pTrans = pBasicCtx->pTransporter,
.requestId = pBasicCtx->requestId,
.requestObjRefId = pBasicCtx->requestRid,
.mgmtEps = pBasicCtx->mgmtEpSet};
......@@ -1315,15 +1315,6 @@ static void destroyInsertParseContext(SInsertParseContext* pCxt) {
destroyBlockArrayList(pCxt->pVgDataBlocks);
}
static int32_t checkSchemalessDb(SInsertParseContext* pCxt, char* pDbName) {
// SDbCfgInfo pInfo = {0};
// char fullName[TSDB_TABLE_FNAME_LEN];
// snprintf(fullName, sizeof(fullName), "%d.%s", pCxt->pComCxt->acctId, pDbName);
// CHECK_CODE(getDBCfg(pCxt, fullName, &pInfo));
// return pInfo.schemaless ? TSDB_CODE_SML_INVALID_DB_CONF : TSDB_CODE_SUCCESS;
return TSDB_CODE_SUCCESS;
}
// tb_name
// [USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)]
// [(field1_name, ...)]
......@@ -1377,8 +1368,6 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
SName name;
CHECK_CODE(createSName(&name, &tbnameToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg));
CHECK_CODE(checkSchemalessDb(pCxt, name.dbname));
tNameExtractFullName(&name, tbFName);
CHECK_CODE(taosHashPut(pCxt->pTableNameHashObj, tbFName, strlen(tbFName), &name, sizeof(SName)));
char dbFName[TSDB_DB_FNAME_LEN];
......
......@@ -56,8 +56,12 @@ static EDealRes doRewriteExpr(SNode** pNode, void* pContext) {
pCol->node.resType = pToBeRewrittenExpr->resType;
strcpy(pCol->node.aliasName, pToBeRewrittenExpr->aliasName);
strcpy(pCol->colName, ((SExprNode*)pExpr)->aliasName);
if (QUERY_NODE_FUNCTION == nodeType(pExpr) && FUNCTION_TYPE_WSTARTTS == ((SFunctionNode*)pExpr)->funcType) {
pCol->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
if (QUERY_NODE_FUNCTION == nodeType(pExpr)) {
if (FUNCTION_TYPE_WSTARTTS == ((SFunctionNode*)pExpr)->funcType) {
pCol->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
} else if (FUNCTION_TYPE_TBNAME == ((SFunctionNode*)pExpr)->funcType) {
pCol->colType = COLUMN_TYPE_TBNAME;
}
}
nodesDestroyNode(*pNode);
*pNode = (SNode*)pCol;
......
......@@ -1042,7 +1042,7 @@ static int32_t smaOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan)
static EDealRes partTagsOptHasColImpl(SNode* pNode, void* pContext) {
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
if (COLUMN_TYPE_TAG != ((SColumnNode*)pNode)->colType) {
if (COLUMN_TYPE_TAG != ((SColumnNode*)pNode)->colType && COLUMN_TYPE_TBNAME != ((SColumnNode*)pNode)->colType) {
*(bool*)pContext = true;
return DEAL_RES_END;
}
......@@ -1080,6 +1080,28 @@ static bool partTagsOptMayBeOptimized(SLogicNode* pNode) {
return !partTagsOptHasCol(partTagsGetPartKeys(pNode));
}
static EDealRes partTagsOptRebuildTbanmeImpl(SNode** pNode, void* pContext) {
if (QUERY_NODE_COLUMN == nodeType(*pNode) && COLUMN_TYPE_TBNAME == ((SColumnNode*)*pNode)->colType) {
SFunctionNode* pFunc = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION);
if (NULL == pFunc) {
*(int32_t*)pContext = TSDB_CODE_OUT_OF_MEMORY;
return DEAL_RES_ERROR;
}
strcpy(pFunc->functionName, "tbname");
pFunc->funcType = FUNCTION_TYPE_TBNAME;
nodesDestroyNode(*pNode);
*pNode = (SNode*)pFunc;
return DEAL_RES_IGNORE_CHILD;
}
return DEAL_RES_CONTINUE;
}
static int32_t partTagsOptRebuildTbanme(SNodeList* pPartKeys) {
int32_t code = TSDB_CODE_SUCCESS;
nodesRewriteExprs(pPartKeys, partTagsOptRebuildTbanmeImpl, &code);
return code;
}
static int32_t partTagsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) {
SLogicNode* pNode = optFindPossibleNode(pLogicSubplan->pNode, partTagsOptMayBeOptimized);
if (NULL == pNode) {
......@@ -1096,7 +1118,18 @@ static int32_t partTagsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub
nodesDestroyNode((SNode*)pNode);
}
} else {
TSWAP(((SAggLogicNode*)pNode)->pGroupKeys, pScan->pPartTags);
SNode* pGroupKey = NULL;
FOREACH(pGroupKey, ((SAggLogicNode*)pNode)->pGroupKeys) {
code = nodesListMakeStrictAppend(
&pScan->pPartTags, nodesCloneNode(nodesListGetNode(((SGroupingSetNode*)pGroupKey)->pParameterList, 0)));
if (TSDB_CODE_SUCCESS != code) {
break;
}
}
DESTORY_LIST(((SAggLogicNode*)pNode)->pGroupKeys);
}
if (TSDB_CODE_SUCCESS == code) {
code = partTagsOptRebuildTbanme(pScan->pPartTags);
}
return code;
}
......@@ -1184,7 +1217,7 @@ static const SOptimizeRule optimizeRuleSet[] = {
{.pName = "ConditionPushDown", .optimizeFunc = cpdOptimize},
{.pName = "OrderByPrimaryKey", .optimizeFunc = opkOptimize},
{.pName = "SmaIndex", .optimizeFunc = smaOptimize},
{.pName = "PartitionByTags", .optimizeFunc = partTagsOptimize},
{.pName = "PartitionTags", .optimizeFunc = partTagsOptimize},
{.pName = "EliminateProject", .optimizeFunc = eliminateProjOptimize}
};
// clang-format on
......
......@@ -176,7 +176,7 @@ static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) {
return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(streamQuery, pNode);
case QUERY_NODE_LOGIC_PLAN_WINDOW: {
SWindowLogicNode* pWindow = (SWindowLogicNode*)pNode;
if (WINDOW_TYPE_STATE == pWindow->winType || (!streamQuery && WINDOW_TYPE_SESSION == pWindow->winType) ) {
if (WINDOW_TYPE_STATE == pWindow->winType || (!streamQuery && WINDOW_TYPE_SESSION == pWindow->winType)) {
return false;
}
return !stbSplHasGatherExecFunc(pWindow->pFuncs) && stbSplHasMultiTbScan(streamQuery, pNode);
......@@ -380,6 +380,7 @@ static int32_t stbSplCreateExchangeNode(SSplitContext* pCxt, SLogicNode* pParent
SExchangeLogicNode* pExchange = NULL;
int32_t code = splCreateExchangeNode(pCxt, pPartChild, &pExchange);
if (TSDB_CODE_SUCCESS == code) {
pExchange->node.pParent = pParent;
code = nodesListMakeAppend(&pParent->pChildren, (SNode*)pExchange);
}
return code;
......@@ -484,7 +485,27 @@ static int32_t stbSplSplitSession(SSplitContext* pCxt, SStableSplitInfo* pInfo)
}
}
static int32_t stbSplSplitWindowNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
static SNodeList* stbSplGetPartKeys(SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
return ((SScanLogicNode*)pNode)->pPartTags;
} else {
return NULL;
}
}
static bool stbSplIsPartTbanme(SNodeList* pPartKeys) {
if (NULL == pPartKeys || 1 != LIST_LENGTH(pPartKeys)) {
return false;
}
SNode* pPartKey = nodesListGetNode(pPartKeys, 0);
return QUERY_NODE_FUNCTION == nodeType(pPartKey) && FUNCTION_TYPE_TBNAME == ((SFunctionNode*)pPartKey)->funcType;
}
static bool stbSplIsMultiTableWinodw(SWindowLogicNode* pWindow) {
return stbSplIsPartTbanme(stbSplGetPartKeys((SLogicNode*)nodesListGetNode(pWindow->node.pChildren, 0)));
}
static int32_t stbSplSplitWindowForMergeTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
switch (((SWindowLogicNode*)pInfo->pSplitNode)->winType) {
case WINDOW_TYPE_INTERVAL:
return stbSplSplitInterval(pCxt, pInfo);
......@@ -496,6 +517,34 @@ static int32_t stbSplSplitWindowNode(SSplitContext* pCxt, SStableSplitInfo* pInf
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}
static int32_t stbSplSplitWindowForMultiTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
if (pCxt->pPlanCxt->streamQuery) {
SPLIT_FLAG_SET_MASK(pInfo->pSubplan->splitFlag, SPLIT_FLAG_STABLE_SPLIT);
return TSDB_CODE_SUCCESS;
}
SExchangeLogicNode* pExchange = NULL;
int32_t code = splCreateExchangeNode(pCxt, pInfo->pSplitNode, &pExchange);
if (TSDB_CODE_SUCCESS == code) {
code = replaceLogicNode(pInfo->pSubplan, pInfo->pSplitNode, (SLogicNode*)pExchange);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
(SNode*)splCreateScanSubplan(pCxt, pInfo->pSplitNode, SPLIT_FLAG_STABLE_SPLIT));
}
pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
++(pCxt->groupId);
return code;
}
static int32_t stbSplSplitWindowNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
if (stbSplIsMultiTableWinodw((SWindowLogicNode*)pInfo->pSplitNode)) {
return stbSplSplitWindowForMultiTable(pCxt, pInfo);
} else {
return stbSplSplitWindowForMergeTable(pCxt, pInfo);
}
}
static int32_t stbSplCreatePartAggNode(SAggLogicNode* pMergeAgg, SLogicNode** pOutput) {
SNodeList* pFunc = pMergeAgg->pAggFuncs;
pMergeAgg->pAggFuncs = NULL;
......
......@@ -37,7 +37,9 @@ TEST_F(PlanOtherTest, createStream) {
TEST_F(PlanOtherTest, createStreamUseSTable) {
useDb("root", "test");
run("create stream if not exists s1 as select count(*) from st1 interval(10s)");
run("CREATE STREAM IF NOT EXISTS s1 as SELECT COUNT(*) FROM st1 INTERVAL(10s)");
run("CREATE STREAM IF NOT EXISTS s1 as SELECT COUNT(*) FROM st1 PARTITION BY TBNAME INTERVAL(10s)");
}
TEST_F(PlanOtherTest, createSmaIndex) {
......
......@@ -31,12 +31,12 @@
#define GET_HASH_NODE_DATA(_n) ((char *)(_n) + sizeof(SHashNode))
#define GET_HASH_PNODE(_n) ((SHashNode *)((char *)(_n) - sizeof(SHashNode)))
#define FREE_HASH_NODE(_fp, _n) \
do { \
if (_fp != NULL) { \
(_fp)(_n); \
} \
taosMemoryFreeClear(_n); \
#define FREE_HASH_NODE(_fp, _n) \
do { \
if (_fp != NULL) { \
(_fp)(GET_HASH_NODE_DATA(_n)); \
} \
taosMemoryFreeClear(_n); \
} while (0);
struct SHashNode {
......@@ -56,7 +56,7 @@ typedef struct SHashEntry {
} SHashEntry;
struct SHashObj {
SHashEntry ** hashList;
SHashEntry **hashList;
size_t capacity; // number of slots
int64_t size; // number of elements in hash table
_hash_fn_t hashFp; // hash function
......@@ -65,7 +65,7 @@ struct SHashObj {
SRWLatch lock; // read-write spin lock
SHashLockTypeE type; // lock type
bool enableUpdate; // enable update
SArray * pMemBlock; // memory block allocated for SHashEntry
SArray *pMemBlock; // memory block allocated for SHashEntry
_hash_before_fn_t callbackFp; // function invoked before return the value to caller
};
......@@ -633,7 +633,7 @@ void taosHashTableResize(SHashObj *pHashObj) {
}
int64_t st = taosGetTimestampUs();
void * pNewEntryList = taosMemoryRealloc(pHashObj->hashList, sizeof(void *) * newCapacity);
void *pNewEntryList = taosMemoryRealloc(pHashObj->hashList, sizeof(void *) * newCapacity);
if (pNewEntryList == NULL) {
// uDebug("cache resize failed due to out of memory, capacity remain:%zu", pHashObj->capacity);
return;
......@@ -642,7 +642,7 @@ void taosHashTableResize(SHashObj *pHashObj) {
pHashObj->hashList = pNewEntryList;
size_t inc = newCapacity - pHashObj->capacity;
void * p = taosMemoryCalloc(inc, sizeof(SHashEntry));
void *p = taosMemoryCalloc(inc, sizeof(SHashEntry));
for (int32_t i = 0; i < inc; ++i) {
pHashObj->hashList[i + pHashObj->capacity] = (void *)((char *)p + i * sizeof(SHashEntry));
......@@ -653,9 +653,9 @@ void taosHashTableResize(SHashObj *pHashObj) {
pHashObj->capacity = newCapacity;
for (int32_t idx = 0; idx < pHashObj->capacity; ++idx) {
SHashEntry *pe = pHashObj->hashList[idx];
SHashNode * pNode;
SHashNode * pNext;
SHashNode * pPrev = NULL;
SHashNode *pNode;
SHashNode *pNext;
SHashNode *pPrev = NULL;
if (pe->num == 0) {
assert(pe->next == NULL);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册