joinoperator.c 14.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "executorimpl.h"
17 18 19 20
#include "function.h"
#include "os.h"
#include "querynodes.h"
#include "tcompare.h"
21
#include "tdatablock.h"
22
#include "thash.h"
23
#include "tmsg.h"
24 25
#include "ttypes.h"

26
static void         setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode);
27
static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator);
28
static void         destroyMergeJoinOperator(void* param, int32_t numOfOutput);
29 30 31
static void         extractTimeCondition(SJoinOperatorInfo* pInfo, SOperatorInfo** pDownstream, int32_t numOfDownstream,
                                         SSortMergeJoinPhysiNode* pJoinNode);

S
slzhou 已提交
32 33
static void extractTimeCondition(SJoinOperatorInfo* pInfo, SOperatorInfo** pDownstream, int32_t numOfDownstream,
                                 SSortMergeJoinPhysiNode* pJoinNode) {
34 35 36
  SNode* pMergeCondition = pJoinNode->pMergeCondition;
  if (nodeType(pMergeCondition) == QUERY_NODE_OPERATOR) {
    SOperatorNode* pNode = (SOperatorNode*)pMergeCondition;
S
slzhou 已提交
37 38 39 40
    SColumnNode*   col1 = (SColumnNode*)pNode->pLeft;
    SColumnNode*   col2 = (SColumnNode*)pNode->pRight;
    SColumnNode*   leftTsCol = NULL;
    SColumnNode*   rightTsCol = NULL;
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
    if (col1->dataBlockId == pDownstream[0]->resultDataBlockId) {
      ASSERT(col2->dataBlockId == pDownstream[1]->resultDataBlockId);
      leftTsCol = col1;
      rightTsCol = col2;
    } else {
      ASSERT(col1->dataBlockId == pDownstream[1]->resultDataBlockId);
      ASSERT(col2->dataBlockId == pDownstream[0]->resultDataBlockId);
      leftTsCol = col2;
      rightTsCol = col1;
    }
    setJoinColumnInfo(&pInfo->leftCol, leftTsCol);
    setJoinColumnInfo(&pInfo->rightCol, rightTsCol);
  } else {
    ASSERT(false);
  }
}
57

58 59
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
                                           SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo) {
60 61 62 63 64 65
  SJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SJoinOperatorInfo));
  SOperatorInfo*     pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pOperator == NULL || pInfo == NULL) {
    goto _error;
  }

66
  SSDataBlock* pResBlock = createResDataBlock(pJoinNode->node.pOutputDataBlockDesc);
67

68
  int32_t    numOfCols = 0;
69 70
  SExprInfo* pExprInfo = createExprInfo(pJoinNode->pTargets, NULL, &numOfCols);

71
  initResultSizeInfo(&pOperator->resultInfo, 4096);
72

73 74
  pInfo->pRes = pResBlock;
  pOperator->name = "MergeJoinOperator";
75
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
76 77 78 79 80 81
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->exprSupp.pExprInfo = pExprInfo;
  pOperator->exprSupp.numOfExprs = numOfCols;
  pOperator->info = pInfo;
  pOperator->pTaskInfo = pTaskInfo;
82

83
  extractTimeCondition(pInfo, pDownstream, numOfDownstream, pJoinNode);
84

85 86 87 88 89 90 91 92 93 94 95 96 97
  if (pJoinNode->pOnConditions != NULL && pJoinNode->node.pConditions != NULL) {
    pInfo->pCondAfterMerge = nodesMakeNode(QUERY_NODE_LOGIC_CONDITION);
    SLogicConditionNode* pLogicCond = (SLogicConditionNode*)(pInfo->pCondAfterMerge);
    pLogicCond->pParameterList = nodesMakeList();
    nodesListMakeAppend(&pLogicCond->pParameterList, nodesCloneNode(pJoinNode->pOnConditions));
    nodesListMakeAppend(&pLogicCond->pParameterList, nodesCloneNode(pJoinNode->node.pConditions));
    pLogicCond->condType = LOGIC_COND_TYPE_AND;
  } else if (pJoinNode->pOnConditions != NULL) {
    pInfo->pCondAfterMerge = nodesCloneNode(pJoinNode->pOnConditions);
  } else if (pJoinNode->node.pConditions != NULL) {
    pInfo->pCondAfterMerge = nodesCloneNode(pJoinNode->node.pConditions);
  } else {
    pInfo->pCondAfterMerge = NULL;
98 99
  }

100
  pInfo->inputOrder = TSDB_ORDER_ASC;
