提交 88c767bc 编写于 作者: X Xiaoyu Wang

fix: problem of parser and planner

上级 6804c6a4
......@@ -203,9 +203,9 @@ SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode) {
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData idata = {{0}};
SSlotDescNode* pDescNode = nodesListGetNode(pNode->pSlots, i);
// if (!pDescNode->output) { // todo disable it temporarily
// continue;
// }
// if (!pDescNode->output) { // todo disable it temporarily
// continue;
// }
idata.info.type = pDescNode->dataType.type;
idata.info.bytes = pDescNode->dataType.bytes;
......@@ -833,7 +833,8 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
if (pExpr[k].pExpr->nodeType == QUERY_NODE_COLUMN) { // it is a project query
SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
if (pResult->info.rows > 0 && !createNewColModel) {
colDataMergeCol(pColInfoData, pResult->info.rows, &pResult->info.capacity, pfCtx->input.pData[0], pfCtx->input.numOfRows);
colDataMergeCol(pColInfoData, pResult->info.rows, &pResult->info.capacity, pfCtx->input.pData[0],
pfCtx->input.numOfRows);
} else {
colDataAssign(pColInfoData, pfCtx->input.pData[0], pfCtx->input.numOfRows);
}
......@@ -1873,7 +1874,7 @@ static void updateTableQueryInfoForReverseScan(STableQueryInfo* pTableQueryInfo)
}
void initResultRow(SResultRow* pResultRow) {
// pResultRow->pEntryInfo = (struct SResultRowEntryInfo*)((char*)pResultRow + sizeof(SResultRow));
// pResultRow->pEntryInfo = (struct SResultRowEntryInfo*)((char*)pResultRow + sizeof(SResultRow));
}
/*
......@@ -1885,7 +1886,8 @@ void initResultRow(SResultRow* pResultRow) {
* offset[0] offset[1] offset[2]
*/
// TODO refactor: some function move away
void setFunctionResultOutput(SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t stage, int32_t numOfExprs, SExecTaskInfo* pTaskInfo) {
void setFunctionResultOutput(SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t stage, int32_t numOfExprs,
SExecTaskInfo* pTaskInfo) {
SqlFunctionCtx* pCtx = pInfo->pCtx;
SSDataBlock* pDataBlock = pInfo->pRes;
int32_t* rowCellInfoOffset = pInfo->rowCellInfoOffset;
......@@ -2240,12 +2242,13 @@ int32_t doCopyToSDataBlock(SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbased
return 0;
}
void doBuildResultDatablock(SOptrBasicInfo *pbInfo, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf) {
void doBuildResultDatablock(SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo,
SDiskbasedBuf* pBuf) {
assert(pGroupResInfo->currentGroup <= pGroupResInfo->totalGroup);
int32_t* rowCellOffset = pbInfo->rowCellInfoOffset;
SSDataBlock* pBlock = pbInfo->pRes;
SqlFunctionCtx* pCtx = pbInfo->pCtx;
int32_t* rowCellOffset = pbInfo->rowCellInfoOffset;
SSDataBlock* pBlock = pbInfo->pRes;
SqlFunctionCtx* pCtx = pbInfo->pCtx;
blockDataCleanup(pBlock);
if (!hasRemainDataInCurrentGroup(pGroupResInfo)) {
......@@ -2419,7 +2422,8 @@ void queryCostStatis(SExecTaskInfo* pTaskInfo) {
SFileBlockLoadRecorder* pRecorder = pSummary->pRecoder;
if (pSummary->pRecoder != NULL) {
qDebug("%s :cost summary: elapsed time:%" PRId64 " us, first merge:%" PRId64 " us, total blocks:%d, "
qDebug("%s :cost summary: elapsed time:%" PRId64 " us, first merge:%" PRId64
" us, total blocks:%d, "
"load block statis:%d, load data block:%d, total rows:%" PRId64 ", check rows:%" PRId64,
GET_TASKID(pTaskInfo), pSummary->elapsedTime, pSummary->firstStageMergeTime, pRecorder->totalBlocks,
pRecorder->loadBlockStatis, pRecorder->loadBlocks, pRecorder->totalRows, pRecorder->totalCheckedRows);
......@@ -2954,7 +2958,7 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI
relocateColumnData(pRes, pColList, pBlock->pDataBlock);
taosArrayDestroy(pBlock->pDataBlock);
taosMemoryFree(pBlock);
// blockDataDestroy(pBlock);
// blockDataDestroy(pBlock);
}
pRes->info.rows = numOfRows;
......@@ -4099,8 +4103,8 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator) {
}
// todo handle different group data interpolation
bool n = false;
bool *newgroup = &n;
bool n = false;
bool* newgroup = &n;
doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, newgroup, pTaskInfo);
if (pResBlock->info.rows > pResultInfo->threshold || (!pInfo->multigroupResult && pResBlock->info.rows > 0)) {
return pResBlock;
......@@ -4224,7 +4228,7 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n
}
uint32_t defaultPgsz = 4096;
while(defaultPgsz < pAggSup->resultRowSize*4) {
while (defaultPgsz < pAggSup->resultRowSize * 4) {
defaultPgsz <<= 1u;
}
......@@ -4375,7 +4379,7 @@ void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) {
}
void destroyMergeJoinOperator(void* param, int32_t numOfOutput) {
SJoinOperatorInfo* pJoinOperator = (SJoinOperatorInfo*) param;
SJoinOperatorInfo* pJoinOperator = (SJoinOperatorInfo*)param;
}
void destroyAggOperatorInfo(void* param, int32_t numOfOutput) {
......@@ -4487,8 +4491,9 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t
}
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols,
SInterval* pInterval, STimeWindow* pWindow, SSDataBlock* pResBlock, int32_t fillType, SNodeListNode* pValueNode,
bool multigroupResult, SExecTaskInfo* pTaskInfo) {
SInterval* pInterval, STimeWindow* pWindow, SSDataBlock* pResBlock,
int32_t fillType, SNodeListNode* pValueNode, bool multigroupResult,
SExecTaskInfo* pTaskInfo) {
SFillOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SFillOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
......@@ -4522,8 +4527,8 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp
SResultInfo* pResultInfo = &pOperator->resultInfo;
initResultSizeInfo(pOperator, 4096);
int32_t code = initFillInfo(pInfo, pExpr, numOfCols, pValueNode, *pWindow, pResultInfo->capacity,
pTaskInfo->id.str, pInterval, type);
int32_t code = initFillInfo(pInfo, pExpr, numOfCols, pValueNode, *pWindow, pResultInfo->capacity, pTaskInfo->id.str,
pInterval, type);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
......@@ -4736,7 +4741,8 @@ static int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t
uint64_t queryId, uint64_t taskId);
static SArray* extractTableIdList(const STableGroupInfo* pTableGroupInfo);
static SArray* extractColumnInfo(SNodeList* pNodeList);
static SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols, int32_t type);
static SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols,
int32_t type);
static SArray* createSortInfo(SNodeList* pNodeList);
static SArray* extractPartitionColInfo(SNodeList* pNodeList);
......@@ -4776,15 +4782,15 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SSDataBlock* pResBlock = createResDataBlock(pDescNode);
SQueryTableDataCond cond = {0};
int32_t code = initQueryTableDataCond(&cond, pTableScanNode);
int32_t code = initQueryTableDataCond(&cond, pTableScanNode);
if (code != TSDB_CODE_SUCCESS) {
return NULL;
}
SInterval interval = extractIntervalInfo(pTableScanNode);
SOperatorInfo* pOperator = createTableScanOperatorInfo(pDataReader, &cond, numOfCols, pTableScanNode->dataRequired,
pTableScanNode->scanSeq, pColList, pResBlock, pScanPhyNode->node.pConditions,
&interval, pTableScanNode->ratio, pTaskInfo);
SInterval interval = extractIntervalInfo(pTableScanNode);
SOperatorInfo* pOperator = createTableScanOperatorInfo(
pDataReader, &cond, numOfCols, pTableScanNode->dataRequired, pTableScanNode->scanSeq, pColList, pResBlock,
pScanPhyNode->node.pConditions, &interval, pTableScanNode->ratio, pTaskInfo);
STableScanInfo* pScanInfo = pOperator->info;
pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
return pOperator;
......@@ -4795,14 +4801,15 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table.
int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId);
int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo,
queryId, taskId);
SArray* tableIdList = extractTableIdList(pTableGroupInfo);
SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc;
SSDataBlock* pResBlock = createResDataBlock(pDescNode);
SSDataBlock* pResBlock = createResDataBlock(pDescNode);
int32_t numOfCols = 0;
SArray* pCols = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);
int32_t numOfCols = 0;
SArray* pCols = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle->reader, pResBlock, pCols, tableIdList, pTaskInfo,
pScanPhyNode->node.pConditions);
taosArrayDestroy(tableIdList);
......@@ -4822,25 +4829,27 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pTaskInfo, pSysScanPhyNode->showRewrite, pSysScanPhyNode->accountId);
return pOperator;
} else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) {
STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*) pPhyNode;
STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode;
SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc;
SSDataBlock* pResBlock = createResDataBlock(pDescNode);
int32_t code =
doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId);
int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo,
queryId, taskId);
if (code != TSDB_CODE_SUCCESS) {
return NULL;
}
int32_t num = 0;
int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pScanPhyNode->pScanPseudoCols, NULL, &num);
int32_t numOfOutputCols = 0;
SArray* colList = extractColMatchInfo(pScanPhyNode->pScanPseudoCols, pDescNode, &numOfOutputCols, COL_MATCH_FROM_COL_ID);
SArray* colList =
extractColMatchInfo(pScanPhyNode->pScanPseudoCols, pDescNode, &numOfOutputCols, COL_MATCH_FROM_COL_ID);
SOperatorInfo* pOperator = createTagScanOperatorInfo(pHandle, pExprInfo, num, pResBlock, colList, pTableGroupInfo, pTaskInfo);
SOperatorInfo* pOperator =
createTagScanOperatorInfo(pHandle, pExprInfo, num, pResBlock, colList, pTableGroupInfo, pTaskInfo);
return pOperator;
} else {
ASSERT(0);
......@@ -4887,7 +4896,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pOptr = createAggregateOperatorInfo(ops[0], pExprInfo, num, pResBlock, pScalarExprInfo, numOfScalarExpr,
pTaskInfo, pTableGroupInfo);
}
} else if (QUERY_NODE_PHYSICAL_PLAN_INTERVAL == type) {
} else if (QUERY_NODE_PHYSICAL_PLAN_INTERVAL == type || QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type) {
SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num);
......@@ -4925,13 +4934,15 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
} else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == type) {
SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
STimeWindowAggSupp as = {.waterMark = pSessionNode->window.watermark, .calTrigger = pSessionNode->window.triggerType};
STimeWindowAggSupp as = {.waterMark = pSessionNode->window.watermark,
.calTrigger = pSessionNode->window.triggerType};
SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &num);
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
int32_t tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
pOptr = createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, tsSlotId, &as, pTaskInfo);
pOptr =
createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, tsSlotId, &as, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) {
SPartitionPhysiNode* pPartNode = (SPartitionPhysiNode*)pPhyNode;
SArray* pColList = extractPartitionColInfo(pPartNode->pPartitionKeys);
......@@ -4957,11 +4968,12 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pOptr = createMergeJoinOperatorInfo(ops, size, pExprInfo, num, pResBlock, pJoinNode->pOnConditions, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_FILL == type) {
SFillPhysiNode* pFillNode = (SFillPhysiNode*)pPhyNode;
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
SExprInfo* pExprInfo = createExprInfo(pFillNode->pTargets, NULL, &num);
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
SExprInfo* pExprInfo = createExprInfo(pFillNode->pTargets, NULL, &num);
SInterval* pInterval = &((SIntervalAggOperatorInfo*)ops[0]->info)->interval;
pOptr = createFillOperatorInfo(ops[0], pExprInfo, num, pInterval, &pFillNode->timeRange, pResBlock, pFillNode->mode, (SNodeListNode*)pFillNode->pValues, false, pTaskInfo);
pOptr = createFillOperatorInfo(ops[0], pExprInfo, num, pInterval, &pFillNode->timeRange, pResBlock, pFillNode->mode,
(SNodeListNode*)pFillNode->pValues, false, pTaskInfo);
} else {
ASSERT(0);
}
......@@ -5101,7 +5113,8 @@ SArray* createSortInfo(SNodeList* pNodeList) {
return pList;
}
SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols, int32_t type) {
SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols,
int32_t type) {
size_t numOfCols = LIST_LENGTH(pNodeList);
SArray* pList = taosArrayInit(numOfCols, sizeof(SColMatchInfo));
if (pList == NULL) {
......@@ -5114,10 +5127,10 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod
SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;
SColMatchInfo c = {0};
c.output = true;
c.colId = pColNode->colId;
c.srcSlotId = pColNode->slotId;
c.matchType = type;
c.output = true;
c.colId = pColNode->colId;
c.srcSlotId = pColNode->slotId;
c.matchType = type;
c.targetSlotId = pNode->slotId;
taosArrayPush(pList, &c);
}
......@@ -5527,8 +5540,8 @@ static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) {
}
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo,
int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition,
SExecTaskInfo* pTaskInfo) {
int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition,
SExecTaskInfo* pTaskInfo) {
SJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SJoinOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pOperator == NULL || pInfo == NULL) {
......@@ -5537,14 +5550,14 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
initResultSizeInfo(pOperator, 4096);
pInfo->pRes = pResBlock;
pOperator->name = "MergeJoinOperator";
pInfo->pRes = pResBlock;
pOperator->name = "MergeJoinOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_JOIN;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->pExpr = pExprInfo;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->pExpr = pExprInfo;
pOperator->numOfExprs = numOfCols;
pOperator->info = pInfo;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
SOperatorNode* pNode = (SOperatorNode*)pOnCondition;
......@@ -5568,9 +5581,9 @@ _error:
}
void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode) {
pColumn->slotId = pColumnNode->slotId;
pColumn->type = pColumnNode->node.resType.type;
pColumn->bytes = pColumnNode->node.resType.bytes;
pColumn->slotId = pColumnNode->slotId;
pColumn->type = pColumnNode->node.resType.type;
pColumn->bytes = pColumnNode->node.resType.bytes;
pColumn->precision = pColumnNode->node.resType.precision;
pColumn->scale = pColumnNode->node.resType.scale;
pColumn->scale = pColumnNode->node.resType.scale;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册