提交 ddb39a8d 编写于 作者: X Xiaoyu Wang

fix: some problems of planner

上级 570f036c
......@@ -275,7 +275,12 @@ typedef struct SScanPhysiNode {
typedef SScanPhysiNode STagScanPhysiNode;
typedef SScanPhysiNode SBlockDistScanPhysiNode;
typedef SScanPhysiNode SLastRowScanPhysiNode;
typedef struct SLastRowScanPhysiNode {
SScanPhysiNode scan;
SNodeList* pGroupTags;
bool groupSort;
} SLastRowScanPhysiNode;
typedef struct SSystemTableScanPhysiNode {
SScanPhysiNode scan;
......
......@@ -206,7 +206,7 @@ int32_t qExplainGenerateResChildren(SPhysiNode *pNode, SExplainGroup *group, SNo
}
case QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN: {
SLastRowScanPhysiNode *lastRowPhysiNode = (SLastRowScanPhysiNode *)pNode;
pPhysiChildren = lastRowPhysiNode->node.pChildren;
pPhysiChildren = lastRowPhysiNode->scan.node.pChildren;
break;
}
case QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT: {
......@@ -1209,19 +1209,19 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
}
case QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN: {
SLastRowScanPhysiNode *pLastRowNode = (SLastRowScanPhysiNode *)pNode;
EXPLAIN_ROW_NEW(level, EXPLAIN_LASTROW_SCAN_FORMAT, pLastRowNode->tableName.tname);
EXPLAIN_ROW_NEW(level, EXPLAIN_LASTROW_SCAN_FORMAT, pLastRowNode->scan.tableName.tname);
EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT);
if (pResNode->pExecInfo) {
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
}
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, pLastRowNode->pScanCols->length);
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, pLastRowNode->scan.pScanCols->length);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
if (pLastRowNode->pScanPseudoCols) {
EXPLAIN_ROW_APPEND(EXPLAIN_PSEUDO_COLUMNS_FORMAT, pLastRowNode->pScanPseudoCols->length);
if (pLastRowNode->scan.pScanPseudoCols) {
EXPLAIN_ROW_APPEND(EXPLAIN_PSEUDO_COLUMNS_FORMAT, pLastRowNode->scan.pScanPseudoCols->length);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
}
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pLastRowNode->node.pOutputDataBlockDesc->totalRowSize);
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pLastRowNode->scan.node.pOutputDataBlockDesc->totalRowSize);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
EXPLAIN_ROW_END();
......@@ -1230,15 +1230,15 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
if (verbose) {
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_OUTPUT_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT,
nodesGetOutputNumFromSlotList(pLastRowNode->node.pOutputDataBlockDesc->pSlots));
nodesGetOutputNumFromSlotList(pLastRowNode->scan.node.pOutputDataBlockDesc->pSlots));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pLastRowNode->node.pOutputDataBlockDesc->outputRowSize);
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pLastRowNode->scan.node.pOutputDataBlockDesc->outputRowSize);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
if (pLastRowNode->node.pConditions) {
if (pLastRowNode->scan.node.pConditions) {
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
QRY_ERR_RET(nodesNodeToSQL(pLastRowNode->node.pConditions, tbuf + VARSTR_HEADER_SIZE,
QRY_ERR_RET(nodesNodeToSQL(pLastRowNode->scan.node.pConditions, tbuf + VARSTR_HEADER_SIZE,
TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
......
......@@ -40,10 +40,10 @@ SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pScanNode, SRead
pInfo->pTableList = pTableList;
pInfo->readHandle = *readHandle;
pInfo->pRes = createResDataBlock(pScanNode->node.pOutputDataBlockDesc);
pInfo->pRes = createResDataBlock(pScanNode->scan.node.pOutputDataBlockDesc);
int32_t numOfCols = 0;
pInfo->pColMatchInfo = extractColMatchInfo(pScanNode->pScanCols, pScanNode->node.pOutputDataBlockDesc, &numOfCols,
pInfo->pColMatchInfo = extractColMatchInfo(pScanNode->scan.pScanCols, pScanNode->scan.node.pOutputDataBlockDesc, &numOfCols,
COL_MATCH_FROM_COL_ID);
int32_t code = extractTargetSlotId(pInfo->pColMatchInfo, pTaskInfo, &pInfo->pSlotIds);
if (code != TSDB_CODE_SUCCESS) {
......@@ -53,10 +53,10 @@ SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pScanNode, SRead
tsdbLastRowReaderOpen(readHandle->vnode, LASTROW_RETRIEVE_TYPE_SINGLE, pTableList, taosArrayGetSize(pInfo->pColMatchInfo),
&pInfo->pLastrowReader);
if (pScanNode->pScanPseudoCols != NULL) {
if (pScanNode->scan.pScanPseudoCols != NULL) {
SExprSupp* pPseudoExpr = &pInfo->pseudoExprSup;
pPseudoExpr->pExprInfo = createExprInfo(pScanNode->pScanPseudoCols, NULL, &pPseudoExpr->numOfExprs);
pPseudoExpr->pExprInfo = createExprInfo(pScanNode->scan.pScanPseudoCols, NULL, &pPseudoExpr->numOfExprs);
pPseudoExpr->pCtx = createSqlFunctionCtx(pPseudoExpr->pExprInfo, pPseudoExpr->numOfExprs, &pPseudoExpr->rowEntryInfoOffset);
}
......
......@@ -4444,21 +4444,21 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
// return NULL;
// }
int32_t code = extractTableSchemaInfo(pHandle, pScanNode->uid, pTaskInfo);
int32_t code = extractTableSchemaInfo(pHandle, pScanNode->scan.uid, pTaskInfo);
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
return NULL;
}
pTableListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo));
if (pScanNode->tableType == TSDB_SUPER_TABLE) {
code = vnodeGetAllTableList(pHandle->vnode, pScanNode->uid, pTableListInfo->pTableList);
if (pScanNode->scan.tableType == TSDB_SUPER_TABLE) {
code = vnodeGetAllTableList(pHandle->vnode, pScanNode->scan.uid, pTableListInfo->pTableList);
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = terrno;
return NULL;
}
} else { // Create one table group.
STableKeyInfo info = {.lastKey = 0, .uid = pScanNode->uid, .groupId = 0};
STableKeyInfo info = {.lastKey = 0, .uid = pScanNode->scan.uid, .groupId = 0};
taosArrayPush(pTableListInfo->pTableList, &info);
}
......
......@@ -402,7 +402,6 @@ typedef struct SGroupKeyInfo {
(x) += step; \
} while (0)
#define STATE_COMP(_op, _lval, _param) STATE_COMP_IMPL(_op, _lval, GET_STATE_VAL(_param))
#define GET_STATE_VAL(param) ((param.nType == TSDB_DATA_TYPE_BIGINT) ? (param.i) : (param.d))
......@@ -986,7 +985,7 @@ int32_t avgFunctionMerge(SqlFunctionCtx* pCtx) {
int32_t start = pInput->startRowIndex;
for(int32_t i = start; i < start + pInput->numOfRows; ++i) {
for (int32_t i = start; i < start + pInput->numOfRows; ++i) {
char* data = colDataGetData(pCol, i);
SAvgRes* pInputInfo = (SAvgRes*)varDataVal(data);
avgTransferInfo(pInputInfo, pInfo);
......@@ -2559,7 +2558,7 @@ int32_t apercentileFunctionMerge(SqlFunctionCtx* pCtx) {
int32_t start = pInput->startRowIndex;
for(int32_t i = start; i < start + pInput->numOfRows; ++i) {
for (int32_t i = start; i < start + pInput->numOfRows; ++i) {
char* data = colDataGetData(pCol, i);
SAPercentileInfo* pInputInfo = (SAPercentileInfo*)varDataVal(data);
apercentileTransferInfo(pInputInfo, pInfo);
......@@ -2925,7 +2924,7 @@ static int32_t firstLastFunctionMergeImpl(SqlFunctionCtx* pCtx, bool isFirstQuer
int32_t start = pInput->startRowIndex;
int32_t numOfElems = 0;
for(int32_t i = start; i < start + pInput->numOfRows; ++i) {
for (int32_t i = start; i < start + pInput->numOfRows; ++i) {
char* data = colDataGetData(pCol, i);
SFirstLastRes* pInputInfo = (SFirstLastRes*)varDataVal(data);
firstLastTransferInfo(pCtx, pInputInfo, pInfo, isFirstQuery);
......@@ -2951,7 +2950,7 @@ int32_t firstLastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
pResInfo->isNullRes = (pResInfo->numOfRes == 0) ? 1 : 0;
SFirstLastRes* pRes = GET_ROWCELL_INTERBUF(pResInfo);
colDataAppend(pCol, pBlock->info.rows, pRes->buf, pRes->isNull||pResInfo->isNullRes);
colDataAppend(pCol, pBlock->info.rows, pRes->buf, pRes->isNull || pResInfo->isNullRes);
// handle selectivity
STuplePos* pTuplePos = (STuplePos*)(pRes->buf + pRes->bytes + sizeof(TSKEY));
......@@ -3754,7 +3753,7 @@ int32_t spreadFunctionMerge(SqlFunctionCtx* pCtx) {
int32_t start = pInput->startRowIndex;
for(int32_t i = start; i < start + pInput->numOfRows; ++i) {
for (int32_t i = start; i < start + pInput->numOfRows; ++i) {
char* data = colDataGetData(pCol, i);
SSpreadInfo* pInputInfo = (SSpreadInfo*)varDataVal(data);
spreadTransferInfo(pInputInfo, pInfo);
......@@ -3926,7 +3925,7 @@ int32_t elapsedFunctionMerge(SqlFunctionCtx* pCtx) {
int32_t start = pInput->startRowIndex;
for(int32_t i = start; i < start + pInput->numOfRows; ++i) {
for (int32_t i = start; i < start + pInput->numOfRows; ++i) {
char* data = colDataGetData(pCol, i);
SElapsedInfo* pInputInfo = (SElapsedInfo*)varDataVal(data);
elapsedTransferInfo(pInputInfo, pInfo);
......@@ -4191,13 +4190,9 @@ static int32_t histogramFunctionImpl(SqlFunctionCtx* pCtx, bool isPartial) {
return TSDB_CODE_SUCCESS;
}
int32_t histogramFunction(SqlFunctionCtx* pCtx) {
return histogramFunctionImpl(pCtx, false);
}
int32_t histogramFunction(SqlFunctionCtx* pCtx) { return histogramFunctionImpl(pCtx, false); }
int32_t histogramFunctionPartial(SqlFunctionCtx* pCtx) {
return histogramFunctionImpl(pCtx, true);
}
int32_t histogramFunctionPartial(SqlFunctionCtx* pCtx) { return histogramFunctionImpl(pCtx, true); }
static void histogramTransferInfo(SHistoFuncInfo* pInput, SHistoFuncInfo* pOutput) {
pOutput->normalized = pInput->normalized;
......@@ -4219,7 +4214,7 @@ int32_t histogramFunctionMerge(SqlFunctionCtx* pCtx) {
int32_t start = pInput->startRowIndex;
for(int32_t i = start; i < start + pInput->numOfRows; ++i) {
for (int32_t i = start; i < start + pInput->numOfRows; ++i) {
char* data = colDataGetData(pCol, i);
SHistoFuncInfo* pInputInfo = (SHistoFuncInfo*)varDataVal(data);
histogramTransferInfo(pInputInfo, pInfo);
......@@ -4440,7 +4435,7 @@ int32_t hllFunctionMerge(SqlFunctionCtx* pCtx) {
int32_t start = pInput->startRowIndex;
for(int32_t i = start; i < start + pInput->numOfRows; ++i) {
for (int32_t i = start; i < start + pInput->numOfRows; ++i) {
char* data = colDataGetData(pCol, i);
SHLLInfo* pInputInfo = (SHLLInfo*)varDataVal(data);
hllTransferInfo(pInputInfo, pInfo);
......@@ -6010,7 +6005,6 @@ int32_t lastrowFunction(SqlFunctionCtx* pCtx) {
char* data = colDataGetData(pInputCol, i);
TSKEY cts = getRowPTs(pInput->pPTS, i);
if (pResInfo->numOfRes == 0 || pInfo->ts < cts) {
if (colDataIsNull_s(pInputCol, i)) {
pInfo->isNull = true;
} else {
......@@ -6023,7 +6017,6 @@ int32_t lastrowFunction(SqlFunctionCtx* pCtx) {
}
pInfo->ts = cts;
pInfo->hasResult = true;
pResInfo->numOfRes = 1;
if (pCtx->subsidiaries.num > 0) {
......@@ -6034,6 +6027,8 @@ int32_t lastrowFunction(SqlFunctionCtx* pCtx) {
copyTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos);
}
}
pInfo->hasResult = true;
}
}
......
......@@ -1468,9 +1468,36 @@ static int32_t jsonToPhysiScanNode(const SJson* pJson, void* pObj) {
return code;
}
static int32_t physiTagScanNodeToJson(const void* pObj, SJson* pJson) { return physiScanNodeToJson(pObj, pJson); }
static const char* jkLastRowScanPhysiPlanGroupTags = "GroupTags";
static const char* jkLastRowScanPhysiPlanGroupSort = "GroupSort";
static int32_t jsonToPhysiTagScanNode(const SJson* pJson, void* pObj) { return jsonToPhysiScanNode(pJson, pObj); }
static int32_t physiLastRowScanNodeToJson(const void* pObj, SJson* pJson) {
const SLastRowScanPhysiNode* pNode = (const SLastRowScanPhysiNode*)pObj;
int32_t code = physiScanNodeToJson(pObj, pJson);
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkLastRowScanPhysiPlanGroupTags, pNode->pGroupTags);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkLastRowScanPhysiPlanGroupSort, pNode->groupSort);
}
return code;
}
static int32_t jsonToPhysiLastRowScanNode(const SJson* pJson, void* pObj) {
SLastRowScanPhysiNode* pNode = (SLastRowScanPhysiNode*)pObj;
int32_t code = jsonToPhysiScanNode(pJson, pObj);
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkLastRowScanPhysiPlanGroupTags, &pNode->pGroupTags);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkLastRowScanPhysiPlanGroupSort, &pNode->groupSort);
}
return code;
}
static const char* jkTableScanPhysiPlanScanCount = "ScanCount";
static const char* jkTableScanPhysiPlanReverseScanCount = "ReverseScanCount";
......@@ -4315,8 +4342,9 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
return logicPlanToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
case QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN:
return physiScanNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN:
return physiTagScanNodeToJson(pObj, pJson);
return physiLastRowScanNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN:
case QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN:
......@@ -4461,8 +4489,10 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
return jsonToLogicPlan(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
case QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN:
return jsonToPhysiScanNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN:
return jsonToPhysiTagScanNode(pJson, pObj);
return jsonToPhysiLastRowScanNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN:
case QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN:
......
......@@ -481,8 +481,6 @@ static ENodeType getScanOperatorType(EScanType scanType) {
return QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN;
case SCAN_TYPE_BLOCK_INFO:
return QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN;
case SCAN_TYPE_LAST_ROW:
return QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN;
default:
break;
}
......@@ -502,6 +500,24 @@ static int32_t createSimpleScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSub
return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, pScan, pPhyNode);
}
static int32_t createLastRowScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
SPhysiNode** pPhyNode) {
SLastRowScanPhysiNode* pScan =
(SLastRowScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode, QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN);
if (NULL == pScan) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pScan->pGroupTags = nodesCloneList(pScanLogicNode->pGroupTags);
if (NULL != pScanLogicNode->pGroupTags && NULL == pScan->pGroupTags) {
nodesDestroyNode((SNode*)pScan);
return TSDB_CODE_OUT_OF_MEMORY;
}
pScan->groupSort = pScanLogicNode->groupSort;
return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode);
}
static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
SPhysiNode** pPhyNode) {
STableScanPhysiNode* pTableScan = (STableScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode,
......@@ -583,8 +599,9 @@ static int32_t createScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan,
switch (pScanLogicNode->scanType) {
case SCAN_TYPE_TAG:
case SCAN_TYPE_BLOCK_INFO:
case SCAN_TYPE_LAST_ROW:
return createSimpleScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
case SCAN_TYPE_LAST_ROW:
return createLastRowScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
case SCAN_TYPE_TABLE:
return createTableScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
case SCAN_TYPE_SYSTEM_TABLE:
......
......@@ -364,6 +364,8 @@ static int32_t stbSplCreatePartWindowNode(SWindowLogicNode* pMergeWindow, SLogic
pMergeWindow->node.pTargets = NULL;
SNodeList* pChildren = pMergeWindow->node.pChildren;
pMergeWindow->node.pChildren = NULL;
SNode* pConditions = pMergeWindow->node.pConditions;
pMergeWindow->node.pConditions = NULL;
int32_t code = TSDB_CODE_SUCCESS;
SWindowLogicNode* pPartWin = (SWindowLogicNode*)nodesCloneNode((SNode*)pMergeWindow);
......@@ -373,6 +375,7 @@ static int32_t stbSplCreatePartWindowNode(SWindowLogicNode* pMergeWindow, SLogic
if (TSDB_CODE_SUCCESS == code) {
pMergeWindow->node.pTargets = pTargets;
pMergeWindow->node.pConditions = pConditions;
pPartWin->node.pChildren = pChildren;
splSetParent((SLogicNode*)pPartWin);
code = stbSplRewriteFuns(pFunc, &pPartWin->pFuncs, &pMergeWindow->pFuncs);
......
......@@ -108,6 +108,8 @@ TEST_F(PlanBasicTest, lastRowFunc) {
run("SELECT LAST_ROW(c1) FROM st1");
run("SELECT LAST_ROW(c1) FROM st1 PARTITION BY TBNAME");
run("SELECT LAST_ROW(c1), SUM(c3) FROM t1");
}
......
......@@ -36,6 +36,8 @@ TEST_F(PlanSubqeuryTest, basic) {
run("SELECT * FROM (SELECT NOW() FROM t1)");
run("SELECT NOW() FROM (SELECT * FROM t1) ORDER BY ts");
run("SELECT * FROM (SELECT AVG(c1) a FROM st1 INTERVAL(10s)) WHERE a > 1");
}
TEST_F(PlanSubqeuryTest, doubleGroupBy) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册