101
  if (pJoinNode->inputTsOrder == ORDER_ASC) {
102
    pInfo->inputOrder = TSDB_ORDER_ASC;
103
  } else if (pJoinNode->inputTsOrder == ORDER_DESC) {
104
    pInfo->inputOrder = TSDB_ORDER_DESC;
105 106
  }

107 108 109 110 111 112 113 114 115
  pOperator->fpSet =
      createOperatorFpSet(operatorDummyOpenFn, doMergeJoin, NULL, NULL, destroyMergeJoinOperator, NULL, NULL, NULL);
  int32_t code = appendDownstream(pOperator, pDownstream, numOfDownstream);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  return pOperator;

116
_error:
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
  return NULL;
}

void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode) {
  pColumn->slotId = pColumnNode->slotId;
  pColumn->type = pColumnNode->node.resType.type;
  pColumn->bytes = pColumnNode->node.resType.bytes;
  pColumn->precision = pColumnNode->node.resType.precision;
  pColumn->scale = pColumnNode->node.resType.scale;
}

void destroyMergeJoinOperator(void* param, int32_t numOfOutput) {
  SJoinOperatorInfo* pJoinOperator = (SJoinOperatorInfo*)param;
133
  nodesDestroyNode(pJoinOperator->pCondAfterMerge);
134

D
dapan1121 已提交
135
  taosMemoryFreeClear(param);
136
}
137 138

static void mergeJoinJoinLeftRight(struct SOperatorInfo* pOperator, SSDataBlock* pRes, int32_t currRow,
139 140
                                   SSDataBlock* pLeftBlock, int32_t leftPos, SSDataBlock* pRightBlock,
                                   int32_t rightPos) {
141 142 143 144 145 146 147 148 149 150 151 152
  SJoinOperatorInfo* pJoinInfo = pOperator->info;

  for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
    SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, i);

    SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[i];

    int32_t blockId = pExprInfo->base.pParam[0].pCol->dataBlockId;
    int32_t slotId = pExprInfo->base.pParam[0].pCol->slotId;
    int32_t rowIndex = -1;

    SColumnInfoData* pSrc = NULL;
153
    if (pLeftBlock->info.blockId == blockId) {
154 155
      pSrc = taosArrayGet(pLeftBlock->pDataBlock, slotId);
      rowIndex = leftPos;
156
    } else {
157 158
      pSrc = taosArrayGet(pRightBlock->pDataBlock, slotId);
      rowIndex = rightPos;
159
    }
160

161 162 163 164 165 166 167 168
    if (colDataIsNull_s(pSrc, rowIndex)) {
      colDataAppendNULL(pDst, currRow);
    } else {
      char* p = colDataGetData(pSrc, rowIndex);
      colDataAppend(pDst, currRow, p, false);
    }
  }
}
169
typedef struct SRowLocation {
170 171
  SSDataBlock* pDataBlock;
  int32_t      pos;
172 173
} SRowLocation;

174
// pBlock[tsSlotId][startPos, endPos) == timestamp,
S
shenglian zhou 已提交
175
static int32_t mergeJoinGetBlockRowsEqualTs(SSDataBlock* pBlock, int16_t tsSlotId, int32_t startPos, int64_t timestamp,
176
                                            int32_t* pEndPos, SArray* rowLocations, SArray* createdBlocks) {
177 178
  int32_t numRows = pBlock->info.rows;
  ASSERT(startPos < numRows);
S
shenglian zhou 已提交
179
  SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, tsSlotId);
180 181

  int32_t i = startPos;
182
  for (; i < numRows; ++i) {
183
    char* pNextVal = colDataGetData(pCol, i);
184
    if (timestamp != *(int64_t*)pNextVal) {
185 186 187 188
      break;
    }
  }
  int32_t endPos = i;
S
shenglian zhou 已提交
189
  *pEndPos = endPos;
190

191 192 193 194
  if (endPos - startPos == 0) {
    return 0;
  }

195
  SSDataBlock* block = pBlock;
S
slzhou 已提交
196
  bool         createdNewBlock = false;
S
shenglian zhou 已提交
197
  if (endPos == numRows) {
S
slzhou 已提交
198
    block = blockDataExtractBlock(pBlock, startPos, endPos - startPos);
S
shenglian zhou 已提交
199
    taosArrayPush(createdBlocks, &block);
200
    createdNewBlock = true;
201
  }
202
  SRowLocation location = {0};
