joinoperator.c 14.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "executorimpl.h"
17 18 19 20
#include "function.h"
#include "os.h"
#include "querynodes.h"
#include "tcompare.h"
21
#include "tdatablock.h"
22
#include "thash.h"
23
#include "tmsg.h"
24 25
#include "ttypes.h"

26
static void         setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode);
27
static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator);
28
static void         destroyMergeJoinOperator(void* param, int32_t numOfOutput);
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
static void         extractTimeCondition(SJoinOperatorInfo* pInfo, SOperatorInfo** pDownstream, int32_t numOfDownstream,
                                         SSortMergeJoinPhysiNode* pJoinNode);

static void extractTimeCondition(SJoinOperatorInfo* pInfo, SOperatorInfo** pDownstream, int32_t numOfDownstream, SSortMergeJoinPhysiNode* pJoinNode) {
  SNode* pMergeCondition = pJoinNode->pMergeCondition;
  if (nodeType(pMergeCondition) == QUERY_NODE_OPERATOR) {
    SOperatorNode* pNode = (SOperatorNode*)pMergeCondition;
    SColumnNode* col1 = (SColumnNode*)pNode->pLeft;
    SColumnNode* col2 = (SColumnNode*)pNode->pRight;
    SColumnNode* leftTsCol = NULL;
    SColumnNode* rightTsCol = NULL;
    if (col1->dataBlockId == pDownstream[0]->resultDataBlockId) {
      ASSERT(col2->dataBlockId == pDownstream[1]->resultDataBlockId);
      leftTsCol = col1;
      rightTsCol = col2;
    } else {
      ASSERT(col1->dataBlockId == pDownstream[1]->resultDataBlockId);
      ASSERT(col2->dataBlockId == pDownstream[0]->resultDataBlockId);
      leftTsCol = col2;
      rightTsCol = col1;
    }
    setJoinColumnInfo(&pInfo->leftCol, leftTsCol);
    setJoinColumnInfo(&pInfo->rightCol, rightTsCol);
  } else {
    ASSERT(false);
  }
}
56

57 58
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
                                           SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo) {
59 60 61 62 63 64
  SJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SJoinOperatorInfo));
  SOperatorInfo*     pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pOperator == NULL || pInfo == NULL) {
    goto _error;
  }

65
  SSDataBlock* pResBlock = createResDataBlock(pJoinNode->node.pOutputDataBlockDesc);
66

67
  int32_t    numOfCols = 0;
68 69
  SExprInfo* pExprInfo = createExprInfo(pJoinNode->pTargets, NULL, &numOfCols);

70
  initResultSizeInfo(&pOperator->resultInfo, 4096);
71

72 73
  pInfo->pRes = pResBlock;
  pOperator->name = "MergeJoinOperator";
74
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
75 76 77 78 79 80
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->exprSupp.pExprInfo = pExprInfo;
  pOperator->exprSupp.numOfExprs = numOfCols;
  pOperator->info = pInfo;
  pOperator->pTaskInfo = pTaskInfo;
81

82
  extractTimeCondition(pInfo, pDownstream, numOfDownstream, pJoinNode);
83

84 85 86 87 88 89 90 91 92 93 94 95 96
  if (pJoinNode->pOnConditions != NULL && pJoinNode->node.pConditions != NULL) {
    pInfo->pCondAfterMerge = nodesMakeNode(QUERY_NODE_LOGIC_CONDITION);
    SLogicConditionNode* pLogicCond = (SLogicConditionNode*)(pInfo->pCondAfterMerge);
    pLogicCond->pParameterList = nodesMakeList();
    nodesListMakeAppend(&pLogicCond->pParameterList, nodesCloneNode(pJoinNode->pOnConditions));
    nodesListMakeAppend(&pLogicCond->pParameterList, nodesCloneNode(pJoinNode->node.pConditions));
    pLogicCond->condType = LOGIC_COND_TYPE_AND;
  } else if (pJoinNode->pOnConditions != NULL) {
    pInfo->pCondAfterMerge = nodesCloneNode(pJoinNode->pOnConditions);
  } else if (pJoinNode->node.pConditions != NULL) {
    pInfo->pCondAfterMerge = nodesCloneNode(pJoinNode->node.pConditions);
  } else {
    pInfo->pCondAfterMerge = NULL;
97 98
  }

