joinoperator.c 14.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "executorimpl.h"
17 18 19 20
#include "function.h"
#include "os.h"
#include "querynodes.h"
#include "tcompare.h"
21
#include "tdatablock.h"
22
#include "thash.h"
23
#include "tmsg.h"
24 25
#include "ttypes.h"

26
static void         setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode);
27
static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator);
28
static void         destroyMergeJoinOperator(void* param, int32_t numOfOutput);
29 30 31
static void         extractTimeCondition(SJoinOperatorInfo* pInfo, SOperatorInfo** pDownstream, int32_t numOfDownstream,
                                         SSortMergeJoinPhysiNode* pJoinNode);

S
slzhou 已提交
32 33
static void extractTimeCondition(SJoinOperatorInfo* pInfo, SOperatorInfo** pDownstream, int32_t numOfDownstream,
                                 SSortMergeJoinPhysiNode* pJoinNode) {
34 35 36
  SNode* pMergeCondition = pJoinNode->pMergeCondition;
  if (nodeType(pMergeCondition) == QUERY_NODE_OPERATOR) {
    SOperatorNode* pNode = (SOperatorNode*)pMergeCondition;
S
slzhou 已提交
37 38 39 40
    SColumnNode*   col1 = (SColumnNode*)pNode->pLeft;
    SColumnNode*   col2 = (SColumnNode*)pNode->pRight;
    SColumnNode*   leftTsCol = NULL;
    SColumnNode*   rightTsCol = NULL;
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
    if (col1->dataBlockId == pDownstream[0]->resultDataBlockId) {
      ASSERT(col2->dataBlockId == pDownstream[1]->resultDataBlockId);
      leftTsCol = col1;
      rightTsCol = col2;
    } else {
      ASSERT(col1->dataBlockId == pDownstream[1]->resultDataBlockId);
      ASSERT(col2->dataBlockId == pDownstream[0]->resultDataBlockId);
      leftTsCol = col2;
      rightTsCol = col1;
    }
    setJoinColumnInfo(&pInfo->leftCol, leftTsCol);
    setJoinColumnInfo(&pInfo->rightCol, rightTsCol);
  } else {
    ASSERT(false);
  }
}
57

58 59
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
                                           SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo) {
60 61 62 63 64 65
  SJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SJoinOperatorInfo));
  SOperatorInfo*     pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pOperator == NULL || pInfo == NULL) {
    goto _error;
  }

66
  SSDataBlock* pResBlock = createResDataBlock(pJoinNode->node.pOutputDataBlockDesc);
67

68
  int32_t    numOfCols = 0;
69 70
  SExprInfo* pExprInfo = createExprInfo(pJoinNode->pTargets, NULL, &numOfCols);

71
  initResultSizeInfo(&pOperator->resultInfo, 4096);
72

73 74
  pInfo->pRes = pResBlock;
  pOperator->name = "MergeJoinOperator";
75
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
76 77 78 79 80 81
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->exprSupp.pExprInfo = pExprInfo;
  pOperator->exprSupp.numOfExprs = numOfCols;
  pOperator->info = pInfo;
  pOperator->pTaskInfo = pTaskInfo;
82

83
  extractTimeCondition(pInfo, pDownstream, numOfDownstream, pJoinNode);
84

85 86 87 88 89 90 91 92 93 94 95 96 97
  if (pJoinNode->pOnConditions != NULL && pJoinNode->node.pConditions != NULL) {
    pInfo->pCondAfterMerge = nodesMakeNode(QUERY_NODE_LOGIC_CONDITION);
    SLogicConditionNode* pLogicCond = (SLogicConditionNode*)(pInfo->pCondAfterMerge);
    pLogicCond->pParameterList = nodesMakeList();
    nodesListMakeAppend(&pLogicCond->pParameterList, nodesCloneNode(pJoinNode->pOnConditions));
    nodesListMakeAppend(&pLogicCond->pParameterList, nodesCloneNode(pJoinNode->node.pConditions));
    pLogicCond->condType = LOGIC_COND_TYPE_AND;
  } else if (pJoinNode->pOnConditions != NULL) {
    pInfo->pCondAfterMerge = nodesCloneNode(pJoinNode->pOnConditions);
  } else if (pJoinNode->node.pConditions != NULL) {
    pInfo->pCondAfterMerge = nodesCloneNode(pJoinNode->node.pConditions);
  } else {
    pInfo->pCondAfterMerge = NULL;
98 99
  }

