提交 ee16b461 编写于 作者: S shenglian zhou

fix: remove group keys from join operator

上级 03e77288
......@@ -55,12 +55,10 @@ typedef struct SJoinOperatorInfo {
SNode* pTagEqualConditions;
SArray* leftTagCols;
SArray* leftTagKeys;
char* leftTagKeyBuf;
int32_t leftTagKeyLen;
SArray* rightTagCols;
SArray* rightTagKeys;
char* rightTagKeyBuf;
int32_t rightTagKeyLen;
......@@ -141,27 +139,11 @@ static void extractTagEqualCondCols(SJoinOperatorInfo* pInfo, SOperatorInfo** pD
}
}
static int32_t initTagColsGroupkeys(SArray** pGroupColVals, int32_t* keyLen, char** keyBuf, const SArray* pGroupColList) {
*pGroupColVals = taosArrayInit(4, sizeof(SGroupKeys));
if ((*pGroupColVals) == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
static int32_t initTagColskeyBuf(int32_t* keyLen, char** keyBuf, const SArray* pGroupColList) {
int32_t numOfGroupCols = taosArrayGetSize(pGroupColList);
for (int32_t i = 0; i < numOfGroupCols; ++i) {
SColumn* pCol = (SColumn*)taosArrayGet(pGroupColList, i);
(*keyLen) += pCol->bytes; // actual data + null_flag
SGroupKeys key = {0};
key.bytes = pCol->bytes;
key.type = pCol->type;
key.isNull = false;
key.pData = taosMemoryCalloc(1, pCol->bytes);
if (key.pData == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
taosArrayPush((*pGroupColVals), &key);
}
int32_t nullFlagSize = sizeof(int8_t) * numOfGroupCols;
......@@ -175,10 +157,11 @@ static int32_t initTagColsGroupkeys(SArray** pGroupColVals, int32_t* keyLen, cha
return TSDB_CODE_SUCCESS;
}
static void fillGroupKeyValsFromTagCols(SArray* pCols, SArray* pGroupKeys, SSDataBlock* pBlock, int32_t rowIndex) {
static int32_t fillKeyBufFromTagCols(SArray* pCols, SSDataBlock* pBlock, int32_t rowIndex, void* pKey) {
SColumnDataAgg* pColAgg = NULL;
size_t numOfGroupCols = taosArrayGetSize(pCols);
char* isNull = (char*)pKey;
char* pStart = (char*)pKey + sizeof(int8_t) * numOfGroupCols;
for (int32_t i = 0; i < numOfGroupCols; ++i) {
SColumn* pCol = (SColumn*) taosArrayGet(pCols, i);
......@@ -193,56 +176,24 @@ static void fillGroupKeyValsFromTagCols(SArray* pCols, SArray* pGroupKeys, SSDat
pColAgg = pBlock->pBlockAgg[pCol->slotId]; // TODO is agg data matched?
}
SGroupKeys* pkey = taosArrayGet(pGroupKeys, i);
if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) {
pkey->isNull = true;
isNull[i] = 1;
} else {
pkey->isNull = false;
isNull[i] = 0;
char* val = colDataGetData(pColInfoData, rowIndex);
if (pkey->type == TSDB_DATA_TYPE_JSON) {
if (tTagIsJson(val)) {
terrno = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR;
return;
}
if (pCol->type == TSDB_DATA_TYPE_JSON) {
int32_t dataLen = getJsonValueLen(val);
memcpy(pkey->pData, val, dataLen);
} else if (IS_VAR_DATA_TYPE(pkey->type)) {
memcpy(pkey->pData, val, varDataTLen(val));
ASSERT(varDataTLen(val) <= pkey->bytes);
memcpy(pStart, val, dataLen);
pStart += dataLen;
} else if (IS_VAR_DATA_TYPE(pCol->type)) {
varDataCopy(pStart, val);
pStart += varDataTLen(val);
} else {
memcpy(pkey->pData, val, pkey->bytes);
memcpy(pStart, val, pCol->bytes);
pStart += pCol->bytes;
}
}
}
}
static int32_t combineGroupKeysIntoBuf(void* pKey, const SArray* pGroupKeys) {
size_t numOfGroupCols = taosArrayGetSize(pGroupKeys);
char* isNull = (char*)pKey;
char* pStart = (char*)pKey + sizeof(int8_t) * numOfGroupCols;
for (int32_t i = 0; i < numOfGroupCols; ++i) {
SGroupKeys* pkey = taosArrayGet(pGroupKeys, i);
if (pkey->isNull) {
isNull[i] = 1;
continue;
}
isNull[i] = 0;
if (pkey->type == TSDB_DATA_TYPE_JSON) {
int32_t dataLen = getJsonValueLen(pkey->pData);
memcpy(pStart, (pkey->pData), dataLen);
pStart += dataLen;
} else if (IS_VAR_DATA_TYPE(pkey->type)) {
varDataCopy(pStart, pkey->pData);
pStart += varDataTLen(pkey->pData);
ASSERT(varDataTLen(pkey->pData) <= pkey->bytes);
} else {
memcpy(pStart, pkey->pData, pkey->bytes);
pStart += pkey->bytes;
}
}
return (int32_t)(pStart - (char*)pKey);
}
......@@ -312,8 +263,8 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
pInfo->leftTagCols = taosArrayInit(4, sizeof(SColumn));
pInfo->rightTagCols = taosArrayInit(4, sizeof(SColumn));
extractTagEqualCondCols(pInfo, pDownstream, pInfo->pTagEqualConditions, pInfo->leftTagCols, pInfo->rightTagCols);
initTagColsGroupkeys(&pInfo->leftTagKeys, &pInfo->leftTagKeyLen, &pInfo->leftTagKeyBuf, pInfo->leftTagCols);
initTagColsGroupkeys(&pInfo->rightTagKeys, &pInfo->rightTagKeyLen, &pInfo->rightTagKeyBuf, pInfo->rightTagCols);
initTagColskeyBuf(&pInfo->leftTagKeyLen, &pInfo->leftTagKeyBuf, pInfo->leftTagCols);
initTagColskeyBuf(&pInfo->rightTagKeyLen, &pInfo->rightTagKeyBuf, pInfo->rightTagCols);
}
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doMergeJoin, NULL, destroyMergeJoinOperator, optrDefaultBufFn, NULL);
code = appendDownstream(pOperator, pDownstream, numOfDownstream);
......@@ -341,20 +292,13 @@ void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode) {
pColumn->scale = pColumnNode->node.resType.scale;
}
static void freeGroupKeyData(void* param) {
SGroupKeys* pKey = (SGroupKeys*)param;
taosMemoryFree(pKey->pData);
}
void destroyMergeJoinOperator(void* param) {
SJoinOperatorInfo* pJoinOperator = (SJoinOperatorInfo*)param;
if (pJoinOperator->pTagEqualConditions != NULL) {
taosMemoryFreeClear(pJoinOperator->rightTagKeyBuf);
taosArrayDestroyEx(pJoinOperator->rightTagKeys, freeGroupKeyData);
taosArrayDestroy(pJoinOperator->rightTagCols);
taosMemoryFreeClear(pJoinOperator->leftTagKeyBuf);
taosArrayDestroyEx(pJoinOperator->leftTagKeys, freeGroupKeyData);
taosArrayDestroy(pJoinOperator->leftTagCols);
}
nodesDestroyNode(pJoinOperator->pCondAfterMerge);
......@@ -482,8 +426,7 @@ static int32_t mergeJoinCreateBuildTable(SJoinOperatorInfo* pInfo, SArray* right
SSHashObj* buildTable = tSimpleHashInit(buildTableCap, hashFn);
for (int32_t i = 0; i < taosArrayGetSize(rightRowLocations); ++i) {
SRowLocation* rightRow = taosArrayGet(rightRowLocations, i);
fillGroupKeyValsFromTagCols(pInfo->rightTagCols, pInfo->rightTagKeys, rightRow->pDataBlock, rightRow->pos);
int32_t keyLen = combineGroupKeysIntoBuf(pInfo->rightTagKeyBuf, pInfo->rightTagKeys);
int32_t keyLen = fillKeyBufFromTagCols(pInfo->rightTagCols, rightRow->pDataBlock, rightRow->pos, pInfo->rightTagKeyBuf);
SArray** ppRows = tSimpleHashGet(buildTable, pInfo->rightTagKeyBuf, keyLen);
if (!ppRows) {
SArray* rows = taosArrayInit(4, sizeof(SRowLocation));
......@@ -511,8 +454,7 @@ static int32_t mergeJoinLeftRowsRightRows(SOperatorInfo* pOperator, SSDataBlock*
SRowLocation* leftRow = taosArrayGet(leftRowLocations, i);
SArray* pRightRows = NULL;
if (rightTableHash != NULL) {
fillGroupKeyValsFromTagCols(pJoinInfo->leftTagCols, pJoinInfo->leftTagKeys, leftRow->pDataBlock, leftRow->pos);
int32_t keyLen = combineGroupKeysIntoBuf(pJoinInfo->leftTagKeyBuf, pJoinInfo->leftTagKeys);
int32_t keyLen = fillKeyBufFromTagCols(pJoinInfo->leftTagCols, leftRow->pDataBlock, leftRow->pos, pJoinInfo->leftTagKeyBuf);
SArray** ppRightRows = tSimpleHashGet(rightTableHash, pJoinInfo->leftTagKeyBuf, keyLen);
if (!ppRightRows) {
continue;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册