未验证 提交 09218fef 编写于 作者: S shenglian-zhou 提交者: GitHub

Merge pull request #15465 from taosdata/szhou/fix/udf

fix: prepare for multirow join 
...@@ -83,8 +83,6 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t ...@@ -83,8 +83,6 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
} else if (pJoinNode->inputTsOrder == ORDER_DESC) { } else if (pJoinNode->inputTsOrder == ORDER_DESC) {
pInfo->inputTsOrder = TSDB_ORDER_DESC; pInfo->inputTsOrder = TSDB_ORDER_DESC;
} }
//TODO: remove this when JoinNode inputTsOrder is ready
pInfo->inputTsOrder = TSDB_ORDER_ASC;
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doMergeJoin, NULL, NULL, destroyMergeJoinOperator, NULL, NULL, NULL); createOperatorFpSet(operatorDummyOpenFn, doMergeJoin, NULL, NULL, destroyMergeJoinOperator, NULL, NULL, NULL);
...@@ -116,7 +114,9 @@ void destroyMergeJoinOperator(void* param, int32_t numOfOutput) { ...@@ -116,7 +114,9 @@ void destroyMergeJoinOperator(void* param, int32_t numOfOutput) {
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
} }
static void doJoinOneRow(struct SOperatorInfo* pOperator, SSDataBlock* pRes, int32_t currRow) {
static void mergeJoinJoinLeftRight(struct SOperatorInfo* pOperator, SSDataBlock* pRes, int32_t currRow,
SSDataBlock* pLeftBlock, int32_t leftPos, SSDataBlock* pRightBlock, int32_t rightPos) {
SJoinOperatorInfo* pJoinInfo = pOperator->info; SJoinOperatorInfo* pJoinInfo = pOperator->info;
for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) { for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
...@@ -130,11 +130,11 @@ static void doJoinOneRow(struct SOperatorInfo* pOperator, SSDataBlock* pRes, int ...@@ -130,11 +130,11 @@ static void doJoinOneRow(struct SOperatorInfo* pOperator, SSDataBlock* pRes, int
SColumnInfoData* pSrc = NULL; SColumnInfoData* pSrc = NULL;
if (pJoinInfo->pLeft->info.blockId == blockId) { if (pJoinInfo->pLeft->info.blockId == blockId) {
pSrc = taosArrayGet(pJoinInfo->pLeft->pDataBlock, slotId); pSrc = taosArrayGet(pLeftBlock->pDataBlock, slotId);
rowIndex = pJoinInfo->leftPos; rowIndex = leftPos;
} else { } else {
pSrc = taosArrayGet(pJoinInfo->pRight->pDataBlock, slotId); pSrc = taosArrayGet(pRightBlock->pDataBlock, slotId);
rowIndex = pJoinInfo->rightPos; rowIndex = rightPos;
} }
if (colDataIsNull_s(pSrc, rowIndex)) { if (colDataIsNull_s(pSrc, rowIndex)) {
...@@ -146,60 +146,74 @@ static void doJoinOneRow(struct SOperatorInfo* pOperator, SSDataBlock* pRes, int ...@@ -146,60 +146,74 @@ static void doJoinOneRow(struct SOperatorInfo* pOperator, SSDataBlock* pRes, int
} }
} }
static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes) {
static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs, int64_t* pRightTs) {
SJoinOperatorInfo* pJoinInfo = pOperator->info; SJoinOperatorInfo* pJoinInfo = pOperator->info;
int32_t nrows = pRes->info.rows; if (pJoinInfo->pLeft == NULL || pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) {
SOperatorInfo* ds1 = pOperator->pDownstream[0];
pJoinInfo->pLeft = ds1->fpSet.getNextFn(ds1);
bool asc = (pJoinInfo->inputTsOrder == TSDB_ORDER_ASC) ? true : false; pJoinInfo->leftPos = 0;
if (pJoinInfo->pLeft == NULL) {
while (1) { setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
// todo extract method return false;
if (pJoinInfo->pLeft == NULL || pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) {
SOperatorInfo* ds1 = pOperator->pDownstream[0];
pJoinInfo->pLeft = ds1->fpSet.getNextFn(ds1);
pJoinInfo->leftPos = 0;
if (pJoinInfo->pLeft == NULL) {
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
break;
}
} }
}
if (pJoinInfo->pRight == NULL || pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) { if (pJoinInfo->pRight == NULL || pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) {
SOperatorInfo* ds2 = pOperator->pDownstream[1]; SOperatorInfo* ds2 = pOperator->pDownstream[1];
pJoinInfo->pRight = ds2->fpSet.getNextFn(ds2); pJoinInfo->pRight = ds2->fpSet.getNextFn(ds2);
pJoinInfo->rightPos = 0; pJoinInfo->rightPos = 0;
if (pJoinInfo->pRight == NULL) { if (pJoinInfo->pRight == NULL) {
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
break; return false;
}
} }
}
// only the timestamp match support for ordinary table
SColumnInfoData* pLeftCol = taosArrayGet(pJoinInfo->pLeft->pDataBlock, pJoinInfo->leftCol.slotId);
char* pLeftVal = colDataGetData(pLeftCol, pJoinInfo->leftPos);
*pLeftTs = *(int64_t*)pLeftVal;
SColumnInfoData* pRightCol = taosArrayGet(pJoinInfo->pRight->pDataBlock, pJoinInfo->rightCol.slotId);
char* pRightVal = colDataGetData(pRightCol, pJoinInfo->rightPos);
*pRightTs = *(int64_t*)pRightVal;
ASSERT(pLeftCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
ASSERT(pRightCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
return true;
}
static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes) {
SJoinOperatorInfo* pJoinInfo = pOperator->info;
SColumnInfoData* pLeftCol = taosArrayGet(pJoinInfo->pLeft->pDataBlock, pJoinInfo->leftCol.slotId); int32_t nrows = pRes->info.rows;
char* pLeftVal = colDataGetData(pLeftCol, pJoinInfo->leftPos);
SColumnInfoData* pRightCol = taosArrayGet(pJoinInfo->pRight->pDataBlock, pJoinInfo->rightCol.slotId); bool asc = (pJoinInfo->inputTsOrder == TSDB_ORDER_ASC) ? true : false;
char* pRightVal = colDataGetData(pRightCol, pJoinInfo->rightPos);
while (1) {
int64_t leftTs = 0;
int64_t rightTs = 0;
bool hasNextTs = mergeJoinGetNextTimestamp(pOperator, &leftTs, &rightTs);
if (!hasNextTs) {
break;
}
// only the timestamp match support for ordinary table if (leftTs == rightTs) {
ASSERT(pLeftCol->info.type == TSDB_DATA_TYPE_TIMESTAMP); mergeJoinJoinLeftRight(pOperator, pRes, nrows,
if (*(int64_t*)pLeftVal == *(int64_t*)pRightVal) { pJoinInfo->pLeft, pJoinInfo->leftPos, pJoinInfo->pRight, pJoinInfo->rightPos);
doJoinOneRow(pOperator, pRes, nrows);
pJoinInfo->leftPos += 1; pJoinInfo->leftPos += 1;
pJoinInfo->rightPos += 1; pJoinInfo->rightPos += 1;
nrows += 1; nrows += 1;
} else if (asc && *(int64_t*)pLeftVal < *(int64_t*)pRightVal || } else if (asc && leftTs < rightTs || !asc && leftTs > rightTs) {
!asc && *(int64_t*)pLeftVal > *(int64_t*)pRightVal) {
pJoinInfo->leftPos += 1; pJoinInfo->leftPos += 1;
if (pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) { if (pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) {
continue; continue;
} }
} else if (asc && *(int64_t*)pLeftVal > *(int64_t*)pRightVal || } else if (asc && leftTs > rightTs || !asc && leftTs < rightTs) {
!asc && *(int64_t*)pLeftVal < *(int64_t*)pRightVal) {
pJoinInfo->rightPos += 1; pJoinInfo->rightPos += 1;
if (pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) { if (pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) {
continue; continue;
......
...@@ -1712,6 +1712,7 @@ static int32_t jsonToPhysiProjectNode(const SJson* pJson, void* pObj) { ...@@ -1712,6 +1712,7 @@ static int32_t jsonToPhysiProjectNode(const SJson* pJson, void* pObj) {
} }
static const char* jkJoinPhysiPlanJoinType = "JoinType"; static const char* jkJoinPhysiPlanJoinType = "JoinType";
static const char* jkJoinPhysiPlanInputTsOrder = "InputTsOrder";
static const char* jkJoinPhysiPlanMergeCondition = "MergeCondition"; static const char* jkJoinPhysiPlanMergeCondition = "MergeCondition";
static const char* jkJoinPhysiPlanOnConditions = "OnConditions"; static const char* jkJoinPhysiPlanOnConditions = "OnConditions";
static const char* jkJoinPhysiPlanTargets = "Targets"; static const char* jkJoinPhysiPlanTargets = "Targets";
...@@ -1723,6 +1724,9 @@ static int32_t physiJoinNodeToJson(const void* pObj, SJson* pJson) { ...@@ -1723,6 +1724,9 @@ static int32_t physiJoinNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanJoinType, pNode->joinType); code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanJoinType, pNode->joinType);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanInputTsOrder, pNode->inputTsOrder);
}
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkJoinPhysiPlanMergeCondition, nodeToJson, pNode->pMergeCondition); code = tjsonAddObject(pJson, jkJoinPhysiPlanMergeCondition, nodeToJson, pNode->pMergeCondition);
} }
...@@ -1742,7 +1746,9 @@ static int32_t jsonToPhysiJoinNode(const SJson* pJson, void* pObj) { ...@@ -1742,7 +1746,9 @@ static int32_t jsonToPhysiJoinNode(const SJson* pJson, void* pObj) {
int32_t code = jsonToPhysicPlanNode(pJson, pObj); int32_t code = jsonToPhysicPlanNode(pJson, pObj);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkJoinPhysiPlanJoinType, pNode->joinType, code); tjsonGetNumberValue(pJson, jkJoinPhysiPlanJoinType, pNode->joinType, code);
; }
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkJoinPhysiPlanInputTsOrder, pNode->inputTsOrder, code);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkJoinPhysiPlanOnConditions, &pNode->pOnConditions); code = jsonToNodeObject(pJson, jkJoinPhysiPlanOnConditions, &pNode->pOnConditions);
......
...@@ -633,6 +633,7 @@ static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren ...@@ -633,6 +633,7 @@ static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
pJoin->joinType = pJoinLogicNode->joinType; pJoin->joinType = pJoinLogicNode->joinType;
pJoin->inputTsOrder = pJoinLogicNode->inputTsOrder;
setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pMergeCondition, setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pMergeCondition,
&pJoin->pMergeCondition); &pJoin->pMergeCondition);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册