99
  pInfo->inputOrder = TSDB_ORDER_ASC;
100
  if (pJoinNode->inputTsOrder == ORDER_ASC) {
101
    pInfo->inputOrder = TSDB_ORDER_ASC;
102
  } else if (pJoinNode->inputTsOrder == ORDER_DESC) {
103
    pInfo->inputOrder = TSDB_ORDER_DESC;
104 105
  }

106 107 108 109 110 111 112 113 114
  pOperator->fpSet =
      createOperatorFpSet(operatorDummyOpenFn, doMergeJoin, NULL, NULL, destroyMergeJoinOperator, NULL, NULL, NULL);
  int32_t code = appendDownstream(pOperator, pDownstream, numOfDownstream);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  return pOperator;

115
_error:
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
  return NULL;
}

void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode) {
  pColumn->slotId = pColumnNode->slotId;
  pColumn->type = pColumnNode->node.resType.type;
  pColumn->bytes = pColumnNode->node.resType.bytes;
  pColumn->precision = pColumnNode->node.resType.precision;
  pColumn->scale = pColumnNode->node.resType.scale;
}

void destroyMergeJoinOperator(void* param, int32_t numOfOutput) {
  SJoinOperatorInfo* pJoinOperator = (SJoinOperatorInfo*)param;
132
  nodesDestroyNode(pJoinOperator->pCondAfterMerge);
133

D
dapan1121 已提交
134
  taosMemoryFreeClear(param);
135
}
136 137

static void mergeJoinJoinLeftRight(struct SOperatorInfo* pOperator, SSDataBlock* pRes, int32_t currRow,
138 139
                                   SSDataBlock* pLeftBlock, int32_t leftPos, SSDataBlock* pRightBlock,
                                   int32_t rightPos) {
140 141 142 143 144 145 146 147 148 149 150 151
  SJoinOperatorInfo* pJoinInfo = pOperator->info;

  for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
    SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, i);

    SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[i];

    int32_t blockId = pExprInfo->base.pParam[0].pCol->dataBlockId;
    int32_t slotId = pExprInfo->base.pParam[0].pCol->slotId;
    int32_t rowIndex = -1;

    SColumnInfoData* pSrc = NULL;
152
    if (pLeftBlock->info.blockId == blockId) {
153 154
      pSrc = taosArrayGet(pLeftBlock->pDataBlock, slotId);
      rowIndex = leftPos;
155
    } else {
156 157
      pSrc = taosArrayGet(pRightBlock->pDataBlock, slotId);
      rowIndex = rightPos;
158
    }
159

160 161 162 163 164 165 166 167
    if (colDataIsNull_s(pSrc, rowIndex)) {
      colDataAppendNULL(pDst, currRow);
    } else {
      char* p = colDataGetData(pSrc, rowIndex);
      colDataAppend(pDst, currRow, p, false);
    }
  }
}
168
typedef struct SRowLocation {
169 170
  SSDataBlock* pDataBlock;
  int32_t      pos;
171 172
} SRowLocation;

173
// pBlock[tsSlotId][startPos, endPos) == timestamp,
S
shenglian zhou 已提交
174
static int32_t mergeJoinGetBlockRowsEqualTs(SSDataBlock* pBlock, int16_t tsSlotId, int32_t startPos, int64_t timestamp,
175
                                            int32_t* pEndPos, SArray* rowLocations, SArray* createdBlocks) {
176 177
  int32_t numRows = pBlock->info.rows;
  ASSERT(startPos < numRows);
S
shenglian zhou 已提交
178
  SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, tsSlotId);
179 180

  int32_t i = startPos;
181
  for (; i < numRows; ++i) {
182
    char* pNextVal = colDataGetData(pCol, i);
183
    if (timestamp != *(int64_t*)pNextVal) {
184 185 186 187
      break;
    }
  }
  int32_t endPos = i;
S
shenglian zhou 已提交
188
  *pEndPos = endPos;
189

190 191 192 193
  if (endPos - startPos == 0) {
    return 0;
  }

194
  SSDataBlock* block = pBlock;
195
  bool createdNewBlock = false;
S
shenglian zhou 已提交
196
  if (endPos == numRows) {
197
    block = blockDataExtractBlock(pBlock, startPos, endPos-startPos);
S
shenglian zhou 已提交
198
    taosArrayPush(createdBlocks, &block);
199
    createdNewBlock = true;
200
  }