100
  pInfo->inputOrder = TSDB_ORDER_ASC;
101
  if (pJoinNode->inputTsOrder == ORDER_ASC) {
102
    pInfo->inputOrder = TSDB_ORDER_ASC;
103
  } else if (pJoinNode->inputTsOrder == ORDER_DESC) {
104
    pInfo->inputOrder = TSDB_ORDER_DESC;
105 106
  }

107 108 109 110 111 112 113 114 115
  pOperator->fpSet =
      createOperatorFpSet(operatorDummyOpenFn, doMergeJoin, NULL, NULL, destroyMergeJoinOperator, NULL, NULL, NULL);
  int32_t code = appendDownstream(pOperator, pDownstream, numOfDownstream);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  return pOperator;

116
_error:
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
  return NULL;
}

void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode) {
  pColumn->slotId = pColumnNode->slotId;
  pColumn->type = pColumnNode->node.resType.type;
  pColumn->bytes = pColumnNode->node.resType.bytes;
  pColumn->precision = pColumnNode->node.resType.precision;
  pColumn->scale = pColumnNode->node.resType.scale;
}

void destroyMergeJoinOperator(void* param, int32_t numOfOutput) {
  SJoinOperatorInfo* pJoinOperator = (SJoinOperatorInfo*)param;
133
  nodesDestroyNode(pJoinOperator->pCondAfterMerge);
134

D
dapan1121 已提交
135
  taosMemoryFreeClear(param);
136
}
137 138

static void mergeJoinJoinLeftRight(struct SOperatorInfo* pOperator, SSDataBlock* pRes, int32_t currRow,
139 140
                                   SSDataBlock* pLeftBlock, int32_t leftPos, SSDataBlock* pRightBlock,
                                   int32_t rightPos) {
141 142 143 144 145 146 147 148 149 150 151 152
  SJoinOperatorInfo* pJoinInfo = pOperator->info;

  for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
    SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, i);

    SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[i];

    int32_t blockId = pExprInfo->base.pParam[0].pCol->dataBlockId;
    int32_t slotId = pExprInfo->base.pParam[0].pCol->slotId;
    int32_t rowIndex = -1;

    SColumnInfoData* pSrc = NULL;
153
    if (pLeftBlock->info.blockId == blockId) {
154 155
      pSrc = taosArrayGet(pLeftBlock->pDataBlock, slotId);
      rowIndex = leftPos;
156
    } else {
157 158
      pSrc = taosArrayGet(pRightBlock->pDataBlock, slotId);
      rowIndex = rightPos;
159
    }
160

161 162 163 164 165 166 167 168
    if (colDataIsNull_s(pSrc, rowIndex)) {
      colDataAppendNULL(pDst, currRow);
    } else {
      char* p = colDataGetData(pSrc, rowIndex);
      colDataAppend(pDst, currRow, p, false);
    }
  }
}
169
typedef struct SRowLocation {
170 171
  SSDataBlock* pDataBlock;
  int32_t      pos;
172 173
} SRowLocation;

174
// pBlock[tsSlotId][startPos, endPos) == timestamp,
S
shenglian zhou 已提交
175
static int32_t mergeJoinGetBlockRowsEqualTs(SSDataBlock* pBlock, int16_t tsSlotId, int32_t startPos, int64_t timestamp,
176
                                            int32_t* pEndPos, SArray* rowLocations, SArray* createdBlocks) {
177 178
  int32_t numRows = pBlock->info.rows;
  ASSERT(startPos < numRows);
S
shenglian zhou 已提交
179
  SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, tsSlotId);
180 181

  int32_t i = startPos;
182
  for (; i < numRows; ++i) {
183
    char* pNextVal = colDataGetData(pCol, i);
184
    if (timestamp != *(int64_t*)pNextVal) {
185 186 187 188
      break;
    }
  }
  int32_t endPos = i;
S
shenglian zhou 已提交
189
  *pEndPos = endPos;
190

191 192 193 194
  if (endPos - startPos == 0) {
    return 0;
  }

195
  SSDataBlock* block = pBlock;
S
slzhou 已提交
196
  bool         createdNewBlock = false;
