提交 24a5a3f4 编写于 作者: D dapan1121

fix: compile issues

上级 3ad98b4e
...@@ -180,6 +180,8 @@ int32_t colDataSetVal(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const ...@@ -180,6 +180,8 @@ int32_t colDataSetVal(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const
int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull); int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull);
int32_t colDataReassignVal(SColumnInfoData* pColumnInfoData, uint32_t dstRowIdx, uint32_t srcRowIdx, const char* pData); int32_t colDataReassignVal(SColumnInfoData* pColumnInfoData, uint32_t dstRowIdx, uint32_t srcRowIdx, const char* pData);
int32_t colDataSetNItems(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, uint32_t numOfRows, bool trimValue); int32_t colDataSetNItems(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, uint32_t numOfRows, bool trimValue);
int32_t colDataCopyNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData,
uint32_t numOfRows, bool isNull);
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, int32_t* capacity, int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, int32_t* capacity,
const SColumnInfoData* pSource, int32_t numOfRow2); const SColumnInfoData* pSource, int32_t numOfRow2);
int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows, int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows,
......
...@@ -271,7 +271,7 @@ int32_t colDataCopyAndReassign(SColumnInfoData* pColumnInfoData, uint32_t curren ...@@ -271,7 +271,7 @@ int32_t colDataCopyAndReassign(SColumnInfoData* pColumnInfoData, uint32_t curren
if (numOfRows > 1) { if (numOfRows > 1) {
int32_t* pOffset = pColumnInfoData->varmeta.offset; int32_t* pOffset = pColumnInfoData->varmeta.offset;
memset(&pOffset[currentRow + 1], &pOffset[currentRow], sizeof(pOffset[0]) * (numOfRows - 1)); memset(&pOffset[currentRow + 1], pOffset[currentRow], sizeof(pOffset[0]) * (numOfRows - 1));
pColumnInfoData->reassigned = true; pColumnInfoData->reassigned = true;
} }
......
...@@ -21,6 +21,14 @@ extern "C" { ...@@ -21,6 +21,14 @@ extern "C" {
#define HASH_JOIN_DEFAULT_PAGE_SIZE 10485760 #define HASH_JOIN_DEFAULT_PAGE_SIZE 10485760
#pragma pack(push, 1)
typedef struct SBufRowInfo {
void* next;
uint16_t pageId;
int32_t offset;
} SBufRowInfo;
#pragma pack(pop)
typedef struct SHJoinCtx { typedef struct SHJoinCtx {
bool rowRemains; bool rowRemains;
SBufRowInfo* pBuildRow; SBufRowInfo* pBuildRow;
...@@ -42,7 +50,6 @@ typedef struct SHJoinColInfo { ...@@ -42,7 +50,6 @@ typedef struct SHJoinColInfo {
int32_t bytes; int32_t bytes;
char* data; char* data;
char* bitMap; char* bitMap;
char* dataInBuf;
} SHJoinColInfo; } SHJoinColInfo;
typedef struct SBufPageInfo { typedef struct SBufPageInfo {
...@@ -51,13 +58,6 @@ typedef struct SBufPageInfo { ...@@ -51,13 +58,6 @@ typedef struct SBufPageInfo {
char* data; char* data;
} SBufPageInfo; } SBufPageInfo;
#pragma pack(push, 1)
typedef struct SBufRowInfo {
void* next;
uint16_t pageId;
int32_t offset;
} SBufRowInfo;
#pragma pack(pop)
typedef struct SGroupData { typedef struct SGroupData {
SBufRowInfo* rows; SBufRowInfo* rows;
...@@ -96,9 +96,6 @@ typedef struct SHJoinOperatorInfo { ...@@ -96,9 +96,6 @@ typedef struct SHJoinOperatorInfo {
SHJoinCtx ctx; SHJoinCtx ctx;
} SHJoinOperatorInfo; } SHJoinOperatorInfo;
static SSDataBlock* doHashJoin(struct SOperatorInfo* pOperator);
static void destroyHashJoinOperator(void* param);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -126,6 +126,8 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode ...@@ -126,6 +126,8 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createHashJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SHashJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild); SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild);
......
...@@ -27,7 +27,7 @@ ...@@ -27,7 +27,7 @@
#include "ttypes.h" #include "ttypes.h"
#include "hashjoin.h" #include "hashjoin.h"
int32_t initJoinKeyColsInfo(SHJoinTableInfo* pTable, SNodeList* pList) { static int32_t initHJoinKeyColsInfo(SHJoinTableInfo* pTable, SNodeList* pList) {
pTable->keyNum = LIST_LENGTH(pList); pTable->keyNum = LIST_LENGTH(pList);
pTable->keyCols = taosMemoryMalloc(pTable->keyNum * sizeof(SHJoinColInfo)); pTable->keyCols = taosMemoryMalloc(pTable->keyNum * sizeof(SHJoinColInfo));
...@@ -40,9 +40,9 @@ int32_t initJoinKeyColsInfo(SHJoinTableInfo* pTable, SNodeList* pList) { ...@@ -40,9 +40,9 @@ int32_t initJoinKeyColsInfo(SHJoinTableInfo* pTable, SNodeList* pList) {
SNode* pNode = NULL; SNode* pNode = NULL;
FOREACH(pNode, pList) { FOREACH(pNode, pList) {
SColumnNode* pColNode = (SColumnNode*)pNode; SColumnNode* pColNode = (SColumnNode*)pNode;
pTable->keyCols[i]->srcSlot = pColNode->slotId; pTable->keyCols[i].srcSlot = pColNode->slotId;
pTable->keyCols[i]->vardata = IS_VAR_DATA_TYPE(pColNode->node.resType.type); pTable->keyCols[i].vardata = IS_VAR_DATA_TYPE(pColNode->node.resType.type);
pTable->keyCols[i]->bytes = pColNode->node.resType.bytes; pTable->keyCols[i].bytes = pColNode->node.resType.bytes;
bufSize += pColNode->node.resType.bytes; bufSize += pColNode->node.resType.bytes;
++i; ++i;
} }
...@@ -57,7 +57,7 @@ int32_t initJoinKeyColsInfo(SHJoinTableInfo* pTable, SNodeList* pList) { ...@@ -57,7 +57,7 @@ int32_t initJoinKeyColsInfo(SHJoinTableInfo* pTable, SNodeList* pList) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void getJoinValColNum(SNodeList* pList, int32_t blkId, int32_t* colNum) { static void getHJoinValColNum(SNodeList* pList, int32_t blkId, int32_t* colNum) {
*colNum = 0; *colNum = 0;
SNode* pNode = NULL; SNode* pNode = NULL;
...@@ -70,7 +70,7 @@ void getJoinValColNum(SNodeList* pList, int32_t blkId, int32_t* colNum) { ...@@ -70,7 +70,7 @@ void getJoinValColNum(SNodeList* pList, int32_t blkId, int32_t* colNum) {
} }
} }
bool valColInKeyCols(int16_t slotId, int32_t keyNum, SHJoinColInfo* pKeys, int32_t* pKeyIdx) { static bool valColInKeyCols(int16_t slotId, int32_t keyNum, SHJoinColInfo* pKeys, int32_t* pKeyIdx) {
for (int32_t i = 0; i < keyNum; ++i) { for (int32_t i = 0; i < keyNum; ++i) {
if (pKeys[i].srcSlot == slotId) { if (pKeys[i].srcSlot == slotId) {
*pKeyIdx = i; *pKeyIdx = i;
...@@ -81,8 +81,8 @@ bool valColInKeyCols(int16_t slotId, int32_t keyNum, SHJoinColInfo* pKeys, int32 ...@@ -81,8 +81,8 @@ bool valColInKeyCols(int16_t slotId, int32_t keyNum, SHJoinColInfo* pKeys, int32
return false; return false;
} }
int32_t initJoinValColsInfo(SHJoinTableInfo* pTable, SNodeList* pList) { static int32_t initJoinValColsInfo(SHJoinTableInfo* pTable, SNodeList* pList) {
getJoinValColNum(pList, pTable->blkId, &pTable->valNum); getHJoinValColNum(pList, pTable->blkId, &pTable->valNum);
if (pTable->valNum == 0) { if (pTable->valNum == 0) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -133,7 +133,7 @@ int32_t initJoinValColsInfo(SHJoinTableInfo* pTable, SNodeList* pList) { ...@@ -133,7 +133,7 @@ int32_t initJoinValColsInfo(SHJoinTableInfo* pTable, SNodeList* pList) {
} }
int32_t initJoinTableInfo(SHJoinOperatorInfo* pJoin, SHashJoinPhysiNode* pJoinNode, SOperatorInfo** pDownstream, int32_t idx, SQueryStat* pStat) { static int32_t initHJoinTableInfo(SHJoinOperatorInfo* pJoin, SHashJoinPhysiNode* pJoinNode, SOperatorInfo** pDownstream, int32_t idx, SQueryStat* pStat) {
SNodeList* pKeyList = NULL; SNodeList* pKeyList = NULL;
SHJoinTableInfo* pTable = &pJoin->tbs[idx]; SHJoinTableInfo* pTable = &pJoin->tbs[idx];
pTable->downStream = pDownstream[idx]; pTable->downStream = pDownstream[idx];
...@@ -144,11 +144,11 @@ int32_t initJoinTableInfo(SHJoinOperatorInfo* pJoin, SHashJoinPhysiNode* pJoinNo ...@@ -144,11 +144,11 @@ int32_t initJoinTableInfo(SHJoinOperatorInfo* pJoin, SHashJoinPhysiNode* pJoinNo
pKeyList = pJoinNode->pOnRight; pKeyList = pJoinNode->pOnRight;
} }
int32_t code = initJoinKeyColsInfo(pTable, pKeyList); int32_t code = initHJoinKeyColsInfo(pTable, pKeyList);
if (code) { if (code) {
return code; return code;
} }
int32_t code = initJoinValColsInfo(pTable, pJoinNode->pTargets); code = initJoinValColsInfo(pTable, pJoinNode->pTargets);
if (code) { if (code) {
return code; return code;
} }
...@@ -158,7 +158,7 @@ int32_t initJoinTableInfo(SHJoinOperatorInfo* pJoin, SHashJoinPhysiNode* pJoinNo ...@@ -158,7 +158,7 @@ int32_t initJoinTableInfo(SHJoinOperatorInfo* pJoin, SHashJoinPhysiNode* pJoinNo
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void setJoinBuildAndProbeTable(SHJoinOperatorInfo* pInfo, SHashJoinPhysiNode* pJoinNode) { static void setHJoinBuildAndProbeTable(SHJoinOperatorInfo* pInfo, SHashJoinPhysiNode* pJoinNode) {
int32_t buildIdx = 0; int32_t buildIdx = 0;
int32_t probeIdx = 1; int32_t probeIdx = 1;
...@@ -190,7 +190,7 @@ void setJoinBuildAndProbeTable(SHJoinOperatorInfo* pInfo, SHashJoinPhysiNode* pJ ...@@ -190,7 +190,7 @@ void setJoinBuildAndProbeTable(SHJoinOperatorInfo* pInfo, SHashJoinPhysiNode* pJ
pInfo->pProbe = &pInfo->tbs[probeIdx]; pInfo->pProbe = &pInfo->tbs[probeIdx];
} }
void buildJoinResColMap(SHJoinOperatorInfo* pInfo, SHashJoinPhysiNode* pJoinNode) { static int32_t buildHJoinResColMap(SHJoinOperatorInfo* pInfo, SHashJoinPhysiNode* pJoinNode) {
pInfo->pResColNum = pJoinNode->pTargets->length; pInfo->pResColNum = pJoinNode->pTargets->length;
pInfo->pResColMap = taosMemoryCalloc(pJoinNode->pTargets->length, sizeof(int8_t)); pInfo->pResColMap = taosMemoryCalloc(pJoinNode->pTargets->length, sizeof(int8_t));
if (NULL == pInfo->pResColMap) { if (NULL == pInfo->pResColMap) {
...@@ -213,7 +213,7 @@ void buildJoinResColMap(SHJoinOperatorInfo* pInfo, SHashJoinPhysiNode* pJoinNode ...@@ -213,7 +213,7 @@ void buildJoinResColMap(SHJoinOperatorInfo* pInfo, SHashJoinPhysiNode* pJoinNode
} }
FORCE_INLINE int32_t addPageToJoinBuf(SArray* pRowBufs) { static FORCE_INLINE int32_t addPageToHJoinBuf(SArray* pRowBufs) {
SBufPageInfo page; SBufPageInfo page;
page.pageSize = HASH_JOIN_DEFAULT_PAGE_SIZE; page.pageSize = HASH_JOIN_DEFAULT_PAGE_SIZE;
page.offset = 0; page.offset = 0;
...@@ -226,126 +226,70 @@ FORCE_INLINE int32_t addPageToJoinBuf(SArray* pRowBufs) { ...@@ -226,126 +226,70 @@ FORCE_INLINE int32_t addPageToJoinBuf(SArray* pRowBufs) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t initJoinBufPages(SHJoinOperatorInfo* pInfo) { static int32_t initHJoinBufPages(SHJoinOperatorInfo* pInfo) {
pInfo->pRowBufs = taosArrayInit(32, sizeof(SBufPageInfo)); pInfo->pRowBufs = taosArrayInit(32, sizeof(SBufPageInfo));
if (NULL == pInfo->pRowBufs) { if (NULL == pInfo->pRowBufs) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
return addPageToJoinBuf(pInfo->pRowBufs); return addPageToHJoinBuf(pInfo->pRowBufs);
} }
static void freeHJoinTableInfo(SHJoinTableInfo* pTable) {
taosMemoryFreeClear(pTable->keyCols);
taosMemoryFreeClear(pTable->keyBuf);
taosMemoryFreeClear(pTable->valCols);
taosArrayDestroy(pTable->valVarCols);
}
SOperatorInfo* createHashJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, static void freeHJoinBufPage(void* param) {
SHashJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo) { SBufPageInfo* pInfo = (SBufPageInfo*)param;
SHJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SHJoinOperatorInfo)); taosMemoryFree(pInfo->data);
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); }
int32_t code = TSDB_CODE_SUCCESS;
if (pOperator == NULL || pInfo == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
int32_t numOfCols = 0;
pInfo->pRes = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc);
initResultSizeInfo(&pOperator->resultInfo, 4096);
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
setOperatorInfo(pOperator, "HashJoinOperator", QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN, false, OP_NOT_OPENED, pInfo, pTaskInfo);
initJoinTableInfo(pInfo, pJoinNode, pDownstream, 0, &pJoinNode->inputStat[0]);
initJoinTableInfo(pInfo, pJoinNode, pDownstream, 1, &pJoinNode->inputStat[1]);
setJoinBuildAndProbeTable(pInfo, pJoinNode);
code = buildJoinResColMap(pInfo, pJoinNode);
if (code) {
goto _error;
}
code = initJoinBufPages(pInfo);
if (code) {
goto _error;
}
size_t hashCap = pInfo->pBuild->inputStat.inputRowNum > 0 ? (pInfo->pBuild->inputStat.inputRowNum * 1.5) : 1024;
pInfo->pKeyHash = tSimpleHashInit(hashCap, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
if (pInfo->pKeyHash == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
if (pJoinNode->pFilterConditions != NULL && pJoinNode->node.pConditions != NULL) {
pInfo->pCond = nodesMakeNode(QUERY_NODE_LOGIC_CONDITION);
if (pInfo->pCond == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
SLogicConditionNode* pLogicCond = (SLogicConditionNode*)(pInfo->pCond);
pLogicCond->pParameterList = nodesMakeList();
if (pLogicCond->pParameterList == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
nodesListMakeAppend(&pLogicCond->pParameterList, nodesCloneNode(pJoinNode->pFilterConditions)); static void destroyHJoinKeyHash(SSHashObj** ppHash) {
nodesListMakeAppend(&pLogicCond->pParameterList, nodesCloneNode(pJoinNode->node.pConditions)); if (NULL == ppHash || NULL == (*ppHash)) {
pLogicCond->condType = LOGIC_COND_TYPE_AND; return;
} else if (pJoinNode->pFilterConditions != NULL) {
pInfo->pCond = nodesCloneNode(pJoinNode->pFilterConditions);
} else if (pJoinNode->node.pConditions != NULL) {
pInfo->pCond = nodesCloneNode(pJoinNode->node.pConditions);
} else {
pInfo->pCond = NULL;
} }
code = filterInitFromNode(pInfo->pCond, &pOperator->exprSupp.pFilterInfo, 0); void* pIte = NULL;
if (code != TSDB_CODE_SUCCESS) { int32_t iter = 0;
goto _error; while ((pIte = tSimpleHashIterate(*ppHash, pIte, &iter)) != NULL) {
SGroupData* pGroup = pIte;
SBufRowInfo* pRow = pGroup->rows;
SBufRowInfo* pNext = NULL;
while (pRow) {
pNext = pRow->next;
taosMemoryFree(pRow);
pRow = pNext;
} }
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doHashJoin, NULL, destroHashJoinOperator, optrDefaultBufFn, NULL);
return pOperator;
_error:
if (pInfo != NULL) {
destroyHashJoinOperator(pInfo);
} }
taosMemoryFree(pOperator); tSimpleHashCleanup(*ppHash);
pTaskInfo->code = code; *ppHash = NULL;
return NULL;
} }
void destroHashJoinOperator(void* param) { static void destroyHashJoinOperator(void* param) {
SHJoinOperatorInfo* pJoinOperator = (SHJoinOperatorInfo*)param; SHJoinOperatorInfo* pJoinOperator = (SHJoinOperatorInfo*)param;
if (pJoinOperator->pColEqualOnConditions != NULL) {
mergeJoinDestoryBuildTable(pJoinOperator->rightBuildTable);
taosMemoryFreeClear(pJoinOperator->rightEqOnCondKeyBuf);
taosArrayDestroy(pJoinOperator->rightEqOnCondCols);
taosMemoryFreeClear(pJoinOperator->leftEqOnCondKeyBuf); destroyHJoinKeyHash(&pJoinOperator->pKeyHash);
taosArrayDestroy(pJoinOperator->leftEqOnCondCols);
}
nodesDestroyNode(pJoinOperator->pCondAfterMerge);
taosArrayDestroy(pJoinOperator->rowCtx.leftCreatedBlocks);
taosArrayDestroy(pJoinOperator->rowCtx.rightCreatedBlocks);
taosArrayDestroy(pJoinOperator->rowCtx.leftRowLocations);
taosArrayDestroy(pJoinOperator->rowCtx.rightRowLocations);
freeHJoinTableInfo(&pJoinOperator->tbs[0]);
freeHJoinTableInfo(&pJoinOperator->tbs[1]);
pJoinOperator->pRes = blockDataDestroy(pJoinOperator->pRes); pJoinOperator->pRes = blockDataDestroy(pJoinOperator->pRes);
taosMemoryFreeClear(pJoinOperator->pResColMap);
taosArrayDestroyEx(pJoinOperator->pRowBufs, freeHJoinBufPage);
nodesDestroyNode(pJoinOperator->pCond);
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
} }
FORCE_INLINE char* retrieveColDataFromRowBufs(SArray* pRowBufs, SBufRowInfo* pRow) { static FORCE_INLINE char* retrieveColDataFromRowBufs(SArray* pRowBufs, SBufRowInfo* pRow) {
SBufPageInfo *pPage = taosArrayGet(pRowBufs, pRow->pageId); SBufPageInfo *pPage = taosArrayGet(pRowBufs, pRow->pageId);
return pPage->data + pRow->offset; return pPage->data + pRow->offset;
} }
FORCE_INLINE int32_t copyJoinResRowsToBlock(SHJoinOperatorInfo* pJoin, int32_t rowNum, SBufRowInfo* pStart, SSDataBlock* pRes) { static FORCE_INLINE int32_t copyHJoinResRowsToBlock(SHJoinOperatorInfo* pJoin, int32_t rowNum, SBufRowInfo* pStart, SSDataBlock* pRes) {
SHJoinTableInfo* pBuild = pJoin->pBuild; SHJoinTableInfo* pBuild = pJoin->pBuild;
SHJoinTableInfo* pProbe = pJoin->pProbe; SHJoinTableInfo* pProbe = pJoin->pProbe;
int32_t buildIdx = 0, buildValIdx = 0; int32_t buildIdx = 0, buildValIdx = 0;
...@@ -356,11 +300,17 @@ FORCE_INLINE int32_t copyJoinResRowsToBlock(SHJoinOperatorInfo* pJoin, int32_t r ...@@ -356,11 +300,17 @@ FORCE_INLINE int32_t copyJoinResRowsToBlock(SHJoinOperatorInfo* pJoin, int32_t r
for (int32_t r = 0; r < rowNum; ++r) { for (int32_t r = 0; r < rowNum; ++r) {
char* pData = retrieveColDataFromRowBufs(pJoin->pRowBufs, pRow); char* pData = retrieveColDataFromRowBufs(pJoin->pRowBufs, pRow);
char* pValData = pData + pBuild->valBitMapSize; char* pValData = pData + pBuild->valBitMapSize;
char* pKeyData = pProbe->keyData;
buildIdx = buildValIdx = probeIdx = 0;
for (int32_t i = 0; i < pJoin->pResColNum; ++i) { for (int32_t i = 0; i < pJoin->pResColNum; ++i) {
if (pJoin->pResColMap[i]) { if (pJoin->pResColMap[i]) {
SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pBuild->valCols[buildIdx].dstSlot); SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pBuild->valCols[buildIdx].dstSlot);
if (pBuild->valCols[buildIdx].keyCol) { if (pBuild->valCols[buildIdx].keyCol) {
code = colDataSetVal(pDst, pRes->info.rows + r, pKeyData, false);
if (code) {
return code;
}
pKeyData += pBuild->valCols[buildIdx].vardata ? varDataTLen(pKeyData) : pBuild->valCols[buildIdx].bytes;
} else { } else {
if (colDataIsNull_f(pData, buildValIdx)) { if (colDataIsNull_f(pData, buildValIdx)) {
code = colDataSetVal(pDst, pRes->info.rows + r, NULL, true); code = colDataSetVal(pDst, pRes->info.rows + r, NULL, true);
...@@ -377,8 +327,8 @@ FORCE_INLINE int32_t copyJoinResRowsToBlock(SHJoinOperatorInfo* pJoin, int32_t r ...@@ -377,8 +327,8 @@ FORCE_INLINE int32_t copyJoinResRowsToBlock(SHJoinOperatorInfo* pJoin, int32_t r
buildValIdx++; buildValIdx++;
} }
buildIdx++; buildIdx++;
} else { } else if (0 == i) {
SColumnInfoData* pSrc = taosArrayGet(pJoin->ctx.pProbeData, pProbe->valCols[probeIdx].srcSlot); SColumnInfoData* pSrc = taosArrayGet(pJoin->ctx.pProbeData->pDataBlock, pProbe->valCols[probeIdx].srcSlot);
SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pProbe->valCols[probeIdx].dstSlot); SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pProbe->valCols[probeIdx].dstSlot);
code = colDataCopyNItems(pDst, pRes->info.rows, colDataGetData(pSrc, pJoin->ctx.probeIdx), rowNum, colDataIsNull_s(pSrc, pJoin->ctx.probeIdx)); code = colDataCopyNItems(pDst, pRes->info.rows, colDataGetData(pSrc, pJoin->ctx.probeIdx), rowNum, colDataIsNull_s(pSrc, pJoin->ctx.probeIdx));
...@@ -395,20 +345,20 @@ FORCE_INLINE int32_t copyJoinResRowsToBlock(SHJoinOperatorInfo* pJoin, int32_t r ...@@ -395,20 +345,20 @@ FORCE_INLINE int32_t copyJoinResRowsToBlock(SHJoinOperatorInfo* pJoin, int32_t r
} }
FORCE_INLINE void appendJoinResToBlock(struct SOperatorInfo* pOperator, SSDataBlock* pRes) { static FORCE_INLINE void appendHJoinResToBlock(struct SOperatorInfo* pOperator, SSDataBlock* pRes) {
SHJoinOperatorInfo* pJoin = pOperator->info; SHJoinOperatorInfo* pJoin = pOperator->info;
SHJoinCtx* pCtx = &pJoin->ctx; SHJoinCtx* pCtx = &pJoin->ctx;
SBufRowInfo* pStart = pCtx->pBuildRow; SBufRowInfo* pStart = pCtx->pBuildRow;
int32_t rowNum = 0; int32_t rowNum = 0;
int32_t resNum = pRes.info.rows; int32_t resNum = pRes->info.rows;
while (pCtx->pBuildRow && resNum < pRes.info.capacity) { while (pCtx->pBuildRow && resNum < pRes->info.capacity) {
rowNum++; rowNum++;
resNum++; resNum++;
pCtx->pBuildRow = pCtx->pBuildRow->next; pCtx->pBuildRow = pCtx->pBuildRow->next;
} }
int32_t code = copyJoinResRowsToBlock(pJoin, rowNum, pStart, pRes); int32_t code = copyHJoinResRowsToBlock(pJoin, rowNum, pStart, pRes);
if (code) { if (code) {
pOperator->pTaskInfo->code = code; pOperator->pTaskInfo->code = code;
T_LONG_JMP(pOperator->pTaskInfo->env, code); T_LONG_JMP(pOperator->pTaskInfo->env, code);
...@@ -418,7 +368,42 @@ FORCE_INLINE void appendJoinResToBlock(struct SOperatorInfo* pOperator, SSDataBl ...@@ -418,7 +368,42 @@ FORCE_INLINE void appendJoinResToBlock(struct SOperatorInfo* pOperator, SSDataBl
pCtx->rowRemains = pCtx->pBuildRow ? true : false; pCtx->rowRemains = pCtx->pBuildRow ? true : false;
} }
void doHashJoinImpl(struct SOperatorInfo* pOperator) {
static FORCE_INLINE void copyKeyColsDataToBuf(SHJoinTableInfo* pTable, int32_t rowIdx, size_t *pBufLen) {
char *pData = NULL;
size_t bufLen = 0;
if (1 == pTable->keyNum) {
if (pTable->keyCols[0].vardata) {
pData = pTable->keyCols[0].data + pTable->keyCols[0].offset[rowIdx];
bufLen = varDataTLen(pData);
} else {
pData = pTable->keyCols[0].data + pTable->keyCols[0].bytes * rowIdx;
bufLen = pTable->keyCols[0].bytes;
}
pTable->keyData = pData;
} else {
for (int32_t i = 0; i < pTable->keyNum; ++i) {
if (pTable->keyCols[i].vardata) {
pData = pTable->keyCols[i].data + pTable->keyCols[i].offset[rowIdx];
memcpy(pTable->keyBuf + bufLen, pData, varDataTLen(pData));
bufLen += varDataTLen(pData);
} else {
pData = pTable->keyCols[i].data + pTable->keyCols[i].bytes * rowIdx;
memcpy(pTable->keyBuf + bufLen, pData, pTable->keyCols[i].bytes);
bufLen += pTable->keyCols[i].bytes;
}
}
pTable->keyData = pTable->keyBuf;
}
if (pBufLen) {
*pBufLen = bufLen;
}
}
static void doHashJoinImpl(struct SOperatorInfo* pOperator) {
SHJoinOperatorInfo* pJoin = pOperator->info; SHJoinOperatorInfo* pJoin = pOperator->info;
SHJoinTableInfo* pProbe = pJoin->pProbe; SHJoinTableInfo* pProbe = pJoin->pProbe;
SHJoinCtx* pCtx = &pJoin->ctx; SHJoinCtx* pCtx = &pJoin->ctx;
...@@ -426,7 +411,7 @@ void doHashJoinImpl(struct SOperatorInfo* pOperator) { ...@@ -426,7 +411,7 @@ void doHashJoinImpl(struct SOperatorInfo* pOperator) {
size_t bufLen = 0; size_t bufLen = 0;
if (pJoin->ctx.pBuildRow) { if (pJoin->ctx.pBuildRow) {
appendJoinResToBlock(pOperator, pRes); appendHJoinResToBlock(pOperator, pRes);
return; return;
} }
...@@ -435,15 +420,15 @@ void doHashJoinImpl(struct SOperatorInfo* pOperator) { ...@@ -435,15 +420,15 @@ void doHashJoinImpl(struct SOperatorInfo* pOperator) {
SGroupData* pGroup = tSimpleHashGet(pJoin->pKeyHash, pProbe->keyData, bufLen); SGroupData* pGroup = tSimpleHashGet(pJoin->pKeyHash, pProbe->keyData, bufLen);
if (pGroup) { if (pGroup) {
pCtx->pBuildRow = pGroup->rows; pCtx->pBuildRow = pGroup->rows;
appendJoinResToBlock(pOperator, pRes); appendHJoinResToBlock(pOperator, pRes);
if (pRes->info.rows >= pRes.info.capacity) { if (pRes->info.rows >= pRes->info.capacity) {
break; break;
} }
} }
} }
} }
int32_t setKeyColsData(SSDataBlock* pBlock, SHJoinTableInfo* pTable) { static int32_t setKeyColsData(SSDataBlock* pBlock, SHJoinTableInfo* pTable) {
for (int32_t i = 0; i < pTable->keyNum; ++i) { for (int32_t i = 0; i < pTable->keyNum; ++i) {
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, pTable->keyCols[i].srcSlot); SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, pTable->keyCols[i].srcSlot);
if (pTable->keyCols[i].vardata != IS_VAR_DATA_TYPE(pCol->info.type)) { if (pTable->keyCols[i].vardata != IS_VAR_DATA_TYPE(pCol->info.type)) {
...@@ -463,7 +448,7 @@ int32_t setKeyColsData(SSDataBlock* pBlock, SHJoinTableInfo* pTable) { ...@@ -463,7 +448,7 @@ int32_t setKeyColsData(SSDataBlock* pBlock, SHJoinTableInfo* pTable) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t setValColsData(SSDataBlock* pBlock, SHJoinTableInfo* pTable) { static int32_t setValColsData(SSDataBlock* pBlock, SHJoinTableInfo* pTable) {
if (!pTable->valColExist) { if (!pTable->valColExist) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -480,7 +465,7 @@ int32_t setValColsData(SSDataBlock* pBlock, SHJoinTableInfo* pTable) { ...@@ -480,7 +465,7 @@ int32_t setValColsData(SSDataBlock* pBlock, SHJoinTableInfo* pTable) {
qError("column bytes mismatch, idx:%d, slotId:%d, bytes:%d, %d", i, pTable->valCols[i].srcSlot, pCol->info.bytes, pTable->valCols[i].bytes); qError("column bytes mismatch, idx:%d, slotId:%d, bytes:%d, %d", i, pTable->valCols[i].srcSlot, pCol->info.bytes, pTable->valCols[i].bytes);
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
} }
if (!pTable->valCols[i].vardata)) { if (!pTable->valCols[i].vardata) {
pTable->valCols[i].bitMap = pCol->nullbitmap; pTable->valCols[i].bitMap = pCol->nullbitmap;
} }
pTable->valCols[i].data = pCol->pData; pTable->valCols[i].data = pCol->pData;
...@@ -493,40 +478,8 @@ int32_t setValColsData(SSDataBlock* pBlock, SHJoinTableInfo* pTable) { ...@@ -493,40 +478,8 @@ int32_t setValColsData(SSDataBlock* pBlock, SHJoinTableInfo* pTable) {
} }
FORCE_INLINE void copyKeyColsDataToBuf(SHJoinTableInfo* pTable, int32_t rowIdx, size_t *pBufLen) {
char *pData = NULL;
size_t bufLen = 0;
if (1 == pTable->keyNum) {
if (pTable->keyCols[0].vardata) {
pData = pTable->keyCols[0].data + pTable->keyCols[0].offset[rowIdx];
bufLen = varDataTLen(pData);
} else {
pData = pTable->keyCols[0].data + pTable->keyCols[0].bytes * rowIdx;
bufLen = pTable->keyCols[0].bytes;
}
pTable->keyData = pData;
} else {
for (int32_t i = 0; i < pTable->keyNum; ++i) {
if (pTable->keyCols[i].vardata) {
pData = pTable->keyCols[i].data + pTable->keyCols[i].offset[rowIdx];
memcpy(pTable->keyBuf + bufLen, pData, varDataTLen(pData));
bufLen += varDataTLen(pData);
} else {
pData = pTable->keyCols[i].data + pTable->keyCols[i].bytes * rowIdx;
memcpy(pTable->keyBuf + bufLen, pData, pTable->keyCols[i].bytes);
bufLen += pTable->keyCols[i].bytes;
}
}
pTable->keyData = pTable->keyBuf;
}
if (pBufLen) {
*pBufLen = bufLen;
}
}
FORCE_INLINE void copyValColsDataToBuf(SHJoinTableInfo* pTable, int32_t rowIdx) { static FORCE_INLINE void copyValColsDataToBuf(SHJoinTableInfo* pTable, int32_t rowIdx) {
if (!pTable->valColExist) { if (!pTable->valColExist) {
return; return;
} }
...@@ -559,7 +512,7 @@ FORCE_INLINE void copyValColsDataToBuf(SHJoinTableInfo* pTable, int32_t rowIdx) ...@@ -559,7 +512,7 @@ FORCE_INLINE void copyValColsDataToBuf(SHJoinTableInfo* pTable, int32_t rowIdx)
} }
FORCE_INLINE int32_t getValBufFromPages(SArray* pPages, int32_t bufSize, char** pBuf, SBufRowInfo* pRow) { static FORCE_INLINE int32_t getValBufFromPages(SArray* pPages, int32_t bufSize, char** pBuf, SBufRowInfo* pRow) {
if (0 == bufSize) { if (0 == bufSize) {
pRow->pageId = -1; pRow->pageId = -1;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -580,14 +533,14 @@ FORCE_INLINE int32_t getValBufFromPages(SArray* pPages, int32_t bufSize, char** ...@@ -580,14 +533,14 @@ FORCE_INLINE int32_t getValBufFromPages(SArray* pPages, int32_t bufSize, char**
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t code = addPageToJoinBuf(pPages); int32_t code = addPageToHJoinBuf(pPages);
if (code) { if (code) {
return code; return code;
} }
} while (true); } while (true);
} }
FORCE_INLINE int32_t getJoinValBufSize(SHJoinTableInfo* pTable, int32_t rowIdx) { static FORCE_INLINE int32_t getHJoinValBufSize(SHJoinTableInfo* pTable, int32_t rowIdx) {
if (NULL == pTable->valVarCols) { if (NULL == pTable->valVarCols) {
return pTable->valBufSize; return pTable->valBufSize;
} }
...@@ -605,7 +558,7 @@ FORCE_INLINE int32_t getJoinValBufSize(SHJoinTableInfo* pTable, int32_t rowIdx) ...@@ -605,7 +558,7 @@ FORCE_INLINE int32_t getJoinValBufSize(SHJoinTableInfo* pTable, int32_t rowIdx)
} }
int32_t addRowToHashImpl(SHJoinOperatorInfo* pJoin, SGroupData* pGroup, SHJoinTableInfo* pTable, size_t keyLen, int32_t rowIdx) { static int32_t addRowToHashImpl(SHJoinOperatorInfo* pJoin, SGroupData* pGroup, SHJoinTableInfo* pTable, size_t keyLen, int32_t rowIdx) {
SGroupData group = {0}; SGroupData group = {0};
SBufRowInfo* pRow = NULL; SBufRowInfo* pRow = NULL;
...@@ -616,13 +569,13 @@ int32_t addRowToHashImpl(SHJoinOperatorInfo* pJoin, SGroupData* pGroup, SHJoinTa ...@@ -616,13 +569,13 @@ int32_t addRowToHashImpl(SHJoinOperatorInfo* pJoin, SGroupData* pGroup, SHJoinTa
} }
pRow = group.rows; pRow = group.rows;
} else { } else {
pRow = taosMemoryMalloc(sizeof(SBufRowInfo)) pRow = taosMemoryMalloc(sizeof(SBufRowInfo));
if (NULL == pRow) { if (NULL == pRow) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
} }
int32_t code = getValBufFromPages(pJoin->pRowBufs, getJoinValBufSize(pTable, rowIdx), &pTable->valData, pRow); int32_t code = getHJoinValBufSize(pJoin->pRowBufs, getJoinValBufSize(pTable, rowIdx), &pTable->valData, pRow);
if (code) { if (code) {
return code; return code;
} }
...@@ -640,7 +593,7 @@ int32_t addRowToHashImpl(SHJoinOperatorInfo* pJoin, SGroupData* pGroup, SHJoinTa ...@@ -640,7 +593,7 @@ int32_t addRowToHashImpl(SHJoinOperatorInfo* pJoin, SGroupData* pGroup, SHJoinTa
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t addRowToHash(SHJoinOperatorInfo* pJoin, SSDataBlock* pBlock, size_t keyLen, int32_t rowIdx) { static int32_t addRowToHash(SHJoinOperatorInfo* pJoin, SSDataBlock* pBlock, size_t keyLen, int32_t rowIdx) {
SHJoinTableInfo* pBuild = pJoin->pBuild; SHJoinTableInfo* pBuild = pJoin->pBuild;
int32_t code = setValColsData(pBlock, pBuild); int32_t code = setValColsData(pBlock, pBuild);
if (code) { if (code) {
...@@ -658,7 +611,7 @@ int32_t addRowToHash(SHJoinOperatorInfo* pJoin, SSDataBlock* pBlock, size_t keyL ...@@ -658,7 +611,7 @@ int32_t addRowToHash(SHJoinOperatorInfo* pJoin, SSDataBlock* pBlock, size_t keyL
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t addBlockRowsToHash(SSDataBlock* pBlock, SHJoinOperatorInfo* pJoin) { static int32_t addBlockRowsToHash(SSDataBlock* pBlock, SHJoinOperatorInfo* pJoin) {
SHJoinTableInfo* pBuild = pJoin->pBuild; SHJoinTableInfo* pBuild = pJoin->pBuild;
int32_t code = setKeyColsData(pBlock, pBuild); int32_t code = setKeyColsData(pBlock, pBuild);
if (code) { if (code) {
...@@ -677,8 +630,7 @@ int32_t addBlockRowsToHash(SSDataBlock* pBlock, SHJoinOperatorInfo* pJoin) { ...@@ -677,8 +630,7 @@ int32_t addBlockRowsToHash(SSDataBlock* pBlock, SHJoinOperatorInfo* pJoin) {
return code; return code;
} }
int32_t buildJoinKeyHash(struct SOperatorInfo* pOperator) { static int32_t buildHJoinKeyHash(SHJoinOperatorInfo* pJoin) {
SHJoinOperatorInfo* pJoin = pOperator->info;
SSDataBlock* pBlock = NULL; SSDataBlock* pBlock = NULL;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
...@@ -697,7 +649,7 @@ int32_t buildJoinKeyHash(struct SOperatorInfo* pOperator) { ...@@ -697,7 +649,7 @@ int32_t buildJoinKeyHash(struct SOperatorInfo* pOperator) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void launchBlockHashJoin(struct SOperatorInfo* pOperator, SSDataBlock* pBlock) { static int32_t launchBlockHashJoin(struct SOperatorInfo* pOperator, SSDataBlock* pBlock) {
SHJoinOperatorInfo* pJoin = pOperator->info; SHJoinOperatorInfo* pJoin = pOperator->info;
SHJoinTableInfo* pProbe = pJoin->pProbe; SHJoinTableInfo* pProbe = pJoin->pProbe;
int32_t code = setKeyColsData(pBlock, pProbe); int32_t code = setKeyColsData(pBlock, pProbe);
...@@ -714,9 +666,19 @@ void launchBlockHashJoin(struct SOperatorInfo* pOperator, SSDataBlock* pBlock) { ...@@ -714,9 +666,19 @@ void launchBlockHashJoin(struct SOperatorInfo* pOperator, SSDataBlock* pBlock) {
pJoin->ctx.pProbeData = pBlock; pJoin->ctx.pProbeData = pBlock;
doHashJoinImpl(pOperator); doHashJoinImpl(pOperator);
return TSDB_CODE_SUCCESS;
}
static void setHJoinDone(struct SOperatorInfo* pOperator) {
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
pOperator->status = OP_EXEC_DONE;
SHJoinOperatorInfo* pInfo = pOperator->info;
destroyHJoinKeyHash(&pInfo->pKeyHash);
} }
SSDataBlock* doHashJoin(struct SOperatorInfo* pOperator) { static SSDataBlock* doHashJoin(struct SOperatorInfo* pOperator) {
SHJoinOperatorInfo* pJoin = pOperator->info; SHJoinOperatorInfo* pJoin = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
...@@ -728,15 +690,14 @@ SSDataBlock* doHashJoin(struct SOperatorInfo* pOperator) { ...@@ -728,15 +690,14 @@ SSDataBlock* doHashJoin(struct SOperatorInfo* pOperator) {
} }
if (NULL == pJoin->pKeyHash) { if (NULL == pJoin->pKeyHash) {
code = buildJoinKeyHash(pJoin); code = buildHJoinKeyHash(pJoin);
if (code) { if (code) {
pTaskInfo->code = code; pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
} }
if (tSimpleHashGetSize(pJoin->pKeyHash) <= 0) { if (tSimpleHashGetSize(pJoin->pKeyHash) <= 0) {
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); setHJoinDone(pOperator);
pOperator->status = OP_EXEC_DONE;
return NULL; return NULL;
} }
} }
...@@ -755,12 +716,15 @@ SSDataBlock* doHashJoin(struct SOperatorInfo* pOperator) { ...@@ -755,12 +716,15 @@ SSDataBlock* doHashJoin(struct SOperatorInfo* pOperator) {
while (true) { while (true) {
SSDataBlock* pBlock = pJoin->pProbe->downStream->fpSet.getNextFn(pJoin->pProbe->downStream); SSDataBlock* pBlock = pJoin->pProbe->downStream->fpSet.getNextFn(pJoin->pProbe->downStream);
if (NULL == pBlock) { if (NULL == pBlock) {
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); setHJoinDone(pOperator);
pOperator->status = OP_EXEC_DONE;
break; break;
} }
launchBlockHashJoin(pOperator, pBlock); code = launchBlockHashJoin(pOperator, pBlock);
if (code) {
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
if (pRes->info.rows < pOperator->resultInfo.threshold) { if (pRes->info.rows < pOperator->resultInfo.threshold) {
continue; continue;
...@@ -776,3 +740,88 @@ SSDataBlock* doHashJoin(struct SOperatorInfo* pOperator) { ...@@ -776,3 +740,88 @@ SSDataBlock* doHashJoin(struct SOperatorInfo* pOperator) {
return (pRes->info.rows > 0) ? pRes : NULL; return (pRes->info.rows > 0) ? pRes : NULL;
} }
SOperatorInfo* createHashJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
SHashJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo) {
SHJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SHJoinOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
int32_t code = TSDB_CODE_SUCCESS;
if (pOperator == NULL || pInfo == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
int32_t numOfCols = 0;
pInfo->pRes = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc);
initResultSizeInfo(&pOperator->resultInfo, 4096);
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
setOperatorInfo(pOperator, "HashJoinOperator", QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN, false, OP_NOT_OPENED, pInfo, pTaskInfo);
initHJoinTableInfo(pInfo, pJoinNode, pDownstream, 0, &pJoinNode->inputStat[0]);
initHJoinTableInfo(pInfo, pJoinNode, pDownstream, 1, &pJoinNode->inputStat[1]);
setHJoinBuildAndProbeTable(pInfo, pJoinNode);
code = buildHJoinResColMap(pInfo, pJoinNode);
if (code) {
goto _error;
}
code = initHJoinBufPages(pInfo);
if (code) {
goto _error;
}
size_t hashCap = pInfo->pBuild->inputStat.inputRowNum > 0 ? (pInfo->pBuild->inputStat.inputRowNum * 1.5) : 1024;
pInfo->pKeyHash = tSimpleHashInit(hashCap, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
if (pInfo->pKeyHash == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
if (pJoinNode->pFilterConditions != NULL && pJoinNode->node.pConditions != NULL) {
pInfo->pCond = nodesMakeNode(QUERY_NODE_LOGIC_CONDITION);
if (pInfo->pCond == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
SLogicConditionNode* pLogicCond = (SLogicConditionNode*)(pInfo->pCond);
pLogicCond->pParameterList = nodesMakeList();
if (pLogicCond->pParameterList == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
nodesListMakeAppend(&pLogicCond->pParameterList, nodesCloneNode(pJoinNode->pFilterConditions));
nodesListMakeAppend(&pLogicCond->pParameterList, nodesCloneNode(pJoinNode->node.pConditions));
pLogicCond->condType = LOGIC_COND_TYPE_AND;
} else if (pJoinNode->pFilterConditions != NULL) {
pInfo->pCond = nodesCloneNode(pJoinNode->pFilterConditions);
} else if (pJoinNode->node.pConditions != NULL) {
pInfo->pCond = nodesCloneNode(pJoinNode->node.pConditions);
} else {
pInfo->pCond = NULL;
}
code = filterInitFromNode(pInfo->pCond, &pOperator->exprSupp.pFilterInfo, 0);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doHashJoin, NULL, destroyHashJoinOperator, optrDefaultBufFn, NULL);
return pOperator;
_error:
if (pInfo != NULL) {
destroyHashJoinOperator(pInfo);
}
taosMemoryFree(pOperator);
pTaskInfo->code = code;
return NULL;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册