201
  SRowLocation location = {0};
202 203
  for (int32_t j = startPos; j < endPos; ++j) {
    location.pDataBlock = block;
204 205
    location.pos = ( createdNewBlock ? j - startPos : j);
    taosArrayPush(rowLocations, &location);
206 207 208
  }
  return 0;
}
209

210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236
// whichChild == 0, left child of join; whichChild ==1, right child of join
static int32_t mergeJoinGetDownStreamRowsEqualTimeStamp(SOperatorInfo* pOperator, int32_t whichChild, int16_t tsSlotId,
                                                        SSDataBlock* startDataBlock, int32_t startPos,
                                                        int64_t timestamp, SArray* rowLocations,
                                                        SArray* createdBlocks) {
  ASSERT(whichChild == 0 || whichChild == 1);

  SJoinOperatorInfo* pJoinInfo = pOperator->info;
  int32_t            endPos = -1;
  SSDataBlock*       dataBlock = startDataBlock;
  mergeJoinGetBlockRowsEqualTs(dataBlock, tsSlotId, startPos, timestamp, &endPos, rowLocations, createdBlocks);
  while (endPos == dataBlock->info.rows) {
    SOperatorInfo* ds = pOperator->pDownstream[whichChild];
    dataBlock = ds->fpSet.getNextFn(ds);
    if (whichChild == 0) {
      pJoinInfo->leftPos = 0;
      pJoinInfo->pLeft = dataBlock;
    } else if (whichChild == 1) {
      pJoinInfo->rightPos = 0;
      pJoinInfo->pRight = dataBlock;
    }

    if (dataBlock == NULL) {
      setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
      endPos = -1;
      break;
    }
237

238 239 240 241 242 243 244 245 246
    mergeJoinGetBlockRowsEqualTs(dataBlock, tsSlotId, 0, timestamp, &endPos, rowLocations, createdBlocks);
  }
  if (endPos != -1) {
    if (whichChild == 0) {
      pJoinInfo->leftPos = endPos;
    } else if (whichChild == 1) {
      pJoinInfo->rightPos = endPos;
    }
  }
247 248 249
  return 0;
}

250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290
static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t timestamp, SSDataBlock* pRes,
                                               int32_t* nRows) {
  SJoinOperatorInfo* pJoinInfo = pOperator->info;
  SArray*            leftRowLocations = taosArrayInit(8, sizeof(SRowLocation));
  SArray*            leftCreatedBlocks = taosArrayInit(8, POINTER_BYTES);

  SArray* rightRowLocations = taosArrayInit(8, sizeof(SRowLocation));
  SArray* rightCreatedBlocks = taosArrayInit(8, POINTER_BYTES);

  mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 0, pJoinInfo->leftCol.slotId, pJoinInfo->pLeft,
                                           pJoinInfo->leftPos, timestamp, leftRowLocations, leftCreatedBlocks);
  mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 1, pJoinInfo->rightCol.slotId, pJoinInfo->pRight,
                                           pJoinInfo->rightPos, timestamp, rightRowLocations, rightCreatedBlocks);

  size_t leftNumJoin = taosArrayGetSize(leftRowLocations);
  size_t rightNumJoin = taosArrayGetSize(rightRowLocations);
  for (int32_t i = 0; i < leftNumJoin; ++i) {
    for (int32_t j = 0; j < rightNumJoin; ++j) {
      SRowLocation* leftRow = taosArrayGet(leftRowLocations, i);
      SRowLocation* rightRow = taosArrayGet(rightRowLocations, j);
      mergeJoinJoinLeftRight(pOperator, pRes, *nRows, leftRow->pDataBlock, leftRow->pos, rightRow->pDataBlock,
                             rightRow->pos);
      ++*nRows;
    }
  }

  for (int i = 0; i < taosArrayGetSize(rightCreatedBlocks); ++i) {
    SSDataBlock* pBlock = taosArrayGetP(rightCreatedBlocks, i);
    blockDataDestroy(pBlock);
  }
  taosArrayDestroy(rightCreatedBlocks);
  taosArrayDestroy(rightRowLocations);
  for (int i = 0; i < taosArrayGetSize(leftCreatedBlocks); ++i) {
    SSDataBlock* pBlock = taosArrayGetP(leftCreatedBlocks, i);
    blockDataDestroy(pBlock);
  }
  taosArrayDestroy(leftCreatedBlocks);
  taosArrayDestroy(leftRowLocations);
  return TSDB_CODE_SUCCESS;
}