203 204
  for (int32_t j = startPos; j < endPos; ++j) {
    location.pDataBlock = block;
S
slzhou 已提交
205
    location.pos = (createdNewBlock ? j - startPos : j);
206
    taosArrayPush(rowLocations, &location);
207 208 209
  }
  return 0;
}
210

211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237
// whichChild == 0, left child of join; whichChild ==1, right child of join
static int32_t mergeJoinGetDownStreamRowsEqualTimeStamp(SOperatorInfo* pOperator, int32_t whichChild, int16_t tsSlotId,
                                                        SSDataBlock* startDataBlock, int32_t startPos,
                                                        int64_t timestamp, SArray* rowLocations,
                                                        SArray* createdBlocks) {
  ASSERT(whichChild == 0 || whichChild == 1);

  SJoinOperatorInfo* pJoinInfo = pOperator->info;
  int32_t            endPos = -1;
  SSDataBlock*       dataBlock = startDataBlock;
  mergeJoinGetBlockRowsEqualTs(dataBlock, tsSlotId, startPos, timestamp, &endPos, rowLocations, createdBlocks);
  while (endPos == dataBlock->info.rows) {
    SOperatorInfo* ds = pOperator->pDownstream[whichChild];
    dataBlock = ds->fpSet.getNextFn(ds);
    if (whichChild == 0) {
      pJoinInfo->leftPos = 0;
      pJoinInfo->pLeft = dataBlock;
    } else if (whichChild == 1) {
      pJoinInfo->rightPos = 0;
      pJoinInfo->pRight = dataBlock;
    }

    if (dataBlock == NULL) {
      setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
      endPos = -1;
      break;
    }
238

239 240 241 242 243 244 245 246 247
    mergeJoinGetBlockRowsEqualTs(dataBlock, tsSlotId, 0, timestamp, &endPos, rowLocations, createdBlocks);
  }
  if (endPos != -1) {
    if (whichChild == 0) {
      pJoinInfo->leftPos = endPos;
    } else if (whichChild == 1) {
      pJoinInfo->rightPos = endPos;
    }
  }
248 249 250
  return 0;
}

251 252 253 254 255 256 257 258
static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t timestamp, SSDataBlock* pRes,
                                               int32_t* nRows) {
  SJoinOperatorInfo* pJoinInfo = pOperator->info;
  SArray*            leftRowLocations = taosArrayInit(8, sizeof(SRowLocation));
  SArray*            leftCreatedBlocks = taosArrayInit(8, POINTER_BYTES);

  SArray* rightRowLocations = taosArrayInit(8, sizeof(SRowLocation));
  SArray* rightCreatedBlocks = taosArrayInit(8, POINTER_BYTES);
259
  int32_t code = TSDB_CODE_SUCCESS;
260 261 262 263 264 265 266
  mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 0, pJoinInfo->leftCol.slotId, pJoinInfo->pLeft,
                                           pJoinInfo->leftPos, timestamp, leftRowLocations, leftCreatedBlocks);
  mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 1, pJoinInfo->rightCol.slotId, pJoinInfo->pRight,
                                           pJoinInfo->rightPos, timestamp, rightRowLocations, rightCreatedBlocks);

  size_t leftNumJoin = taosArrayGetSize(leftRowLocations);
  size_t rightNumJoin = taosArrayGetSize(rightRowLocations);
267 268 269 270 271 272 273 274 275 276 277 278 279 280
  code = blockDataEnsureCapacity(pRes, *nRows + leftNumJoin * rightNumJoin);
  if (code != TSDB_CODE_SUCCESS) {
      qError("%s can not ensure block capacity for join. left: %zu, right: %zu", GET_TASKID(pOperator->pTaskInfo), leftNumJoin, rightNumJoin);
  }
  if (code == TSDB_CODE_SUCCESS) {
      for (int32_t i = 0; i < leftNumJoin; ++i) {
          for (int32_t j = 0; j < rightNumJoin; ++j) {
              SRowLocation *leftRow = taosArrayGet(leftRowLocations, i);
              SRowLocation *rightRow = taosArrayGet(rightRowLocations, j);
              mergeJoinJoinLeftRight(pOperator, pRes, *nRows, leftRow->pDataBlock, leftRow->pos, rightRow->pDataBlock,
                                     rightRow->pos);
              ++*nRows;
          }
      }
281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297
  }

  for (int i = 0; i < taosArrayGetSize(rightCreatedBlocks); ++i) {
    SSDataBlock* pBlock = taosArrayGetP(rightCreatedBlocks, i);
    blockDataDestroy(pBlock);
  }
  taosArrayDestroy(rightCreatedBlocks);
  taosArrayDestroy(rightRowLocations);
  for (int i = 0; i < taosArrayGetSize(leftCreatedBlocks); ++i) {
    SSDataBlock* pBlock = taosArrayGetP(leftCreatedBlocks, i);
    blockDataDestroy(pBlock);
  }
  taosArrayDestroy(leftCreatedBlocks);
  taosArrayDestroy(leftRowLocations);
  return TSDB_CODE_SUCCESS;
}

