未验证 提交 aefdf8bb 编写于 作者: X Xiaoyu Wang 提交者: GitHub

Merge pull request #15354 from taosdata/feature/3.0_debug_wxy

feat: super table order by primary key optimization
...@@ -104,6 +104,7 @@ typedef struct SJoinLogicNode { ...@@ -104,6 +104,7 @@ typedef struct SJoinLogicNode {
SNode* pMergeCondition; SNode* pMergeCondition;
SNode* pOnConditions; SNode* pOnConditions;
bool isSingleTableJoin; bool isSingleTableJoin;
EOrder inputTsOrder;
} SJoinLogicNode; } SJoinLogicNode;
typedef struct SAggLogicNode { typedef struct SAggLogicNode {
...@@ -201,6 +202,7 @@ typedef struct SWindowLogicNode { ...@@ -201,6 +202,7 @@ typedef struct SWindowLogicNode {
int64_t watermark; int64_t watermark;
int8_t igExpired; int8_t igExpired;
EWindowAlgorithm windowAlgo; EWindowAlgorithm windowAlgo;
EOrder inputTsOrder;
} SWindowLogicNode; } SWindowLogicNode;
typedef struct SFillLogicNode { typedef struct SFillLogicNode {
...@@ -356,15 +358,14 @@ typedef struct SInterpFuncPhysiNode { ...@@ -356,15 +358,14 @@ typedef struct SInterpFuncPhysiNode {
SNode* pTimeSeries; // SColumnNode SNode* pTimeSeries; // SColumnNode
} SInterpFuncPhysiNode; } SInterpFuncPhysiNode;
typedef struct SJoinPhysiNode { typedef struct SSortMergeJoinPhysiNode {
SPhysiNode node; SPhysiNode node;
EJoinType joinType; EJoinType joinType;
SNode* pMergeCondition; SNode* pMergeCondition;
SNode* pOnConditions; SNode* pOnConditions;
SNodeList* pTargets; SNodeList* pTargets;
} SJoinPhysiNode; EOrder inputTsOrder;
} SSortMergeJoinPhysiNode;
typedef SJoinPhysiNode SSortMergeJoinPhysiNode;
typedef struct SAggPhysiNode { typedef struct SAggPhysiNode {
SPhysiNode node; SPhysiNode node;
......
...@@ -255,6 +255,7 @@ typedef struct SSelectStmt { ...@@ -255,6 +255,7 @@ typedef struct SSelectStmt {
int32_t selectFuncNum; int32_t selectFuncNum;
bool isEmptyResult; bool isEmptyResult;
bool isTimeLineResult; bool isTimeLineResult;
bool isSubquery;
bool hasAggFuncs; bool hasAggFuncs;
bool hasRepeatScanFuncs; bool hasRepeatScanFuncs;
bool hasIndefiniteRowsFunc; bool hasIndefiniteRowsFunc;
......
...@@ -135,7 +135,7 @@ int32_t qExplainGenerateResChildren(SPhysiNode *pNode, SExplainGroup *group, SNo ...@@ -135,7 +135,7 @@ int32_t qExplainGenerateResChildren(SPhysiNode *pNode, SExplainGroup *group, SNo
break; break;
} }
case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: { case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: {
SJoinPhysiNode *pJoinNode = (SJoinPhysiNode *)pNode; SSortMergeJoinPhysiNode *pJoinNode = (SSortMergeJoinPhysiNode *)pNode;
pPhysiChildren = pJoinNode->node.pChildren; pPhysiChildren = pJoinNode->node.pChildren;
break; break;
} }
...@@ -434,7 +434,8 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i ...@@ -434,7 +434,8 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: { case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: {
STableScanPhysiNode *pTblScanNode = (STableScanPhysiNode *)pNode; STableScanPhysiNode *pTblScanNode = (STableScanPhysiNode *)pNode;
EXPLAIN_ROW_NEW(level, EXPLAIN_ROW_NEW(level,
QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == pNode->type ? EXPLAIN_TBL_MERGE_SCAN_FORMAT : EXPLAIN_TBL_SCAN_FORMAT, QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == pNode->type ? EXPLAIN_TBL_MERGE_SCAN_FORMAT
: EXPLAIN_TBL_SCAN_FORMAT,
pTblScanNode->scan.tableName.tname); pTblScanNode->scan.tableName.tname);
EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT); EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT);
if (pResNode->pExecInfo) { if (pResNode->pExecInfo) {
...@@ -613,7 +614,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i ...@@ -613,7 +614,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
break; break;
} }
case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: { case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: {
SJoinPhysiNode *pJoinNode = (SJoinPhysiNode *)pNode; SSortMergeJoinPhysiNode *pJoinNode = (SSortMergeJoinPhysiNode *)pNode;
EXPLAIN_ROW_NEW(level, EXPLAIN_JOIN_FORMAT, EXPLAIN_JOIN_STRING(pJoinNode->joinType)); EXPLAIN_ROW_NEW(level, EXPLAIN_JOIN_FORMAT, EXPLAIN_JOIN_STRING(pJoinNode->joinType));
EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT); EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT);
if (pResNode->pExecInfo) { if (pResNode->pExecInfo) {
......
...@@ -320,6 +320,47 @@ typedef struct STableScanInfo { ...@@ -320,6 +320,47 @@ typedef struct STableScanInfo {
int8_t noTable; int8_t noTable;
} STableScanInfo; } STableScanInfo;
typedef struct STableMergeScanInfo {
STableListInfo* tableListInfo;
int32_t tableStartIndex;
int32_t tableEndIndex;
bool hasGroupId;
uint64_t groupId;
SArray* dataReaders; // array of tsdbReaderT*
SReadHandle readHandle;
int32_t bufPageSize;
uint32_t sortBufSize; // max buffer size for in-memory sort
SArray* pSortInfo;
SSortHandle* pSortHandle;
SSDataBlock* pSortInputBlock;
int64_t startTs; // sort start time
SArray* sortSourceParams;
SFileBlockLoadRecorder readRecorder;
int64_t numOfRows;
SScanInfo scanInfo;
int32_t scanTimes;
SNode* pFilterNode; // filter info, which is push down by optimizer
SqlFunctionCtx* pCtx; // which belongs to the direct upstream operator operator query context
SResultRowInfo* pResultRowInfo;
int32_t* rowEntryInfoOffset;
SExprInfo* pExpr;
SSDataBlock* pResBlock;
SArray* pColMatchInfo;
int32_t numOfOutput;
SExprSupp pseudoSup;
SQueryTableDataCond cond;
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
int32_t dataBlockLoadFlag;
// if the upstream is an interval operator, the interval info is also kept here to get the time
// window to check if current data block needs to be loaded.
SInterval interval;
SSampleExecInfo sample; // sample execution info
} STableMergeScanInfo;
typedef struct STagScanInfo { typedef struct STagScanInfo {
SColumnInfo *pCols; SColumnInfo *pCols;
SSDataBlock *pRes; SSDataBlock *pRes;
...@@ -886,7 +927,7 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition ...@@ -886,7 +927,7 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition
SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SJoinPhysiNode* pJoinNode, SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SSortMergeJoinPhysiNode* pJoinNode,
SExecTaskInfo* pTaskInfo); SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream,
......
...@@ -1325,7 +1325,7 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, const SArray* pColM ...@@ -1325,7 +1325,7 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, const SArray* pColM
extractQualifiedTupleByFilterResult(pBlock, rowRes, keep); extractQualifiedTupleByFilterResult(pBlock, rowRes, keep);
if (pColMatchInfo != NULL) { if (pColMatchInfo != NULL) {
for(int32_t i = 0; i < taosArrayGetSize(pColMatchInfo); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pColMatchInfo); ++i) {
SColMatchInfo* pInfo = taosArrayGet(pColMatchInfo, i); SColMatchInfo* pInfo = taosArrayGet(pColMatchInfo, i);
if (pInfo->colId == PRIMARYKEY_TIMESTAMP_COL_ID) { if (pInfo->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pInfo->targetSlotId); SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pInfo->targetSlotId);
...@@ -1646,10 +1646,10 @@ void queryCostStatis(SExecTaskInfo* pTaskInfo) { ...@@ -1646,10 +1646,10 @@ void queryCostStatis(SExecTaskInfo* pTaskInfo) {
SFileBlockLoadRecorder* pRecorder = pSummary->pRecoder; SFileBlockLoadRecorder* pRecorder = pSummary->pRecoder;
if (pSummary->pRecoder != NULL) { if (pSummary->pRecoder != NULL) {
qDebug( qDebug(
"%s :cost summary: elapsed time:%.2f ms, total blocks:%d, load block SMA:%d, load data block:%d, total rows:%" "%s :cost summary: elapsed time:%.2f ms, total blocks:%d, load block SMA:%d, load data block:%d, total "
PRId64 ", check rows:%" PRId64, GET_TASKID(pTaskInfo), pSummary->elapsedTime / 1000.0, "rows:%" PRId64 ", check rows:%" PRId64,
pRecorder->totalBlocks, pRecorder->loadBlockStatis, pRecorder->loadBlocks, pRecorder->totalRows, GET_TASKID(pTaskInfo), pSummary->elapsedTime / 1000.0, pRecorder->totalBlocks, pRecorder->loadBlockStatis,
pRecorder->totalCheckedRows); pRecorder->loadBlocks, pRecorder->totalRows, pRecorder->totalCheckedRows);
} }
// qDebug("QInfo:0x%"PRIx64" :cost summary: winResPool size:%.2f Kb, numOfWin:%"PRId64", tableInfoSize:%.2f Kb, // qDebug("QInfo:0x%"PRIx64" :cost summary: winResPool size:%.2f Kb, numOfWin:%"PRId64", tableInfoSize:%.2f Kb,
...@@ -2783,11 +2783,16 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan ...@@ -2783,11 +2783,16 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan
*order = TSDB_ORDER_ASC; *order = TSDB_ORDER_ASC;
*scanFlag = MAIN_SCAN; *scanFlag = MAIN_SCAN;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN) { } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
STableScanInfo* pTableScanInfo = pOperator->info; STableScanInfo* pTableScanInfo = pOperator->info;
*order = pTableScanInfo->cond.order; *order = pTableScanInfo->cond.order;
*scanFlag = pTableScanInfo->scanFlag; *scanFlag = pTableScanInfo->scanFlag;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN) {
STableMergeScanInfo* pTableScanInfo = pOperator->info;
*order = pTableScanInfo->cond.order;
*scanFlag = pTableScanInfo->scanFlag;
return TSDB_CODE_SUCCESS;
} else { } else {
if (pOperator->pDownstream == NULL || pOperator->pDownstream[0] == NULL) { if (pOperator->pDownstream == NULL || pOperator->pDownstream[0] == NULL) {
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
...@@ -3728,7 +3733,7 @@ SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) { ...@@ -3728,7 +3733,7 @@ SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) {
} }
// this the tags and pseudo function columns, we only keep the tag columns // this the tags and pseudo function columns, we only keep the tag columns
for(int32_t i = 0; i < numOfTags; ++i) { for (int32_t i = 0; i < numOfTags; ++i) {
STargetNode* pNode = (STargetNode*)nodesListGetNode(pScanNode->pScanPseudoCols, i); STargetNode* pNode = (STargetNode*)nodesListGetNode(pScanNode->pScanPseudoCols, i);
int32_t type = nodeType(pNode->pExpr); int32_t type = nodeType(pNode->pExpr);
...@@ -4165,7 +4170,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4165,7 +4170,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE == type) {
pOptr = createStreamStateAggOperatorInfo(ops[0], pPhyNode, pTaskInfo); pOptr = createStreamStateAggOperatorInfo(ops[0], pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN == type) {
pOptr = createMergeJoinOperatorInfo(ops, size, (SJoinPhysiNode*)pPhyNode, pTaskInfo); pOptr = createMergeJoinOperatorInfo(ops, size, (SSortMergeJoinPhysiNode*)pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_FILL == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_FILL == type) {
pOptr = createFillOperatorInfo(ops[0], (SFillPhysiNode*)pPhyNode, pTaskInfo); pOptr = createFillOperatorInfo(ops[0], (SFillPhysiNode*)pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC == type) {
......
...@@ -28,8 +28,8 @@ static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator); ...@@ -28,8 +28,8 @@ static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator);
static void destroyMergeJoinOperator(void* param, int32_t numOfOutput); static void destroyMergeJoinOperator(void* param, int32_t numOfOutput);
static void extractTimeCondition(SJoinOperatorInfo* Info, SLogicConditionNode* pLogicConditionNode); static void extractTimeCondition(SJoinOperatorInfo* Info, SLogicConditionNode* pLogicConditionNode);
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SJoinPhysiNode* pJoinNode, SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
SExecTaskInfo* pTaskInfo) { SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo) {
SJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SJoinOperatorInfo)); SJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SJoinOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pOperator == NULL || pInfo == NULL) { if (pOperator == NULL || pInfo == NULL) {
......
...@@ -274,7 +274,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca ...@@ -274,7 +274,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
} else { } else {
qDebug("%s data block filter out, elapsed time:%"PRId64, GET_TASKID(pTaskInfo), (et - st)); qDebug("%s data block filter out, elapsed time:%" PRId64, GET_TASKID(pTaskInfo), (et - st));
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -1840,9 +1840,12 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) { ...@@ -1840,9 +1840,12 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
char tagTypeStr[VARSTR_HEADER_SIZE + 32]; char tagTypeStr[VARSTR_HEADER_SIZE + 32];
int tagTypeLen = sprintf(varDataVal(tagTypeStr), "%s", tDataTypes[tagType].name); int tagTypeLen = sprintf(varDataVal(tagTypeStr), "%s", tDataTypes[tagType].name);
if (tagType == TSDB_DATA_TYPE_VARCHAR) { if (tagType == TSDB_DATA_TYPE_VARCHAR) {
tagTypeLen += sprintf(varDataVal(tagTypeStr) + tagTypeLen, "(%d)", (int32_t)(smr.me.stbEntry.schemaTag.pSchema[i].bytes - VARSTR_HEADER_SIZE)); tagTypeLen += sprintf(varDataVal(tagTypeStr) + tagTypeLen, "(%d)",
(int32_t)(smr.me.stbEntry.schemaTag.pSchema[i].bytes - VARSTR_HEADER_SIZE));
} else if (tagType == TSDB_DATA_TYPE_NCHAR) { } else if (tagType == TSDB_DATA_TYPE_NCHAR) {
tagTypeLen += sprintf(varDataVal(tagTypeStr) + tagTypeLen, "(%d)", (int32_t)((smr.me.stbEntry.schemaTag.pSchema[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE)); tagTypeLen +=
sprintf(varDataVal(tagTypeStr) + tagTypeLen, "(%d)",
(int32_t)((smr.me.stbEntry.schemaTag.pSchema[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE));
} }
varDataSetLen(tagTypeStr, tagTypeLen); varDataSetLen(tagTypeStr, tagTypeLen);
colDataAppend(pColInfoData, numOfRows, (char*)tagTypeStr, false); colDataAppend(pColInfoData, numOfRows, (char*)tagTypeStr, false);
...@@ -2527,49 +2530,6 @@ _error: ...@@ -2527,49 +2530,6 @@ _error:
return NULL; return NULL;
} }
typedef struct STableMergeScanInfo {
STableListInfo* tableListInfo;
int32_t tableStartIndex;
int32_t tableEndIndex;
bool hasGroupId;
uint64_t groupId;
SArray* dataReaders; // array of tsdbReaderT*
SReadHandle readHandle;
int32_t bufPageSize;
uint32_t sortBufSize; // max buffer size for in-memory sort
SArray* pSortInfo;
SSortHandle* pSortHandle;
SSDataBlock* pSortInputBlock;
int64_t startTs; // sort start time
SArray* sortSourceParams;
SFileBlockLoadRecorder readRecorder;
int64_t numOfRows;
SScanInfo scanInfo;
int32_t scanTimes;
SNode* pFilterNode; // filter info, which is push down by optimizer
SqlFunctionCtx* pCtx; // which belongs to the direct upstream operator operator query context
SResultRowInfo* pResultRowInfo;
int32_t* rowEntryInfoOffset;
SExprInfo* pExpr;
SSDataBlock* pResBlock;
SArray* pColMatchInfo;
int32_t numOfOutput;
SExprInfo* pPseudoExpr;
int32_t numOfPseudoExpr;
SqlFunctionCtx* pPseudoCtx;
SQueryTableDataCond cond;
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
int32_t dataBlockLoadFlag;
// if the upstream is an interval operator, the interval info is also kept here to get the time
// window to check if current data block needs to be loaded.
SInterval interval;
SSampleExecInfo sample; // sample execution info
} STableMergeScanInfo;
int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle, int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle,
STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond, STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond,
const char* idStr) { const char* idStr) {
...@@ -2700,9 +2660,9 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc ...@@ -2700,9 +2660,9 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc
relocateColumnData(pBlock, pTableScanInfo->pColMatchInfo, pCols, true); relocateColumnData(pBlock, pTableScanInfo->pColMatchInfo, pCols, true);
// currently only the tbname pseudo column // currently only the tbname pseudo column
if (pTableScanInfo->numOfPseudoExpr > 0) { if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pPseudoExpr, int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pseudoSup.pExprInfo,
pTableScanInfo->numOfPseudoExpr, pBlock, GET_TASKID(pTaskInfo)); pTableScanInfo->pseudoSup.numOfExprs, pBlock, GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code); longjmp(pTaskInfo->env, code);
} }
...@@ -2869,29 +2829,31 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) { ...@@ -2869,29 +2829,31 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) {
STableMergeScanInfo* pInfo = pOperator->info; STableMergeScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
tsortDestroySortHandle(pInfo->pSortHandle); size_t numReaders = taosArrayGetSize(pInfo->dataReaders);
for (int32_t i = 0; i < numReaders; ++i) {
STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i);
blockDataDestroy(param->inputBlock);
}
taosArrayClear(pInfo->sortSourceParams); taosArrayClear(pInfo->sortSourceParams);
for (int32_t i = 0; i < taosArrayGetSize(pInfo->dataReaders); ++i) { tsortDestroySortHandle(pInfo->pSortHandle);
for (int32_t i = 0; i < numReaders; ++i) {
STsdbReader* reader = taosArrayGetP(pInfo->dataReaders, i); STsdbReader* reader = taosArrayGetP(pInfo->dataReaders, i);
tsdbReaderClose(reader); tsdbReaderClose(reader);
} }
taosArrayDestroy(pInfo->dataReaders); taosArrayDestroy(pInfo->dataReaders);
pInfo->dataReaders = NULL; pInfo->dataReaders = NULL;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, int32_t capacity, SOperatorInfo* pOperator) { SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock* pResBlock, int32_t capacity, SOperatorInfo* pOperator) {
STableMergeScanInfo* pInfo = pOperator->info; STableMergeScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SSDataBlock* p = tsortGetSortedDataBlock(pHandle); blockDataCleanup(pResBlock);
if (p == NULL) { blockDataEnsureCapacity(pResBlock, capacity);
return NULL;
}
blockDataEnsureCapacity(p, capacity);
while (1) { while (1) {
STupleHandle* pTupleHandle = tsortNextTuple(pHandle); STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
...@@ -2899,14 +2861,15 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, int32_t capa ...@@ -2899,14 +2861,15 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, int32_t capa
break; break;
} }
appendOneRowToDataBlock(p, pTupleHandle); appendOneRowToDataBlock(pResBlock, pTupleHandle);
if (p->info.rows >= capacity) { if (pResBlock->info.rows >= capacity) {
break; break;
} }
} }
qDebug("%s get sorted row blocks, rows:%d", GET_TASKID(pTaskInfo), p->info.rows);
return (p->info.rows > 0) ? p : NULL; qDebug("%s get sorted row blocks, rows:%d", GET_TASKID(pTaskInfo), pResBlock->info.rows);
return (pResBlock->info.rows > 0) ? pResBlock : NULL;
} }
SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
...@@ -2935,7 +2898,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { ...@@ -2935,7 +2898,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
} }
SSDataBlock* pBlock = NULL; SSDataBlock* pBlock = NULL;
while (pInfo->tableStartIndex < tableListSize) { while (pInfo->tableStartIndex < tableListSize) {
pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pOperator->resultInfo.capacity, pOperator); pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pInfo->pResBlock, pOperator->resultInfo.capacity, pOperator);
if (pBlock != NULL) { if (pBlock != NULL) {
pBlock->info.groupId = pInfo->groupId; pBlock->info.groupId = pInfo->groupId;
pOperator->resultInfo.totalRows += pBlock->info.rows; pOperator->resultInfo.totalRows += pBlock->info.rows;
...@@ -2959,6 +2922,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { ...@@ -2959,6 +2922,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
void destroyTableMergeScanOperatorInfo(void* param, int32_t numOfOutput) { void destroyTableMergeScanOperatorInfo(void* param, int32_t numOfOutput) {
STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param; STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param;
cleanupQueryTableDataCond(&pTableScanInfo->cond); cleanupQueryTableDataCond(&pTableScanInfo->cond);
taosArrayDestroy(pTableScanInfo->sortSourceParams);
for (int32_t i = 0; i < taosArrayGetSize(pTableScanInfo->dataReaders); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pTableScanInfo->dataReaders); ++i) {
STsdbReader* reader = taosArrayGetP(pTableScanInfo->dataReaders, i); STsdbReader* reader = taosArrayGetP(pTableScanInfo->dataReaders, i);
...@@ -2974,7 +2938,9 @@ void destroyTableMergeScanOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -2974,7 +2938,9 @@ void destroyTableMergeScanOperatorInfo(void* param, int32_t numOfOutput) {
pTableScanInfo->pSortInputBlock = blockDataDestroy(pTableScanInfo->pSortInputBlock); pTableScanInfo->pSortInputBlock = blockDataDestroy(pTableScanInfo->pSortInputBlock);
taosArrayDestroy(pTableScanInfo->pSortInfo); taosArrayDestroy(pTableScanInfo->pSortInfo);
cleanupExprSupp(&pTableScanInfo->pseudoSup);
taosMemoryFreeClear(pTableScanInfo->rowEntryInfoOffset);
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
} }
...@@ -3031,8 +2997,9 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN ...@@ -3031,8 +2997,9 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
} }
if (pTableScanNode->scan.pScanPseudoCols != NULL) { if (pTableScanNode->scan.pScanPseudoCols != NULL) {
pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr); SExprSupp* pSup = &pInfo->pseudoSup;
pInfo->pPseudoCtx = createSqlFunctionCtx(pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, &pInfo->rowEntryInfoOffset); pSup->pExprInfo = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pSup->numOfExprs);
pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset);
} }
pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]}; pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};
......
...@@ -375,6 +375,7 @@ static int32_t logicJoinCopy(const SJoinLogicNode* pSrc, SJoinLogicNode* pDst) { ...@@ -375,6 +375,7 @@ static int32_t logicJoinCopy(const SJoinLogicNode* pSrc, SJoinLogicNode* pDst) {
CLONE_NODE_FIELD(pMergeCondition); CLONE_NODE_FIELD(pMergeCondition);
CLONE_NODE_FIELD(pOnConditions); CLONE_NODE_FIELD(pOnConditions);
COPY_SCALAR_FIELD(isSingleTableJoin); COPY_SCALAR_FIELD(isSingleTableJoin);
COPY_SCALAR_FIELD(inputTsOrder);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -440,6 +441,7 @@ static int32_t logicWindowCopy(const SWindowLogicNode* pSrc, SWindowLogicNode* p ...@@ -440,6 +441,7 @@ static int32_t logicWindowCopy(const SWindowLogicNode* pSrc, SWindowLogicNode* p
COPY_SCALAR_FIELD(watermark); COPY_SCALAR_FIELD(watermark);
COPY_SCALAR_FIELD(igExpired); COPY_SCALAR_FIELD(igExpired);
COPY_SCALAR_FIELD(windowAlgo); COPY_SCALAR_FIELD(windowAlgo);
COPY_SCALAR_FIELD(inputTsOrder);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -1717,7 +1717,7 @@ static const char* jkJoinPhysiPlanOnConditions = "OnConditions"; ...@@ -1717,7 +1717,7 @@ static const char* jkJoinPhysiPlanOnConditions = "OnConditions";
static const char* jkJoinPhysiPlanTargets = "Targets"; static const char* jkJoinPhysiPlanTargets = "Targets";
static int32_t physiJoinNodeToJson(const void* pObj, SJson* pJson) { static int32_t physiJoinNodeToJson(const void* pObj, SJson* pJson) {
const SJoinPhysiNode* pNode = (const SJoinPhysiNode*)pObj; const SSortMergeJoinPhysiNode* pNode = (const SSortMergeJoinPhysiNode*)pObj;
int32_t code = physicPlanNodeToJson(pObj, pJson); int32_t code = physicPlanNodeToJson(pObj, pJson);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
...@@ -1737,7 +1737,7 @@ static int32_t physiJoinNodeToJson(const void* pObj, SJson* pJson) { ...@@ -1737,7 +1737,7 @@ static int32_t physiJoinNodeToJson(const void* pObj, SJson* pJson) {
} }
static int32_t jsonToPhysiJoinNode(const SJson* pJson, void* pObj) { static int32_t jsonToPhysiJoinNode(const SJson* pJson, void* pObj) {
SJoinPhysiNode* pNode = (SJoinPhysiNode*)pObj; SSortMergeJoinPhysiNode* pNode = (SSortMergeJoinPhysiNode*)pObj;
int32_t code = jsonToPhysicPlanNode(pJson, pObj); int32_t code = jsonToPhysicPlanNode(pJson, pObj);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
......
...@@ -468,7 +468,7 @@ static EDealRes dispatchPhysiPlan(SNode* pNode, ETraversalOrder order, FNodeWalk ...@@ -468,7 +468,7 @@ static EDealRes dispatchPhysiPlan(SNode* pNode, ETraversalOrder order, FNodeWalk
break; break;
} }
case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: { case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: {
SJoinPhysiNode* pJoin = (SJoinPhysiNode*)pNode; SSortMergeJoinPhysiNode* pJoin = (SSortMergeJoinPhysiNode*)pNode;
res = walkPhysiNode((SPhysiNode*)pNode, order, walker, pContext); res = walkPhysiNode((SPhysiNode*)pNode, order, walker, pContext);
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
res = walkPhysiPlan(pJoin->pMergeCondition, order, walker, pContext); res = walkPhysiPlan(pJoin->pMergeCondition, order, walker, pContext);
......
...@@ -287,7 +287,7 @@ SNode* nodesMakeNode(ENodeType type) { ...@@ -287,7 +287,7 @@ SNode* nodesMakeNode(ENodeType type) {
case QUERY_NODE_PHYSICAL_PLAN_PROJECT: case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
return makeNode(type, sizeof(SProjectPhysiNode)); return makeNode(type, sizeof(SProjectPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN:
return makeNode(type, sizeof(SJoinPhysiNode)); return makeNode(type, sizeof(SSortMergeJoinPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG: case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG:
return makeNode(type, sizeof(SAggPhysiNode)); return makeNode(type, sizeof(SAggPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE: case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE:
...@@ -883,7 +883,7 @@ void nodesDestroyNode(SNode* pNode) { ...@@ -883,7 +883,7 @@ void nodesDestroyNode(SNode* pNode) {
break; break;
} }
case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: { case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: {
SJoinPhysiNode* pPhyNode = (SJoinPhysiNode*)pNode; SSortMergeJoinPhysiNode* pPhyNode = (SSortMergeJoinPhysiNode*)pNode;
destroyPhysiNode((SPhysiNode*)pPhyNode); destroyPhysiNode((SPhysiNode*)pPhyNode);
nodesDestroyNode(pPhyNode->pMergeCondition); nodesDestroyNode(pPhyNode->pMergeCondition);
nodesDestroyNode(pPhyNode->pOnConditions); nodesDestroyNode(pPhyNode->pOnConditions);
......
...@@ -90,7 +90,7 @@ SNode* createValueNode(SAstCreateContext* pCxt, int32_t dataType, const SToken* ...@@ -90,7 +90,7 @@ SNode* createValueNode(SAstCreateContext* pCxt, int32_t dataType, const SToken*
SNode* createDurationValueNode(SAstCreateContext* pCxt, const SToken* pLiteral); SNode* createDurationValueNode(SAstCreateContext* pCxt, const SToken* pLiteral);
SNode* createDefaultDatabaseCondValue(SAstCreateContext* pCxt); SNode* createDefaultDatabaseCondValue(SAstCreateContext* pCxt);
SNode* createPlaceholderValueNode(SAstCreateContext* pCxt, const SToken* pLiteral); SNode* createPlaceholderValueNode(SAstCreateContext* pCxt, const SToken* pLiteral);
SNode* setProjectionAlias(SAstCreateContext* pCxt, SNode* pNode, const SToken* pAlias); SNode* setProjectionAlias(SAstCreateContext* pCxt, SNode* pNode, SToken* pAlias);
SNode* createLogicConditionNode(SAstCreateContext* pCxt, ELogicConditionType type, SNode* pParam1, SNode* pParam2); SNode* createLogicConditionNode(SAstCreateContext* pCxt, ELogicConditionType type, SNode* pParam1, SNode* pParam2);
SNode* createOperatorNode(SAstCreateContext* pCxt, EOperatorType type, SNode* pLeft, SNode* pRight); SNode* createOperatorNode(SAstCreateContext* pCxt, EOperatorType type, SNode* pLeft, SNode* pRight);
SNode* createBetweenAnd(SAstCreateContext* pCxt, SNode* pExpr, SNode* pLeft, SNode* pRight); SNode* createBetweenAnd(SAstCreateContext* pCxt, SNode* pExpr, SNode* pLeft, SNode* pRight);
......
...@@ -527,6 +527,7 @@ SNode* createTempTableNode(SAstCreateContext* pCxt, SNode* pSubquery, const STok ...@@ -527,6 +527,7 @@ SNode* createTempTableNode(SAstCreateContext* pCxt, SNode* pSubquery, const STok
} }
if (QUERY_NODE_SELECT_STMT == nodeType(pSubquery)) { if (QUERY_NODE_SELECT_STMT == nodeType(pSubquery)) {
strcpy(((SSelectStmt*)pSubquery)->stmtName, tempTable->table.tableAlias); strcpy(((SSelectStmt*)pSubquery)->stmtName, tempTable->table.tableAlias);
((SSelectStmt*)pSubquery)->isSubquery = true;
} else if (QUERY_NODE_SET_OPERATOR == nodeType(pSubquery)) { } else if (QUERY_NODE_SET_OPERATOR == nodeType(pSubquery)) {
strcpy(((SSetOperator*)pSubquery)->stmtName, tempTable->table.tableAlias); strcpy(((SSetOperator*)pSubquery)->stmtName, tempTable->table.tableAlias);
} }
...@@ -637,8 +638,9 @@ SNode* createInterpTimeRange(SAstCreateContext* pCxt, SNode* pStart, SNode* pEnd ...@@ -637,8 +638,9 @@ SNode* createInterpTimeRange(SAstCreateContext* pCxt, SNode* pStart, SNode* pEnd
return createBetweenAnd(pCxt, createPrimaryKeyCol(pCxt), pStart, pEnd); return createBetweenAnd(pCxt, createPrimaryKeyCol(pCxt), pStart, pEnd);
} }
SNode* setProjectionAlias(SAstCreateContext* pCxt, SNode* pNode, const SToken* pAlias) { SNode* setProjectionAlias(SAstCreateContext* pCxt, SNode* pNode, SToken* pAlias) {
CHECK_PARSER_STATUS(pCxt); CHECK_PARSER_STATUS(pCxt);
trimEscape(pAlias);
int32_t len = TMIN(sizeof(((SExprNode*)pNode)->aliasName) - 1, pAlias->n); int32_t len = TMIN(sizeof(((SExprNode*)pNode)->aliasName) - 1, pAlias->n);
strncpy(((SExprNode*)pNode)->aliasName, pAlias->z, len); strncpy(((SExprNode*)pNode)->aliasName, pAlias->z, len);
((SExprNode*)pNode)->aliasName[len] = '\0'; ((SExprNode*)pNode)->aliasName[len] = '\0';
......
...@@ -739,12 +739,13 @@ static int32_t parseBoundColumns(SInsertParseContext* pCxt, SParsedDataColInfo* ...@@ -739,12 +739,13 @@ static int32_t parseBoundColumns(SInsertParseContext* pCxt, SParsedDataColInfo*
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void buildCreateTbReq(SVCreateTbReq* pTbReq, const char* tname, STag* pTag, int64_t suid, const char* sname, SArray* tagName, uint8_t tagNum) { static void buildCreateTbReq(SVCreateTbReq* pTbReq, const char* tname, STag* pTag, int64_t suid, const char* sname,
SArray* tagName, uint8_t tagNum) {
pTbReq->type = TD_CHILD_TABLE; pTbReq->type = TD_CHILD_TABLE;
pTbReq->name = strdup(tname); pTbReq->name = strdup(tname);
pTbReq->ctb.suid = suid; pTbReq->ctb.suid = suid;
pTbReq->ctb.tagNum = tagNum; pTbReq->ctb.tagNum = tagNum;
if(sname) pTbReq->ctb.name = strdup(sname); if (sname) pTbReq->ctb.name = strdup(sname);
pTbReq->ctb.pTag = (uint8_t*)pTag; pTbReq->ctb.pTag = (uint8_t*)pTag;
pTbReq->ctb.tagName = taosArrayDup(tagName); pTbReq->ctb.tagName = taosArrayDup(tagName);
pTbReq->commentLen = -1; pTbReq->commentLen = -1;
...@@ -1012,7 +1013,8 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pSchema, uint ...@@ -1012,7 +1013,8 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pSchema, uint
goto end; goto end;
} }
buildCreateTbReq(&pCxt->createTblReq, tName, pTag, pCxt->pTableMeta->suid, pCxt->sTableName, tagName, pCxt->pTableMeta->tableInfo.numOfTags); buildCreateTbReq(&pCxt->createTblReq, tName, pTag, pCxt->pTableMeta->suid, pCxt->sTableName, tagName,
pCxt->pTableMeta->tableInfo.numOfTags);
end: end:
for (int i = 0; i < taosArrayGetSize(pTagVals); ++i) { for (int i = 0; i < taosArrayGetSize(pTagVals); ++i) {
...@@ -1650,7 +1652,6 @@ static int32_t skipUsingClause(SInsertParseSyntaxCxt* pCxt) { ...@@ -1650,7 +1652,6 @@ static int32_t skipUsingClause(SInsertParseSyntaxCxt* pCxt) {
static int32_t collectTableMetaKey(SInsertParseSyntaxCxt* pCxt, SToken* pTbToken) { static int32_t collectTableMetaKey(SInsertParseSyntaxCxt* pCxt, SToken* pTbToken) {
SName name; SName name;
CHECK_CODE(createSName(&name, pTbToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg)); CHECK_CODE(createSName(&name, pTbToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg));
CHECK_CODE(reserveDbCfgInCache(pCxt->pComCxt->acctId, name.dbname, pCxt->pMetaCache));
CHECK_CODE(reserveUserAuthInCacheExt(pCxt->pComCxt->pUser, &name, AUTH_TYPE_WRITE, pCxt->pMetaCache)); CHECK_CODE(reserveUserAuthInCacheExt(pCxt->pComCxt->pUser, &name, AUTH_TYPE_WRITE, pCxt->pMetaCache));
CHECK_CODE(reserveTableMetaInCacheExt(&name, pCxt->pMetaCache)); CHECK_CODE(reserveTableMetaInCacheExt(&name, pCxt->pMetaCache));
CHECK_CODE(reserveTableVgroupInCacheExt(&name, pCxt->pMetaCache)); CHECK_CODE(reserveTableVgroupInCacheExt(&name, pCxt->pMetaCache));
...@@ -2332,7 +2333,8 @@ int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols ...@@ -2332,7 +2333,8 @@ int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols
return ret; return ret;
} }
buildCreateTbReq(&smlHandle->tableExecHandle.createTblReq, tableName, pTag, pTableMeta->suid, NULL, tagName, pTableMeta->tableInfo.numOfTags); buildCreateTbReq(&smlHandle->tableExecHandle.createTblReq, tableName, pTag, pTableMeta->suid, NULL, tagName,
pTableMeta->tableInfo.numOfTags);
taosArrayDestroy(tagName); taosArrayDestroy(tagName);
smlHandle->tableExecHandle.createTblReq.ctb.name = taosMemoryMalloc(sTableNameLen + 1); smlHandle->tableExecHandle.createTblReq.ctb.name = taosMemoryMalloc(sTableNameLen + 1);
......
...@@ -92,7 +92,7 @@ static char* getSyntaxErrFormat(int32_t errCode) { ...@@ -92,7 +92,7 @@ static char* getSyntaxErrFormat(int32_t errCode) {
case TSDB_CODE_PAR_INTER_SLIDING_TOO_BIG: case TSDB_CODE_PAR_INTER_SLIDING_TOO_BIG:
return "sliding value no larger than the interval value"; return "sliding value no larger than the interval value";
case TSDB_CODE_PAR_INTER_SLIDING_TOO_SMALL: case TSDB_CODE_PAR_INTER_SLIDING_TOO_SMALL:
return "sliding value can not less than 1% of interval value"; return "sliding value can not less than 1%% of interval value";
case TSDB_CODE_PAR_ONLY_ONE_JSON_TAG: case TSDB_CODE_PAR_ONLY_ONE_JSON_TAG:
return "Only one tag if there is a json tag"; return "Only one tag if there is a json tag";
case TSDB_CODE_PAR_INCORRECT_NUM_OF_COL: case TSDB_CODE_PAR_INCORRECT_NUM_OF_COL:
......
...@@ -339,6 +339,7 @@ static int32_t createJoinLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect ...@@ -339,6 +339,7 @@ static int32_t createJoinLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
pJoin->joinType = pJoinTable->joinType; pJoin->joinType = pJoinTable->joinType;
pJoin->isSingleTableJoin = pJoinTable->table.singleTable; pJoin->isSingleTableJoin = pJoinTable->table.singleTable;
pJoin->inputTsOrder = ORDER_ASC;
pJoin->node.groupAction = GROUP_ACTION_CLEAR; pJoin->node.groupAction = GROUP_ACTION_CLEAR;
pJoin->node.requireDataOrder = DATA_ORDER_LEVEL_GLOBAL; pJoin->node.requireDataOrder = DATA_ORDER_LEVEL_GLOBAL;
pJoin->node.requireDataOrder = DATA_ORDER_LEVEL_GLOBAL; pJoin->node.requireDataOrder = DATA_ORDER_LEVEL_GLOBAL;
...@@ -625,14 +626,14 @@ static int32_t createInterpFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt* p ...@@ -625,14 +626,14 @@ static int32_t createInterpFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt* p
static int32_t createWindowLogicNodeFinalize(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SWindowLogicNode* pWindow, static int32_t createWindowLogicNodeFinalize(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SWindowLogicNode* pWindow,
SLogicNode** pLogicNode) { SLogicNode** pLogicNode) {
int32_t code = nodesCollectFuncs(pSelect, SQL_CLAUSE_WINDOW, fmIsWindowClauseFunc, &pWindow->pFuncs);
if (pCxt->pPlanCxt->streamQuery) { if (pCxt->pPlanCxt->streamQuery) {
pWindow->triggerType = pCxt->pPlanCxt->triggerType; pWindow->triggerType = pCxt->pPlanCxt->triggerType;
pWindow->watermark = pCxt->pPlanCxt->watermark; pWindow->watermark = pCxt->pPlanCxt->watermark;
pWindow->igExpired = pCxt->pPlanCxt->igExpired; pWindow->igExpired = pCxt->pPlanCxt->igExpired;
} }
pWindow->inputTsOrder = ORDER_ASC;
int32_t code = nodesCollectFuncs(pSelect, SQL_CLAUSE_WINDOW, fmIsWindowClauseFunc, &pWindow->pFuncs);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = rewriteExprsForSelect(pWindow->pFuncs, pSelect, SQL_CLAUSE_WINDOW); code = rewriteExprsForSelect(pWindow->pFuncs, pSelect, SQL_CLAUSE_WINDOW);
} }
...@@ -861,7 +862,8 @@ static int32_t createProjectLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSel ...@@ -861,7 +862,8 @@ static int32_t createProjectLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSel
TSWAP(pProject->node.pLimit, pSelect->pLimit); TSWAP(pProject->node.pLimit, pSelect->pLimit);
TSWAP(pProject->node.pSlimit, pSelect->pSlimit); TSWAP(pProject->node.pSlimit, pSelect->pSlimit);
pProject->node.groupAction = GROUP_ACTION_CLEAR; pProject->node.groupAction =
(!pSelect->isSubquery && pCxt->pPlanCxt->streamQuery) ? GROUP_ACTION_KEEP : GROUP_ACTION_CLEAR;
pProject->node.requireDataOrder = DATA_ORDER_LEVEL_NONE; pProject->node.requireDataOrder = DATA_ORDER_LEVEL_NONE;
pProject->node.resultDataOrder = DATA_ORDER_LEVEL_NONE; pProject->node.resultDataOrder = DATA_ORDER_LEVEL_NONE;
......
...@@ -993,25 +993,28 @@ static bool sortPriKeyOptMayBeOptimized(SLogicNode* pNode) { ...@@ -993,25 +993,28 @@ static bool sortPriKeyOptMayBeOptimized(SLogicNode* pNode) {
} }
static int32_t sortPriKeyOptGetScanNodesImpl(SLogicNode* pNode, bool* pNotOptimize, SNodeList** pScanNodes) { static int32_t sortPriKeyOptGetScanNodesImpl(SLogicNode* pNode, bool* pNotOptimize, SNodeList** pScanNodes) {
int32_t code = TSDB_CODE_SUCCESS;
switch (nodeType(pNode)) { switch (nodeType(pNode)) {
case QUERY_NODE_LOGIC_PLAN_SCAN: case QUERY_NODE_LOGIC_PLAN_SCAN: {
if (TSDB_SUPER_TABLE != ((SScanLogicNode*)pNode)->tableType) { SScanLogicNode* pScan = (SScanLogicNode*)pNode;
if (NULL != pScan->pGroupTags) {
*pNotOptimize = true;
return TSDB_CODE_SUCCESS;
}
return nodesListMakeAppend(pScanNodes, (SNode*)pNode); return nodesListMakeAppend(pScanNodes, (SNode*)pNode);
} }
break; case QUERY_NODE_LOGIC_PLAN_JOIN: {
case QUERY_NODE_LOGIC_PLAN_JOIN: int32_t code =
code =
sortPriKeyOptGetScanNodesImpl((SLogicNode*)nodesListGetNode(pNode->pChildren, 0), pNotOptimize, pScanNodes); sortPriKeyOptGetScanNodesImpl((SLogicNode*)nodesListGetNode(pNode->pChildren, 0), pNotOptimize, pScanNodes);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = code =
sortPriKeyOptGetScanNodesImpl((SLogicNode*)nodesListGetNode(pNode->pChildren, 1), pNotOptimize, pScanNodes); sortPriKeyOptGetScanNodesImpl((SLogicNode*)nodesListGetNode(pNode->pChildren, 1), pNotOptimize, pScanNodes);
} }
return code; return code;
}
case QUERY_NODE_LOGIC_PLAN_AGG: case QUERY_NODE_LOGIC_PLAN_AGG:
case QUERY_NODE_LOGIC_PLAN_PARTITION:
*pNotOptimize = true; *pNotOptimize = true;
return code; return TSDB_CODE_SUCCESS;
default: default:
break; break;
} }
...@@ -1037,17 +1040,33 @@ static EOrder sortPriKeyOptGetPriKeyOrder(SSortLogicNode* pSort) { ...@@ -1037,17 +1040,33 @@ static EOrder sortPriKeyOptGetPriKeyOrder(SSortLogicNode* pSort) {
return ((SOrderByExprNode*)nodesListGetNode(pSort->pSortKeys, 0))->order; return ((SOrderByExprNode*)nodesListGetNode(pSort->pSortKeys, 0))->order;
} }
static void sortPriKeyOptSetParentOrder(SLogicNode* pNode, EOrder order) {
if (NULL == pNode) {
return;
}
if (QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode)) {
((SWindowLogicNode*)pNode)->inputTsOrder = order;
} else if (QUERY_NODE_LOGIC_PLAN_JOIN == nodeType(pNode)) {
((SJoinLogicNode*)pNode)->inputTsOrder = order;
}
sortPriKeyOptSetParentOrder(pNode->pParent, order);
}
static int32_t sortPriKeyOptApply(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan, SSortLogicNode* pSort, static int32_t sortPriKeyOptApply(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan, SSortLogicNode* pSort,
SNodeList* pScanNodes) { SNodeList* pScanNodes) {
EOrder order = sortPriKeyOptGetPriKeyOrder(pSort); EOrder order = sortPriKeyOptGetPriKeyOrder(pSort);
if (ORDER_DESC == order) {
SNode* pScanNode = NULL; SNode* pScanNode = NULL;
FOREACH(pScanNode, pScanNodes) { FOREACH(pScanNode, pScanNodes) {
SScanLogicNode* pScan = (SScanLogicNode*)pScanNode; SScanLogicNode* pScan = (SScanLogicNode*)pScanNode;
if (pScan->scanSeq[0] > 0) { if (ORDER_DESC == order && pScan->scanSeq[0] > 0) {
TSWAP(pScan->scanSeq[0], pScan->scanSeq[1]); TSWAP(pScan->scanSeq[0], pScan->scanSeq[1]);
} }
if (TSDB_SUPER_TABLE == pScan->tableType) {
pScan->scanType = SCAN_TYPE_TABLE_MERGE;
pScan->node.resultDataOrder = DATA_ORDER_LEVEL_GLOBAL;
pScan->node.requireDataOrder = DATA_ORDER_LEVEL_GLOBAL;
} }
sortPriKeyOptSetParentOrder(pScan->node.pParent, order);
} }
SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pSort->node.pChildren, 0); SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pSort->node.pChildren, 0);
......
...@@ -415,7 +415,6 @@ static int32_t createScanPhysiNodeFinalize(SPhysiPlanContext* pCxt, SSubplan* pS ...@@ -415,7 +415,6 @@ static int32_t createScanPhysiNodeFinalize(SPhysiPlanContext* pCxt, SSubplan* pS
SScanPhysiNode* pScanPhysiNode, SPhysiNode** pPhyNode) { SScanPhysiNode* pScanPhysiNode, SPhysiNode** pPhyNode) {
int32_t code = createScanCols(pCxt, pScanPhysiNode, pScanLogicNode->pScanCols); int32_t code = createScanCols(pCxt, pScanPhysiNode, pScanLogicNode->pScanCols);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
// Data block describe also needs to be set without scanning column, such as SELECT COUNT(*) FROM t
code = addDataBlockSlots(pCxt, pScanPhysiNode->pScanCols, pScanPhysiNode->node.pOutputDataBlockDesc); code = addDataBlockSlots(pCxt, pScanPhysiNode->pScanCols, pScanPhysiNode->node.pOutputDataBlockDesc);
} }
...@@ -622,8 +621,8 @@ static int32_t createScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, ...@@ -622,8 +621,8 @@ static int32_t createScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan,
static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SJoinLogicNode* pJoinLogicNode, static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SJoinLogicNode* pJoinLogicNode,
SPhysiNode** pPhyNode) { SPhysiNode** pPhyNode) {
SJoinPhysiNode* pJoin = SSortMergeJoinPhysiNode* pJoin =
(SJoinPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pJoinLogicNode, QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN); (SSortMergeJoinPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pJoinLogicNode, QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN);
if (NULL == pJoin) { if (NULL == pJoin) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
...@@ -975,6 +974,9 @@ static int32_t createInterpFuncPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pCh ...@@ -975,6 +974,9 @@ static int32_t createInterpFuncPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pCh
} }
static bool projectCanMergeDataBlock(SProjectLogicNode* pProject) { static bool projectCanMergeDataBlock(SProjectLogicNode* pProject) {
if (GROUP_ACTION_KEEP == pProject->node.groupAction) {
return false;
}
if (DATA_ORDER_LEVEL_NONE == pProject->node.resultDataOrder) { if (DATA_ORDER_LEVEL_NONE == pProject->node.resultDataOrder) {
return true; return true;
} }
......
...@@ -469,7 +469,7 @@ static int32_t stbSplCreateExchangeNode(SSplitContext* pCxt, SLogicNode* pParent ...@@ -469,7 +469,7 @@ static int32_t stbSplCreateExchangeNode(SSplitContext* pCxt, SLogicNode* pParent
return code; return code;
} }
static int32_t stbSplCreateMergeKeysByPrimaryKey(SNode* pPrimaryKey, SNodeList** pMergeKeys) { static int32_t stbSplCreateMergeKeysByPrimaryKey(SNode* pPrimaryKey, EOrder order, SNodeList** pMergeKeys) {
SOrderByExprNode* pMergeKey = (SOrderByExprNode*)nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR); SOrderByExprNode* pMergeKey = (SOrderByExprNode*)nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR);
if (NULL == pMergeKey) { if (NULL == pMergeKey) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
...@@ -479,7 +479,7 @@ static int32_t stbSplCreateMergeKeysByPrimaryKey(SNode* pPrimaryKey, SNodeList** ...@@ -479,7 +479,7 @@ static int32_t stbSplCreateMergeKeysByPrimaryKey(SNode* pPrimaryKey, SNodeList**
nodesDestroyNode((SNode*)pMergeKey); nodesDestroyNode((SNode*)pMergeKey);
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
pMergeKey->order = ORDER_ASC; pMergeKey->order = order;
pMergeKey->nullOrder = NULL_ORDER_FIRST; pMergeKey->nullOrder = NULL_ORDER_FIRST;
return nodesListMakeStrictAppend(pMergeKeys, (SNode*)pMergeKey); return nodesListMakeStrictAppend(pMergeKeys, (SNode*)pMergeKey);
} }
...@@ -491,7 +491,8 @@ static int32_t stbSplSplitIntervalForBatch(SSplitContext* pCxt, SStableSplitInfo ...@@ -491,7 +491,8 @@ static int32_t stbSplSplitIntervalForBatch(SSplitContext* pCxt, SStableSplitInfo
((SWindowLogicNode*)pPartWindow)->windowAlgo = INTERVAL_ALGO_HASH; ((SWindowLogicNode*)pPartWindow)->windowAlgo = INTERVAL_ALGO_HASH;
((SWindowLogicNode*)pInfo->pSplitNode)->windowAlgo = INTERVAL_ALGO_MERGE; ((SWindowLogicNode*)pInfo->pSplitNode)->windowAlgo = INTERVAL_ALGO_MERGE;
SNodeList* pMergeKeys = NULL; SNodeList* pMergeKeys = NULL;
code = stbSplCreateMergeKeysByPrimaryKey(((SWindowLogicNode*)pInfo->pSplitNode)->pTspk, &pMergeKeys); code = stbSplCreateMergeKeysByPrimaryKey(((SWindowLogicNode*)pInfo->pSplitNode)->pTspk,
((SWindowLogicNode*)pInfo->pSplitNode)->inputTsOrder, &pMergeKeys);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = stbSplCreateMergeNode(pCxt, NULL, pInfo->pSplitNode, pMergeKeys, pPartWindow, true); code = stbSplCreateMergeNode(pCxt, NULL, pInfo->pSplitNode, pMergeKeys, pPartWindow, true);
} }
...@@ -579,7 +580,8 @@ static int32_t stbSplSplitSessionOrStateForBatch(SSplitContext* pCxt, SStableSpl ...@@ -579,7 +580,8 @@ static int32_t stbSplSplitSessionOrStateForBatch(SSplitContext* pCxt, SStableSpl
SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pWindow->pChildren, 0); SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pWindow->pChildren, 0);
SNodeList* pMergeKeys = NULL; SNodeList* pMergeKeys = NULL;
int32_t code = stbSplCreateMergeKeysByPrimaryKey(((SWindowLogicNode*)pWindow)->pTspk, &pMergeKeys); int32_t code = stbSplCreateMergeKeysByPrimaryKey(((SWindowLogicNode*)pWindow)->pTspk,
((SWindowLogicNode*)pWindow)->inputTsOrder, &pMergeKeys);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pChild, pMergeKeys, (SLogicNode*)pChild, true); code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pChild, pMergeKeys, (SLogicNode*)pChild, true);
...@@ -913,27 +915,70 @@ static int32_t stbSplSplitScanNodeWithPartTags(SSplitContext* pCxt, SStableSplit ...@@ -913,27 +915,70 @@ static int32_t stbSplSplitScanNodeWithPartTags(SSplitContext* pCxt, SStableSplit
} }
static SNode* stbSplFindPrimaryKeyFromScan(SScanLogicNode* pScan) { static SNode* stbSplFindPrimaryKeyFromScan(SScanLogicNode* pScan) {
bool find = false;
SNode* pCol = NULL; SNode* pCol = NULL;
FOREACH(pCol, pScan->pScanCols) { FOREACH(pCol, pScan->pScanCols) {
if (PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pCol)->colId) { if (PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pCol)->colId) {
return pCol; find = true;
break;
} }
} }
if (!find) {
return NULL; return NULL;
}
SNode* pTarget = NULL;
FOREACH(pTarget, pScan->node.pTargets) {
if (nodesEqualNode(pTarget, pCol)) {
return pCol;
}
}
nodesListStrictAppend(pScan->node.pTargets, nodesCloneNode(pCol));
return pCol;
}
static int32_t stbSplCreateMergeScanNode(SScanLogicNode* pScan, SLogicNode** pOutputMergeScan,
SNodeList** pOutputMergeKeys) {
SNodeList* pChildren = pScan->node.pChildren;
pScan->node.pChildren = NULL;
int32_t code = TSDB_CODE_SUCCESS;
SScanLogicNode* pMergeScan = (SScanLogicNode*)nodesCloneNode((SNode*)pScan);
if (NULL == pMergeScan) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
SNodeList* pMergeKeys = NULL;
if (TSDB_CODE_SUCCESS == code) {
pMergeScan->scanType = SCAN_TYPE_TABLE_MERGE;
pMergeScan->node.pChildren = pChildren;
splSetParent((SLogicNode*)pMergeScan);
code = stbSplCreateMergeKeysByPrimaryKey(stbSplFindPrimaryKeyFromScan(pMergeScan),
pMergeScan->scanSeq[0] > 0 ? ORDER_ASC : ORDER_DESC, &pMergeKeys);
}
if (TSDB_CODE_SUCCESS == code) {
*pOutputMergeScan = (SLogicNode*)pMergeScan;
*pOutputMergeKeys = pMergeKeys;
} else {
nodesDestroyNode((SNode*)pMergeScan);
nodesDestroyList(pMergeKeys);
}
return code;
} }
static int32_t stbSplSplitMergeScanNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SScanLogicNode* pScan, static int32_t stbSplSplitMergeScanNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SScanLogicNode* pScan,
bool groupSort) { bool groupSort) {
SLogicNode* pMergeScan = NULL;
SNodeList* pMergeKeys = NULL; SNodeList* pMergeKeys = NULL;
int32_t code = stbSplCreateMergeKeysByPrimaryKey(stbSplFindPrimaryKeyFromScan(pScan), &pMergeKeys); int32_t code = stbSplCreateMergeScanNode(pScan, &pMergeScan, &pMergeKeys);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = stbSplCreateMergeNode(pCxt, pSubplan, (SLogicNode*)pScan, pMergeKeys, (SLogicNode*)pScan, groupSort); code = stbSplCreateMergeNode(pCxt, pSubplan, (SLogicNode*)pScan, pMergeKeys, pMergeScan, groupSort);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeStrictAppend(&pSubplan->pChildren, code = nodesListMakeStrictAppend(&pSubplan->pChildren,
(SNode*)splCreateScanSubplan(pCxt, (SLogicNode*)pScan, SPLIT_FLAG_STABLE_SPLIT)); (SNode*)splCreateScanSubplan(pCxt, pMergeScan, SPLIT_FLAG_STABLE_SPLIT));
} }
pScan->scanType = SCAN_TYPE_TABLE_MERGE;
++(pCxt->groupId); ++(pCxt->groupId);
return code; return code;
} }
...@@ -978,14 +1023,14 @@ static int32_t stbSplSplitJoinNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) ...@@ -978,14 +1023,14 @@ static int32_t stbSplSplitJoinNode(SSplitContext* pCxt, SStableSplitInfo* pInfo)
} }
static int32_t stbSplCreateMergeKeysForPartitionNode(SLogicNode* pPart, SNodeList** pMergeKeys) { static int32_t stbSplCreateMergeKeysForPartitionNode(SLogicNode* pPart, SNodeList** pMergeKeys) {
SNode* pPrimaryKey = SScanLogicNode* pScan = (SScanLogicNode*)nodesListGetNode(pPart->pChildren, 0);
nodesCloneNode(stbSplFindPrimaryKeyFromScan((SScanLogicNode*)nodesListGetNode(pPart->pChildren, 0))); SNode* pPrimaryKey = nodesCloneNode(stbSplFindPrimaryKeyFromScan(pScan));
if (NULL == pPrimaryKey) { if (NULL == pPrimaryKey) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
int32_t code = nodesListAppend(pPart->pTargets, pPrimaryKey); int32_t code = nodesListAppend(pPart->pTargets, pPrimaryKey);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = stbSplCreateMergeKeysByPrimaryKey(pPrimaryKey, pMergeKeys); code = stbSplCreateMergeKeysByPrimaryKey(pPrimaryKey, pScan->scanSeq[0] > 0 ? ORDER_ASC : ORDER_DESC, pMergeKeys);
} }
return code; return code;
} }
......
...@@ -124,7 +124,8 @@ int32_t replaceLogicNode(SLogicSubplan* pSubplan, SLogicNode* pOld, SLogicNode* ...@@ -124,7 +124,8 @@ int32_t replaceLogicNode(SLogicSubplan* pSubplan, SLogicNode* pOld, SLogicNode*
} }
static int32_t adjustScanDataRequirement(SScanLogicNode* pScan, EDataOrderLevel requirement) { static int32_t adjustScanDataRequirement(SScanLogicNode* pScan, EDataOrderLevel requirement) {
if (SCAN_TYPE_TABLE != pScan->scanType && SCAN_TYPE_TABLE_MERGE != pScan->scanType) { if ((SCAN_TYPE_TABLE != pScan->scanType && SCAN_TYPE_TABLE_MERGE != pScan->scanType) ||
DATA_ORDER_LEVEL_GLOBAL == pScan->node.requireDataOrder) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
// The lowest sort level of scan output data is DATA_ORDER_LEVEL_IN_BLOCK // The lowest sort level of scan output data is DATA_ORDER_LEVEL_IN_BLOCK
......
...@@ -24,9 +24,10 @@ TEST_F(PlanBasicTest, selectClause) { ...@@ -24,9 +24,10 @@ TEST_F(PlanBasicTest, selectClause) {
useDb("root", "test"); useDb("root", "test");
run("SELECT * FROM t1"); run("SELECT * FROM t1");
run("SELECT 1 FROM t1");
run("SELECT * FROM st1"); run("SELECT MAX(c1) c2, c2 FROM t1");
run("SELECT 1 FROM st1");
run("SELECT MAX(c1) c2, c2 FROM st1");
} }
TEST_F(PlanBasicTest, whereClause) { TEST_F(PlanBasicTest, whereClause) {
......
...@@ -53,6 +53,8 @@ TEST_F(PlanOptimizeTest, sortPrimaryKey) { ...@@ -53,6 +53,8 @@ TEST_F(PlanOptimizeTest, sortPrimaryKey) {
run("SELECT c1 FROM t1 ORDER BY ts"); run("SELECT c1 FROM t1 ORDER BY ts");
run("SELECT c1 FROM st1 ORDER BY ts");
run("SELECT c1 FROM t1 ORDER BY ts DESC"); run("SELECT c1 FROM t1 ORDER BY ts DESC");
run("SELECT COUNT(*) FROM t1 INTERVAL(10S) ORDER BY _WSTART DESC"); run("SELECT COUNT(*) FROM t1 INTERVAL(10S) ORDER BY _WSTART DESC");
......
...@@ -512,7 +512,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTER_OFFSET_UNIT, "Cannot use 'year' as ...@@ -512,7 +512,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTER_OFFSET_UNIT, "Cannot use 'year' as
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTER_OFFSET_TOO_BIG, "Interval offset should be shorter than interval") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTER_OFFSET_TOO_BIG, "Interval offset should be shorter than interval")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTER_SLIDING_UNIT, "Does not support sliding when interval is natural month/year") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTER_SLIDING_UNIT, "Does not support sliding when interval is natural month/year")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTER_SLIDING_TOO_BIG, "sliding value no larger than the interval value") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTER_SLIDING_TOO_BIG, "sliding value no larger than the interval value")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTER_SLIDING_TOO_SMALL, "sliding value can not less than 1% of interval value") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTER_SLIDING_TOO_SMALL, "sliding value can not less than 1%% of interval value")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_ONLY_ONE_JSON_TAG, "Only one tag if there is a json tag") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_ONLY_ONE_JSON_TAG, "Only one tag if there is a json tag")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INCORRECT_NUM_OF_COL, "Query block has incorrect number of result columns") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INCORRECT_NUM_OF_COL, "Query block has incorrect number of result columns")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INCORRECT_TIMESTAMP_VAL, "Incorrect TIMESTAMP value") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INCORRECT_TIMESTAMP_VAL, "Incorrect TIMESTAMP value")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册