提交 31dc5533 编写于 作者: X Xiaoyu Wang

feat: sma index optimize

上级 dd38da79
......@@ -2493,14 +2493,14 @@ int32_t tSerializeSTableIndexReq(void* buf, int32_t bufLen, STableIndexReq* pReq
int32_t tDeserializeSTableIndexReq(void* buf, int32_t bufLen, STableIndexReq* pReq);
typedef struct {
int8_t intervalUnit;
int8_t slidingUnit;
int64_t interval;
int64_t offset;
int64_t sliding;
int64_t dstTbUid;
int32_t dstVgId; // for stream
char* expr;
int8_t intervalUnit;
int8_t slidingUnit;
int64_t interval;
int64_t offset;
int64_t sliding;
int64_t dstTbUid;
int32_t dstVgId;
char* expr;
} STableIndexInfo;
typedef struct {
......@@ -2510,7 +2510,6 @@ typedef struct {
int32_t tSerializeSTableIndexRsp(void* buf, int32_t bufLen, const STableIndexRsp* pRsp);
int32_t tDeserializeSTableIndexRsp(void* buf, int32_t bufLen, STableIndexRsp* pRsp);
typedef struct {
int8_t mqMsgType;
int32_t code;
......@@ -2751,8 +2750,8 @@ typedef struct {
char* msg;
} SVDeleteReq;
int32_t tSerializeSVDeleteReq(void *buf, int32_t bufLen, SVDeleteReq *pReq);
int32_t tDeserializeSVDeleteReq(void *buf, int32_t bufLen, SVDeleteReq *pReq);
int32_t tSerializeSVDeleteReq(void* buf, int32_t bufLen, SVDeleteReq* pReq);
int32_t tDeserializeSVDeleteReq(void* buf, int32_t bufLen, SVDeleteReq* pReq);
typedef struct {
int64_t affectedRows;
......
......@@ -272,7 +272,7 @@ int32_t catalogGetDBCfg(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, cons
int32_t catalogGetIndexMeta(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* indexName, SIndexInfo* pInfo);
int32_t catalogGetTableIndex(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const char* tbFName, SArray** pRes);
int32_t catalogGetTableIndex(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pName, SArray** pRes);
int32_t catalogGetUdfInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* funcName, SFuncInfo* pInfo);
......
......@@ -1136,13 +1136,15 @@ int32_t catalogGetIndexMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps
CTG_API_LEAVE(ctgGetIndexInfoFromMnode(CTG_PARAMS_LIST(), indexName, pInfo, NULL));
}
int32_t catalogGetTableIndex(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const char* tbFName, SArray** pRes) {
int32_t catalogGetTableIndex(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pName, SArray** pRes) {
CTG_API_ENTER();
if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == tbFName || NULL == pRes) {
if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == pName || NULL == pRes) {
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
}
char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
tNameExtractFullName(pName, tbFName);
CTG_API_LEAVE(ctgGetTbIndexFromMnode(CTG_PARAMS_LIST(), tbFName, pRes, NULL));
}
......
......@@ -46,6 +46,7 @@ typedef struct SParseMetaCache {
SHashObj* pDbInfo; // key is tbFName, element is SDbInfo*
SHashObj* pUserAuth; // key is SUserAuthInfo serialized string, element is bool indicating whether or not to pass
SHashObj* pUdf; // key is funcName, element is SFuncInfo*
SHashObj* pSmaIndex; // key is tbFName, element is SArray<STableIndexInfo>*
} SParseMetaCache;
int32_t generateSyntaxErrMsg(SMsgBuf* pBuf, int32_t errCode, ...);
......@@ -58,7 +59,7 @@ int32_t getNumOfColumns(const STableMeta* pTableMeta);
int32_t getNumOfTags(const STableMeta* pTableMeta);
STableComInfo getTableInfo(const STableMeta* pTableMeta);
STableMeta* tableMetaDup(const STableMeta* pTableMeta);
int32_t parseJsontoTagData(const char* json, SArray* pTagVals, STag **ppTag, SMsgBuf* pMsgBuf);
int32_t parseJsontoTagData(const char* json, SArray* pTagVals, STag** ppTag, SMsgBuf* pMsgBuf);
int32_t trimString(const char* src, int32_t len, char* dst, int32_t dlen);
......@@ -84,6 +85,7 @@ int32_t getDbCfgFromCache(SParseMetaCache* pMetaCache, const char* pDbFName, SDb
int32_t getUserAuthFromCache(SParseMetaCache* pMetaCache, const char* pUser, const char* pDbFName, AUTH_TYPE type,
bool* pPass);
int32_t getUdfInfoFromCache(SParseMetaCache* pMetaCache, const char* pFunc, SFuncInfo* pInfo);
int32_t getTableIndexFromCache(SParseMetaCache* pMetaCache, const SName* pName, SArray** pIndexes);
#ifdef __cplusplus
}
......
......@@ -25,6 +25,9 @@
#include "tglobal.h"
#include "ttime.h"
#define SMA_TABLE_NAME "#sma_table"
#define SMA_COL_NAME_PREFIX "#sma_col_"
#define generateDealNodeErrMsg(pCxt, code, ...) \
(pCxt->errCode = generateSyntaxErrMsg(&pCxt->msgBuf, code, ##__VA_ARGS__), DEAL_RES_ERROR)
......@@ -260,6 +263,28 @@ static int32_t getUdfInfo(STranslateContext* pCxt, SFunctionNode* pFunc) {
return code;
}
static int32_t getTableIndex(STranslateContext* pCxt, const char* pDbName, const char* pTableName, SArray** pIndexes) {
SParseContext* pParCxt = pCxt->pParseCxt;
SName name;
toName(pParCxt->acctId, pDbName, pTableName, &name);
int32_t code = TSDB_CODE_SUCCESS;
if (pParCxt->async) {
code = getTableIndexFromCache(pCxt->pMetaCache, &name, pIndexes);
} else {
code = collectUseDatabase(&name, pCxt->pDbs);
if (TSDB_CODE_SUCCESS == code) {
code = collectUseTable(&name, pCxt->pTables);
}
if (TSDB_CODE_SUCCESS == code) {
code = catalogGetTableIndex(pParCxt->pCatalog, pParCxt->pTransporter, &pParCxt->mgmtEpSet, &name, pIndexes);
}
}
if (TSDB_CODE_SUCCESS != code) {
parserError("getTableIndex error, code:%s, dbName:%s, tbName:%s", tstrerror(code), pDbName, pTableName);
}
return code;
}
static int32_t initTranslateContext(SParseContext* pParseCxt, SParseMetaCache* pMetaCache, STranslateContext* pCxt) {
pCxt->pParseCxt = pParseCxt;
pCxt->errCode = TSDB_CODE_SUCCESS;
......@@ -334,6 +359,10 @@ static bool isIndefiniteRowsFunc(const SNode* pNode) {
return (QUERY_NODE_FUNCTION == nodeType(pNode) && fmIsIndefiniteRowsFunc(((SFunctionNode*)pNode)->funcId));
}
static bool isVectorFunc(const SNode* pNode) {
return (QUERY_NODE_FUNCTION == nodeType(pNode) && fmIsVectorFunc(((SFunctionNode*)pNode)->funcId));
}
static bool isDistinctOrderBy(STranslateContext* pCxt) {
return (SQL_CLAUSE_ORDER_BY == pCxt->currClause && pCxt->pCurrSelectStmt->isDistinct);
}
......@@ -1787,7 +1816,7 @@ static int64_t getMonthsFromTimeVal(int64_t val, int32_t fromPrecision, char uni
return -1;
}
static int32_t checkIntervalWindow(STranslateContext* pCxt, SNode* pWhere, SIntervalWindowNode* pInterval) {
static int32_t checkIntervalWindow(STranslateContext* pCxt, SIntervalWindowNode* pInterval) {
uint8_t precision = ((SColumnNode*)pInterval->pCol)->node.resType.precision;
SValueNode* pInter = (SValueNode*)pInterval->pInterval;
......@@ -1829,7 +1858,15 @@ static int32_t checkIntervalWindow(STranslateContext* pCxt, SNode* pWhere, SInte
}
}
return translateFill(pCxt, pWhere, pInterval);
return TSDB_CODE_SUCCESS;
}
static int32_t translateIntervalWindow(STranslateContext* pCxt, SSelectStmt* pSelect, SIntervalWindowNode* pInterval) {
int32_t code = checkIntervalWindow(pCxt, pInterval);
if (TSDB_CODE_SUCCESS == code) {
code = translateFill(pCxt, pSelect->pWhere, pInterval);
}
return code;
}
static EDealRes checkStateExpr(SNode* pNode, void* pContext) {
......@@ -1851,13 +1888,13 @@ static EDealRes checkStateExpr(SNode* pNode, void* pContext) {
return DEAL_RES_CONTINUE;
}
static int32_t checkStateWindow(STranslateContext* pCxt, SStateWindowNode* pState) {
static int32_t translateStateWindow(STranslateContext* pCxt, SStateWindowNode* pState) {
nodesWalkExprPostOrder(pState->pExpr, checkStateExpr, pCxt);
// todo check for "function not support for state_window"
return pCxt->errCode;
}
static int32_t checkSessionWindow(STranslateContext* pCxt, SSessionWindowNode* pSession) {
static int32_t translateSessionWindow(STranslateContext* pCxt, SSessionWindowNode* pSession) {
if ('y' == pSession->pGap->unit || 'n' == pSession->pGap->unit || 0 == pSession->pGap->datum.i) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INTER_SESSION_GAP);
}
......@@ -1868,14 +1905,14 @@ static int32_t checkSessionWindow(STranslateContext* pCxt, SSessionWindowNode* p
return TSDB_CODE_SUCCESS;
}
static int32_t checkWindow(STranslateContext* pCxt, SSelectStmt* pSelect) {
static int32_t translateSpecificWindow(STranslateContext* pCxt, SSelectStmt* pSelect) {
switch (nodeType(pSelect->pWindow)) {
case QUERY_NODE_STATE_WINDOW:
return checkStateWindow(pCxt, (SStateWindowNode*)pSelect->pWindow);
return translateStateWindow(pCxt, (SStateWindowNode*)pSelect->pWindow);
case QUERY_NODE_SESSION_WINDOW:
return checkSessionWindow(pCxt, (SSessionWindowNode*)pSelect->pWindow);
return translateSessionWindow(pCxt, (SSessionWindowNode*)pSelect->pWindow);
case QUERY_NODE_INTERVAL_WINDOW:
return checkIntervalWindow(pCxt, pSelect->pWhere, (SIntervalWindowNode*)pSelect->pWindow);
return translateIntervalWindow(pCxt, pSelect, (SIntervalWindowNode*)pSelect->pWindow);
default:
break;
}
......@@ -1889,7 +1926,7 @@ static int32_t translateWindow(STranslateContext* pCxt, SSelectStmt* pSelect) {
pCxt->currClause = SQL_CLAUSE_WINDOW;
int32_t code = translateExpr(pCxt, &pSelect->pWindow);
if (TSDB_CODE_SUCCESS == code) {
code = checkWindow(pCxt, pSelect);
code = translateSpecificWindow(pCxt, pSelect);
}
return code;
}
......@@ -1965,6 +2002,270 @@ static int32_t rewriteTimelineFunc(STranslateContext* pCxt, SSelectStmt* pSelect
nodesWalkSelectStmt(pSelect, SQL_CLAUSE_FROM, rewriteTimelineFuncImpl, pCxt);
return pCxt->errCode;
}
#if 0
static bool mayBeApplySmaIndex(SSelectStmt* pSelect) {
if (NULL == pSelect->pWindow || QUERY_NODE_INTERVAL_WINDOW != nodeType(pSelect->pWindow) ||
NULL != ((SIntervalWindowNode*)pSelect->pWindow)->pFill ||
QUERY_NODE_REAL_TABLE != nodeType(pSelect->pFromTable) || NULL != pSelect->pWhere ||
NULL != pSelect->pPartitionByList || NULL != pSelect->pGroupByList || NULL != pSelect->pHaving) {
return false;
}
return true;
}
static bool equalIntervalWindow(SIntervalWindowNode* pInterval, SNode* pWhere, STableIndexInfo* pIndex) {
int64_t interval = ((SValueNode*)pInterval->pInterval)->datum.i;
int8_t intervalUnit = ((SValueNode*)pInterval->pInterval)->unit;
int64_t offset = (NULL != pInterval->pOffset ? ((SValueNode*)pInterval->pOffset)->datum.i : 0);
int64_t sliding = (NULL != pInterval->pSliding ? ((SValueNode*)pInterval->pSliding)->datum.i : interval);
int8_t slidingUnit = (NULL != pInterval->pSliding ? ((SValueNode*)pInterval->pSliding)->unit : intervalUnit);
if (interval != pIndex->interval || intervalUnit != pIndex->intervalUnit || offset != pIndex->offset ||
sliding != pIndex->sliding || slidingUnit != pIndex->slidingUnit) {
return false;
}
// todo
if (NULL != pWhere) {
return false;
}
return true;
}
typedef struct SSmaIndexMatchFuncsCxt {
int32_t errCode;
uint64_t tableId;
SNodeList* pSmaFuncs;
SNodeList* pUseFuncs;
SNodeList* pUseCols;
SArray* pUseMap;
bool match;
} SSmaIndexMatchFuncsCxt;
static SColumnNode* createColumnFromSmaFunc(uint64_t tableId, int32_t index, SExprNode* pSmaFunc) {
SColumnNode* pCol = nodesMakeNode(QUERY_NODE_COLUMN);
if (NULL == pCol) {
return NULL;
}
pCol->tableId = tableId;
pCol->tableType = TSDB_SUPER_TABLE;
pCol->colId = index + 2; // skip timestamp primary col
pCol->colType = COLUMN_TYPE_COLUMN;
snprintf(pCol->colName, sizeof(pCol->colName), SMA_COL_NAME_PREFIX "%d", pCol->colId);
strcpy(pCol->tableName, SMA_TABLE_NAME);
strcpy(pCol->tableAlias, SMA_TABLE_NAME);
pCol->node.resType = pSmaFunc->resType;
strcpy(pCol->node.aliasName, pSmaFunc->aliasName);
return pCol;
}
static int32_t collectSmaFunc(SSmaIndexMatchFuncsCxt* pCxt, int32_t index, SNode* pSmaFunc) {
if (NULL == pCxt->pUseMap) {
int32_t nfuncs = LIST_LENGTH(pCxt->pSmaFuncs);
pCxt->pUseMap = taosArrayInit(nfuncs, sizeof(int32_t));
if (NULL == pCxt->pUseMap) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t initPos = -1;
for (int32_t i = 0; i < nfuncs; ++i) {
taosArrayPush(pCxt->pUseMap, &initPos);
}
}
int32_t pos = *(int32_t*)taosArrayGet(pCxt->pUseMap, index);
if (pos < 0) {
pos = LIST_LENGTH(pCxt->pUseFuncs);
taosArraySet(pCxt->pUseMap, index, &pos);
int32_t code =
nodesListMakeStrictAppend(&pCxt->pUseCols, createColumnFromSmaFunc(pCxt->tableId, index, (SExprNode*)pSmaFunc));
if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeStrictAppend(&pCxt->pUseFuncs, nodesCloneNode(pSmaFunc));
}
return code;
}
return TSDB_CODE_SUCCESS;
}
static int32_t findSmaFunc(SSmaIndexMatchFuncsCxt* pCxt, SNode* pNode, bool* pFound) {
if (!isAggFunc(pNode)) {
return TSDB_CODE_SUCCESS;
}
int32_t index = 0;
SNode* pSmaFunc = NULL;
FOREACH(pSmaFunc, pCxt->pSmaFuncs) {
if (nodesEqualNode(pSmaFunc, pNode)) {
*pFound = true;
return collectSmaFunc(pCxt, index, pSmaFunc);
}
++index;
}
return TSDB_CODE_SUCCESS;
}
static EDealRes matchSmaFuncsImpl(SNode* pNode, void* pContext) {
SSmaIndexMatchFuncsCxt* pCxt = pContext;
bool found = false;
pCxt->errCode = findSmaFunc(pCxt, pNode, &found);
if (TSDB_CODE_SUCCESS != pCxt->errCode) {
return DEAL_RES_ERROR;
}
if (found) {
pCxt->match = true;
return DEAL_RES_IGNORE_CHILD;
}
if (isVectorFunc(pNode)) {
pCxt->match = false;
return DEAL_RES_END;
}
return DEAL_RES_CONTINUE;
}
static int32_t matchSmaFuncs(SSelectStmt* pSelect, STableIndexInfo* pIndex, SNodeList* pSmaFuncs, SNodeList** pFuncs,
SNodeList** pCols) {
SSmaIndexMatchFuncsCxt cxt = {.errCode = TSDB_CODE_SUCCESS,
.tableId = pIndex->dstTbUid,
.pSmaFuncs = pSmaFuncs,
.pUseFuncs = NULL,
.pUseCols = NULL,
.pUseMap = NULL,
.match = false};
nodesWalkExprs(pSelect->pProjectionList, matchSmaFuncsImpl, &cxt);
if (TSDB_CODE_SUCCESS == cxt.errCode && cxt.match) {
*pFuncs = cxt.pUseFuncs;
*pCols = cxt.pUseCols;
} else {
nodesDestroyList(cxt.pUseFuncs);
nodesDestroyList(cxt.pUseCols);
}
taosArrayDestroy(cxt.pUseMap);
return cxt.errCode;
}
static int32_t couldApplySmaIndex(STranslateContext* pCxt, SSelectStmt* pSelect, STableIndexInfo* pIndex,
SNodeList** pFuncs, SNodeList** pCols) {
if (!equalIntervalWindow((SIntervalWindowNode*)pSelect->pWindow, pSelect->pWhere, pIndex)) {
return TSDB_CODE_SUCCESS;
}
SNodeList* pSmaFuncs = NULL;
int32_t code = nodesStringToList(pIndex->expr, &pSmaFuncs);
if (TSDB_CODE_SUCCESS == code) {
pCxt->currClause = SQL_CLAUSE_SELECT;
code = translateExprList(pCxt, pSmaFuncs);
}
if (TSDB_CODE_SUCCESS == code) {
code = matchSmaFuncs(pSelect, pIndex, pSmaFuncs, pFuncs, pCols);
}
nodesDestroyList(pSmaFuncs);
return code;
}
typedef struct SSmaIndexRewriteFuncsCxt {
int32_t errCode;
SNodeList* pFuncs;
SNodeList* pCols;
} SSmaIndexRewriteFuncsCxt;
static EDealRes rewriteFuncBySmaIndex(SNode** pNode, void* pContext) {
if (isAggFunc(*pNode)) {
SSmaIndexRewriteFuncsCxt* pCxt = pContext;
SNode* pFunc;
int32_t index = 0;
FOREACH(pFunc, pCxt->pFuncs) {
if (nodesEqualNode(pFunc, *pNode)) {
SNode* pNew = nodesCloneNode(nodesListGetNode(pCxt->pCols, index));
if (NULL == pNew) {
pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY;
return DEAL_RES_ERROR;
}
nodesDestroyNode(*pNode);
*pNode = pNew;
return DEAL_RES_IGNORE_CHILD;
}
++index;
}
}
return DEAL_RES_CONTINUE;
}
static int32_t rewriteTableBySmaIndex(SSelectStmt* pSelect, STableIndexInfo* pMatchIndex) {
nodesDestroyNode(pSelect->pFromTable);
SRealTableNode* pRealTable = nodesMakeNode(QUERY_NODE_REAL_TABLE);
if (NULL == pRealTable) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pRealTable->table.singleTable = true;
strcpy(pRealTable->table.tableName, SMA_TABLE_NAME);
pRealTable->pMeta = taosMemoryCalloc(1, sizeof(STableMeta));
if (NULL == pRealTable->pMeta) {
nodesDestroyNode(pRealTable);
return TSDB_CODE_OUT_OF_MEMORY;
}
pRealTable->pMeta->vgId = pMatchIndex->dstVgId;
pRealTable->pMeta->uid = pMatchIndex->dstTbUid;
pRealTable->pVgroupList = taosMemoryCalloc(1, sizeof(SVgroupsInfo));
if (NULL == pRealTable->pVgroupList) {
nodesDestroyNode(pRealTable);
return TSDB_CODE_OUT_OF_MEMORY;
}
// todo
pSelect->pFromTable = (SNode*)pRealTable;
return TSDB_CODE_SUCCESS;
}
static int32_t rewriteSelectBySmaIndex(SSelectStmt* pSelect, STableIndexInfo* pMatchIndex, SNodeList* pFuncs,
SNodeList* pCols) {
SSmaIndexRewriteFuncsCxt cxt = {.errCode = TSDB_CODE_SUCCESS, .pFuncs = pFuncs, .pCols = pCols};
nodesRewriteExprs(pSelect->pProjectionList, rewriteFuncBySmaIndex, &cxt);
if (TSDB_CODE_SUCCESS == cxt.errCode) {
cxt.errCode = rewriteTableBySmaIndex(pSelect, pMatchIndex);
}
return cxt.errCode;
}
static int32_t attemptApplySmaIndexImpl(STranslateContext* pCxt, SSelectStmt* pSelect, SArray* pIndexes) {
if (NULL == pIndexes) {
return TSDB_CODE_SUCCESS;
}
int32_t nindexes = taosArrayGetSize(pIndexes);
for (int32_t i = 0; i < nindexes; ++i) {
STableIndexInfo* pIndex = taosArrayGet(pIndexes, i);
SNodeList* pFuncs = NULL;
SNodeList* pCols = NULL;
if (TSDB_CODE_SUCCESS != couldApplySmaIndex(pCxt, pSelect, pIndex, &pFuncs, &pCols)) {
return TSDB_CODE_OUT_OF_MEMORY;
}
if (NULL != pFuncs) {
int32_t code = rewriteSelectBySmaIndex(pSelect, pIndex, pFuncs, pCols);
nodesDestroyList(pFuncs);
nodesDestroyList(pCols);
return code;
}
}
return TSDB_CODE_SUCCESS;
}
static void destroySmaIndex(void* p) { taosMemoryFree(((STableIndexInfo*)p)->expr); }
static int32_t attemptApplySmaIndex(STranslateContext* pCxt, SSelectStmt* pSelect) {
SRealTableNode* pRealTable = (SRealTableNode*)pSelect->pFromTable;
SArray* pIndexes = NULL;
int32_t code = getTableIndex(pCxt, pRealTable->table.dbName, pRealTable->table.tableName, &pIndexes);
if (TSDB_CODE_SUCCESS == code) {
code = attemptApplySmaIndexImpl(pCxt, pSelect, pIndexes);
}
taosArrayDestroyEx(pIndexes, destroySmaIndex);
return code;
}
static int32_t attemptApplyIndex(STranslateContext* pCxt, SSelectStmt* pSelect) {
// if (mayBeApplySmaIndex(pSelect)) {
// return attemptApplySmaIndex(pCxt, pSelect);
// }
return TSDB_CODE_SUCCESS;
}
#endif
static int32_t translateSelect(STranslateContext* pCxt, SSelectStmt* pSelect) {
pCxt->pCurrSelectStmt = pSelect;
......
......@@ -836,3 +836,7 @@ int32_t getUdfInfoFromCache(SParseMetaCache* pMetaCache, const char* pFunc, SFun
memcpy(pInfo, *pRes, sizeof(SFuncInfo));
return TSDB_CODE_SUCCESS;
}
int32_t getTableIndexFromCache(SParseMetaCache* pMetaCache, const SName* pName, SArray** pIndexes) {
return TSDB_CODE_PAR_INTERNAL_ERROR;
}
......@@ -57,6 +57,7 @@ class MockCatalogService {
void createSubTable(const std::string& db, const std::string& stbname, const std::string& tbname, int16_t vgid);
void showTables() const;
void createFunction(const std::string& func, int8_t funcType, int8_t outputType, int32_t outputLen, int32_t bufSize);
void createSmaIndex(const SMCreateSmaReq* pReq);
int32_t catalogGetTableMeta(const SName* pTableName, STableMeta** pTableMeta) const;
int32_t catalogGetTableHashVgroup(const SName* pTableName, SVgroupInfo* vgInfo) const;
......
......@@ -38,9 +38,9 @@ extern "C" {
int32_t generateUsageErrMsg(char* pBuf, int32_t len, int32_t errCode, ...);
int32_t createColumnByRewriteExps(SNodeList* pExprs, SNodeList** pList);
int32_t createLogicPlan(SPlanContext* pCxt, SLogicNode** pLogicNode);
int32_t optimizeLogicPlan(SPlanContext* pCxt, SLogicNode* pLogicNode);
int32_t splitLogicPlan(SPlanContext* pCxt, SLogicNode* pLogicNode, SLogicSubplan** pLogicSubplan);
int32_t createLogicPlan(SPlanContext* pCxt, SLogicSubplan** pLogicSubplan);
int32_t optimizeLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan);
int32_t splitLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan);
int32_t scaleOutLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan, SQueryLogicPlan** pLogicPlan);
int32_t createPhysiPlan(SPlanContext* pCxt, SQueryLogicPlan* pLogicPlan, SQueryPlan** pPlan, SArray* pExecNodeList);
......
......@@ -1138,11 +1138,40 @@ static int32_t createQueryLogicNode(SLogicPlanContext* pCxt, SNode* pStmt, SLogi
return TSDB_CODE_FAILED;
}
int32_t createLogicPlan(SPlanContext* pCxt, SLogicNode** pLogicNode) {
static void doSetLogicNodeParent(SLogicNode* pNode, SLogicNode* pParent) {
pNode->pParent = pParent;
SNode* pChild;
FOREACH(pChild, pNode->pChildren) { doSetLogicNodeParent((SLogicNode*)pChild, pNode); }
}
static void setLogicNodeParent(SLogicNode* pNode) { doSetLogicNodeParent(pNode, NULL); }
int32_t createLogicPlan(SPlanContext* pCxt, SLogicSubplan** pLogicSubplan) {
SLogicPlanContext cxt = {.pPlanCxt = pCxt};
int32_t code = createQueryLogicNode(&cxt, pCxt->pAstRoot, pLogicNode);
if (TSDB_CODE_SUCCESS != code) {
return code;
SLogicSubplan* pSubplan = (SLogicSubplan*)nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN);
if (NULL == pSubplan) {
return TSDB_CODE_OUT_OF_MEMORY;
}
return TSDB_CODE_SUCCESS;
pSubplan->id.queryId = pCxt->queryId;
pSubplan->id.groupId = 1;
pSubplan->id.subplanId = 1;
int32_t code = createQueryLogicNode(&cxt, pCxt->pAstRoot, &pSubplan->pNode);
if (TSDB_CODE_SUCCESS == code) {
setLogicNodeParent(pSubplan->pNode);
if (QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY == nodeType(pSubplan->pNode)) {
pSubplan->subplanType = SUBPLAN_TYPE_MODIFY;
} else {
pSubplan->subplanType = SUBPLAN_TYPE_SCAN;
}
}
if (TSDB_CODE_SUCCESS == code) {
*pLogicSubplan = pSubplan;
} else {
nodesDestroyNode(pSubplan);
}
return code;
}
......@@ -773,4 +773,6 @@ static int32_t applyOptimizeRule(SPlanContext* pCxt, SLogicNode* pLogicNode) {
return TSDB_CODE_SUCCESS;
}
int32_t optimizeLogicPlan(SPlanContext* pCxt, SLogicNode* pLogicNode) { return applyOptimizeRule(pCxt, pLogicNode); }
int32_t optimizeLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan) {
return applyOptimizeRule(pCxt, pLogicSubplan->pNode);
}
......@@ -915,14 +915,6 @@ static int32_t applySplitRule(SPlanContext* pCxt, SLogicSubplan* pSubplan) {
return TSDB_CODE_SUCCESS;
}
static void doSetLogicNodeParent(SLogicNode* pNode, SLogicNode* pParent) {
pNode->pParent = pParent;
SNode* pChild;
FOREACH(pChild, pNode->pChildren) { doSetLogicNodeParent((SLogicNode*)pChild, pNode); }
}
static void setLogicNodeParent(SLogicNode* pNode) { doSetLogicNodeParent(pNode, NULL); }
static void setVgroupsInfo(SLogicNode* pNode, SLogicSubplan* pSubplan) {
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
TSWAP(((SScanLogicNode*)pNode)->pVgroupList, pSubplan->pVgroupList);
......@@ -933,37 +925,10 @@ static void setVgroupsInfo(SLogicNode* pNode, SLogicSubplan* pSubplan) {
FOREACH(pChild, pNode->pChildren) { setVgroupsInfo((SLogicNode*)pChild, pSubplan); }
}
int32_t splitLogicPlan(SPlanContext* pCxt, SLogicNode* pLogicNode, SLogicSubplan** pLogicSubplan) {
SLogicSubplan* pSubplan = (SLogicSubplan*)nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN);
if (NULL == pSubplan) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pSubplan->pNode = nodesCloneNode(pLogicNode);
if (NULL == pSubplan->pNode) {
nodesDestroyNode(pSubplan);
return TSDB_CODE_OUT_OF_MEMORY;
}
pSubplan->id.queryId = pCxt->queryId;
pSubplan->id.groupId = 1;
setLogicNodeParent(pSubplan->pNode);
int32_t code = TSDB_CODE_SUCCESS;
if (QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY == nodeType(pLogicNode)) {
pSubplan->subplanType = SUBPLAN_TYPE_MODIFY;
TSWAP(((SVnodeModifyLogicNode*)pLogicNode)->pDataBlocks, ((SVnodeModifyLogicNode*)pSubplan->pNode)->pDataBlocks);
setVgroupsInfo(pSubplan->pNode, pSubplan);
} else {
pSubplan->subplanType = SUBPLAN_TYPE_SCAN;
code = applySplitRule(pCxt, pSubplan);
}
if (TSDB_CODE_SUCCESS == code) {
*pLogicSubplan = pSubplan;
} else {
nodesDestroyNode(pSubplan);
int32_t splitLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan) {
if (QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY == nodeType(pLogicSubplan->pNode)) {
setVgroupsInfo(pLogicSubplan->pNode, pLogicSubplan);
return TSDB_CODE_SUCCESS;
}
return code;
}
\ No newline at end of file
return applySplitRule(pCxt, pLogicSubplan);
}
......@@ -26,16 +26,15 @@ static void dumpQueryPlan(SQueryPlan* pPlan) {
}
int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNodeList) {
SLogicNode* pLogicNode = NULL;
SLogicSubplan* pLogicSubplan = NULL;
SQueryLogicPlan* pLogicPlan = NULL;
int32_t code = createLogicPlan(pCxt, &pLogicNode);
int32_t code = createLogicPlan(pCxt, &pLogicSubplan);
if (TSDB_CODE_SUCCESS == code) {
code = optimizeLogicPlan(pCxt, pLogicNode);
code = optimizeLogicPlan(pCxt, pLogicSubplan);
}
if (TSDB_CODE_SUCCESS == code) {
code = splitLogicPlan(pCxt, pLogicNode, &pLogicSubplan);
code = splitLogicPlan(pCxt, pLogicSubplan);
}
if (TSDB_CODE_SUCCESS == code) {
code = scaleOutLogicPlan(pCxt, pLogicSubplan, &pLogicPlan);
......@@ -47,7 +46,6 @@ int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNo
dumpQueryPlan(*pPlan);
}
nodesDestroyNode(pLogicNode);
nodesDestroyNode(pLogicSubplan);
nodesDestroyNode(pLogicPlan);
terrno = code;
......
......@@ -42,7 +42,7 @@ TEST_F(PlanOtherTest, createStreamUseSTable) {
TEST_F(PlanOtherTest, createSmaIndex) {
useDb("root", "test");
run("create sma index index1 on t1 function(max(c1), min(c3 + 10), sum(c4)) interval(10s)");
run("CREATE SMA INDEX idx1 ON t1 FUNCTION(MAX(c1), MIN(c3 + 10), SUM(c4)) INTERVAL(10s)");
}
TEST_F(PlanOtherTest, explain) {
......
......@@ -104,13 +104,12 @@ class PlannerTestBaseImpl {
SPlanContext cxt = {0};
setPlanContext(pQuery, &cxt);
SLogicNode* pLogicNode = nullptr;
doCreateLogicPlan(&cxt, &pLogicNode);
SLogicSubplan* pLogicSubplan = nullptr;
doCreateLogicPlan(&cxt, &pLogicSubplan);
doOptimizeLogicPlan(&cxt, pLogicNode);
doOptimizeLogicPlan(&cxt, pLogicSubplan);
SLogicSubplan* pLogicSubplan = nullptr;
doSplitLogicPlan(&cxt, pLogicNode, &pLogicSubplan);
doSplitLogicPlan(&cxt, pLogicSubplan);
SQueryLogicPlan* pLogicPlan = nullptr;
doScaleOutLogicPlan(&cxt, pLogicSubplan, &pLogicPlan);
......@@ -164,13 +163,12 @@ class PlannerTestBaseImpl {
SPlanContext cxt = {0};
setPlanContext(stmtEnv_.pQuery_, &cxt);
SLogicNode* pLogicNode = nullptr;
doCreateLogicPlan(&cxt, &pLogicNode);
SLogicSubplan* pLogicSubplan = nullptr;
doCreateLogicPlan(&cxt, &pLogicSubplan);
doOptimizeLogicPlan(&cxt, pLogicNode);
doOptimizeLogicPlan(&cxt, pLogicSubplan);
SLogicSubplan* pLogicSubplan = nullptr;
doSplitLogicPlan(&cxt, pLogicNode, &pLogicSubplan);
doSplitLogicPlan(&cxt, pLogicSubplan);
SQueryLogicPlan* pLogicPlan = nullptr;
doScaleOutLogicPlan(&cxt, pLogicSubplan, &pLogicPlan);
......@@ -324,19 +322,19 @@ class PlannerTestBaseImpl {
res_.ast_ = toString(pQuery->pRoot);
}
void doCreateLogicPlan(SPlanContext* pCxt, SLogicNode** pLogicNode) {
DO_WITH_THROW(createLogicPlan, pCxt, pLogicNode);
res_.rawLogicPlan_ = toString((SNode*)(*pLogicNode));
void doCreateLogicPlan(SPlanContext* pCxt, SLogicSubplan** pLogicSubplan) {
DO_WITH_THROW(createLogicPlan, pCxt, pLogicSubplan);
res_.rawLogicPlan_ = toString((SNode*)(*pLogicSubplan));
}
void doOptimizeLogicPlan(SPlanContext* pCxt, SLogicNode* pLogicNode) {
DO_WITH_THROW(optimizeLogicPlan, pCxt, pLogicNode);
res_.optimizedLogicPlan_ = toString((SNode*)pLogicNode);
void doOptimizeLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan) {
DO_WITH_THROW(optimizeLogicPlan, pCxt, pLogicSubplan);
res_.optimizedLogicPlan_ = toString((SNode*)pLogicSubplan);
}
void doSplitLogicPlan(SPlanContext* pCxt, SLogicNode* pLogicNode, SLogicSubplan** pLogicSubplan) {
DO_WITH_THROW(splitLogicPlan, pCxt, pLogicNode, pLogicSubplan);
res_.splitLogicPlan_ = toString((SNode*)(*pLogicSubplan));
void doSplitLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan) {
DO_WITH_THROW(splitLogicPlan, pCxt, pLogicSubplan);
res_.splitLogicPlan_ = toString((SNode*)(pLogicSubplan));
}
void doScaleOutLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan, SQueryLogicPlan** pLogicPlan) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册