提交 f36b0be1 编写于 作者: S shenglian zhou

fix: join eq conditions not only for tag

上级 8eacc51a
...@@ -112,7 +112,7 @@ typedef struct SJoinLogicNode { ...@@ -112,7 +112,7 @@ typedef struct SJoinLogicNode {
SNode* pOnConditions; SNode* pOnConditions;
bool isSingleTableJoin; bool isSingleTableJoin;
EOrder inputTsOrder; EOrder inputTsOrder;
SNode* pTagEqualConditions; SNode* pEqualOnConditions;
} SJoinLogicNode; } SJoinLogicNode;
typedef struct SAggLogicNode { typedef struct SAggLogicNode {
...@@ -406,7 +406,7 @@ typedef struct SSortMergeJoinPhysiNode { ...@@ -406,7 +406,7 @@ typedef struct SSortMergeJoinPhysiNode {
SNode* pOnConditions; SNode* pOnConditions;
SNodeList* pTargets; SNodeList* pTargets;
EOrder inputTsOrder; EOrder inputTsOrder;
SNode* pTagEqualCondtions; SNode* pEqualOnCondtions;
} SSortMergeJoinPhysiNode; } SSortMergeJoinPhysiNode;
typedef struct SAggPhysiNode { typedef struct SAggPhysiNode {
...@@ -448,7 +448,7 @@ typedef struct SMergePhysiNode { ...@@ -448,7 +448,7 @@ typedef struct SMergePhysiNode {
bool groupSort; bool groupSort;
} SMergePhysiNode; } SMergePhysiNode;
typedef struct SWinodwPhysiNode { typedef struct SWindowPhysiNode {
SPhysiNode node; SPhysiNode node;
SNodeList* pExprs; // these are expression list of parameter expression of function SNodeList* pExprs; // these are expression list of parameter expression of function
SNodeList* pFuncs; SNodeList* pFuncs;
...@@ -461,10 +461,10 @@ typedef struct SWinodwPhysiNode { ...@@ -461,10 +461,10 @@ typedef struct SWinodwPhysiNode {
EOrder inputTsOrder; EOrder inputTsOrder;
EOrder outputTsOrder; EOrder outputTsOrder;
bool mergeDataBlock; bool mergeDataBlock;
} SWinodwPhysiNode; } SWindowPhysiNode;
typedef struct SIntervalPhysiNode { typedef struct SIntervalPhysiNode {
SWinodwPhysiNode window; SWindowPhysiNode window;
int64_t interval; int64_t interval;
int64_t offset; int64_t offset;
int64_t sliding; int64_t sliding;
...@@ -497,7 +497,7 @@ typedef struct SMultiTableIntervalPhysiNode { ...@@ -497,7 +497,7 @@ typedef struct SMultiTableIntervalPhysiNode {
} SMultiTableIntervalPhysiNode; } SMultiTableIntervalPhysiNode;
typedef struct SSessionWinodwPhysiNode { typedef struct SSessionWinodwPhysiNode {
SWinodwPhysiNode window; SWindowPhysiNode window;
int64_t gap; int64_t gap;
} SSessionWinodwPhysiNode; } SSessionWinodwPhysiNode;
...@@ -506,14 +506,14 @@ typedef SSessionWinodwPhysiNode SStreamSemiSessionWinodwPhysiNode; ...@@ -506,14 +506,14 @@ typedef SSessionWinodwPhysiNode SStreamSemiSessionWinodwPhysiNode;
typedef SSessionWinodwPhysiNode SStreamFinalSessionWinodwPhysiNode; typedef SSessionWinodwPhysiNode SStreamFinalSessionWinodwPhysiNode;
typedef struct SStateWinodwPhysiNode { typedef struct SStateWinodwPhysiNode {
SWinodwPhysiNode window; SWindowPhysiNode window;
SNode* pStateKey; SNode* pStateKey;
} SStateWinodwPhysiNode; } SStateWinodwPhysiNode;
typedef SStateWinodwPhysiNode SStreamStateWinodwPhysiNode; typedef SStateWinodwPhysiNode SStreamStateWinodwPhysiNode;
typedef struct SEventWinodwPhysiNode { typedef struct SEventWinodwPhysiNode {
SWinodwPhysiNode window; SWindowPhysiNode window;
SNode* pStartCond; SNode* pStartCond;
SNode* pEndCond; SNode* pEndCond;
} SEventWinodwPhysiNode; } SEventWinodwPhysiNode;
......
...@@ -52,15 +52,15 @@ typedef struct SJoinOperatorInfo { ...@@ -52,15 +52,15 @@ typedef struct SJoinOperatorInfo {
int32_t rightPos; int32_t rightPos;
SColumnInfo rightCol; SColumnInfo rightCol;
SNode* pCondAfterMerge; SNode* pCondAfterMerge;
SNode* pTagEqualConditions; SNode* pEqualOnConditions;
SArray* leftTagCols; SArray* leftEqOnCondCols;
char* leftTagKeyBuf; char* leftEqOnCondKeyBuf;
int32_t leftTagKeyLen; int32_t leftEqOnCondKeyLen;
SArray* rightTagCols; SArray* rightEqOnCondCols;
char* rightTagKeyBuf; char* rightEqOnCondKeyBuf;
int32_t rightTagKeyLen; int32_t rightEqOnCondKeyLen;
SSHashObj* rightBuildTable; SSHashObj* rightBuildTable;
SJoinRowCtx rowCtx; SJoinRowCtx rowCtx;
...@@ -104,7 +104,7 @@ static void extractTimeCondition(SJoinOperatorInfo* pInfo, SOperatorInfo** pDown ...@@ -104,7 +104,7 @@ static void extractTimeCondition(SJoinOperatorInfo* pInfo, SOperatorInfo** pDown
setJoinColumnInfo(&pInfo->rightCol, rightTsCol); setJoinColumnInfo(&pInfo->rightCol, rightTsCol);
} }
static void extractTagEqualColsFromOper(SJoinOperatorInfo* pInfo, SOperatorInfo** pDownstreams, SOperatorNode* pOperNode, static void extractEqualOnCondColsFromOper(SJoinOperatorInfo* pInfo, SOperatorInfo** pDownstreams, SOperatorNode* pOperNode,
SColumn* pLeft, SColumn* pRight) { SColumn* pLeft, SColumn* pRight) {
SColumnNode* pLeftNode = (SColumnNode*)pOperNode->pLeft; SColumnNode* pLeftNode = (SColumnNode*)pOperNode->pLeft;
SColumnNode* pRightNode = (SColumnNode*)pOperNode->pRight; SColumnNode* pRightNode = (SColumnNode*)pOperNode->pRight;
...@@ -125,7 +125,7 @@ static void extractTagEqualCondCols(SJoinOperatorInfo* pInfo, SOperatorInfo** pD ...@@ -125,7 +125,7 @@ static void extractTagEqualCondCols(SJoinOperatorInfo* pInfo, SOperatorInfo** pD
SNode* pNode = NULL; SNode* pNode = NULL;
FOREACH(pNode, ((SLogicConditionNode*)pTagEqualNode)->pParameterList) { FOREACH(pNode, ((SLogicConditionNode*)pTagEqualNode)->pParameterList) {
SOperatorNode* pOperNode = (SOperatorNode*)pNode; SOperatorNode* pOperNode = (SOperatorNode*)pNode;
extractTagEqualColsFromOper(pInfo, pDownStream, pOperNode, &left, &right); extractEqualOnCondColsFromOper(pInfo, pDownStream, pOperNode, &left, &right);
taosArrayPush(leftTagEqCols, &left); taosArrayPush(leftTagEqCols, &left);
taosArrayPush(rightTagEqCols, &right); taosArrayPush(rightTagEqCols, &right);
} }
...@@ -134,7 +134,7 @@ static void extractTagEqualCondCols(SJoinOperatorInfo* pInfo, SOperatorInfo** pD ...@@ -134,7 +134,7 @@ static void extractTagEqualCondCols(SJoinOperatorInfo* pInfo, SOperatorInfo** pD
if (nodeType(pTagEqualNode) == QUERY_NODE_OPERATOR) { if (nodeType(pTagEqualNode) == QUERY_NODE_OPERATOR) {
SOperatorNode* pOperNode = (SOperatorNode*)pTagEqualNode; SOperatorNode* pOperNode = (SOperatorNode*)pTagEqualNode;
extractTagEqualColsFromOper(pInfo, pDownStream, pOperNode, &left, &right); extractEqualOnCondColsFromOper(pInfo, pDownStream, pOperNode, &left, &right);
taosArrayPush(leftTagEqCols, &left); taosArrayPush(leftTagEqCols, &left);
taosArrayPush(rightTagEqCols, &right); taosArrayPush(rightTagEqCols, &right);
} }
...@@ -259,13 +259,13 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t ...@@ -259,13 +259,13 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
pInfo->inputOrder = TSDB_ORDER_DESC; pInfo->inputOrder = TSDB_ORDER_DESC;
} }
pInfo->pTagEqualConditions = pJoinNode->pTagEqualCondtions; pInfo->pEqualOnConditions = pJoinNode->pEqualOnCondtions;
if (pInfo->pTagEqualConditions != NULL) { if (pInfo->pEqualOnConditions != NULL) {
pInfo->leftTagCols = taosArrayInit(4, sizeof(SColumn)); pInfo->leftEqOnCondCols = taosArrayInit(4, sizeof(SColumn));
pInfo->rightTagCols = taosArrayInit(4, sizeof(SColumn)); pInfo->rightEqOnCondCols = taosArrayInit(4, sizeof(SColumn));
extractTagEqualCondCols(pInfo, pDownstream, pInfo->pTagEqualConditions, pInfo->leftTagCols, pInfo->rightTagCols); extractTagEqualCondCols(pInfo, pDownstream, pInfo->pEqualOnConditions, pInfo->leftEqOnCondCols, pInfo->rightEqOnCondCols);
initTagColskeyBuf(&pInfo->leftTagKeyLen, &pInfo->leftTagKeyBuf, pInfo->leftTagCols); initTagColskeyBuf(&pInfo->leftEqOnCondKeyLen, &pInfo->leftEqOnCondKeyBuf, pInfo->leftEqOnCondCols);
initTagColskeyBuf(&pInfo->rightTagKeyLen, &pInfo->rightTagKeyBuf, pInfo->rightTagCols); initTagColskeyBuf(&pInfo->rightEqOnCondKeyLen, &pInfo->rightEqOnCondKeyBuf, pInfo->rightEqOnCondCols);
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->rightBuildTable = tSimpleHashInit(256, hashFn); pInfo->rightBuildTable = tSimpleHashInit(256, hashFn);
} }
...@@ -309,13 +309,13 @@ static void mergeJoinDestoryBuildTable(SSHashObj* pBuildTable) { ...@@ -309,13 +309,13 @@ static void mergeJoinDestoryBuildTable(SSHashObj* pBuildTable) {
void destroyMergeJoinOperator(void* param) { void destroyMergeJoinOperator(void* param) {
SJoinOperatorInfo* pJoinOperator = (SJoinOperatorInfo*)param; SJoinOperatorInfo* pJoinOperator = (SJoinOperatorInfo*)param;
if (pJoinOperator->pTagEqualConditions != NULL) { if (pJoinOperator->pEqualOnConditions != NULL) {
mergeJoinDestoryBuildTable(pJoinOperator->rightBuildTable); mergeJoinDestoryBuildTable(pJoinOperator->rightBuildTable);
taosMemoryFreeClear(pJoinOperator->rightTagKeyBuf); taosMemoryFreeClear(pJoinOperator->rightEqOnCondKeyBuf);
taosArrayDestroy(pJoinOperator->rightTagCols); taosArrayDestroy(pJoinOperator->rightEqOnCondCols);
taosMemoryFreeClear(pJoinOperator->leftTagKeyBuf); taosMemoryFreeClear(pJoinOperator->leftEqOnCondKeyBuf);
taosArrayDestroy(pJoinOperator->leftTagCols); taosArrayDestroy(pJoinOperator->leftEqOnCondCols);
} }
nodesDestroyNode(pJoinOperator->pCondAfterMerge); nodesDestroyNode(pJoinOperator->pCondAfterMerge);
...@@ -439,12 +439,12 @@ static int32_t mergeJoinGetDownStreamRowsEqualTimeStamp(SOperatorInfo* pOperator ...@@ -439,12 +439,12 @@ static int32_t mergeJoinGetDownStreamRowsEqualTimeStamp(SOperatorInfo* pOperator
static int32_t mergeJoinFillBuildTable(SJoinOperatorInfo* pInfo, SArray* rightRowLocations) { static int32_t mergeJoinFillBuildTable(SJoinOperatorInfo* pInfo, SArray* rightRowLocations) {
for (int32_t i = 0; i < taosArrayGetSize(rightRowLocations); ++i) { for (int32_t i = 0; i < taosArrayGetSize(rightRowLocations); ++i) {
SRowLocation* rightRow = taosArrayGet(rightRowLocations, i); SRowLocation* rightRow = taosArrayGet(rightRowLocations, i);
int32_t keyLen = fillKeyBufFromTagCols(pInfo->rightTagCols, rightRow->pDataBlock, rightRow->pos, pInfo->rightTagKeyBuf); int32_t keyLen = fillKeyBufFromTagCols(pInfo->rightEqOnCondCols, rightRow->pDataBlock, rightRow->pos, pInfo->rightEqOnCondKeyBuf);
SArray** ppRows = tSimpleHashGet(pInfo->rightBuildTable, pInfo->rightTagKeyBuf, keyLen); SArray** ppRows = tSimpleHashGet(pInfo->rightBuildTable, pInfo->rightEqOnCondKeyBuf, keyLen);
if (!ppRows) { if (!ppRows) {
SArray* rows = taosArrayInit(4, sizeof(SRowLocation)); SArray* rows = taosArrayInit(4, sizeof(SRowLocation));
taosArrayPush(rows, rightRow); taosArrayPush(rows, rightRow);
tSimpleHashPut(pInfo->rightBuildTable, pInfo->rightTagKeyBuf, keyLen, &rows, POINTER_BYTES); tSimpleHashPut(pInfo->rightBuildTable, pInfo->rightEqOnCondKeyBuf, keyLen, &rows, POINTER_BYTES);
} else { } else {
taosArrayPush(*ppRows, rightRow); taosArrayPush(*ppRows, rightRow);
} }
...@@ -466,8 +466,8 @@ static int32_t mergeJoinLeftRowsRightRows(SOperatorInfo* pOperator, SSDataBlock* ...@@ -466,8 +466,8 @@ static int32_t mergeJoinLeftRowsRightRows(SOperatorInfo* pOperator, SSDataBlock*
SRowLocation* leftRow = taosArrayGet(leftRowLocations, i); SRowLocation* leftRow = taosArrayGet(leftRowLocations, i);
SArray* pRightRows = NULL; SArray* pRightRows = NULL;
if (useBuildTableTSRange) { if (useBuildTableTSRange) {
int32_t keyLen = fillKeyBufFromTagCols(pJoinInfo->leftTagCols, leftRow->pDataBlock, leftRow->pos, pJoinInfo->leftTagKeyBuf); int32_t keyLen = fillKeyBufFromTagCols(pJoinInfo->leftEqOnCondCols, leftRow->pDataBlock, leftRow->pos, pJoinInfo->leftEqOnCondKeyBuf);
SArray** ppRightRows = tSimpleHashGet(pJoinInfo->rightBuildTable, pJoinInfo->leftTagKeyBuf, keyLen); SArray** ppRightRows = tSimpleHashGet(pJoinInfo->rightBuildTable, pJoinInfo->leftEqOnCondKeyBuf, keyLen);
if (!ppRightRows) { if (!ppRightRows) {
continue; continue;
} }
...@@ -567,7 +567,7 @@ static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t ...@@ -567,7 +567,7 @@ static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t
pJoinInfo->leftPos, timestamp, leftRowLocations, leftCreatedBlocks); pJoinInfo->leftPos, timestamp, leftRowLocations, leftCreatedBlocks);
mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 1, pJoinInfo->rightCol.slotId, pJoinInfo->pRight, mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 1, pJoinInfo->rightCol.slotId, pJoinInfo->pRight,
pJoinInfo->rightPos, timestamp, rightRowLocations, rightCreatedBlocks); pJoinInfo->rightPos, timestamp, rightRowLocations, rightCreatedBlocks);
if (pJoinInfo->pTagEqualConditions != NULL && taosArrayGetSize(rightRowLocations) > 16) { if (pJoinInfo->pEqualOnConditions != NULL && taosArrayGetSize(rightRowLocations) > 16) {
mergeJoinFillBuildTable(pJoinInfo, rightRowLocations); mergeJoinFillBuildTable(pJoinInfo, rightRowLocations);
rightUseBuildTable = true; rightUseBuildTable = true;
taosArrayDestroy(rightRowLocations); taosArrayDestroy(rightRowLocations);
......
...@@ -401,7 +401,7 @@ static int32_t logicJoinCopy(const SJoinLogicNode* pSrc, SJoinLogicNode* pDst) { ...@@ -401,7 +401,7 @@ static int32_t logicJoinCopy(const SJoinLogicNode* pSrc, SJoinLogicNode* pDst) {
COPY_SCALAR_FIELD(joinType); COPY_SCALAR_FIELD(joinType);
CLONE_NODE_FIELD(pMergeCondition); CLONE_NODE_FIELD(pMergeCondition);
CLONE_NODE_FIELD(pOnConditions); CLONE_NODE_FIELD(pOnConditions);
CLONE_NODE_FIELD(pTagEqualConditions); CLONE_NODE_FIELD(pEqualOnConditions);
COPY_SCALAR_FIELD(isSingleTableJoin); COPY_SCALAR_FIELD(isSingleTableJoin);
COPY_SCALAR_FIELD(inputTsOrder); COPY_SCALAR_FIELD(inputTsOrder);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -588,7 +588,7 @@ static int32_t physiSysTableScanCopy(const SSystemTableScanPhysiNode* pSrc, SSys ...@@ -588,7 +588,7 @@ static int32_t physiSysTableScanCopy(const SSystemTableScanPhysiNode* pSrc, SSys
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t physiWindowCopy(const SWinodwPhysiNode* pSrc, SWinodwPhysiNode* pDst) { static int32_t physiWindowCopy(const SWindowPhysiNode* pSrc, SWindowPhysiNode* pDst) {
COPY_BASE_OBJECT_FIELD(node, physiNodeCopy); COPY_BASE_OBJECT_FIELD(node, physiNodeCopy);
CLONE_NODE_LIST_FIELD(pExprs); CLONE_NODE_LIST_FIELD(pExprs);
CLONE_NODE_LIST_FIELD(pFuncs); CLONE_NODE_LIST_FIELD(pFuncs);
......
...@@ -1432,7 +1432,7 @@ static int32_t logicJoinNodeToJson(const void* pObj, SJson* pJson) { ...@@ -1432,7 +1432,7 @@ static int32_t logicJoinNodeToJson(const void* pObj, SJson* pJson) {
code = tjsonAddObject(pJson, jkJoinLogicPlanOnConditions, nodeToJson, pNode->pOnConditions); code = tjsonAddObject(pJson, jkJoinLogicPlanOnConditions, nodeToJson, pNode->pOnConditions);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkJoinLogicPlanTagEqualConditions, nodeToJson, pNode->pTagEqualConditions); code = tjsonAddObject(pJson, jkJoinLogicPlanTagEqualConditions, nodeToJson, pNode->pEqualOnConditions);
} }
return code; return code;
} }
...@@ -1451,7 +1451,7 @@ static int32_t jsonToLogicJoinNode(const SJson* pJson, void* pObj) { ...@@ -1451,7 +1451,7 @@ static int32_t jsonToLogicJoinNode(const SJson* pJson, void* pObj) {
code = jsonToNodeObject(pJson, jkJoinLogicPlanOnConditions, &pNode->pOnConditions); code = jsonToNodeObject(pJson, jkJoinLogicPlanOnConditions, &pNode->pOnConditions);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkJoinLogicPlanTagEqualConditions, &pNode->pTagEqualConditions); code = jsonToNodeObject(pJson, jkJoinLogicPlanTagEqualConditions, &pNode->pEqualOnConditions);
} }
return code; return code;
} }
...@@ -1905,7 +1905,7 @@ static int32_t physiJoinNodeToJson(const void* pObj, SJson* pJson) { ...@@ -1905,7 +1905,7 @@ static int32_t physiJoinNodeToJson(const void* pObj, SJson* pJson) {
code = nodeListToJson(pJson, jkJoinPhysiPlanTargets, pNode->pTargets); code = nodeListToJson(pJson, jkJoinPhysiPlanTargets, pNode->pTargets);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkJoinPhysiPlanTagEqualConditions, nodeToJson, pNode->pTagEqualCondtions); code = tjsonAddObject(pJson, jkJoinPhysiPlanTagEqualConditions, nodeToJson, pNode->pEqualOnCondtions);
} }
return code; return code;
} }
...@@ -1930,7 +1930,7 @@ static int32_t jsonToPhysiJoinNode(const SJson* pJson, void* pObj) { ...@@ -1930,7 +1930,7 @@ static int32_t jsonToPhysiJoinNode(const SJson* pJson, void* pObj) {
code = jsonToNodeList(pJson, jkJoinPhysiPlanTargets, &pNode->pTargets); code = jsonToNodeList(pJson, jkJoinPhysiPlanTargets, &pNode->pTargets);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkJoinPhysiPlanTagEqualConditions, &pNode->pTagEqualCondtions); code = jsonToNodeObject(pJson, jkJoinPhysiPlanTagEqualConditions, &pNode->pEqualOnCondtions);
} }
return code; return code;
} }
...@@ -2135,7 +2135,7 @@ static const char* jkWindowPhysiPlanOutputTsOrder = "outputTsOrder"; ...@@ -2135,7 +2135,7 @@ static const char* jkWindowPhysiPlanOutputTsOrder = "outputTsOrder";
static const char* jkWindowPhysiPlanMergeDataBlock = "MergeDataBlock"; static const char* jkWindowPhysiPlanMergeDataBlock = "MergeDataBlock";
static int32_t physiWindowNodeToJson(const void* pObj, SJson* pJson) { static int32_t physiWindowNodeToJson(const void* pObj, SJson* pJson) {
const SWinodwPhysiNode* pNode = (const SWinodwPhysiNode*)pObj; const SWindowPhysiNode* pNode = (const SWindowPhysiNode*)pObj;
int32_t code = physicPlanNodeToJson(pObj, pJson); int32_t code = physicPlanNodeToJson(pObj, pJson);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
...@@ -2176,7 +2176,7 @@ static int32_t physiWindowNodeToJson(const void* pObj, SJson* pJson) { ...@@ -2176,7 +2176,7 @@ static int32_t physiWindowNodeToJson(const void* pObj, SJson* pJson) {
} }
static int32_t jsonToPhysiWindowNode(const SJson* pJson, void* pObj) { static int32_t jsonToPhysiWindowNode(const SJson* pJson, void* pObj) {
SWinodwPhysiNode* pNode = (SWinodwPhysiNode*)pObj; SWindowPhysiNode* pNode = (SWindowPhysiNode*)pObj;
int32_t code = jsonToPhysicPlanNode(pJson, pObj); int32_t code = jsonToPhysicPlanNode(pJson, pObj);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
......
...@@ -2341,7 +2341,7 @@ static int32_t physiJoinNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { ...@@ -2341,7 +2341,7 @@ static int32_t physiJoinNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
code = tlvEncodeEnum(pEncoder, PHY_SORT_MERGE_JOIN_CODE_INPUT_TS_ORDER, pNode->inputTsOrder); code = tlvEncodeEnum(pEncoder, PHY_SORT_MERGE_JOIN_CODE_INPUT_TS_ORDER, pNode->inputTsOrder);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, PHY_SORT_MERGE_JOIN_CODE_TAG_EQUAL_CONDITIONS, nodeToMsg, pNode->pTagEqualCondtions); code = tlvEncodeObj(pEncoder, PHY_SORT_MERGE_JOIN_CODE_TAG_EQUAL_CONDITIONS, nodeToMsg, pNode->pEqualOnCondtions);
} }
return code; return code;
} }
...@@ -2372,7 +2372,7 @@ static int32_t msgToPhysiJoinNode(STlvDecoder* pDecoder, void* pObj) { ...@@ -2372,7 +2372,7 @@ static int32_t msgToPhysiJoinNode(STlvDecoder* pDecoder, void* pObj) {
code = tlvDecodeEnum(pTlv, &pNode->inputTsOrder, sizeof(pNode->inputTsOrder)); code = tlvDecodeEnum(pTlv, &pNode->inputTsOrder, sizeof(pNode->inputTsOrder));
break; break;
case PHY_SORT_MERGE_JOIN_CODE_TAG_EQUAL_CONDITIONS: case PHY_SORT_MERGE_JOIN_CODE_TAG_EQUAL_CONDITIONS:
code = msgToNodeFromTlv(pTlv, (void**)&pNode->pTagEqualCondtions); code = msgToNodeFromTlv(pTlv, (void**)&pNode->pEqualOnCondtions);
break; break;
default: default:
break; break;
...@@ -2639,7 +2639,7 @@ enum { ...@@ -2639,7 +2639,7 @@ enum {
}; };
static int32_t physiWindowNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { static int32_t physiWindowNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
const SWinodwPhysiNode* pNode = (const SWinodwPhysiNode*)pObj; const SWindowPhysiNode* pNode = (const SWindowPhysiNode*)pObj;
int32_t code = tlvEncodeObj(pEncoder, PHY_WINDOW_CODE_BASE_NODE, physiNodeToMsg, &pNode->node); int32_t code = tlvEncodeObj(pEncoder, PHY_WINDOW_CODE_BASE_NODE, physiNodeToMsg, &pNode->node);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
...@@ -2680,7 +2680,7 @@ static int32_t physiWindowNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { ...@@ -2680,7 +2680,7 @@ static int32_t physiWindowNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
} }
static int32_t msgToPhysiWindowNode(STlvDecoder* pDecoder, void* pObj) { static int32_t msgToPhysiWindowNode(STlvDecoder* pDecoder, void* pObj) {
SWinodwPhysiNode* pNode = (SWinodwPhysiNode*)pObj; SWindowPhysiNode* pNode = (SWindowPhysiNode*)pObj;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
STlv* pTlv = NULL; STlv* pTlv = NULL;
......
...@@ -599,7 +599,7 @@ static void destroyPhysiNode(SPhysiNode* pNode) { ...@@ -599,7 +599,7 @@ static void destroyPhysiNode(SPhysiNode* pNode) {
nodesDestroyNode(pNode->pSlimit); nodesDestroyNode(pNode->pSlimit);
} }
static void destroyWinodwPhysiNode(SWinodwPhysiNode* pNode) { static void destroyWinodwPhysiNode(SWindowPhysiNode* pNode) {
destroyPhysiNode((SPhysiNode*)pNode); destroyPhysiNode((SPhysiNode*)pNode);
nodesDestroyList(pNode->pExprs); nodesDestroyList(pNode->pExprs);
nodesDestroyList(pNode->pFuncs); nodesDestroyList(pNode->pFuncs);
...@@ -1072,7 +1072,7 @@ void nodesDestroyNode(SNode* pNode) { ...@@ -1072,7 +1072,7 @@ void nodesDestroyNode(SNode* pNode) {
destroyLogicNode((SLogicNode*)pLogicNode); destroyLogicNode((SLogicNode*)pLogicNode);
nodesDestroyNode(pLogicNode->pMergeCondition); nodesDestroyNode(pLogicNode->pMergeCondition);
nodesDestroyNode(pLogicNode->pOnConditions); nodesDestroyNode(pLogicNode->pOnConditions);
nodesDestroyNode(pLogicNode->pTagEqualConditions); nodesDestroyNode(pLogicNode->pEqualOnConditions);
break; break;
} }
case QUERY_NODE_LOGIC_PLAN_AGG: { case QUERY_NODE_LOGIC_PLAN_AGG: {
...@@ -1205,7 +1205,7 @@ void nodesDestroyNode(SNode* pNode) { ...@@ -1205,7 +1205,7 @@ void nodesDestroyNode(SNode* pNode) {
nodesDestroyNode(pPhyNode->pMergeCondition); nodesDestroyNode(pPhyNode->pMergeCondition);
nodesDestroyNode(pPhyNode->pOnConditions); nodesDestroyNode(pPhyNode->pOnConditions);
nodesDestroyList(pPhyNode->pTargets); nodesDestroyList(pPhyNode->pTargets);
nodesDestroyNode(pPhyNode->pTagEqualCondtions); nodesDestroyNode(pPhyNode->pEqualOnCondtions);
break; break;
} }
case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG: { case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG: {
...@@ -1243,7 +1243,7 @@ void nodesDestroyNode(SNode* pNode) { ...@@ -1243,7 +1243,7 @@ void nodesDestroyNode(SNode* pNode) {
case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL: case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL: case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL: case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL:
destroyWinodwPhysiNode((SWinodwPhysiNode*)pNode); destroyWinodwPhysiNode((SWindowPhysiNode*)pNode);
break; break;
case QUERY_NODE_PHYSICAL_PLAN_FILL: case QUERY_NODE_PHYSICAL_PLAN_FILL:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL: { case QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL: {
...@@ -1259,19 +1259,19 @@ void nodesDestroyNode(SNode* pNode) { ...@@ -1259,19 +1259,19 @@ void nodesDestroyNode(SNode* pNode) {
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION: case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION: case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION: case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION:
destroyWinodwPhysiNode((SWinodwPhysiNode*)pNode); destroyWinodwPhysiNode((SWindowPhysiNode*)pNode);
break; break;
case QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE: case QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE: { case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE: {
SStateWinodwPhysiNode* pPhyNode = (SStateWinodwPhysiNode*)pNode; SStateWinodwPhysiNode* pPhyNode = (SStateWinodwPhysiNode*)pNode;
destroyWinodwPhysiNode((SWinodwPhysiNode*)pPhyNode); destroyWinodwPhysiNode((SWindowPhysiNode*)pPhyNode);
nodesDestroyNode(pPhyNode->pStateKey); nodesDestroyNode(pPhyNode->pStateKey);
break; break;
} }
case QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT: case QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT: { case QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT: {
SEventWinodwPhysiNode* pPhyNode = (SEventWinodwPhysiNode*)pNode; SEventWinodwPhysiNode* pPhyNode = (SEventWinodwPhysiNode*)pNode;
destroyWinodwPhysiNode((SWinodwPhysiNode*)pPhyNode); destroyWinodwPhysiNode((SWindowPhysiNode*)pPhyNode);
nodesDestroyNode(pPhyNode->pStartCond); nodesDestroyNode(pPhyNode->pStartCond);
nodesDestroyNode(pPhyNode->pEndCond); nodesDestroyNode(pPhyNode->pEndCond);
break; break;
......
...@@ -740,18 +740,15 @@ static int32_t pushDownCondOptJoinExtractMergeCond(SOptimizeContext* pCxt, SJoin ...@@ -740,18 +740,15 @@ static int32_t pushDownCondOptJoinExtractMergeCond(SOptimizeContext* pCxt, SJoin
return code; return code;
} }
static bool pushDownCondOptIsTag(SNode* pNode, SNodeList* pTableCols) { static bool pushDownCondOptIsTableColumn(SNode* pNode, SNodeList* pTableCols) {
if (QUERY_NODE_COLUMN != nodeType(pNode)) { if (QUERY_NODE_COLUMN != nodeType(pNode)) {
return false; return false;
} }
SColumnNode* pCol = (SColumnNode*)pNode; SColumnNode* pCol = (SColumnNode*)pNode;
if (COLUMN_TYPE_TAG != pCol->colType) {
return false;
}
return pushDownCondOptBelongThisTable(pNode, pTableCols); return pushDownCondOptBelongThisTable(pNode, pTableCols);
} }
static bool pushDownCondOptIsTagEqualCond(SJoinLogicNode* pJoin, SNode* pCond) { static bool pushDownCondOptIsEqualOnCond(SJoinLogicNode* pJoin, SNode* pCond) {
if (QUERY_NODE_OPERATOR != nodeType(pCond)) { if (QUERY_NODE_OPERATOR != nodeType(pCond)) {
return false; return false;
} }
...@@ -770,22 +767,22 @@ static bool pushDownCondOptIsTagEqualCond(SJoinLogicNode* pJoin, SNode* pCond) { ...@@ -770,22 +767,22 @@ static bool pushDownCondOptIsTagEqualCond(SJoinLogicNode* pJoin, SNode* pCond) {
} }
SNodeList* pLeftCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0))->pTargets; SNodeList* pLeftCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0))->pTargets;
SNodeList* pRightCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1))->pTargets; SNodeList* pRightCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1))->pTargets;
if (pushDownCondOptIsTag(pOper->pLeft, pLeftCols)) { if (pushDownCondOptIsTableColumn(pOper->pLeft, pLeftCols)) {
return pushDownCondOptIsTag(pOper->pRight, pRightCols); return pushDownCondOptIsTableColumn(pOper->pRight, pRightCols);
} else if (pushDownCondOptIsTag(pOper->pLeft, pRightCols)) { } else if (pushDownCondOptIsTableColumn(pOper->pLeft, pRightCols)) {
return pushDownCondOptIsTag(pOper->pRight, pLeftCols); return pushDownCondOptIsTableColumn(pOper->pRight, pLeftCols);
} }
return false; return false;
} }
static int32_t pushDownCondOptJoinExtractTagEqualLogicCond(SJoinLogicNode* pJoin) { static int32_t pushDownCondOptJoinExtractEqualOnLogicCond(SJoinLogicNode* pJoin) {
SLogicConditionNode* pLogicCond = (SLogicConditionNode*)(pJoin->pOnConditions); SLogicConditionNode* pLogicCond = (SLogicConditionNode*)(pJoin->pOnConditions);
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SNodeList* pTagEqualConds = NULL; SNodeList* pTagEqualConds = NULL;
SNode* pCond = NULL; SNode* pCond = NULL;
FOREACH(pCond, pLogicCond->pParameterList) { FOREACH(pCond, pLogicCond->pParameterList) {
if (pushDownCondOptIsTagEqualCond(pJoin, pCond)) { if (pushDownCondOptIsEqualOnCond(pJoin, pCond)) {
code = nodesListMakeAppend(&pTagEqualConds, nodesCloneNode(pCond)); code = nodesListMakeAppend(&pTagEqualConds, nodesCloneNode(pCond));
} }
} }
...@@ -796,7 +793,7 @@ static int32_t pushDownCondOptJoinExtractTagEqualLogicCond(SJoinLogicNode* pJoin ...@@ -796,7 +793,7 @@ static int32_t pushDownCondOptJoinExtractTagEqualLogicCond(SJoinLogicNode* pJoin
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
pJoin->pTagEqualConditions = pTempTagEqCond; pJoin->pEqualOnConditions = pTempTagEqCond;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else { } else {
nodesDestroyList(pTagEqualConds); nodesDestroyList(pTagEqualConds);
...@@ -805,18 +802,18 @@ static int32_t pushDownCondOptJoinExtractTagEqualLogicCond(SJoinLogicNode* pJoin ...@@ -805,18 +802,18 @@ static int32_t pushDownCondOptJoinExtractTagEqualLogicCond(SJoinLogicNode* pJoin
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t pushDownCondOptJoinExtractTagEqualCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) { static int32_t pushDownCondOptJoinExtractEqualOnCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) {
if (NULL == pJoin->pOnConditions) { if (NULL == pJoin->pOnConditions) {
pJoin->pTagEqualConditions = NULL; pJoin->pEqualOnConditions = NULL;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (QUERY_NODE_LOGIC_CONDITION == nodeType(pJoin->pOnConditions) && if (QUERY_NODE_LOGIC_CONDITION == nodeType(pJoin->pOnConditions) &&
LOGIC_COND_TYPE_AND == ((SLogicConditionNode*)(pJoin->pOnConditions))->condType) { LOGIC_COND_TYPE_AND == ((SLogicConditionNode*)(pJoin->pOnConditions))->condType) {
return pushDownCondOptJoinExtractTagEqualLogicCond(pJoin); return pushDownCondOptJoinExtractEqualOnLogicCond(pJoin);
} }
if (pushDownCondOptIsTagEqualCond(pJoin, pJoin->pOnConditions)) { if (pushDownCondOptIsEqualOnCond(pJoin, pJoin->pOnConditions)) {
pJoin->pTagEqualConditions = nodesCloneNode(pJoin->pOnConditions); pJoin->pEqualOnConditions = nodesCloneNode(pJoin->pOnConditions);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -857,7 +854,7 @@ static int32_t pushDownCondOptDealJoin(SOptimizeContext* pCxt, SJoinLogicNode* p ...@@ -857,7 +854,7 @@ static int32_t pushDownCondOptDealJoin(SOptimizeContext* pCxt, SJoinLogicNode* p
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = pushDownCondOptJoinExtractTagEqualCond(pCxt, pJoin); code = pushDownCondOptJoinExtractEqualOnCond(pCxt, pJoin);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
......
...@@ -705,8 +705,8 @@ static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren ...@@ -705,8 +705,8 @@ static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren
pJoinLogicNode->pOnConditions, &pJoin->pOnConditions); pJoinLogicNode->pOnConditions, &pJoin->pOnConditions);
} }
if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pTagEqualConditions) { if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pEqualOnConditions) {
code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pTagEqualConditions, &pJoin->pTagEqualCondtions); code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pEqualOnConditions, &pJoin->pEqualOnCondtions);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = setConditionsSlotId(pCxt, (const SLogicNode*)pJoinLogicNode, (SPhysiNode*)pJoin); code = setConditionsSlotId(pCxt, (const SLogicNode*)pJoinLogicNode, (SPhysiNode*)pJoin);
...@@ -1150,7 +1150,7 @@ static int32_t createExchangePhysiNode(SPhysiPlanContext* pCxt, SExchangeLogicNo ...@@ -1150,7 +1150,7 @@ static int32_t createExchangePhysiNode(SPhysiPlanContext* pCxt, SExchangeLogicNo
} }
} }
static int32_t createWindowPhysiNodeFinalize(SPhysiPlanContext* pCxt, SNodeList* pChildren, SWinodwPhysiNode* pWindow, static int32_t createWindowPhysiNodeFinalize(SPhysiPlanContext* pCxt, SNodeList* pChildren, SWindowPhysiNode* pWindow,
SWindowLogicNode* pWindowLogicNode) { SWindowLogicNode* pWindowLogicNode) {
pWindow->triggerType = pWindowLogicNode->triggerType; pWindow->triggerType = pWindowLogicNode->triggerType;
pWindow->watermark = pWindowLogicNode->watermark; pWindow->watermark = pWindowLogicNode->watermark;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册