joinoperator.c 9.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "executorimpl.h"
17 18 19 20
#include "function.h"
#include "os.h"
#include "querynodes.h"
#include "tcompare.h"
21
#include "tdatablock.h"
22
#include "thash.h"
23
#include "tmsg.h"
24 25
#include "ttypes.h"

26
static void         setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode);
27
static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator);
28 29
static void         destroyMergeJoinOperator(void* param, int32_t numOfOutput);
static void         extractTimeCondition(SJoinOperatorInfo* Info, SLogicConditionNode* pLogicConditionNode);
30

31 32
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
                                           SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo) {
33 34 35 36 37 38
  SJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SJoinOperatorInfo));
  SOperatorInfo*     pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pOperator == NULL || pInfo == NULL) {
    goto _error;
  }

39
  SSDataBlock* pResBlock = createResDataBlock(pJoinNode->node.pOutputDataBlockDesc);
40

41
  int32_t    numOfCols = 0;
42 43
  SExprInfo* pExprInfo = createExprInfo(pJoinNode->pTargets, NULL, &numOfCols);

44
  initResultSizeInfo(&pOperator->resultInfo, 4096);
45

46 47
  pInfo->pRes = pResBlock;
  pOperator->name = "MergeJoinOperator";
48
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
49 50 51 52 53 54
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->exprSupp.pExprInfo = pExprInfo;
  pOperator->exprSupp.numOfExprs = numOfCols;
  pOperator->info = pInfo;
  pOperator->pTaskInfo = pTaskInfo;
55

56 57 58
  SNode* pMergeCondition = pJoinNode->pMergeCondition;
  if (nodeType(pMergeCondition) == QUERY_NODE_OPERATOR) {
    SOperatorNode* pNode = (SOperatorNode*)pMergeCondition;
59 60
    setJoinColumnInfo(&pInfo->leftCol, (SColumnNode*)pNode->pLeft);
    setJoinColumnInfo(&pInfo->rightCol, (SColumnNode*)pNode->pRight);
61 62 63 64
  } else {
    ASSERT(false);
  }

65 66 67 68 69 70 71 72 73 74 75 76 77
  if (pJoinNode->pOnConditions != NULL && pJoinNode->node.pConditions != NULL) {
    pInfo->pCondAfterMerge = nodesMakeNode(QUERY_NODE_LOGIC_CONDITION);
    SLogicConditionNode* pLogicCond = (SLogicConditionNode*)(pInfo->pCondAfterMerge);
    pLogicCond->pParameterList = nodesMakeList();
    nodesListMakeAppend(&pLogicCond->pParameterList, nodesCloneNode(pJoinNode->pOnConditions));
    nodesListMakeAppend(&pLogicCond->pParameterList, nodesCloneNode(pJoinNode->node.pConditions));
    pLogicCond->condType = LOGIC_COND_TYPE_AND;
  } else if (pJoinNode->pOnConditions != NULL) {
    pInfo->pCondAfterMerge = nodesCloneNode(pJoinNode->pOnConditions);
  } else if (pJoinNode->node.pConditions != NULL) {
    pInfo->pCondAfterMerge = nodesCloneNode(pJoinNode->node.pConditions);
  } else {
    pInfo->pCondAfterMerge = NULL;
78 79
  }

80 81 82 83 84 85 86 87 88
  pInfo->inputTsOrder = TSDB_ORDER_ASC;
  if (pJoinNode->inputTsOrder == ORDER_ASC) {
    pInfo->inputTsOrder = TSDB_ORDER_ASC;
  } else if (pJoinNode->inputTsOrder == ORDER_DESC) {
    pInfo->inputTsOrder = TSDB_ORDER_DESC;
  }
  //TODO: remove this when JoinNode inputTsOrder is ready
  pInfo->inputTsOrder = TSDB_ORDER_ASC;

89 90 91 92 93 94 95 96 97
  pOperator->fpSet =
      createOperatorFpSet(operatorDummyOpenFn, doMergeJoin, NULL, NULL, destroyMergeJoinOperator, NULL, NULL, NULL);
  int32_t code = appendDownstream(pOperator, pDownstream, numOfDownstream);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  return pOperator;

98
_error:
99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
  return NULL;
}

void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode) {
  pColumn->slotId = pColumnNode->slotId;
  pColumn->type = pColumnNode->node.resType.type;
  pColumn->bytes = pColumnNode->node.resType.bytes;
  pColumn->precision = pColumnNode->node.resType.precision;
  pColumn->scale = pColumnNode->node.resType.scale;
}

void destroyMergeJoinOperator(void* param, int32_t numOfOutput) {
  SJoinOperatorInfo* pJoinOperator = (SJoinOperatorInfo*)param;
115
  nodesDestroyNode(pJoinOperator->pCondAfterMerge);
116

D
dapan1121 已提交
117
  taosMemoryFreeClear(param);
118
}
119 120 121

static void mergeJoinJoinLeftRight(struct SOperatorInfo* pOperator, SSDataBlock* pRes, int32_t currRow,
                            SSDataBlock* pLeftBlock, int32_t leftPos, SSDataBlock* pRightBlock, int32_t rightPos) {
122 123 124 125 126 127 128 129 130 131 132 133 134
  SJoinOperatorInfo* pJoinInfo = pOperator->info;

  for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
    SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, i);

    SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[i];

    int32_t blockId = pExprInfo->base.pParam[0].pCol->dataBlockId;
    int32_t slotId = pExprInfo->base.pParam[0].pCol->slotId;
    int32_t rowIndex = -1;

    SColumnInfoData* pSrc = NULL;
    if (pJoinInfo->pLeft->info.blockId == blockId) {
135 136
      pSrc = taosArrayGet(pLeftBlock->pDataBlock, slotId);
      rowIndex = leftPos;
137
    } else {
138 139
      pSrc = taosArrayGet(pRightBlock->pDataBlock, slotId);
      rowIndex = rightPos;
140
    }
