提交 c6555c3b 编写于 作者: S shenglian zhou

fix: join operator finished

上级 629ab2b8
......@@ -30,11 +30,11 @@ typedef struct SJoinRowCtx {
bool rowRemains;
int64_t ts;
SArray* leftRowLocations;
SArray* rightRowLocations;
SArray* leftCreatedBlocks;
SArray* rightCreatedBlocks;
int32_t leftRowIdx;
int32_t rightRowIdx;
SSHashObj* buildTableTSRange;
} SJoinRowCtx;
typedef struct SJoinOperatorInfo {
......@@ -50,6 +50,17 @@ typedef struct SJoinOperatorInfo {
int32_t rightPos;
SColumnInfo rightCol;
SNode* pCondAfterMerge;
SNode* pTagEqualConditions;
SArray* leftTagCols;
SArray* leftTagKeys;
char* leftTagKeyBuf;
int32_t leftTagKeyLen;
SArray* rightTagCols;
SArray* rightTagKeys;
char* rightTagKeyBuf;
int32_t rightTagKeyLen;
SJoinRowCtx rowCtx;
} SJoinOperatorInfo;
......@@ -92,6 +103,147 @@ static void extractTimeCondition(SJoinOperatorInfo* pInfo, SOperatorInfo** pDown
setJoinColumnInfo(&pInfo->rightCol, rightTsCol);
}
static void extractTagEqualColsFromOper(SJoinOperatorInfo* pInfo, SOperatorInfo** pDownstreams, SOperatorNode* pOperNode,
SColumn* pLeft, SColumn* pRight) {
SColumnNode* pLeftNode = (SColumnNode*)pOperNode->pLeft;
SColumnNode* pRightNode = (SColumnNode*)pOperNode->pRight;
if (pLeftNode->dataBlockId == pRightNode->dataBlockId || pLeftNode->dataBlockId == pDownstreams[0]->resultDataBlockId) {
*pLeft = extractColumnFromColumnNode((SColumnNode*)pOperNode->pLeft);
*pRight = extractColumnFromColumnNode((SColumnNode*)pOperNode->pRight);
} else {
*pLeft = extractColumnFromColumnNode((SColumnNode*)pOperNode->pRight);
*pRight = extractColumnFromColumnNode((SColumnNode*)pOperNode->pLeft);
}
}
static void extractTagEqualCondCols(SJoinOperatorInfo* pInfo, SOperatorInfo** pDownStream, SNode* pTagEqualNode,
SArray* leftTagEqCols, SArray* rightTagEqCols) {
SColumn left = {0};
SColumn right = {0};
if (nodeType(pTagEqualNode) == QUERY_NODE_LOGIC_CONDITION && ((SLogicConditionNode*)pTagEqualNode)->condType == LOGIC_COND_TYPE_AND) {
SNode* pNode = NULL;
FOREACH(pNode, ((SLogicConditionNode*)pTagEqualNode)->pParameterList) {
SOperatorNode* pOperNode = (SOperatorNode*)pNode;
extractTagEqualColsFromOper(pInfo, pDownStream, pOperNode, &left, &right);
taosArrayPush(leftTagEqCols, &left);
taosArrayPush(rightTagEqCols, &right);
}
return;
}
if (nodeType(pTagEqualNode) == QUERY_NODE_OPERATOR) {
SOperatorNode* pOperNode = (SOperatorNode*)pTagEqualNode;
extractTagEqualColsFromOper(pInfo, pDownStream, pOperNode, &left, &right);
taosArrayPush(leftTagEqCols, &left);
taosArrayPush(rightTagEqCols, &right);
}
}
static int32_t initTagColsGroupkeys(SArray** pGroupColVals, int32_t* keyLen, char** keyBuf, const SArray* pGroupColList) {
*pGroupColVals = taosArrayInit(4, sizeof(SGroupKeys));
if ((*pGroupColVals) == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t numOfGroupCols = taosArrayGetSize(pGroupColList);
for (int32_t i = 0; i < numOfGroupCols; ++i) {
SColumn* pCol = (SColumn*)taosArrayGet(pGroupColList, i);
(*keyLen) += pCol->bytes; // actual data + null_flag
SGroupKeys key = {0};
key.bytes = pCol->bytes;
key.type = pCol->type;
key.isNull = false;
key.pData = taosMemoryCalloc(1, pCol->bytes);
if (key.pData == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
taosArrayPush((*pGroupColVals), &key);
}
int32_t nullFlagSize = sizeof(int8_t) * numOfGroupCols;
(*keyLen) += nullFlagSize;
(*keyBuf) = taosMemoryCalloc(1, (*keyLen));
if ((*keyBuf) == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
return TSDB_CODE_SUCCESS;
}
static void fillGroupKeyValsFromTagCols(SArray* pCols, SArray* pGroupKeys, SSDataBlock* pBlock, int32_t rowIndex) {
SColumnDataAgg* pColAgg = NULL;
size_t numOfGroupCols = taosArrayGetSize(pCols);
for (int32_t i = 0; i < numOfGroupCols; ++i) {
SColumn* pCol = (SColumn*) taosArrayGet(pCols, i);
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId);
// valid range check. todo: return error code.
if (pCol->slotId > taosArrayGetSize(pBlock->pDataBlock)) {
continue;
}
if (pBlock->pBlockAgg != NULL) {
pColAgg = pBlock->pBlockAgg[pCol->slotId]; // TODO is agg data matched?
}
SGroupKeys* pkey = taosArrayGet(pGroupKeys, i);
if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) {
pkey->isNull = true;
} else {
pkey->isNull = false;
char* val = colDataGetData(pColInfoData, rowIndex);
if (pkey->type == TSDB_DATA_TYPE_JSON) {
if (tTagIsJson(val)) {
terrno = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR;
return;
}
int32_t dataLen = getJsonValueLen(val);
memcpy(pkey->pData, val, dataLen);
} else if (IS_VAR_DATA_TYPE(pkey->type)) {
memcpy(pkey->pData, val, varDataTLen(val));
ASSERT(varDataTLen(val) <= pkey->bytes);
} else {
memcpy(pkey->pData, val, pkey->bytes);
}
}
}
}
static int32_t combineGroupKeysIntoBuf(void* pKey, const SArray* pGroupKeys) {
size_t numOfGroupCols = taosArrayGetSize(pGroupKeys);
char* isNull = (char*)pKey;
char* pStart = (char*)pKey + sizeof(int8_t) * numOfGroupCols;
for (int32_t i = 0; i < numOfGroupCols; ++i) {
SGroupKeys* pkey = taosArrayGet(pGroupKeys, i);
if (pkey->isNull) {
isNull[i] = 1;
continue;
}
isNull[i] = 0;
if (pkey->type == TSDB_DATA_TYPE_JSON) {
int32_t dataLen = getJsonValueLen(pkey->pData);
memcpy(pStart, (pkey->pData), dataLen);
pStart += dataLen;
} else if (IS_VAR_DATA_TYPE(pkey->type)) {
varDataCopy(pStart, pkey->pData);
pStart += varDataTLen(pkey->pData);
ASSERT(varDataTLen(pkey->pData) <= pkey->bytes);
} else {
memcpy(pStart, pkey->pData, pkey->bytes);
pStart += pkey->bytes;
}
}
return (int32_t)(pStart - (char*)pKey);
}
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo) {
SJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SJoinOperatorInfo));
......@@ -153,6 +305,13 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
pInfo->inputOrder = TSDB_ORDER_DESC;
}
pInfo->pTagEqualConditions = pJoinNode->pTagEqualCondtions;
pInfo->leftTagCols = taosArrayInit(4, sizeof(SColumn));
pInfo->rightTagCols = taosArrayInit(4, sizeof(SColumn));
extractTagEqualCondCols(pInfo, pDownstream, pInfo->pTagEqualConditions, pInfo->leftTagCols, pInfo->rightTagCols);
initTagColsGroupkeys(&pInfo->leftTagKeys, &pInfo->leftTagKeyLen, &pInfo->leftTagKeyBuf, pInfo->leftTagCols);
initTagColsGroupkeys(&pInfo->rightTagKeys, &pInfo->rightTagKeyLen, &pInfo->rightTagKeyBuf, pInfo->rightTagCols);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doMergeJoin, NULL, destroyMergeJoinOperator, optrDefaultBufFn, NULL);
code = appendDownstream(pOperator, pDownstream, numOfDownstream);
if (code != TSDB_CODE_SUCCESS) {
......@@ -179,8 +338,22 @@ void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode) {
pColumn->scale = pColumnNode->node.resType.scale;
}
static void freeGroupKeyData(void* param) {
SGroupKeys* pKey = (SGroupKeys*)param;
taosMemoryFree(pKey->pData);
}
void destroyMergeJoinOperator(void* param) {
SJoinOperatorInfo* pJoinOperator = (SJoinOperatorInfo*)param;
taosMemoryFreeClear(pJoinOperator->rightTagKeyBuf);
taosArrayDestroyEx(pJoinOperator->rightTagKeys, freeGroupKeyData);
taosArrayDestroy(pJoinOperator->rightTagCols);
taosMemoryFreeClear(pJoinOperator->leftTagKeyBuf);
taosArrayDestroyEx(pJoinOperator->leftTagKeys, freeGroupKeyData);
taosArrayDestroy(pJoinOperator->leftTagCols);
nodesDestroyNode(pJoinOperator->pCondAfterMerge);
pJoinOperator->pRes = blockDataDestroy(pJoinOperator->pRes);
......@@ -300,22 +473,121 @@ static int32_t mergeJoinGetDownStreamRowsEqualTimeStamp(SOperatorInfo* pOperator
return 0;
}
static int32_t mergeJoinCreateBuildTable(SJoinOperatorInfo* pInfo, SArray* rightRowLocations,SSHashObj** ppHashObj) {
int32_t buildTableCap = MIN(taosArrayGetSize(rightRowLocations), 4096);
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
SSHashObj* buildTable = tSimpleHashInit(buildTableCap, hashFn);
for (int32_t i = 0; i < taosArrayGetSize(rightRowLocations); ++i) {
SRowLocation* rightRow = taosArrayGet(rightRowLocations, i);
fillGroupKeyValsFromTagCols(pInfo->rightTagCols, pInfo->rightTagKeys, rightRow->pDataBlock, rightRow->pos);
int32_t keyLen = combineGroupKeysIntoBuf(pInfo->rightTagKeyBuf, pInfo->rightTagKeys);
SArray** ppRows = tSimpleHashGet(buildTable, pInfo->rightTagKeyBuf, keyLen);
if (!ppRows) {
SArray* rows = taosArrayInit(4, sizeof(SRowLocation));
taosArrayPush(rows, rightRow);
tSimpleHashPut(buildTable, pInfo->rightTagKeyBuf, keyLen, &rows, POINTER_BYTES);
} else {
taosArrayPush(*ppRows, rightRow);
}
}
*ppHashObj = buildTable;
return TSDB_CODE_SUCCESS;
}
static int32_t mergeJoinLeftRowsRightRows(SOperatorInfo* pOperator, SSDataBlock* pRes, int32_t* nRows,
const SArray* leftRowLocations, int32_t leftRowIdx,
int32_t rightRowIdx, SSHashObj* rightTableHash, bool* pReachThreshold) {
*pReachThreshold = false;
uint32_t limitRowNum = pOperator->resultInfo.threshold;
SJoinOperatorInfo* pJoinInfo = pOperator->info;
size_t leftNumJoin = taosArrayGetSize(leftRowLocations);
int32_t i,j;
for (i = leftRowIdx; i < leftNumJoin; ++i, rightRowIdx = 0) {
SRowLocation* leftRow = taosArrayGet(leftRowLocations, i);
fillGroupKeyValsFromTagCols(pJoinInfo->leftTagCols, pJoinInfo->leftTagKeys, leftRow->pDataBlock, leftRow->pos);
int32_t keyLen = combineGroupKeysIntoBuf(pJoinInfo->leftTagKeyBuf, pJoinInfo->leftTagKeys);
SArray** ppRightRows = tSimpleHashGet(rightTableHash, pJoinInfo->leftTagKeyBuf, keyLen);
if (!ppRightRows) {
continue;
}
SArray* pRightRows = *ppRightRows;
size_t rightRowsSize = taosArrayGetSize(pRightRows);
for (j = rightRowIdx; j < rightRowsSize; ++j) {
if (*nRows >= limitRowNum) {
*pReachThreshold = true;
break;
}
SRowLocation* rightRow = taosArrayGet(pRightRows, j);
mergeJoinJoinLeftRight(pOperator, pRes, *nRows, leftRow->pDataBlock, leftRow->pos, rightRow->pDataBlock,
rightRow->pos);
++*nRows;
}
if (*pReachThreshold) {
break;
}
}
if (*pReachThreshold) {
pJoinInfo->rowCtx.rowRemains = true;
pJoinInfo->rowCtx.leftRowIdx = i;
pJoinInfo->rowCtx.rightRowIdx = j;
}
return TSDB_CODE_SUCCESS;
}
static void mergeJoinDestoryBuildTable(SSHashObj* pBuildTable) {
void* p = NULL;
int32_t iter = 0;
while ((p = tSimpleHashIterate(pBuildTable, p, &iter)) != NULL) {
SArray* rows = (*(SArray**)p);
taosArrayDestroy(rows);
}
tSimpleHashCleanup(pBuildTable);
}
static void mergeJoinDestroyTSRangeCtx(SJoinOperatorInfo* pJoinInfo, SArray* leftRowLocations, SArray* leftCreatedBlocks,
SArray* rightCreatedBlocks, SSHashObj* rightTableHash) {
for (int i = 0; i < taosArrayGetSize(rightCreatedBlocks); ++i) {
SSDataBlock* pBlock = taosArrayGetP(rightCreatedBlocks, i);
blockDataDestroy(pBlock);
}
taosArrayDestroy(rightCreatedBlocks);
for (int i = 0; i < taosArrayGetSize(leftCreatedBlocks); ++i) {
SSDataBlock* pBlock = taosArrayGetP(leftCreatedBlocks, i);
blockDataDestroy(pBlock);
}
mergeJoinDestoryBuildTable(rightTableHash);
taosArrayDestroy(leftCreatedBlocks);
taosArrayDestroy(leftRowLocations);
pJoinInfo->rowCtx.rowRemains = false;
pJoinInfo->rowCtx.leftRowLocations = NULL;
pJoinInfo->rowCtx.leftCreatedBlocks = NULL;
pJoinInfo->rowCtx.rightCreatedBlocks = NULL;
pJoinInfo->rowCtx.buildTableTSRange = NULL;
}
static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t timestamp, SSDataBlock* pRes,
int32_t* nRows) {
int32_t code = TSDB_CODE_SUCCESS;
SJoinOperatorInfo* pJoinInfo = pOperator->info;
SArray* leftRowLocations = NULL;
SArray* leftCreatedBlocks = NULL;
SArray* rightRowLocations = NULL;
SArray* rightCreatedBlocks = NULL;
int32_t leftRowIdx = 0;
int32_t rightRowIdx = 0;
int32_t i, j;
SSHashObj* rightTableHash = NULL;
if (pJoinInfo->rowCtx.rowRemains) {
leftRowLocations = pJoinInfo->rowCtx.leftRowLocations;
leftCreatedBlocks = pJoinInfo->rowCtx.leftCreatedBlocks;
rightRowLocations = pJoinInfo->rowCtx.rightRowLocations;
rightTableHash = pJoinInfo->rowCtx.buildTableTSRange;
rightCreatedBlocks = pJoinInfo->rowCtx.rightCreatedBlocks;
leftRowIdx = pJoinInfo->rowCtx.leftRowIdx;
rightRowIdx = pJoinInfo->rowCtx.rightRowIdx;
......@@ -323,85 +595,42 @@ static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t
leftRowLocations = taosArrayInit(8, sizeof(SRowLocation));
leftCreatedBlocks = taosArrayInit(8, POINTER_BYTES);
rightRowLocations = taosArrayInit(8, sizeof(SRowLocation));
SArray* rightRowLocations = taosArrayInit(8, sizeof(SRowLocation));
rightCreatedBlocks = taosArrayInit(8, POINTER_BYTES);
mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 0, pJoinInfo->leftCol.slotId, pJoinInfo->pLeft,
pJoinInfo->leftPos, timestamp, leftRowLocations, leftCreatedBlocks);
mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 1, pJoinInfo->rightCol.slotId, pJoinInfo->pRight,
pJoinInfo->rightPos, timestamp, rightRowLocations, rightCreatedBlocks);
mergeJoinCreateBuildTable(pJoinInfo, rightRowLocations, &rightTableHash);
taosArrayDestroy(rightRowLocations);
}
size_t leftNumJoin = taosArrayGetSize(leftRowLocations);
size_t rightNumJoin = taosArrayGetSize(rightRowLocations);
uint32_t maxRowNum = *nRows + (leftNumJoin - leftRowIdx - 1) * rightNumJoin + rightNumJoin - rightRowIdx;
uint32_t limitRowNum = maxRowNum;
if (maxRowNum > pOperator->resultInfo.threshold) {
limitRowNum = pOperator->resultInfo.threshold;
if (!pJoinInfo->rowCtx.rowRemains) {
pJoinInfo->rowCtx.rowRemains = true;
pJoinInfo->rowCtx.ts = timestamp;
pJoinInfo->rowCtx.leftRowLocations = leftRowLocations;
pJoinInfo->rowCtx.rightRowLocations = rightRowLocations;
pJoinInfo->rowCtx.leftCreatedBlocks = leftCreatedBlocks;
pJoinInfo->rowCtx.rightCreatedBlocks = rightCreatedBlocks;
}
}
code = blockDataEnsureCapacity(pRes, limitRowNum);
code = blockDataEnsureCapacity(pRes, pOperator->resultInfo.threshold);
if (code != TSDB_CODE_SUCCESS) {
qError("%s can not ensure block capacity for join. left: %zu, right: %zu", GET_TASKID(pOperator->pTaskInfo),
leftNumJoin, rightNumJoin);
qError("%s can not ensure block capacity for join. left: %zu", GET_TASKID(pOperator->pTaskInfo),
leftNumJoin);
}
if (code == TSDB_CODE_SUCCESS) {
bool done = false;
for (i = leftRowIdx; i < leftNumJoin; ++i, rightRowIdx = 0) {
for (j = rightRowIdx; j < rightNumJoin; ++j) {
if (*nRows >= limitRowNum) {
done = true;
break;
}
bool reachThreshold = false;
SRowLocation* leftRow = taosArrayGet(leftRowLocations, i);
SRowLocation* rightRow = taosArrayGet(rightRowLocations, j);
mergeJoinJoinLeftRight(pOperator, pRes, *nRows, leftRow->pDataBlock, leftRow->pos, rightRow->pDataBlock,
rightRow->pos);
++*nRows;
}
if (done) {
break;
}
}
if (maxRowNum > pOperator->resultInfo.threshold) {
pJoinInfo->rowCtx.leftRowIdx = i;
pJoinInfo->rowCtx.rightRowIdx = j;
}
if (code == TSDB_CODE_SUCCESS) {
mergeJoinLeftRowsRightRows(pOperator, pRes, nRows, leftRowLocations, leftRowIdx,
rightRowIdx, rightTableHash, &reachThreshold);
}
if (maxRowNum <= pOperator->resultInfo.threshold) {
for (int i = 0; i < taosArrayGetSize(rightCreatedBlocks); ++i) {
SSDataBlock* pBlock = taosArrayGetP(rightCreatedBlocks, i);
blockDataDestroy(pBlock);
}
taosArrayDestroy(rightCreatedBlocks);
taosArrayDestroy(rightRowLocations);
for (int i = 0; i < taosArrayGetSize(leftCreatedBlocks); ++i) {
SSDataBlock* pBlock = taosArrayGetP(leftCreatedBlocks, i);
blockDataDestroy(pBlock);
}
taosArrayDestroy(leftCreatedBlocks);
taosArrayDestroy(leftRowLocations);
if (!reachThreshold) {
mergeJoinDestroyTSRangeCtx(pJoinInfo, leftRowLocations, leftCreatedBlocks, rightCreatedBlocks,
rightTableHash);
if (pJoinInfo->rowCtx.rowRemains) {
pJoinInfo->rowCtx.rowRemains = false;
pJoinInfo->rowCtx.leftRowLocations = NULL;
pJoinInfo->rowCtx.rightRowLocations = NULL;
pJoinInfo->rowCtx.leftCreatedBlocks = NULL;
pJoinInfo->rowCtx.rightCreatedBlocks = NULL;
}
} else {
pJoinInfo->rowCtx.rowRemains = true;
pJoinInfo->rowCtx.ts = timestamp;
pJoinInfo->rowCtx.leftRowLocations = leftRowLocations;
pJoinInfo->rowCtx.leftCreatedBlocks = leftCreatedBlocks;
pJoinInfo->rowCtx.rightCreatedBlocks = rightCreatedBlocks;
pJoinInfo->rowCtx.buildTableTSRange = rightTableHash;
}
return TSDB_CODE_SUCCESS;
}
......
......@@ -759,6 +759,14 @@ static bool pushDownCondOptIsTagEqualCond(SJoinLogicNode* pJoin, SNode* pCond) {
if (OP_TYPE_EQUAL != pOper->opType) {
return false;
}
if (QUERY_NODE_COLUMN != nodeType(pOper->pLeft) || QUERY_NODE_COLUMN != nodeType(pOper->pRight)) {
return false;
}
SColumnNode* pLeft = (SColumnNode*)(pOper->pLeft);
SColumnNode* pRight = (SColumnNode*)(pOper->pRight);
if (pLeft->node.resType.type != pRight->node.resType.type) {
return false;
}
SNodeList* pLeftCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0))->pTargets;
SNodeList* pRightCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1))->pTargets;
if (pushDownCondOptIsTag(pOper->pLeft, pLeftCols)) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册