298 299
static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs, int64_t* pRightTs) {
  SJoinOperatorInfo* pJoinInfo = pOperator->info;
300

301 302 303
  if (pJoinInfo->pLeft == NULL || pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) {
    SOperatorInfo* ds1 = pOperator->pDownstream[0];
    pJoinInfo->pLeft = ds1->fpSet.getNextFn(ds1);
304

305 306 307 308
    pJoinInfo->leftPos = 0;
    if (pJoinInfo->pLeft == NULL) {
      setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
      return false;
309
    }
310
  }
311

312 313 314
  if (pJoinInfo->pRight == NULL || pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) {
    SOperatorInfo* ds2 = pOperator->pDownstream[1];
    pJoinInfo->pRight = ds2->fpSet.getNextFn(ds2);
315

316 317 318 319
    pJoinInfo->rightPos = 0;
    if (pJoinInfo->pRight == NULL) {
      setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
      return false;
320
    }
321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337
  }
  // only the timestamp match support for ordinary table
  SColumnInfoData* pLeftCol = taosArrayGet(pJoinInfo->pLeft->pDataBlock, pJoinInfo->leftCol.slotId);
  char*            pLeftVal = colDataGetData(pLeftCol, pJoinInfo->leftPos);
  *pLeftTs = *(int64_t*)pLeftVal;

  SColumnInfoData* pRightCol = taosArrayGet(pJoinInfo->pRight->pDataBlock, pJoinInfo->rightCol.slotId);
  char*            pRightVal = colDataGetData(pRightCol, pJoinInfo->rightPos);
  *pRightTs = *(int64_t*)pRightVal;

  ASSERT(pLeftCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
  ASSERT(pRightCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
  return true;
}

static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes) {
  SJoinOperatorInfo* pJoinInfo = pOperator->info;
338

339
  int32_t nrows = pRes->info.rows;
340

341
  bool asc = (pJoinInfo->inputOrder == TSDB_ORDER_ASC) ? true : false;
342

343 344 345
  while (1) {
    int64_t leftTs = 0;
    int64_t rightTs = 0;
346
    bool    hasNextTs = mergeJoinGetNextTimestamp(pOperator, &leftTs, &rightTs);
347 348 349
    if (!hasNextTs) {
      break;
    }
350

351
    if (leftTs == rightTs) {
352
      mergeJoinJoinDownstreamTsRanges(pOperator, leftTs, pRes, &nrows);
353
    } else if (asc && leftTs < rightTs || !asc && leftTs > rightTs) {
354 355 356 357 358
      pJoinInfo->leftPos += 1;

      if (pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) {
        continue;
      }
359
    } else if (asc && leftTs > rightTs || !asc && leftTs < rightTs) {
360 361 362 363 364 365 366 367 368 369 370 371
      pJoinInfo->rightPos += 1;
      if (pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) {
        continue;
      }
    }

    // the pDataBlock are always the same one, no need to call this again
    pRes->info.rows = nrows;
    if (pRes->info.rows >= pOperator->resultInfo.threshold) {
      break;
    }
  }
372 373 374 375
}

SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) {
  SJoinOperatorInfo* pJoinInfo = pOperator->info;
376

377 378 379 380 381 382 383 384 385 386
  SSDataBlock* pRes = pJoinInfo->pRes;
  blockDataCleanup(pRes);
  blockDataEnsureCapacity(pRes, 4096);
  while (true) {
    int32_t numOfRowsBefore = pRes->info.rows;
    doMergeJoinImpl(pOperator, pRes);
    int32_t numOfNewRows = pRes->info.rows - numOfRowsBefore;
    if (numOfNewRows == 0) {
      break;
    }
387
    if (pJoinInfo->pCondAfterMerge != NULL) {
388
      doFilter(pJoinInfo->pCondAfterMerge, pRes, NULL);
389
    }
390 391 392 393
    if (pRes->info.rows >= pOperator->resultInfo.threshold) {
      break;
    }
  }
394 395
  return (pRes->info.rows > 0) ? pRes : NULL;
}