未验证 提交 6d8566ca 编写于 作者: X Xiaoyu Wang 提交者: GitHub

Merge pull request #11807 from taosdata/feature/3.0_wxy

enh: order by primary key optimize
cmake_minimum_required(VERSION 3.16)
set(CMAKE_VERBOSE_MAKEFILE ON)
set(CMAKE_VERBOSE_MAKEFILE OFF)
#set output directory
SET(LIBRARY_OUTPUT_PATH ${PROJECT_BINARY_DIR}/build/lib)
......
......@@ -46,7 +46,7 @@ typedef struct SScanLogicNode {
struct STableMeta* pMeta;
SVgroupsInfo* pVgroupList;
EScanType scanType;
uint8_t scanFlag; // denotes reversed scan of data or not
uint8_t scanSeq[2]; // first is scan count, and second is reverse scan count
STimeWindow scanRange;
SName tableName;
bool showRewrite;
......@@ -189,9 +189,6 @@ typedef struct SScanPhysiNode {
SNodeList* pScanCols;
uint64_t uid; // unique id of the table
int8_t tableType;
int32_t order; // scan order: TSDB_ORDER_ASC|TSDB_ORDER_DESC
int32_t count; // repeat count
int32_t reverse; // reverse scan count
SName tableName;
} SScanPhysiNode;
......@@ -207,7 +204,7 @@ typedef struct SSystemTableScanPhysiNode {
typedef struct STableScanPhysiNode {
SScanPhysiNode scan;
uint8_t scanFlag; // denotes reversed scan of data or not
uint8_t scanSeq[2]; // first is scan count, and second is reverse scan count
STimeWindow scanRange;
double ratio;
int32_t dataRequired;
......
......@@ -37,6 +37,8 @@ typedef struct SPlanContext {
bool isStmtQuery;
void* pTransporter;
struct SCatalog* pCatalog;
char* pMsg;
int32_t msgLen;
} SPlanContext;
// Create the physical plan for the query, according to the AST.
......
......@@ -623,6 +623,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_PAR_INVALID_DAYS_VALUE TAOS_DEF_ERROR_CODE(0, 0x2636)
#define TSDB_CODE_PAR_OFFSET_LESS_ZERO TAOS_DEF_ERROR_CODE(0, 0x2637)
#define TSDB_CODE_PAR_SLIMIT_LEAK_PARTITION_BY TAOS_DEF_ERROR_CODE(0, 0x2638)
#define TSDB_CODE_PAR_INVALID_TOPIC_QUERY TAOS_DEF_ERROR_CODE(0, 0x2639)
//planner
#define TSDB_CODE_PLAN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2700)
......
......@@ -232,7 +232,9 @@ int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArra
.mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp),
.pAstRoot = pQuery->pRoot,
.showRewrite = pQuery->showRewrite,
.pTransporter = pRequest->pTscObj->pAppInfo->pTransporter
.pTransporter = pRequest->pTscObj->pAppInfo->pTransporter,
.pMsg = pRequest->msgBuf,
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE
};
int32_t code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &cxt.pCatalog);
if (TSDB_CODE_SUCCESS == code) {
......
......@@ -344,11 +344,6 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pTagScanNode->node.pOutputDataBlockDesc->totalRowSize);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_LOOPS_FORMAT, pTagScanNode->count);
if (pTagScanNode->reverse) {
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_REVERSE_FORMAT, pTagScanNode->reverse);
}
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
......@@ -361,10 +356,6 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_ORDER_FORMAT, EXPLAIN_ORDER_STRING(pTagScanNode->order));
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
if (pResNode->pExecInfo) {
QRY_ERR_RET(qExplainBufAppendVerboseExecInfo(pResNode->pExecInfo, tbuf, &tlen));
if (tlen) {
......@@ -388,11 +379,6 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pTblScanNode->scan.node.pOutputDataBlockDesc->totalRowSize);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_LOOPS_FORMAT, pTblScanNode->scan.count);
if (pTblScanNode->scan.reverse) {
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_REVERSE_FORMAT, pTblScanNode->scan.reverse);
}
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
......@@ -405,10 +391,6 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_ORDER_FORMAT, EXPLAIN_ORDER_STRING(pTblScanNode->scan.order));
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_TIMERANGE_FORMAT, pTblScanNode->scanRange.skey, pTblScanNode->scanRange.ekey);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
......@@ -434,11 +416,6 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pSTblScanNode->scan.node.pOutputDataBlockDesc->totalRowSize);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_LOOPS_FORMAT, pSTblScanNode->scan.count);
if (pSTblScanNode->scan.reverse) {
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_REVERSE_FORMAT, pSTblScanNode->scan.reverse);
}
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
......@@ -451,10 +428,6 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_ORDER_FORMAT, EXPLAIN_ORDER_STRING(pSTblScanNode->scan.order));
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
if (pSTblScanNode->scan.node.pConditions) {
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
QRY_ERR_RET(nodesNodeToSQL(pSTblScanNode->scan.node.pConditions, tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
......
......@@ -6575,9 +6575,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
.offset = pTableScanNode->offset,
};
return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pTableScanNode->dataRequired,
pScanPhyNode->count, pScanPhyNode->reverse, pColList, pResBlock,
pScanPhyNode->node.pConditions, &interval, pTableScanNode->ratio, pTaskInfo);
return createTableScanOperatorInfo(pDataReader, pTableScanNode->scanSeq[0] > 0 ? TSDB_ORDER_ASC : TSDB_ORDER_DESC,
numOfCols, pTableScanNode->dataRequired, pTableScanNode->scanSeq[0], pTableScanNode->scanSeq[1], pColList,
pResBlock, pScanPhyNode->node.pConditions, &interval, pTableScanNode->ratio, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) {
SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode;
SSDataBlock* pResBlock = createResDataBlock(pExchange->node.pOutputDataBlockDesc);
......@@ -6725,7 +6725,7 @@ static tsdbReaderT createDataReaderImpl(STableScanPhysiNode* pTableScanNode, STa
void* readHandle, uint64_t queryId, uint64_t taskId) {
STsdbQueryCond cond = {.loadExternalRows = false};
cond.order = pTableScanNode->scan.order;
cond.order = pTableScanNode->scanSeq[0] > 0 ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
cond.numOfCols = LIST_LENGTH(pTableScanNode->scan.pScanCols);
cond.colList = taosMemoryCalloc(cond.numOfCols, sizeof(SColumnInfo));
if (cond.colList == NULL) {
......
......@@ -671,9 +671,6 @@ static int32_t jsonToName(const SJson* pJson, void* pObj) {
static const char* jkScanPhysiPlanScanCols = "ScanCols";
static const char* jkScanPhysiPlanTableId = "TableId";
static const char* jkScanPhysiPlanTableType = "TableType";
static const char* jkScanPhysiPlanScanOrder = "ScanOrder";
static const char* jkScanPhysiPlanScanCount = "ScanCount";
static const char* jkScanPhysiPlanReverseScanCount = "ReverseScanCount";
static const char* jkScanPhysiPlanTableName = "TableName";
static int32_t physiScanNodeToJson(const void* pObj, SJson* pJson) {
......@@ -689,15 +686,6 @@ static int32_t physiScanNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkScanPhysiPlanTableType, pNode->tableType);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkScanPhysiPlanScanOrder, pNode->order);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkScanPhysiPlanScanCount, pNode->count);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkScanPhysiPlanReverseScanCount, pNode->reverse);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkScanPhysiPlanTableName, nameToJson, &pNode->tableName);
}
......@@ -718,15 +706,6 @@ static int32_t jsonToPhysiScanNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetTinyIntValue(pJson, jkScanPhysiPlanTableType, &pNode->tableType);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkScanPhysiPlanScanOrder, &pNode->order);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkScanPhysiPlanScanCount, &pNode->count);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkScanPhysiPlanReverseScanCount, &pNode->reverse);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonToObject(pJson, jkScanPhysiPlanTableName, jsonToName, &pNode->tableName);
}
......@@ -742,7 +721,8 @@ static int32_t jsonToPhysiTagScanNode(const SJson* pJson, void* pObj) {
return jsonToPhysiScanNode(pJson, pObj);
}
static const char* jkTableScanPhysiPlanScanFlag = "ScanFlag";
static const char* jkTableScanPhysiPlanScanCount = "ScanCount";
static const char* jkTableScanPhysiPlanReverseScanCount = "ReverseScanCount";
static const char* jkTableScanPhysiPlanStartKey = "StartKey";
static const char* jkTableScanPhysiPlanEndKey = "EndKey";
static const char* jkTableScanPhysiPlanRatio = "Ratio";
......@@ -759,7 +739,10 @@ static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
int32_t code = physiScanNodeToJson(pObj, pJson);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkTableScanPhysiPlanScanFlag, pNode->scanFlag);
code = tjsonAddIntegerToObject(pJson, jkTableScanPhysiPlanScanCount, pNode->scanSeq[0]);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkTableScanPhysiPlanReverseScanCount, pNode->scanSeq[1]);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkTableScanPhysiPlanStartKey, pNode->scanRange.skey);
......@@ -800,7 +783,10 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) {
int32_t code = jsonToPhysiScanNode(pJson, pObj);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetUTinyIntValue(pJson, jkTableScanPhysiPlanScanFlag, &pNode->scanFlag);
code = tjsonGetUTinyIntValue(pJson, jkTableScanPhysiPlanScanCount, &pNode->scanSeq[0]);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetUTinyIntValue(pJson, jkTableScanPhysiPlanReverseScanCount, &pNode->scanSeq[1]);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBigIntValue(pJson, jkTableScanPhysiPlanStartKey, &pNode->scanRange.skey);
......
......@@ -611,6 +611,7 @@ function_expression(A) ::= function_name(B) NK_LP expression_list(C) NK_RP(D).
function_expression(A) ::= star_func(B) NK_LP star_func_para_list(C) NK_RP(D). { A = createRawExprNodeExt(pCxt, &B, &D, createFunctionNode(pCxt, &B, C)); }
function_expression(A) ::= CAST(B) NK_LP expression(C) AS type_name(D) NK_RP(E). { A = createRawExprNodeExt(pCxt, &B, &E, createCastFunctionNode(pCxt, releaseRawExprNode(pCxt, C), D)); }
function_expression(A) ::= noarg_func(B) NK_LP NK_RP(C). { A = createRawExprNodeExt(pCxt, &B, &C, createFunctionNodeNoArg(pCxt, &B)); }
//function_expression(A) ::= NOW(B). { A = createRawExprNode(pCxt, &B, createFunctionNode(pCxt, &B, NULL)); }
%type noarg_func { SToken }
%destructor noarg_func { }
......
......@@ -83,7 +83,7 @@ static EDealRes calcConstOperator(SOperatorNode** pNode, void* pContext) {
static EDealRes calcConstFunction(SFunctionNode** pNode, void* pContext) {
SFunctionNode* pFunc = *pNode;
if (!fmIsScalarFunc(pFunc->funcId)) {
if (!fmIsScalarFunc(pFunc->funcId) || fmIsUserDefinedFunc(pFunc->funcId)) {
return DEAL_RES_CONTINUE;
}
SNode* pParam = NULL;
......
......@@ -2616,37 +2616,62 @@ static int32_t translateDropComponentNode(STranslateContext* pCxt, SDropComponen
(FSerializeFunc)tSerializeSCreateDropMQSBNodeReq, &dropReq);
}
static int32_t translateCreateTopic(STranslateContext* pCxt, SCreateTopicStmt* pStmt) {
SCMCreateTopicReq createReq = {0};
static int32_t buildCreateTopicReq(STranslateContext* pCxt, SCreateTopicStmt* pStmt, SCMCreateTopicReq* pReq) {
SName name;
// tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->topicName, strlen(pStmt->topicName));
// tNameGetFullDbName(&name, pReq->name);
tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pCxt->pParseCxt->db, pStmt->topicName, &name), pReq->name);
pReq->igExists = pStmt->ignoreExists;
pReq->withTbName = pStmt->pOptions->withTable;
pReq->withSchema = pStmt->pOptions->withSchema;
pReq->withTag = pStmt->pOptions->withTag;
pReq->sql = strdup(pCxt->pParseCxt->pSql);
if (NULL == pReq->sql) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t code = TSDB_CODE_SUCCESS;
if (NULL != pStmt->pQuery) {
strcpy(pReq->subscribeDbName, ((SRealTableNode*)(((SSelectStmt*)pStmt->pQuery)->pFromTable))->table.dbName);
pCxt->pParseCxt->topicQuery = true;
int32_t code = translateQuery(pCxt, pStmt->pQuery);
code = translateQuery(pCxt, pStmt->pQuery);
if (TSDB_CODE_SUCCESS == code) {
code = nodesNodeToString(pStmt->pQuery, false, &createReq.ast, NULL);
}
if (TSDB_CODE_SUCCESS != code) {
return code;
code = nodesNodeToString(pStmt->pQuery, false, &pReq->ast, NULL);
}
} else {
strcpy(createReq.subscribeDbName, pStmt->subscribeDbName);
strcpy(pReq->subscribeDbName, pStmt->subscribeDbName);
}
createReq.sql = strdup(pCxt->pParseCxt->pSql);
if (NULL == createReq.sql) {
return TSDB_CODE_OUT_OF_MEMORY;
return code;
}
static int32_t checkCreateTopic(STranslateContext* pCxt, SCreateTopicStmt* pStmt) {
if (NULL == pStmt->pQuery) {
return TSDB_CODE_SUCCESS;
}
SName name;
// tNameSetDbName(&name, pCxt->pParseCxt->acctId, pCxt->pParseCxt->db, strlen(pCxt->pParseCxt->db));
// tNameGetFullDbName(&name, createReq.name);
tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pCxt->pParseCxt->db, pStmt->topicName, &name), createReq.name);
createReq.igExists = pStmt->ignoreExists;
createReq.withTbName = pStmt->pOptions->withTable;
createReq.withSchema = pStmt->pOptions->withSchema;
createReq.withTag = pStmt->pOptions->withTag;
if (QUERY_NODE_SELECT_STMT == nodeType(pStmt->pQuery)) {
SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery;
if (!pSelect->isDistinct && QUERY_NODE_REAL_TABLE == nodeType(pSelect->pFromTable) && NULL == pSelect->pGroupByList &&
NULL == pSelect->pLimit && NULL == pSelect->pSlimit && NULL == pSelect->pOrderByList && NULL == pSelect->pPartitionByList) {
return TSDB_CODE_SUCCESS;
}
}
int32_t code = buildCmdMsg(pCxt, TDMT_MND_CREATE_TOPIC, (FSerializeFunc)tSerializeSCMCreateTopicReq, &createReq);
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_TOPIC_QUERY);
}
static int32_t translateCreateTopic(STranslateContext* pCxt, SCreateTopicStmt* pStmt) {
SCMCreateTopicReq createReq = {0};
int32_t code = checkCreateTopic(pCxt, pStmt);
if (TSDB_CODE_SUCCESS == code) {
code = buildCreateTopicReq(pCxt, pStmt, &createReq);
}
if (TSDB_CODE_SUCCESS == code) {
code = buildCmdMsg(pCxt, TDMT_MND_CREATE_TOPIC, (FSerializeFunc)tSerializeSCMCreateTopicReq, &createReq);
}
tFreeSCMCreateTopicReq(&createReq);
return code;
}
......
......@@ -128,6 +128,8 @@ static char* getSyntaxErrFormat(int32_t errCode) {
return "soffset/offset can not be less than 0";
case TSDB_CODE_PAR_SLIMIT_LEAK_PARTITION_BY:
return "slimit/soffset only available for PARTITION BY query";
case TSDB_CODE_PAR_INVALID_TOPIC_QUERY:
return "Invalid topic query";
case TSDB_CODE_OUT_OF_MEMORY:
return "Out of memory";
default:
......
......@@ -199,7 +199,8 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
TSWAP(pScan->pMeta, pRealTable->pMeta, STableMeta*);
TSWAP(pScan->pVgroupList, pRealTable->pVgroupList, SVgroupsInfo*);
pScan->scanFlag = MAIN_SCAN;
pScan->scanSeq[0] = 1;
pScan->scanSeq[1] = 0;
pScan->scanRange = TSWINDOW_INITIALIZER;
pScan->tableName.type = TSDB_TABLE_NAME_T;
pScan->tableName.acctId = pCxt->pPlanCxt->acctId;
......
......@@ -20,11 +20,14 @@
#define OPTIMIZE_FLAG_MASK(n) (1 << n)
#define OPTIMIZE_FLAG_OSD OPTIMIZE_FLAG_MASK(0)
#define OPTIMIZE_FLAG_CPD OPTIMIZE_FLAG_MASK(1)
#define OPTIMIZE_FLAG_OPK OPTIMIZE_FLAG_MASK(2)
#define OPTIMIZE_FLAG_SET_MASK(val, mask) (val) |= (mask)
#define OPTIMIZE_FLAG_TEST_MASK(val, mask) (((val) & (mask)) != 0)
typedef struct SOptimizeContext {
SPlanContext* pPlanCxt;
bool optimized;
} SOptimizeContext;
......@@ -57,7 +60,23 @@ typedef enum ECondAction {
// after supporting outer join, there are other possibilities
} ECondAction;
EDealRes haveNormalColImpl(SNode* pNode, void* pContext) {
typedef bool (*FMayBeOptimized)(SLogicNode* pNode);
static SLogicNode* optFindPossibleNode(SLogicNode* pNode, FMayBeOptimized func) {
if (func(pNode)) {
return pNode;
}
SNode* pChild;
FOREACH(pChild, pNode->pChildren) {
SLogicNode* pScanNode = optFindPossibleNode((SLogicNode*)pChild, func);
if (NULL != pScanNode) {
return pScanNode;
}
}
return NULL;
}
EDealRes osdHaveNormalColImpl(SNode* pNode, void* pContext) {
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
*((bool*)pContext) = (COLUMN_TYPE_TAG != ((SColumnNode*)pNode)->colType);
return *((bool*)pContext) ? DEAL_RES_END : DEAL_RES_IGNORE_CHILD;
......@@ -65,9 +84,9 @@ EDealRes haveNormalColImpl(SNode* pNode, void* pContext) {
return DEAL_RES_CONTINUE;
}
static bool haveNormalCol(SNodeList* pList) {
static bool osdHaveNormalCol(SNodeList* pList) {
bool res = false;
nodesWalkExprsPostOrder(pList, haveNormalColImpl, &res);
nodesWalkExprsPostOrder(pList, osdHaveNormalColImpl, &res);
return res;
}
......@@ -89,21 +108,7 @@ static bool osdMayBeOptimized(SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode->pParent)) {
return (WINDOW_TYPE_INTERVAL == ((SWindowLogicNode*)pNode->pParent)->winType);
}
return !haveNormalCol(((SAggLogicNode*)pNode->pParent)->pGroupKeys);
}
static SLogicNode* osdFindPossibleScanNode(SLogicNode* pNode) {
if (osdMayBeOptimized(pNode)) {
return pNode;
}
SNode* pChild;
FOREACH(pChild, pNode->pChildren) {
SLogicNode* pScanNode = osdFindPossibleScanNode((SLogicNode*)pChild);
if (NULL != pScanNode) {
return pScanNode;
}
}
return NULL;
return !osdHaveNormalCol(((SAggLogicNode*)pNode->pParent)->pGroupKeys);
}
static SNodeList* osdGetAllFuncs(SLogicNode* pNode) {
......@@ -138,7 +143,7 @@ static int32_t osdGetRelatedFuncs(SScanLogicNode* pScan, SNodeList** pSdrFuncs,
}
static int32_t osdMatch(SOptimizeContext* pCxt, SLogicNode* pLogicNode, SOsdInfo* pInfo) {
pInfo->pScan = (SScanLogicNode*)osdFindPossibleScanNode(pLogicNode);
pInfo->pScan = (SScanLogicNode*)optFindPossibleNode(pLogicNode, osdMayBeOptimized);
if (NULL == pInfo->pScan) {
return TSDB_CODE_SUCCESS;
}
......@@ -345,7 +350,7 @@ static int32_t cpdCalcTimeRange(SScanLogicNode* pScan, SNode** pPrimaryKeyCond,
}
static int32_t cpdOptimizeScanCondition(SOptimizeContext* pCxt, SScanLogicNode* pScan) {
if (NULL == pScan->node.pConditions) {
if (NULL == pScan->node.pConditions || OPTIMIZE_FLAG_TEST_MASK(pScan->node.optimizedFlag, OPTIMIZE_FLAG_CPD)) {
return TSDB_CODE_SUCCESS;
}
......@@ -359,7 +364,10 @@ static int32_t cpdOptimizeScanCondition(SOptimizeContext* pCxt, SScanLogicNode*
pScan->node.pConditions = pOtherCond;
}
if (TSDB_CODE_SUCCESS != code) {
if (TSDB_CODE_SUCCESS == code) {
OPTIMIZE_FLAG_SET_MASK(pScan->node.optimizedFlag, OPTIMIZE_FLAG_CPD);
pCxt->optimized = true;
} else {
nodesDestroyNode(pPrimaryKeyCond);
nodesDestroyNode(pOtherCond);
}
......@@ -367,7 +375,7 @@ static int32_t cpdOptimizeScanCondition(SOptimizeContext* pCxt, SScanLogicNode*
return code;
}
static bool belongThisTable(SNode* pCondCol, SNodeList* pTableCols) {
static bool cpdBelongThisTable(SNode* pCondCol, SNodeList* pTableCols) {
SNode* pTableCol = NULL;
FOREACH(pTableCol, pTableCols) {
if (nodesEqualNode(pCondCol, pTableCol)) {
......@@ -380,9 +388,9 @@ static bool belongThisTable(SNode* pCondCol, SNodeList* pTableCols) {
static EDealRes cpdIsMultiTableCondImpl(SNode* pNode, void* pContext) {
SCpdIsMultiTableCondCxt* pCxt = pContext;
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
if (belongThisTable(pNode, pCxt->pLeftCols)) {
if (cpdBelongThisTable(pNode, pCxt->pLeftCols)) {
pCxt->havaLeftCol = true;
} else if (belongThisTable(pNode, pCxt->pRightCols)) {
} else if (cpdBelongThisTable(pNode, pCxt->pRightCols)) {
pCxt->haveRightCol = true;
}
return pCxt->havaLeftCol && pCxt->haveRightCol ? DEAL_RES_END : DEAL_RES_CONTINUE;
......@@ -508,11 +516,76 @@ static int32_t cpdPushCondToChild(SOptimizeContext* pCxt, SLogicNode* pChild, SN
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}
static bool cpdIsPrimaryKey(SNode* pNode, SNodeList* pTableCols) {
if (QUERY_NODE_COLUMN != nodeType(pNode)) {
return false;
}
SColumnNode* pCol = (SColumnNode*)pNode;
if (PRIMARYKEY_TIMESTAMP_COL_ID != pCol->colId) {
return false;
}
return cpdBelongThisTable(pNode, pTableCols);
}
static bool cpdIsPrimaryKeyEqualCond(SJoinLogicNode* pJoin, SNode* pCond) {
if (QUERY_NODE_OPERATOR != nodeType(pCond)) {
return false;
}
SNodeList* pLeftCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0))->pTargets;
SNodeList* pRightCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1))->pTargets;
SOperatorNode* pOper = (SOperatorNode*)pJoin->pOnConditions;
if (cpdIsPrimaryKey(pOper->pLeft, pLeftCols)) {
return cpdIsPrimaryKey(pOper->pRight, pRightCols);
} else if (cpdIsPrimaryKey(pOper->pLeft, pRightCols)) {
return cpdIsPrimaryKey(pOper->pRight, pLeftCols);
}
return false;
}
static int32_t cpdCheckOpCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin, SNode* pOnCond) {
if (!cpdIsPrimaryKeyEqualCond(pJoin, pOnCond)) {
snprintf(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, "l.ts = r.ts is expected in join expression");
return TSDB_CODE_FAILED;
}
return TSDB_CODE_SUCCESS;
}
static int32_t cpdCheckLogicCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin, SLogicConditionNode* pOnCond) {
if (LOGIC_COND_TYPE_AND != pOnCond->condType) {
snprintf(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, "l.ts = r.ts is expected in join expression");
return TSDB_CODE_FAILED;
}
SNode* pCond = NULL;
FOREACH(pCond, pOnCond->pParameterList) {
if (!cpdIsPrimaryKeyEqualCond(pJoin, pCond)) {
snprintf(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, "l.ts = r.ts is expected in join expression");
return TSDB_CODE_FAILED;
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t cpdCheckJoinOnCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) {
if (NULL == pJoin->pOnConditions) {
snprintf(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, "not support cross join");
return TSDB_CODE_FAILED;
}
if (QUERY_NODE_LOGIC_CONDITION == nodeType(pJoin->pOnConditions)) {
return cpdCheckLogicCond(pCxt, pJoin, (SLogicConditionNode*)pJoin->pOnConditions);
} else {
return cpdCheckOpCond(pCxt, pJoin, pJoin->pOnConditions);
}
}
static int32_t cpdPushJoinCondition(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) {
if (NULL == pJoin->node.pConditions) {
if (OPTIMIZE_FLAG_TEST_MASK(pJoin->node.optimizedFlag, OPTIMIZE_FLAG_CPD)) {
return TSDB_CODE_SUCCESS;
}
if (NULL == pJoin->node.pConditions) {
return cpdCheckJoinOnCond(pCxt, pJoin);
}
SNode* pOnCond = NULL;
SNode* pLeftChildCond = NULL;
SNode* pRightChildCond = NULL;
......@@ -527,7 +600,11 @@ static int32_t cpdPushJoinCondition(SOptimizeContext* pCxt, SJoinLogicNode* pJoi
code = cpdPushCondToChild(pCxt, (SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1), &pRightChildCond);
}
if (TSDB_CODE_SUCCESS != code) {
if (TSDB_CODE_SUCCESS == code) {
OPTIMIZE_FLAG_SET_MASK(pJoin->node.optimizedFlag, OPTIMIZE_FLAG_CPD);
pCxt->optimized = true;
code = cpdCheckJoinOnCond(pCxt, pJoin);
} else {
nodesDestroyNode(pOnCond);
nodesDestroyNode(pLeftChildCond);
nodesDestroyNode(pRightChildCond);
......@@ -572,15 +649,129 @@ static int32_t cpdOptimize(SOptimizeContext* pCxt, SLogicNode* pLogicNode) {
return cpdPushCondition(pCxt, pLogicNode);
}
static bool opkIsPrimaryKeyOrderBy(SNodeList* pSortKeys) {
if (1 != LIST_LENGTH(pSortKeys)) {
return false;
}
SNode* pNode = ((SOrderByExprNode*)nodesListGetNode(pSortKeys, 0))->pExpr;
return (QUERY_NODE_COLUMN == nodeType(pNode) ? (PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pNode)->colId) : false);
}
static bool opkSortMayBeOptimized(SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_SORT != nodeType(pNode)) {
return false;
}
if (OPTIMIZE_FLAG_TEST_MASK(pNode->optimizedFlag, OPTIMIZE_FLAG_OPK)) {
return false;
}
return true;
}
static int32_t opkGetScanNodesImpl(SLogicNode* pNode, bool* pNotOptimize, SNodeList** pScanNodes) {
int32_t code = TSDB_CODE_SUCCESS;
switch (nodeType(pNode)) {
case QUERY_NODE_LOGIC_PLAN_SCAN:
return nodesListMakeAppend(pScanNodes, pNode);
case QUERY_NODE_LOGIC_PLAN_JOIN:
code = opkGetScanNodesImpl(nodesListGetNode(pNode->pChildren, 0), pNotOptimize, pScanNodes);
if (TSDB_CODE_SUCCESS == code) {
code = opkGetScanNodesImpl(nodesListGetNode(pNode->pChildren, 1), pNotOptimize, pScanNodes);
}
return code;
case QUERY_NODE_LOGIC_PLAN_AGG:
*pNotOptimize = true;
return code;
default:
break;
}
if (1 != LIST_LENGTH(pNode->pChildren)) {
*pNotOptimize = true;
}
return opkGetScanNodesImpl(nodesListGetNode(pNode->pChildren, 0), pNotOptimize, pScanNodes);
}
static int32_t opkGetScanNodes(SLogicNode* pNode, SNodeList** pScanNodes) {
bool notOptimize = false;
int32_t code = opkGetScanNodesImpl(pNode, &notOptimize, pScanNodes);
if (TSDB_CODE_SUCCESS != code || notOptimize) {
nodesClearList(*pScanNodes);
}
return code;
}
static EOrder opkGetPrimaryKeyOrder(SSortLogicNode* pSort) {
return ((SOrderByExprNode*)nodesListGetNode(pSort->pSortKeys, 0))->order;
}
static SNode* opkRewriteDownNode(SSortLogicNode* pSort) {
SNode* pDownNode = nodesListGetNode(pSort->node.pChildren, 0);
// todo
pSort->node.pChildren = NULL;
return pDownNode;
}
static int32_t opkDoOptimized(SOptimizeContext* pCxt, SSortLogicNode* pSort, SNodeList* pScanNodes) {
EOrder order = opkGetPrimaryKeyOrder(pSort);
if (ORDER_DESC == order) {
SNode* pScan = NULL;
FOREACH(pScan, pScanNodes) {
((SScanLogicNode*)pScan)->scanSeq[0] = 0;
((SScanLogicNode*)pScan)->scanSeq[1] = 1;
}
}
if (NULL == pSort->node.pParent) {
// todo
return TSDB_CODE_SUCCESS;
}
SNode* pDownNode = opkRewriteDownNode(pSort);
SNode* pNode;
FOREACH(pNode, pSort->node.pParent->pChildren) {
if (nodesEqualNode(pNode, pSort)) {
REPLACE_NODE(pDownNode);
break;
}
}
nodesDestroyNode(pSort);
return TSDB_CODE_SUCCESS;
}
static int32_t opkOptimizeImpl(SOptimizeContext* pCxt, SSortLogicNode* pSort) {
OPTIMIZE_FLAG_SET_MASK(pSort->node.optimizedFlag, OPTIMIZE_FLAG_OPK);
if (!opkIsPrimaryKeyOrderBy(pSort->pSortKeys) || 1 != LIST_LENGTH(pSort->node.pChildren)) {
return TSDB_CODE_SUCCESS;
}
SNodeList* pScanNodes = NULL;
int32_t code = opkGetScanNodes(nodesListGetNode(pSort->node.pChildren, 0), &pScanNodes);
if (TSDB_CODE_SUCCESS == code && NULL != pScanNodes) {
code = opkDoOptimized(pCxt, pSort, pScanNodes);
}
nodesClearList(pScanNodes);
return code;
}
static int32_t opkOptimize(SOptimizeContext* pCxt, SLogicNode* pLogicNode) {
SSortLogicNode* pSort = (SSortLogicNode*)optFindPossibleNode(pLogicNode, opkSortMayBeOptimized);
if (NULL == pSort) {
return TSDB_CODE_SUCCESS;
}
return opkOptimizeImpl(pCxt, pSort);
}
static const SOptimizeRule optimizeRuleSet[] = {
{ .pName = "OptimizeScanData", .optimizeFunc = osdOptimize },
{ .pName = "ConditionPushDown", .optimizeFunc = cpdOptimize }
{ .pName = "ConditionPushDown", .optimizeFunc = cpdOptimize },
{ .pName = "OrderByPrimaryKey", .optimizeFunc = opkOptimize }
};
static const int32_t optimizeRuleNum = (sizeof(optimizeRuleSet) / sizeof(SOptimizeRule));
static int32_t applyOptimizeRule(SLogicNode* pLogicNode) {
SOptimizeContext cxt = { .optimized = false };
static int32_t applyOptimizeRule(SPlanContext* pCxt, SLogicNode* pLogicNode) {
SOptimizeContext cxt = { .pPlanCxt = pCxt, .optimized = false };
do {
cxt.optimized = false;
for (int32_t i = 0; i < optimizeRuleNum; ++i) {
......@@ -594,5 +785,5 @@ static int32_t applyOptimizeRule(SLogicNode* pLogicNode) {
}
int32_t optimizeLogicPlan(SPlanContext* pCxt, SLogicNode* pLogicNode) {
return applyOptimizeRule(pLogicNode);
return applyOptimizeRule(pCxt, pLogicNode);
}
......@@ -398,9 +398,6 @@ static int32_t createScanPhysiNodeFinalize(SPhysiPlanContext* pCxt, SScanLogicNo
if (TSDB_CODE_SUCCESS == code) {
pScanPhysiNode->uid = pScanLogicNode->pMeta->uid;
pScanPhysiNode->tableType = pScanLogicNode->pMeta->tableType;
pScanPhysiNode->order = TSDB_ORDER_ASC;
pScanPhysiNode->count = 1;
pScanPhysiNode->reverse = 0;
memcpy(&pScanPhysiNode->tableName, &pScanLogicNode->tableName, sizeof(SName));
}
......@@ -432,7 +429,7 @@ static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubp
return TSDB_CODE_OUT_OF_MEMORY;
}
pTableScan->scanFlag = pScanLogicNode->scanFlag;
memcpy(pTableScan->scanSeq, pScanLogicNode->scanSeq, sizeof(pScanLogicNode->scanSeq));
pTableScan->scanRange = pScanLogicNode->scanRange;
pTableScan->ratio = pScanLogicNode->ratio;
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "planTestUtil.h"
#include "planner.h"
using namespace std;
class PlanOptimizeTest : public PlannerTestBase {
};
TEST_F(PlanOptimizeTest, orderByPrimaryKey) {
useDb("root", "test");
run("select * from t1 order by ts");
run("select * from t1 order by ts desc");
run("select c1 from t1 order by ts");
run("select c1 from t1 order by ts desc");
}
......@@ -1247,26 +1247,28 @@ int32_t timeDiffFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
}
int32_t nowFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
if (inputNum != 1) {
return TSDB_CODE_FAILED;
int64_t ts = taosGetTimestamp(TSDB_TIME_PRECISION_MILLI);
for (int32_t i = 0; i < pInput->numOfRows; ++i) {
colDataAppendInt64(pOutput->columnData, i, &ts);
}
colDataAppendInt64(pOutput->columnData, pOutput->numOfRows, (int64_t *)colDataGetData(pInput->columnData, 0));
pOutput->numOfRows = pInput->numOfRows;
return TSDB_CODE_SUCCESS;
}
int32_t todayFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
if (inputNum != 1) {
return TSDB_CODE_FAILED;
int64_t ts = taosGetTimestampToday(TSDB_TIME_PRECISION_MILLI);
for (int32_t i = 0; i < pInput->numOfRows; ++i) {
colDataAppendInt64(pOutput->columnData, i, &ts);
}
colDataAppendInt64(pOutput->columnData, pOutput->numOfRows, (int64_t *)colDataGetData(pInput->columnData, 0));
pOutput->numOfRows = pInput->numOfRows;
return TSDB_CODE_SUCCESS;
}
int32_t timezoneFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
if (inputNum != 1) {
return TSDB_CODE_FAILED;
for (int32_t i = 0; i < pInput->numOfRows; ++i) {
colDataAppend(pOutput->columnData, i, tsTimezoneStr, false);
}
colDataAppend(pOutput->columnData, pOutput->numOfRows, (char *)colDataGetData(pInput->columnData, 0), false);
pOutput->numOfRows = pInput->numOfRows;
return TSDB_CODE_SUCCESS;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册