141

142 143 144 145 146 147 148 149 150
    if (colDataIsNull_s(pSrc, rowIndex)) {
      colDataAppendNULL(pDst, currRow);
    } else {
      char* p = colDataGetData(pSrc, rowIndex);
      colDataAppend(pDst, currRow, p, false);
    }
  }

}
151

152 153
static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs, int64_t* pRightTs) {
  SJoinOperatorInfo* pJoinInfo = pOperator->info;
154

155 156 157
  if (pJoinInfo->pLeft == NULL || pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) {
    SOperatorInfo* ds1 = pOperator->pDownstream[0];
    pJoinInfo->pLeft = ds1->fpSet.getNextFn(ds1);
158

159 160 161 162
    pJoinInfo->leftPos = 0;
    if (pJoinInfo->pLeft == NULL) {
      setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
      return false;
163
    }
164
  }
165

166 167 168
  if (pJoinInfo->pRight == NULL || pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) {
    SOperatorInfo* ds2 = pOperator->pDownstream[1];
    pJoinInfo->pRight = ds2->fpSet.getNextFn(ds2);
169

170 171 172 173
    pJoinInfo->rightPos = 0;
    if (pJoinInfo->pRight == NULL) {
      setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
      return false;
174
    }
175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191
  }
  // only the timestamp match support for ordinary table
  SColumnInfoData* pLeftCol = taosArrayGet(pJoinInfo->pLeft->pDataBlock, pJoinInfo->leftCol.slotId);
  char*            pLeftVal = colDataGetData(pLeftCol, pJoinInfo->leftPos);
  *pLeftTs = *(int64_t*)pLeftVal;

  SColumnInfoData* pRightCol = taosArrayGet(pJoinInfo->pRight->pDataBlock, pJoinInfo->rightCol.slotId);
  char*            pRightVal = colDataGetData(pRightCol, pJoinInfo->rightPos);
  *pRightTs = *(int64_t*)pRightVal;

  ASSERT(pLeftCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
  ASSERT(pRightCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
  return true;
}

static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes) {
  SJoinOperatorInfo* pJoinInfo = pOperator->info;
192

193
  int32_t nrows = pRes->info.rows;
194

195
  bool asc = (pJoinInfo->inputTsOrder == TSDB_ORDER_ASC) ? true : false;
196

197 198 199 200 201 202 203 204 205 206 207
  while (1) {
    int64_t leftTs = 0;
    int64_t rightTs = 0;
    bool hasNextTs = mergeJoinGetNextTimestamp(pOperator, &leftTs, &rightTs);
    if (!hasNextTs) {
      break;
    }
    
    if (leftTs == rightTs) {
      mergeJoinJoinLeftRight(pOperator, pRes, nrows,
                      pJoinInfo->pLeft, pJoinInfo->leftPos, pJoinInfo->pRight, pJoinInfo->rightPos);
208 209 210 211
      pJoinInfo->leftPos += 1;
      pJoinInfo->rightPos += 1;

      nrows += 1;
212 213
    } else if (asc && leftTs < rightTs ||
               !asc && leftTs > rightTs) {
214 215 216 217 218
      pJoinInfo->leftPos += 1;

      if (pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) {
        continue;
      }
219 220
    } else if (asc && leftTs > rightTs ||
               !asc && leftTs < rightTs) {
221 222 223 224 225 226 227 228 229 230 231 232
      pJoinInfo->rightPos += 1;
      if (pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) {
        continue;
      }
    }

    // the pDataBlock are always the same one, no need to call this again
    pRes->info.rows = nrows;
    if (pRes->info.rows >= pOperator->resultInfo.threshold) {
      break;
    }
  }
233 234 235 236
}

SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) {
  SJoinOperatorInfo* pJoinInfo = pOperator->info;
237

238 239 240 241 242 243 244 245 246 247
  SSDataBlock* pRes = pJoinInfo->pRes;
  blockDataCleanup(pRes);
  blockDataEnsureCapacity(pRes, 4096);
  while (true) {
    int32_t numOfRowsBefore = pRes->info.rows;
    doMergeJoinImpl(pOperator, pRes);
    int32_t numOfNewRows = pRes->info.rows - numOfRowsBefore;
    if (numOfNewRows == 0) {
      break;
    }
248
    if (pJoinInfo->pCondAfterMerge != NULL) {
249
      doFilter(pJoinInfo->pCondAfterMerge, pRes, NULL);
250
    }
251 252 253 254
    if (pRes->info.rows >= pOperator->resultInfo.threshold) {
      break;
    }
  }
255 256
  return (pRes->info.rows > 0) ? pRes : NULL;
}
257

258
static void extractTimeCondition(SJoinOperatorInfo* pInfo, SLogicConditionNode* pLogicConditionNode) {
259 260
  int32_t len = LIST_LENGTH(pLogicConditionNode->pParameterList);

261
  for (int32_t i = 0; i < len; ++i) {
262 263 264 265 266 267 268 269 270
    SNode* pNode = nodesListGetNode(pLogicConditionNode->pParameterList, i);
    if (nodeType(pNode) == QUERY_NODE_OPERATOR) {
      SOperatorNode* pn1 = (SOperatorNode*)pNode;
      setJoinColumnInfo(&pInfo->leftCol, (SColumnNode*)pn1->pLeft);
      setJoinColumnInfo(&pInfo->rightCol, (SColumnNode*)pn1->pRight);
      break;
    }
  }
}