S
shenglian zhou 已提交
197
  if (endPos == numRows) {
S
slzhou 已提交
198
    block = blockDataExtractBlock(pBlock, startPos, endPos - startPos);
S
shenglian zhou 已提交
199
    taosArrayPush(createdBlocks, &block);
200
    createdNewBlock = true;
201
  }
202
  SRowLocation location = {0};
203 204
  for (int32_t j = startPos; j < endPos; ++j) {
    location.pDataBlock = block;
S
slzhou 已提交
205
    location.pos = (createdNewBlock ? j - startPos : j);
206
    taosArrayPush(rowLocations, &location);
207 208 209
  }
  return 0;
}
210

211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237
// whichChild == 0, left child of join; whichChild ==1, right child of join
static int32_t mergeJoinGetDownStreamRowsEqualTimeStamp(SOperatorInfo* pOperator, int32_t whichChild, int16_t tsSlotId,
                                                        SSDataBlock* startDataBlock, int32_t startPos,
                                                        int64_t timestamp, SArray* rowLocations,
                                                        SArray* createdBlocks) {
  ASSERT(whichChild == 0 || whichChild == 1);

  SJoinOperatorInfo* pJoinInfo = pOperator->info;
  int32_t            endPos = -1;
  SSDataBlock*       dataBlock = startDataBlock;
  mergeJoinGetBlockRowsEqualTs(dataBlock, tsSlotId, startPos, timestamp, &endPos, rowLocations, createdBlocks);
  while (endPos == dataBlock->info.rows) {
    SOperatorInfo* ds = pOperator->pDownstream[whichChild];
    dataBlock = ds->fpSet.getNextFn(ds);
    if (whichChild == 0) {
      pJoinInfo->leftPos = 0;
      pJoinInfo->pLeft = dataBlock;
    } else if (whichChild == 1) {
      pJoinInfo->rightPos = 0;
      pJoinInfo->pRight = dataBlock;
    }

    if (dataBlock == NULL) {
      setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
      endPos = -1;
      break;
    }
238

239 240 241 242 243 244 245 246 247
    mergeJoinGetBlockRowsEqualTs(dataBlock, tsSlotId, 0, timestamp, &endPos, rowLocations, createdBlocks);
  }
  if (endPos != -1) {
    if (whichChild == 0) {
      pJoinInfo->leftPos = endPos;
    } else if (whichChild == 1) {
      pJoinInfo->rightPos = endPos;
    }
  }
248 249 250
  return 0;
}

251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291
static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t timestamp, SSDataBlock* pRes,
                                               int32_t* nRows) {
  SJoinOperatorInfo* pJoinInfo = pOperator->info;
  SArray*            leftRowLocations = taosArrayInit(8, sizeof(SRowLocation));
  SArray*            leftCreatedBlocks = taosArrayInit(8, POINTER_BYTES);

  SArray* rightRowLocations = taosArrayInit(8, sizeof(SRowLocation));
  SArray* rightCreatedBlocks = taosArrayInit(8, POINTER_BYTES);

  mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 0, pJoinInfo->leftCol.slotId, pJoinInfo->pLeft,
                                           pJoinInfo->leftPos, timestamp, leftRowLocations, leftCreatedBlocks);
  mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 1, pJoinInfo->rightCol.slotId, pJoinInfo->pRight,
                                           pJoinInfo->rightPos, timestamp, rightRowLocations, rightCreatedBlocks);

  size_t leftNumJoin = taosArrayGetSize(leftRowLocations);
  size_t rightNumJoin = taosArrayGetSize(rightRowLocations);
  for (int32_t i = 0; i < leftNumJoin; ++i) {
    for (int32_t j = 0; j < rightNumJoin; ++j) {
      SRowLocation* leftRow = taosArrayGet(leftRowLocations, i);
      SRowLocation* rightRow = taosArrayGet(rightRowLocations, j);
      mergeJoinJoinLeftRight(pOperator, pRes, *nRows, leftRow->pDataBlock, leftRow->pos, rightRow->pDataBlock,
                             rightRow->pos);
      ++*nRows;
    }
  }

  for (int i = 0; i < taosArrayGetSize(rightCreatedBlocks); ++i) {
    SSDataBlock* pBlock = taosArrayGetP(rightCreatedBlocks, i);
    blockDataDestroy(pBlock);
  }
  taosArrayDestroy(rightCreatedBlocks);
  taosArrayDestroy(rightRowLocations);
  for (int i = 0; i < taosArrayGetSize(leftCreatedBlocks); ++i) {
    SSDataBlock* pBlock = taosArrayGetP(leftCreatedBlocks, i);
    blockDataDestroy(pBlock);
  }
  taosArrayDestroy(leftCreatedBlocks);
  taosArrayDestroy(leftRowLocations);
  return TSDB_CODE_SUCCESS;
}