291 292
static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs, int64_t* pRightTs) {
  SJoinOperatorInfo* pJoinInfo = pOperator->info;
293

294 295 296
  if (pJoinInfo->pLeft == NULL || pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) {
    SOperatorInfo* ds1 = pOperator->pDownstream[0];
    pJoinInfo->pLeft = ds1->fpSet.getNextFn(ds1);
297

298 299 300 301
    pJoinInfo->leftPos = 0;
    if (pJoinInfo->pLeft == NULL) {
      setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
      return false;
302
    }
303
  }
304

305 306 307
  if (pJoinInfo->pRight == NULL || pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) {
    SOperatorInfo* ds2 = pOperator->pDownstream[1];
    pJoinInfo->pRight = ds2->fpSet.getNextFn(ds2);
308

309 310 311 312
    pJoinInfo->rightPos = 0;
    if (pJoinInfo->pRight == NULL) {
      setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
      return false;
313
    }
314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330
  }
  // only the timestamp match support for ordinary table
  SColumnInfoData* pLeftCol = taosArrayGet(pJoinInfo->pLeft->pDataBlock, pJoinInfo->leftCol.slotId);
  char*            pLeftVal = colDataGetData(pLeftCol, pJoinInfo->leftPos);
  *pLeftTs = *(int64_t*)pLeftVal;

  SColumnInfoData* pRightCol = taosArrayGet(pJoinInfo->pRight->pDataBlock, pJoinInfo->rightCol.slotId);
  char*            pRightVal = colDataGetData(pRightCol, pJoinInfo->rightPos);
  *pRightTs = *(int64_t*)pRightVal;

  ASSERT(pLeftCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
  ASSERT(pRightCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
  return true;
}

static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes) {
  SJoinOperatorInfo* pJoinInfo = pOperator->info;
331

332
  int32_t nrows = pRes->info.rows;
333

334
  bool asc = (pJoinInfo->inputOrder == TSDB_ORDER_ASC) ? true : false;
335

336 337 338
  while (1) {
    int64_t leftTs = 0;
    int64_t rightTs = 0;
339
    bool    hasNextTs = mergeJoinGetNextTimestamp(pOperator, &leftTs, &rightTs);
340 341 342
    if (!hasNextTs) {
      break;
    }
343

344
    if (leftTs == rightTs) {
345
      mergeJoinJoinDownstreamTsRanges(pOperator, leftTs, pRes, &nrows);
346
    } else if (asc && leftTs < rightTs || !asc && leftTs > rightTs) {
347 348 349 350 351
      pJoinInfo->leftPos += 1;

      if (pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) {
        continue;
      }
352
    } else if (asc && leftTs > rightTs || !asc && leftTs < rightTs) {
353 354 355 356 357 358 359 360 361 362 363 364
      pJoinInfo->rightPos += 1;
      if (pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) {
        continue;
      }
    }

    // the pDataBlock are always the same one, no need to call this again
    pRes->info.rows = nrows;
    if (pRes->info.rows >= pOperator->resultInfo.threshold) {
      break;
    }
  }
365 366 367 368
}

SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) {
  SJoinOperatorInfo* pJoinInfo = pOperator->info;
369

370 371 372 373 374 375 376 377 378 379
  SSDataBlock* pRes = pJoinInfo->pRes;
  blockDataCleanup(pRes);
  blockDataEnsureCapacity(pRes, 4096);
  while (true) {
    int32_t numOfRowsBefore = pRes->info.rows;
    doMergeJoinImpl(pOperator, pRes);
    int32_t numOfNewRows = pRes->info.rows - numOfRowsBefore;
    if (numOfNewRows == 0) {
      break;
    }
380
    if (pJoinInfo->pCondAfterMerge != NULL) {
381
      doFilter(pJoinInfo->pCondAfterMerge, pRes, NULL);
382
    }
383 384 385 386
    if (pRes->info.rows >= pOperator->resultInfo.threshold) {
      break;
    }
  }
387 388
  return (pRes->info.rows > 0) ? pRes : NULL;
}