未验证 提交 1b19d3b0 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #18384 from taosdata/fix/3.0_bugfix_wxy

enh: last cache optimize
...@@ -171,6 +171,7 @@ typedef struct SExchangeLogicNode { ...@@ -171,6 +171,7 @@ typedef struct SExchangeLogicNode {
SLogicNode node; SLogicNode node;
int32_t srcStartGroupId; int32_t srcStartGroupId;
int32_t srcEndGroupId; int32_t srcEndGroupId;
bool seqRecvData;
} SExchangeLogicNode; } SExchangeLogicNode;
typedef struct SMergeLogicNode { typedef struct SMergeLogicNode {
...@@ -416,6 +417,7 @@ typedef struct SExchangePhysiNode { ...@@ -416,6 +417,7 @@ typedef struct SExchangePhysiNode {
int32_t srcEndGroupId; int32_t srcEndGroupId;
bool singleChannel; bool singleChannel;
SNodeList* pSrcEndPoints; // element is SDownstreamSource, scheduler fill by calling qSetSuplanExecutionNode SNodeList* pSrcEndPoints; // element is SDownstreamSource, scheduler fill by calling qSetSuplanExecutionNode
bool seqRecvData;
} SExchangePhysiNode; } SExchangePhysiNode;
typedef struct SMergePhysiNode { typedef struct SMergePhysiNode {
......
...@@ -51,9 +51,9 @@ typedef struct SSourceDataInfo { ...@@ -51,9 +51,9 @@ typedef struct SSourceDataInfo {
const char* taskId; const char* taskId;
} SSourceDataInfo; } SSourceDataInfo;
static void destroyExchangeOperatorInfo(void* param); static void destroyExchangeOperatorInfo(void* param);
static void freeBlock(void* pParam); static void freeBlock(void* pParam);
static void freeSourceDataInfo(void* param); static void freeSourceDataInfo(void* param);
static void* setAllSourcesCompleted(SOperatorInfo* pOperator); static void* setAllSourcesCompleted(SOperatorInfo* pOperator);
static int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code); static int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code);
...@@ -62,7 +62,8 @@ static int32_t getCompletedSources(const SArray* pArray); ...@@ -62,7 +62,8 @@ static int32_t getCompletedSources(const SArray* pArray);
static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator); static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator);
static int32_t seqLoadRemoteData(SOperatorInfo* pOperator); static int32_t seqLoadRemoteData(SOperatorInfo* pOperator);
static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator); static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator);
static int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf); static int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock,
bool holdDataInBuf);
static int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDataInfo); static int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDataInfo);
static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo, static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo,
...@@ -106,7 +107,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn ...@@ -106,7 +107,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
if (pRsp->numOfRows == 0) { if (pRsp->numOfRows == 0) {
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
qDebug("%s vgId:%d, taskId:0x%" PRIx64 " execId:%d index:%d completed, rowsOfSource:%" PRIu64 qDebug("%s vgId:%d, taskId:0x%" PRIx64 " execId:%d index:%d completed, rowsOfSource:%" PRIu64
", totalRows:%" PRIu64 ", try next %d/%" PRIzu, ", totalRows:%" PRIu64 ", try next %d/%" PRIzu,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pDataInfo->totalRows, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pDataInfo->totalRows,
pExchangeInfo->loadInfo.totalRows, i + 1, totalSources); pExchangeInfo->loadInfo.totalRows, i + 1, totalSources);
taosMemoryFreeClear(pDataInfo->pRsp); taosMemoryFreeClear(pDataInfo->pRsp);
...@@ -125,14 +126,14 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn ...@@ -125,14 +126,14 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
if (pRsp->completed == 1) { if (pRsp->completed == 1) {
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64
" execId:%d index:%d completed, blocks:%d, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " execId:%d index:%d completed, blocks:%d, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64
", total:%.2f Kb, try next %d/%" PRIzu, ", total:%.2f Kb, try next %d/%" PRIzu,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pRsp->numOfBlocks, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pRsp->numOfBlocks,
pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0, pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0, i + 1,
i + 1, totalSources); totalSources);
} else { } else {
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64
" execId:%d blocks:%d, numOfRows:%d, totalRows:%" PRIu64 ", total:%.2f Kb", " execId:%d blocks:%d, numOfRows:%d, totalRows:%" PRIu64 ", total:%.2f Kb",
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRsp->numOfBlocks, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRsp->numOfBlocks,
pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0); pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0);
} }
...@@ -157,7 +158,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn ...@@ -157,7 +158,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
} }
} }
_error: _error:
pTaskInfo->code = code; pTaskInfo->code = code;
} }
...@@ -298,17 +299,19 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode ...@@ -298,17 +299,19 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode
SExchangeOpStopInfo stopInfo = {QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, pInfo->self}; SExchangeOpStopInfo stopInfo = {QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, pInfo->self};
qAppendTaskStopInfo(pTaskInfo, &stopInfo); qAppendTaskStopInfo(pTaskInfo, &stopInfo);
pInfo->seqLoadData = false; pInfo->seqLoadData = pExNode->seqRecvData;
pInfo->pTransporter = pTransporter; pInfo->pTransporter = pTransporter;
setOperatorInfo(pOperator, "ExchangeOperator", QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, false, OP_NOT_OPENED, pInfo, pTaskInfo); setOperatorInfo(pOperator, "ExchangeOperator", QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, false, OP_NOT_OPENED, pInfo,
pTaskInfo);
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pDummyBlock->pDataBlock); pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pDummyBlock->pDataBlock);
pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, destroyExchangeOperatorInfo, NULL); pOperator->fpSet =
createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, destroyExchangeOperatorInfo, NULL);
return pOperator; return pOperator;
_error: _error:
if (pInfo != NULL) { if (pInfo != NULL) {
doDestroyExchangeOperatorInfo(pInfo); doDestroyExchangeOperatorInfo(pInfo);
} }
...@@ -379,7 +382,8 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) { ...@@ -379,7 +382,8 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
} else { } else {
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
pSourceDataInfo->code = code; pSourceDataInfo->code = code;
qDebug("%s fetch rsp received, index:%d, error:%s, %p", pSourceDataInfo->taskId, index, tstrerror(code), pExchangeInfo); qDebug("%s fetch rsp received, index:%d, error:%s, %p", pSourceDataInfo->taskId, index, tstrerror(code),
pExchangeInfo);
} }
pSourceDataInfo->status = EX_SOURCE_DATA_READY; pSourceDataInfo->status = EX_SOURCE_DATA_READY;
...@@ -427,14 +431,14 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas ...@@ -427,14 +431,14 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas
taosMemoryFree(pWrapper); taosMemoryFree(pWrapper);
return pTaskInfo->code; return pTaskInfo->code;
} }
void* msg = taosMemoryCalloc(1, msgSize); void* msg = taosMemoryCalloc(1, msgSize);
if (NULL == msg) { if (NULL == msg) {
pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY; pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
taosMemoryFree(pWrapper); taosMemoryFree(pWrapper);
return pTaskInfo->code; return pTaskInfo->code;
} }
if (tSerializeSResFetchReq(msg, msgSize, &req) < 0) { if (tSerializeSResFetchReq(msg, msgSize, &req) < 0) {
pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY; pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
taosMemoryFree(pWrapper); taosMemoryFree(pWrapper);
...@@ -524,7 +528,7 @@ void* setAllSourcesCompleted(SOperatorInfo* pOperator) { ...@@ -524,7 +528,7 @@ void* setAllSourcesCompleted(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo; SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
qDebug("%s all %" PRIzu " sources are exhausted, total rows: %" PRIu64 ", %.2f Kb, elapsed:%.2f ms", qDebug("%s all %" PRIzu " sources are exhausted, total rows: %" PRIu64 ", %.2f Kb, elapsed:%.2f ms",
GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0, GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0,
pLoadInfo->totalElapsed / 1000.0); pLoadInfo->totalElapsed / 1000.0);
...@@ -574,7 +578,7 @@ int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) { ...@@ -574,7 +578,7 @@ int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
if (isTaskKilled(pTaskInfo)) { if (isTaskKilled(pTaskInfo)) {
longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
} }
tsem_post(&pExchangeInfo->ready); tsem_post(&pExchangeInfo->ready);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -614,13 +618,15 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) { ...@@ -614,13 +618,15 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current);
pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current); doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current);
tsem_wait(&pExchangeInfo->ready); tsem_wait(&pExchangeInfo->ready);
if (isTaskKilled(pTaskInfo)) { if (isTaskKilled(pTaskInfo)) {
longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
} }
SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current);
SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current); SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
if (pDataInfo->code != TSDB_CODE_SUCCESS) { if (pDataInfo->code != TSDB_CODE_SUCCESS) {
...@@ -634,7 +640,7 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) { ...@@ -634,7 +640,7 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo; SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
if (pRsp->numOfRows == 0) { if (pRsp->numOfRows == 0) {
qDebug("%s vgId:%d, taskID:0x%" PRIx64 " execId:%d %d of total completed, rowsOfSource:%" PRIu64 qDebug("%s vgId:%d, taskID:0x%" PRIx64 " execId:%d %d of total completed, rowsOfSource:%" PRIu64
", totalRows:%" PRIu64 " try next", ", totalRows:%" PRIu64 " try next",
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pExchangeInfo->current + 1, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pExchangeInfo->current + 1,
pDataInfo->totalRows, pLoadInfo->totalRows); pDataInfo->totalRows, pLoadInfo->totalRows);
...@@ -652,7 +658,7 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) { ...@@ -652,7 +658,7 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp; SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
if (pRsp->completed == 1) { if (pRsp->completed == 1) {
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, rowsOfSource:%" PRIu64 qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, rowsOfSource:%" PRIu64
", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu, ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRetrieveRsp->numOfRows, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRetrieveRsp->numOfRows,
pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, pExchangeInfo->current + 1, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, pExchangeInfo->current + 1,
totalSources); totalSources);
...@@ -661,7 +667,7 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) { ...@@ -661,7 +667,7 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
pExchangeInfo->current += 1; pExchangeInfo->current += 1;
} else { } else {
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, totalRows:%" PRIu64 qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, totalRows:%" PRIu64
", totalBytes:%" PRIu64, ", totalBytes:%" PRIu64,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRetrieveRsp->numOfRows, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRetrieveRsp->numOfRows,
pLoadInfo->totalRows, pLoadInfo->totalSize); pLoadInfo->totalRows, pLoadInfo->totalSize);
} }
...@@ -673,7 +679,7 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) { ...@@ -673,7 +679,7 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
_error: _error:
pTaskInfo->code = code; pTaskInfo->code = code;
return code; return code;
} }
......
...@@ -434,6 +434,7 @@ static int32_t logicExchangeCopy(const SExchangeLogicNode* pSrc, SExchangeLogicN ...@@ -434,6 +434,7 @@ static int32_t logicExchangeCopy(const SExchangeLogicNode* pSrc, SExchangeLogicN
COPY_BASE_OBJECT_FIELD(node, logicNodeCopy); COPY_BASE_OBJECT_FIELD(node, logicNodeCopy);
COPY_SCALAR_FIELD(srcStartGroupId); COPY_SCALAR_FIELD(srcStartGroupId);
COPY_SCALAR_FIELD(srcEndGroupId); COPY_SCALAR_FIELD(srcEndGroupId);
COPY_SCALAR_FIELD(seqRecvData);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -1864,6 +1864,7 @@ static int32_t jsonToPhysiAggNode(const SJson* pJson, void* pObj) { ...@@ -1864,6 +1864,7 @@ static int32_t jsonToPhysiAggNode(const SJson* pJson, void* pObj) {
static const char* jkExchangePhysiPlanSrcStartGroupId = "SrcStartGroupId"; static const char* jkExchangePhysiPlanSrcStartGroupId = "SrcStartGroupId";
static const char* jkExchangePhysiPlanSrcEndGroupId = "SrcEndGroupId"; static const char* jkExchangePhysiPlanSrcEndGroupId = "SrcEndGroupId";
static const char* jkExchangePhysiPlanSrcEndPoints = "SrcEndPoints"; static const char* jkExchangePhysiPlanSrcEndPoints = "SrcEndPoints";
static const char* jkExchangePhysiPlanSeqRecvData = "SeqRecvData";
static int32_t physiExchangeNodeToJson(const void* pObj, SJson* pJson) { static int32_t physiExchangeNodeToJson(const void* pObj, SJson* pJson) {
const SExchangePhysiNode* pNode = (const SExchangePhysiNode*)pObj; const SExchangePhysiNode* pNode = (const SExchangePhysiNode*)pObj;
...@@ -1878,6 +1879,9 @@ static int32_t physiExchangeNodeToJson(const void* pObj, SJson* pJson) { ...@@ -1878,6 +1879,9 @@ static int32_t physiExchangeNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkExchangePhysiPlanSrcEndPoints, pNode->pSrcEndPoints); code = nodeListToJson(pJson, jkExchangePhysiPlanSrcEndPoints, pNode->pSrcEndPoints);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkExchangePhysiPlanSeqRecvData, pNode->seqRecvData);
}
return code; return code;
} }
...@@ -1895,6 +1899,9 @@ static int32_t jsonToPhysiExchangeNode(const SJson* pJson, void* pObj) { ...@@ -1895,6 +1899,9 @@ static int32_t jsonToPhysiExchangeNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkExchangePhysiPlanSrcEndPoints, &pNode->pSrcEndPoints); code = jsonToNodeList(pJson, jkExchangePhysiPlanSrcEndPoints, &pNode->pSrcEndPoints);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkExchangePhysiPlanSeqRecvData, &pNode->seqRecvData);
}
return code; return code;
} }
......
...@@ -2428,7 +2428,8 @@ enum { ...@@ -2428,7 +2428,8 @@ enum {
PHY_EXCHANGE_CODE_SRC_START_GROUP_ID, PHY_EXCHANGE_CODE_SRC_START_GROUP_ID,
PHY_EXCHANGE_CODE_SRC_END_GROUP_ID, PHY_EXCHANGE_CODE_SRC_END_GROUP_ID,
PHY_EXCHANGE_CODE_SINGLE_CHANNEL, PHY_EXCHANGE_CODE_SINGLE_CHANNEL,
PHY_EXCHANGE_CODE_SRC_ENDPOINTS PHY_EXCHANGE_CODE_SRC_ENDPOINTS,
PHY_EXCHANGE_CODE_SEQ_RECV_DATA
}; };
static int32_t physiExchangeNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { static int32_t physiExchangeNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
...@@ -2447,6 +2448,9 @@ static int32_t physiExchangeNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { ...@@ -2447,6 +2448,9 @@ static int32_t physiExchangeNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, PHY_EXCHANGE_CODE_SRC_ENDPOINTS, nodeListToMsg, pNode->pSrcEndPoints); code = tlvEncodeObj(pEncoder, PHY_EXCHANGE_CODE_SRC_ENDPOINTS, nodeListToMsg, pNode->pSrcEndPoints);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeBool(pEncoder, PHY_EXCHANGE_CODE_SEQ_RECV_DATA, pNode->seqRecvData);
}
return code; return code;
} }
...@@ -2473,6 +2477,9 @@ static int32_t msgToPhysiExchangeNode(STlvDecoder* pDecoder, void* pObj) { ...@@ -2473,6 +2477,9 @@ static int32_t msgToPhysiExchangeNode(STlvDecoder* pDecoder, void* pObj) {
case PHY_EXCHANGE_CODE_SRC_ENDPOINTS: case PHY_EXCHANGE_CODE_SRC_ENDPOINTS:
code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pSrcEndPoints); code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pSrcEndPoints);
break; break;
case PHY_EXCHANGE_CODE_SEQ_RECV_DATA:
code = tlvDecodeBool(pTlv, &pNode->seqRecvData);
break;
default: default:
break; break;
} }
......
...@@ -3685,9 +3685,19 @@ static int32_t translateInsertProject(STranslateContext* pCxt, SInsertStmt* pIns ...@@ -3685,9 +3685,19 @@ static int32_t translateInsertProject(STranslateContext* pCxt, SInsertStmt* pIns
return addOrderByPrimaryKeyToQuery(pCxt, pPrimaryKeyExpr, pInsert->pQuery); return addOrderByPrimaryKeyToQuery(pCxt, pPrimaryKeyExpr, pInsert->pQuery);
} }
static int32_t translateInsertTable(STranslateContext* pCxt, SNode* pTable) {
int32_t code = translateFrom(pCxt, pTable);
if (TSDB_CODE_SUCCESS == code && TSDB_CHILD_TABLE != ((SRealTableNode*)pTable)->pMeta->tableType &&
TSDB_NORMAL_TABLE != ((SRealTableNode*)pTable)->pMeta->tableType) {
code = generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_SYNTAX_ERROR,
"insert data into super table is not supported");
}
return code;
}
static int32_t translateInsert(STranslateContext* pCxt, SInsertStmt* pInsert) { static int32_t translateInsert(STranslateContext* pCxt, SInsertStmt* pInsert) {
pCxt->pCurrStmt = (SNode*)pInsert; pCxt->pCurrStmt = (SNode*)pInsert;
int32_t code = translateFrom(pCxt, pInsert->pTable); int32_t code = translateInsertTable(pCxt, pInsert->pTable);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = translateInsertCols(pCxt, pInsert); code = translateInsertCols(pCxt, pInsert);
} }
...@@ -7089,9 +7099,10 @@ static int32_t rewriteDropTable(STranslateContext* pCxt, SQuery* pQuery) { ...@@ -7089,9 +7099,10 @@ static int32_t rewriteDropTable(STranslateContext* pCxt, SQuery* pQuery) {
static int32_t buildUpdateTagValReq(STranslateContext* pCxt, SAlterTableStmt* pStmt, STableMeta* pTableMeta, static int32_t buildUpdateTagValReq(STranslateContext* pCxt, SAlterTableStmt* pStmt, STableMeta* pTableMeta,
SVAlterTbReq* pReq) { SVAlterTbReq* pReq) {
SSchema* pSchema = getColSchema(pTableMeta, pStmt->colName); SSchema* pSchema = getTagSchema(pTableMeta, pStmt->colName);
if (NULL == pSchema) { if (NULL == pSchema) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ALTER_TABLE); return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ALTER_TABLE, "Invalid tag name: %s",
pStmt->colName);
} }
pReq->tagName = strdup(pStmt->colName); pReq->tagName = strdup(pStmt->colName);
......
...@@ -36,6 +36,7 @@ static int32_t createQueryLogicNode(SLogicPlanContext* pCxt, SNode* pStmt, SLogi ...@@ -36,6 +36,7 @@ static int32_t createQueryLogicNode(SLogicPlanContext* pCxt, SNode* pStmt, SLogi
typedef struct SRewriteExprCxt { typedef struct SRewriteExprCxt {
int32_t errCode; int32_t errCode;
SNodeList* pExprs; SNodeList* pExprs;
bool* pOutputs;
} SRewriteExprCxt; } SRewriteExprCxt;
static void setColumnInfo(SFunctionNode* pFunc, SColumnNode* pCol) { static void setColumnInfo(SFunctionNode* pFunc, SColumnNode* pCol) {
...@@ -63,14 +64,30 @@ static void setColumnInfo(SFunctionNode* pFunc, SColumnNode* pCol) { ...@@ -63,14 +64,30 @@ static void setColumnInfo(SFunctionNode* pFunc, SColumnNode* pCol) {
} }
static EDealRes doRewriteExpr(SNode** pNode, void* pContext) { static EDealRes doRewriteExpr(SNode** pNode, void* pContext) {
SRewriteExprCxt* pCxt = (SRewriteExprCxt*)pContext;
switch (nodeType(*pNode)) { switch (nodeType(*pNode)) {
case QUERY_NODE_COLUMN: {
if (NULL != pCxt->pOutputs) {
SNode* pExpr;
int32_t index = 0;
FOREACH(pExpr, pCxt->pExprs) {
if (QUERY_NODE_GROUPING_SET == nodeType(pExpr)) {
pExpr = nodesListGetNode(((SGroupingSetNode*)pExpr)->pParameterList, 0);
}
if (nodesEqualNode(pExpr, *pNode)) {
pCxt->pOutputs[index] = true;
break;
}
}
}
break;
}
case QUERY_NODE_OPERATOR: case QUERY_NODE_OPERATOR:
case QUERY_NODE_LOGIC_CONDITION: case QUERY_NODE_LOGIC_CONDITION:
case QUERY_NODE_FUNCTION: case QUERY_NODE_FUNCTION:
case QUERY_NODE_CASE_WHEN: { case QUERY_NODE_CASE_WHEN: {
SRewriteExprCxt* pCxt = (SRewriteExprCxt*)pContext; SNode* pExpr;
SNode* pExpr; int32_t index = 0;
int32_t index = 0;
FOREACH(pExpr, pCxt->pExprs) { FOREACH(pExpr, pCxt->pExprs) {
if (QUERY_NODE_GROUPING_SET == nodeType(pExpr)) { if (QUERY_NODE_GROUPING_SET == nodeType(pExpr)) {
pExpr = nodesListGetNode(((SGroupingSetNode*)pExpr)->pParameterList, 0); pExpr = nodesListGetNode(((SGroupingSetNode*)pExpr)->pParameterList, 0);
...@@ -89,6 +106,9 @@ static EDealRes doRewriteExpr(SNode** pNode, void* pContext) { ...@@ -89,6 +106,9 @@ static EDealRes doRewriteExpr(SNode** pNode, void* pContext) {
} }
nodesDestroyNode(*pNode); nodesDestroyNode(*pNode);
*pNode = (SNode*)pCol; *pNode = (SNode*)pCol;
if (NULL != pCxt->pOutputs) {
pCxt->pOutputs[index] = true;
}
return DEAL_RES_IGNORE_CHILD; return DEAL_RES_IGNORE_CHILD;
} }
++index; ++index;
...@@ -121,7 +141,7 @@ static EDealRes doNameExpr(SNode* pNode, void* pContext) { ...@@ -121,7 +141,7 @@ static EDealRes doNameExpr(SNode* pNode, void* pContext) {
static int32_t rewriteExprForSelect(SNode* pExpr, SSelectStmt* pSelect, ESqlClause clause) { static int32_t rewriteExprForSelect(SNode* pExpr, SSelectStmt* pSelect, ESqlClause clause) {
nodesWalkExpr(pExpr, doNameExpr, NULL); nodesWalkExpr(pExpr, doNameExpr, NULL);
SRewriteExprCxt cxt = {.errCode = TSDB_CODE_SUCCESS, .pExprs = NULL}; SRewriteExprCxt cxt = {.errCode = TSDB_CODE_SUCCESS, .pExprs = NULL, .pOutputs = NULL};
cxt.errCode = nodesListMakeAppend(&cxt.pExprs, pExpr); cxt.errCode = nodesListMakeAppend(&cxt.pExprs, pExpr);
if (TSDB_CODE_SUCCESS == cxt.errCode) { if (TSDB_CODE_SUCCESS == cxt.errCode) {
nodesRewriteSelectStmt(pSelect, clause, doRewriteExpr, &cxt); nodesRewriteSelectStmt(pSelect, clause, doRewriteExpr, &cxt);
...@@ -130,23 +150,50 @@ static int32_t rewriteExprForSelect(SNode* pExpr, SSelectStmt* pSelect, ESqlClau ...@@ -130,23 +150,50 @@ static int32_t rewriteExprForSelect(SNode* pExpr, SSelectStmt* pSelect, ESqlClau
return cxt.errCode; return cxt.errCode;
} }
static int32_t rewriteExprsForSelect(SNodeList* pExprs, SSelectStmt* pSelect, ESqlClause clause) { static int32_t cloneRewriteExprs(SNodeList* pExprs, bool* pOutputs, SNodeList** pRewriteExpr) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t index = 0;
SNode* pExpr = NULL;
FOREACH(pExpr, pExprs) {
if (pOutputs[index]) {
code = nodesListMakeStrictAppend(pRewriteExpr, nodesCloneNode(pExpr));
if (TSDB_CODE_SUCCESS != code) {
NODES_DESTORY_LIST(*pRewriteExpr);
break;
}
}
}
return code;
}
static int32_t rewriteExprsForSelect(SNodeList* pExprs, SSelectStmt* pSelect, ESqlClause clause,
SNodeList** pRewriteExprs) {
nodesWalkExprs(pExprs, doNameExpr, NULL); nodesWalkExprs(pExprs, doNameExpr, NULL);
SRewriteExprCxt cxt = {.errCode = TSDB_CODE_SUCCESS, .pExprs = pExprs}; SRewriteExprCxt cxt = {.errCode = TSDB_CODE_SUCCESS, .pExprs = pExprs, .pOutputs = NULL};
if (NULL != pRewriteExprs) {
cxt.pOutputs = taosMemoryCalloc(LIST_LENGTH(pExprs), sizeof(bool));
if (NULL == cxt.pOutputs) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
nodesRewriteSelectStmt(pSelect, clause, doRewriteExpr, &cxt); nodesRewriteSelectStmt(pSelect, clause, doRewriteExpr, &cxt);
if (TSDB_CODE_SUCCESS == cxt.errCode && NULL != pRewriteExprs) {
cxt.errCode = cloneRewriteExprs(pExprs, cxt.pOutputs, pRewriteExprs);
}
taosMemoryFree(cxt.pOutputs);
return cxt.errCode; return cxt.errCode;
} }
static int32_t rewriteExpr(SNodeList* pExprs, SNode** pTarget) { static int32_t rewriteExpr(SNodeList* pExprs, SNode** pTarget) {
nodesWalkExprs(pExprs, doNameExpr, NULL); nodesWalkExprs(pExprs, doNameExpr, NULL);
SRewriteExprCxt cxt = {.errCode = TSDB_CODE_SUCCESS, .pExprs = pExprs}; SRewriteExprCxt cxt = {.errCode = TSDB_CODE_SUCCESS, .pExprs = pExprs, .pOutputs = NULL};
nodesRewriteExpr(pTarget, doRewriteExpr, &cxt); nodesRewriteExpr(pTarget, doRewriteExpr, &cxt);
return cxt.errCode; return cxt.errCode;
} }
static int32_t rewriteExprs(SNodeList* pExprs, SNodeList* pTarget) { static int32_t rewriteExprs(SNodeList* pExprs, SNodeList* pTarget) {
nodesWalkExprs(pExprs, doNameExpr, NULL); nodesWalkExprs(pExprs, doNameExpr, NULL);
SRewriteExprCxt cxt = {.errCode = TSDB_CODE_SUCCESS, .pExprs = pExprs}; SRewriteExprCxt cxt = {.errCode = TSDB_CODE_SUCCESS, .pExprs = pExprs, .pOutputs = NULL};
nodesRewriteExprs(pTarget, doRewriteExpr, &cxt); nodesRewriteExprs(pTarget, doRewriteExpr, &cxt);
return cxt.errCode; return cxt.errCode;
} }
...@@ -311,7 +358,7 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect ...@@ -311,7 +358,7 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
// rewrite the expression in subsequent clauses // rewrite the expression in subsequent clauses
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = rewriteExprsForSelect(pScan->pScanPseudoCols, pSelect, SQL_CLAUSE_FROM); code = rewriteExprsForSelect(pScan->pScanPseudoCols, pSelect, SQL_CLAUSE_FROM, NULL);
} }
pScan->scanType = getScanType(pCxt, pScan->pScanPseudoCols, pScan->pScanCols, pScan->tableType, pSelect->tagScan); pScan->scanType = getScanType(pCxt, pScan->pScanPseudoCols, pScan->pScanCols, pScan->tableType, pSelect->tagScan);
...@@ -509,23 +556,20 @@ static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, ...@@ -509,23 +556,20 @@ static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect,
// rewrite the expression in subsequent clauses // rewrite the expression in subsequent clauses
if (TSDB_CODE_SUCCESS == code && NULL != pAgg->pAggFuncs) { if (TSDB_CODE_SUCCESS == code && NULL != pAgg->pAggFuncs) {
code = rewriteExprsForSelect(pAgg->pAggFuncs, pSelect, SQL_CLAUSE_GROUP_BY); code = rewriteExprsForSelect(pAgg->pAggFuncs, pSelect, SQL_CLAUSE_GROUP_BY, NULL);
} }
if (NULL != pSelect->pGroupByList) { if (NULL != pSelect->pGroupByList) {
if (NULL != pAgg->pGroupKeys) { pAgg->pGroupKeys = nodesCloneList(pSelect->pGroupByList);
code = nodesListStrictAppendList(pAgg->pGroupKeys, nodesCloneList(pSelect->pGroupByList)); if (NULL == pAgg->pGroupKeys) {
} else { code = TSDB_CODE_OUT_OF_MEMORY;
pAgg->pGroupKeys = nodesCloneList(pSelect->pGroupByList);
if (NULL == pAgg->pGroupKeys) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
} }
} }
// rewrite the expression in subsequent clauses // rewrite the expression in subsequent clauses
SNodeList* pOutputGroupKeys = NULL;
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = rewriteExprsForSelect(pAgg->pGroupKeys, pSelect, SQL_CLAUSE_GROUP_BY); code = rewriteExprsForSelect(pAgg->pGroupKeys, pSelect, SQL_CLAUSE_GROUP_BY, &pOutputGroupKeys);
} }
if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pHaving) { if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pHaving) {
...@@ -536,9 +580,11 @@ static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, ...@@ -536,9 +580,11 @@ static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect,
} }
// set the output // set the output
if (TSDB_CODE_SUCCESS == code && NULL != pAgg->pGroupKeys) { if (TSDB_CODE_SUCCESS == code && NULL != pOutputGroupKeys) {
code = createColumnByRewriteExprs(pAgg->pGroupKeys, &pAgg->node.pTargets); code = createColumnByRewriteExprs(pOutputGroupKeys, &pAgg->node.pTargets);
} }
nodesDestroyList(pOutputGroupKeys);
if (TSDB_CODE_SUCCESS == code && NULL != pAgg->pAggFuncs) { if (TSDB_CODE_SUCCESS == code && NULL != pAgg->pAggFuncs) {
code = createColumnByRewriteExprs(pAgg->pAggFuncs, &pAgg->node.pTargets); code = createColumnByRewriteExprs(pAgg->pAggFuncs, &pAgg->node.pTargets);
} }
...@@ -574,7 +620,7 @@ static int32_t createIndefRowsFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt ...@@ -574,7 +620,7 @@ static int32_t createIndefRowsFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt
// indefinite rows functions and _select_values functions // indefinite rows functions and _select_values functions
int32_t code = nodesCollectFuncs(pSelect, SQL_CLAUSE_SELECT, fmIsVectorFunc, &pIdfRowsFunc->pFuncs); int32_t code = nodesCollectFuncs(pSelect, SQL_CLAUSE_SELECT, fmIsVectorFunc, &pIdfRowsFunc->pFuncs);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = rewriteExprsForSelect(pIdfRowsFunc->pFuncs, pSelect, SQL_CLAUSE_SELECT); code = rewriteExprsForSelect(pIdfRowsFunc->pFuncs, pSelect, SQL_CLAUSE_SELECT, NULL);
} }
// set the output // set the output
...@@ -612,7 +658,7 @@ static int32_t createInterpFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt* p ...@@ -612,7 +658,7 @@ static int32_t createInterpFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt* p
// interp functions and _group_key functions // interp functions and _group_key functions
int32_t code = nodesCollectFuncs(pSelect, SQL_CLAUSE_SELECT, isInterpFunc, &pInterpFunc->pFuncs); int32_t code = nodesCollectFuncs(pSelect, SQL_CLAUSE_SELECT, isInterpFunc, &pInterpFunc->pFuncs);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = rewriteExprsForSelect(pInterpFunc->pFuncs, pSelect, SQL_CLAUSE_SELECT); code = rewriteExprsForSelect(pInterpFunc->pFuncs, pSelect, SQL_CLAUSE_SELECT, NULL);
} }
if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pFill) { if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pFill) {
...@@ -656,7 +702,7 @@ static int32_t createWindowLogicNodeFinalize(SLogicPlanContext* pCxt, SSelectStm ...@@ -656,7 +702,7 @@ static int32_t createWindowLogicNodeFinalize(SLogicPlanContext* pCxt, SSelectStm
int32_t code = nodesCollectFuncs(pSelect, SQL_CLAUSE_WINDOW, fmIsWindowClauseFunc, &pWindow->pFuncs); int32_t code = nodesCollectFuncs(pSelect, SQL_CLAUSE_WINDOW, fmIsWindowClauseFunc, &pWindow->pFuncs);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = rewriteExprsForSelect(pWindow->pFuncs, pSelect, SQL_CLAUSE_WINDOW); code = rewriteExprsForSelect(pWindow->pFuncs, pSelect, SQL_CLAUSE_WINDOW, NULL);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
...@@ -854,10 +900,10 @@ static int32_t createFillLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect ...@@ -854,10 +900,10 @@ static int32_t createFillLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
int32_t code = partFillExprs(pSelect, &pFill->pFillExprs, &pFill->pNotFillExprs); int32_t code = partFillExprs(pSelect, &pFill->pFillExprs, &pFill->pNotFillExprs);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = rewriteExprsForSelect(pFill->pFillExprs, pSelect, SQL_CLAUSE_FILL); code = rewriteExprsForSelect(pFill->pFillExprs, pSelect, SQL_CLAUSE_FILL, NULL);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = rewriteExprsForSelect(pFill->pNotFillExprs, pSelect, SQL_CLAUSE_FILL); code = rewriteExprsForSelect(pFill->pNotFillExprs, pSelect, SQL_CLAUSE_FILL, NULL);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = createColumnByRewriteExprs(pFill->pFillExprs, &pFill->node.pTargets); code = createColumnByRewriteExprs(pFill->pFillExprs, &pFill->node.pTargets);
...@@ -1066,7 +1112,7 @@ static int32_t createDistinctLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSe ...@@ -1066,7 +1112,7 @@ static int32_t createDistinctLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSe
// rewrite the expression in subsequent clauses // rewrite the expression in subsequent clauses
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = rewriteExprsForSelect(pAgg->pGroupKeys, pSelect, SQL_CLAUSE_DISTINCT); code = rewriteExprsForSelect(pAgg->pGroupKeys, pSelect, SQL_CLAUSE_DISTINCT, NULL);
} }
// set the output // set the output
......
...@@ -1476,19 +1476,33 @@ static bool partTagsHasIndefRowsSelectFunc(SNodeList* pFuncs) { ...@@ -1476,19 +1476,33 @@ static bool partTagsHasIndefRowsSelectFunc(SNodeList* pFuncs) {
return false; return false;
} }
static int32_t partTagsRewriteGroupTagsToFuncs(SNodeList* pGroupTags, int32_t start, SNodeList* pAggFuncs) { static bool partTagsNeedOutput(SNode* pExpr, SNodeList* pTargets) {
bool hasIndefRowsSelectFunc = partTagsHasIndefRowsSelectFunc(pAggFuncs); SNode* pOutput = NULL;
FOREACH(pOutput, pTargets) {
if (QUERY_NODE_COLUMN == nodeType(pExpr)) {
if (nodesEqualNode(pExpr, pOutput)) {
return true;
}
} else if (0 == strcmp(((SExprNode*)pExpr)->aliasName, ((SColumnNode*)pOutput)->colName)) {
return true;
}
}
return false;
}
static int32_t partTagsRewriteGroupTagsToFuncs(SNodeList* pGroupTags, int32_t start, SAggLogicNode* pAgg) {
bool hasIndefRowsSelectFunc = partTagsHasIndefRowsSelectFunc(pAgg->pAggFuncs);
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int32_t index = 0; int32_t index = 0;
SNode* pNode = NULL; SNode* pNode = NULL;
FOREACH(pNode, pGroupTags) { FOREACH(pNode, pGroupTags) {
if (index++ < start) { if (index++ < start || !partTagsNeedOutput(pNode, pAgg->node.pTargets)) {
continue; continue;
} }
if (hasIndefRowsSelectFunc) { if (hasIndefRowsSelectFunc) {
code = nodesListStrictAppend(pAggFuncs, partTagsCreateWrapperFunc("_select_value", pNode)); code = nodesListStrictAppend(pAgg->pAggFuncs, partTagsCreateWrapperFunc("_select_value", pNode));
} else { } else {
code = nodesListStrictAppend(pAggFuncs, partTagsCreateWrapperFunc("_group_key", pNode)); code = nodesListStrictAppend(pAgg->pAggFuncs, partTagsCreateWrapperFunc("_group_key", pNode));
} }
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
break; break;
...@@ -1541,7 +1555,7 @@ static int32_t partTagsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub ...@@ -1541,7 +1555,7 @@ static int32_t partTagsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub
} }
NODES_DESTORY_LIST(pAgg->pGroupKeys); NODES_DESTORY_LIST(pAgg->pGroupKeys);
if (TSDB_CODE_SUCCESS == code && start >= 0) { if (TSDB_CODE_SUCCESS == code && start >= 0) {
code = partTagsRewriteGroupTagsToFuncs(pScan->pGroupTags, start, pAgg->pAggFuncs); code = partTagsRewriteGroupTagsToFuncs(pScan->pGroupTags, start, pAgg);
} }
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
......
...@@ -1064,6 +1064,7 @@ static int32_t doCreateExchangePhysiNode(SPhysiPlanContext* pCxt, SExchangeLogic ...@@ -1064,6 +1064,7 @@ static int32_t doCreateExchangePhysiNode(SPhysiPlanContext* pCxt, SExchangeLogic
pExchange->srcStartGroupId = pExchangeLogicNode->srcStartGroupId; pExchange->srcStartGroupId = pExchangeLogicNode->srcStartGroupId;
pExchange->srcEndGroupId = pExchangeLogicNode->srcEndGroupId; pExchange->srcEndGroupId = pExchangeLogicNode->srcEndGroupId;
pExchange->seqRecvData = pExchangeLogicNode->seqRecvData;
*pPhyNode = (SPhysiNode*)pExchange; *pPhyNode = (SPhysiNode*)pExchange;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
...@@ -292,6 +292,43 @@ static bool stbSplNeedSplitJoin(bool streamQuery, SJoinLogicNode* pJoin) { ...@@ -292,6 +292,43 @@ static bool stbSplNeedSplitJoin(bool streamQuery, SJoinLogicNode* pJoin) {
return true; return true;
} }
static SNodeList* stbSplGetPartKeys(SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
return ((SScanLogicNode*)pNode)->pGroupTags;
} else if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode)) {
return ((SPartitionLogicNode*)pNode)->pPartitionKeys;
} else {
return NULL;
}
}
static bool stbSplHasPartTbname(SNodeList* pPartKeys) {
if (NULL == pPartKeys) {
return false;
}
SNode* pPartKey = NULL;
FOREACH(pPartKey, pPartKeys) {
if (QUERY_NODE_GROUPING_SET == nodeType(pPartKey)) {
pPartKey = nodesListGetNode(((SGroupingSetNode*)pPartKey)->pParameterList, 0);
}
if ((QUERY_NODE_FUNCTION == nodeType(pPartKey) && FUNCTION_TYPE_TBNAME == ((SFunctionNode*)pPartKey)->funcType) ||
(QUERY_NODE_COLUMN == nodeType(pPartKey) && COLUMN_TYPE_TBNAME == ((SColumnNode*)pPartKey)->colType)) {
return true;
}
}
return false;
}
static bool stbSplIsPartTableAgg(SAggLogicNode* pAgg) {
if (NULL != pAgg->pGroupKeys) {
return stbSplHasPartTbname(pAgg->pGroupKeys);
}
if (1 != LIST_LENGTH(pAgg->node.pChildren)) {
return false;
}
return stbSplHasPartTbname(stbSplGetPartKeys((SLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0)));
}
static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) { static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) {
switch (nodeType(pNode)) { switch (nodeType(pNode)) {
case QUERY_NODE_LOGIC_PLAN_SCAN: case QUERY_NODE_LOGIC_PLAN_SCAN:
...@@ -301,7 +338,9 @@ static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) { ...@@ -301,7 +338,9 @@ static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) {
case QUERY_NODE_LOGIC_PLAN_PARTITION: case QUERY_NODE_LOGIC_PLAN_PARTITION:
return streamQuery ? false : stbSplIsMultiTbScanChild(streamQuery, pNode); return streamQuery ? false : stbSplIsMultiTbScanChild(streamQuery, pNode);
case QUERY_NODE_LOGIC_PLAN_AGG: case QUERY_NODE_LOGIC_PLAN_AGG:
return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(streamQuery, pNode); return (!stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) ||
stbSplIsPartTableAgg((SAggLogicNode*)pNode)) &&
stbSplHasMultiTbScan(streamQuery, pNode);
case QUERY_NODE_LOGIC_PLAN_WINDOW: case QUERY_NODE_LOGIC_PLAN_WINDOW:
return stbSplNeedSplitWindow(streamQuery, pNode); return stbSplNeedSplitWindow(streamQuery, pNode);
case QUERY_NODE_LOGIC_PLAN_SORT: case QUERY_NODE_LOGIC_PLAN_SORT:
...@@ -676,27 +715,8 @@ static int32_t stbSplSplitState(SSplitContext* pCxt, SStableSplitInfo* pInfo) { ...@@ -676,27 +715,8 @@ static int32_t stbSplSplitState(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
} }
} }
static SNodeList* stbSplGetPartKeys(SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
return ((SScanLogicNode*)pNode)->pGroupTags;
} else if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode)) {
return ((SPartitionLogicNode*)pNode)->pPartitionKeys;
} else {
return NULL;
}
}
static bool stbSplIsPartTbanme(SNodeList* pPartKeys) {
if (NULL == pPartKeys || 1 != LIST_LENGTH(pPartKeys)) {
return false;
}
SNode* pPartKey = nodesListGetNode(pPartKeys, 0);
return (QUERY_NODE_FUNCTION == nodeType(pPartKey) && FUNCTION_TYPE_TBNAME == ((SFunctionNode*)pPartKey)->funcType) ||
(QUERY_NODE_COLUMN == nodeType(pPartKey) && COLUMN_TYPE_TBNAME == ((SColumnNode*)pPartKey)->colType);
}
static bool stbSplIsPartTableWinodw(SWindowLogicNode* pWindow) { static bool stbSplIsPartTableWinodw(SWindowLogicNode* pWindow) {
return stbSplIsPartTbanme(stbSplGetPartKeys((SLogicNode*)nodesListGetNode(pWindow->node.pChildren, 0))); return stbSplHasPartTbname(stbSplGetPartKeys((SLogicNode*)nodesListGetNode(pWindow->node.pChildren, 0)));
} }
static int32_t stbSplSplitWindowForCrossTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) { static int32_t stbSplSplitWindowForCrossTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
...@@ -713,6 +733,17 @@ static int32_t stbSplSplitWindowForCrossTable(SSplitContext* pCxt, SStableSplitI ...@@ -713,6 +733,17 @@ static int32_t stbSplSplitWindowForCrossTable(SSplitContext* pCxt, SStableSplitI
return TSDB_CODE_PLAN_INTERNAL_ERROR; return TSDB_CODE_PLAN_INTERNAL_ERROR;
} }
static bool stbSplNeedSeqRecvData(SLogicNode* pNode) {
if (NULL == pNode) {
return false;
}
if (NULL != pNode->pLimit || NULL != pNode->pSlimit) {
return true;
}
return stbSplNeedSeqRecvData(pNode->pParent);
}
static int32_t stbSplSplitWindowForPartTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) { static int32_t stbSplSplitWindowForPartTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
if (pCxt->pPlanCxt->streamQuery) { if (pCxt->pPlanCxt->streamQuery) {
SPLIT_FLAG_SET_MASK(pInfo->pSubplan->splitFlag, SPLIT_FLAG_STABLE_SPLIT); SPLIT_FLAG_SET_MASK(pInfo->pSubplan->splitFlag, SPLIT_FLAG_STABLE_SPLIT);
...@@ -728,6 +759,7 @@ static int32_t stbSplSplitWindowForPartTable(SSplitContext* pCxt, SStableSplitIn ...@@ -728,6 +759,7 @@ static int32_t stbSplSplitWindowForPartTable(SSplitContext* pCxt, SStableSplitIn
code = replaceLogicNode(pInfo->pSubplan, pInfo->pSplitNode, (SLogicNode*)pExchange); code = replaceLogicNode(pInfo->pSubplan, pInfo->pSplitNode, (SLogicNode*)pExchange);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
pExchange->seqRecvData = stbSplNeedSeqRecvData((SLogicNode*)pExchange);
code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren, code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
(SNode*)splCreateScanSubplan(pCxt, pInfo->pSplitNode, SPLIT_FLAG_STABLE_SPLIT)); (SNode*)splCreateScanSubplan(pCxt, pInfo->pSplitNode, SPLIT_FLAG_STABLE_SPLIT));
} }
...@@ -797,7 +829,17 @@ static int32_t stbSplCreatePartAggNode(SAggLogicNode* pMergeAgg, SLogicNode** pO ...@@ -797,7 +829,17 @@ static int32_t stbSplCreatePartAggNode(SAggLogicNode* pMergeAgg, SLogicNode** pO
return code; return code;
} }
static int32_t stbSplSplitAggNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) { static int32_t stbSplSplitAggNodeForPartTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
int32_t code = splCreateExchangeNodeForSubplan(pCxt, pInfo->pSubplan, pInfo->pSplitNode, SUBPLAN_TYPE_MERGE);
if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
(SNode*)splCreateScanSubplan(pCxt, pInfo->pSplitNode, SPLIT_FLAG_STABLE_SPLIT));
}
++(pCxt->groupId);
return code;
}
static int32_t stbSplSplitAggNodeForCrossTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
SLogicNode* pPartAgg = NULL; SLogicNode* pPartAgg = NULL;
int32_t code = stbSplCreatePartAggNode((SAggLogicNode*)pInfo->pSplitNode, &pPartAgg); int32_t code = stbSplCreatePartAggNode((SAggLogicNode*)pInfo->pSplitNode, &pPartAgg);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
...@@ -812,6 +854,13 @@ static int32_t stbSplSplitAggNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) ...@@ -812,6 +854,13 @@ static int32_t stbSplSplitAggNode(SSplitContext* pCxt, SStableSplitInfo* pInfo)
return code; return code;
} }
static int32_t stbSplSplitAggNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
if (stbSplIsPartTableAgg((SAggLogicNode*)pInfo->pSplitNode)) {
return stbSplSplitAggNodeForPartTable(pCxt, pInfo);
}
return stbSplSplitAggNodeForCrossTable(pCxt, pInfo);
}
static SNode* stbSplCreateColumnNode(SExprNode* pExpr) { static SNode* stbSplCreateColumnNode(SExprNode* pExpr) {
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN); SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
if (NULL == pCol) { if (NULL == pCol) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册