292 293
static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs, int64_t* pRightTs) {
  SJoinOperatorInfo* pJoinInfo = pOperator->info;
294

295 296 297
  if (pJoinInfo->pLeft == NULL || pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) {
    SOperatorInfo* ds1 = pOperator->pDownstream[0];
    pJoinInfo->pLeft = ds1->fpSet.getNextFn(ds1);
298

299 300 301 302
    pJoinInfo->leftPos = 0;
    if (pJoinInfo->pLeft == NULL) {
      setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
      return false;
303
    }
304
  }
305

306 307 308
  if (pJoinInfo->pRight == NULL || pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) {
    SOperatorInfo* ds2 = pOperator->pDownstream[1];
    pJoinInfo->pRight = ds2->fpSet.getNextFn(ds2);
309

310 311 312 313
    pJoinInfo->rightPos = 0;
    if (pJoinInfo->pRight == NULL) {
      setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
      return false;
314
    }
315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331
  }
  // only the timestamp match support for ordinary table
  SColumnInfoData* pLeftCol = taosArrayGet(pJoinInfo->pLeft->pDataBlock, pJoinInfo->leftCol.slotId);
  char*            pLeftVal = colDataGetData(pLeftCol, pJoinInfo->leftPos);
  *pLeftTs = *(int64_t*)pLeftVal;

  SColumnInfoData* pRightCol = taosArrayGet(pJoinInfo->pRight->pDataBlock, pJoinInfo->rightCol.slotId);
  char*            pRightVal = colDataGetData(pRightCol, pJoinInfo->rightPos);
  *pRightTs = *(int64_t*)pRightVal;

  ASSERT(pLeftCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
  ASSERT(pRightCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
  return true;
}

static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes) {
  SJoinOperatorInfo* pJoinInfo = pOperator->info;
332

333
  int32_t nrows = pRes->info.rows;
334

335
  bool asc = (pJoinInfo->inputOrder == TSDB_ORDER_ASC) ? true : false;
336

337 338 339
  while (1) {
    int64_t leftTs = 0;
    int64_t rightTs = 0;
340
    bool    hasNextTs = mergeJoinGetNextTimestamp(pOperator, &leftTs, &rightTs);
341 342 343
    if (!hasNextTs) {
      break;
    }
344

345
    if (leftTs == rightTs) {
346
      mergeJoinJoinDownstreamTsRanges(pOperator, leftTs, pRes, &nrows);
347
    } else if (asc && leftTs < rightTs || !asc && leftTs > rightTs) {
348 349 350 351 352
      pJoinInfo->leftPos += 1;

      if (pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) {
        continue;
      }
353
    } else if (asc && leftTs > rightTs || !asc && leftTs < rightTs) {
354 355 356 357 358 359 360 361 362 363 364 365
      pJoinInfo->rightPos += 1;
      if (pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) {
        continue;
      }
    }

    // the pDataBlock are always the same one, no need to call this again
    pRes->info.rows = nrows;
    if (pRes->info.rows >= pOperator->resultInfo.threshold) {
      break;
    }
  }
366 367 368 369
}

SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) {
  SJoinOperatorInfo* pJoinInfo = pOperator->info;
370

371 372 373 374 375 376 377 378 379 380
  SSDataBlock* pRes = pJoinInfo->pRes;
  blockDataCleanup(pRes);
  blockDataEnsureCapacity(pRes, 4096);
  while (true) {
    int32_t numOfRowsBefore = pRes->info.rows;
    doMergeJoinImpl(pOperator, pRes);
    int32_t numOfNewRows = pRes->info.rows - numOfRowsBefore;
    if (numOfNewRows == 0) {
      break;
    }
381
    if (pJoinInfo->pCondAfterMerge != NULL) {
382
      doFilter(pJoinInfo->pCondAfterMerge, pRes, NULL);
383
    }
384 385 386 387
    if (pRes->info.rows >= pOperator->resultInfo.threshold) {
      break;
    }
  }
388 389
  return (pRes->info.rows > 0) ? pRes : NULL;
}