joinoperator.c 26.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "function.h"
19
#include "operator.h"
20 21
#include "os.h"
#include "querynodes.h"
22
#include "querytask.h"
23
#include "tcompare.h"
24
#include "tdatablock.h"
25
#include "thash.h"
26
#include "tmsg.h"
27 28
#include "ttypes.h"

29 30 31 32 33 34 35 36
typedef struct SJoinRowCtx {
  bool    rowRemains;
  int64_t ts;
  SArray* leftRowLocations;
  SArray* leftCreatedBlocks;
  SArray* rightCreatedBlocks;
  int32_t leftRowIdx;
  int32_t rightRowIdx;
37

38
  bool    rightUseBuildTable;
39
  SArray* rightRowLocations;
40 41
} SJoinRowCtx;

H
Haojun Liao 已提交
42 43 44 45 46 47 48 49 50 51 52 53 54
typedef struct SJoinOperatorInfo {
  SSDataBlock* pRes;
  int32_t      joinType;
  int32_t      inputOrder;

  SSDataBlock* pLeft;
  int32_t      leftPos;
  SColumnInfo  leftCol;

  SSDataBlock* pRight;
  int32_t      rightPos;
  SColumnInfo  rightCol;
  SNode*       pCondAfterMerge;
S
slzhou 已提交
55
  SNode*       pColEqualOnConditions;
S
shenglian zhou 已提交
56

57 58 59
  SArray*      leftEqOnCondCols;
  char*        leftEqOnCondKeyBuf;
  int32_t      leftEqOnCondKeyLen;
S
shenglian zhou 已提交
60

61 62 63
  SArray*      rightEqOnCondCols;
  char*        rightEqOnCondKeyBuf;
  int32_t      rightEqOnCondKeyLen;
64

65
  SSHashObj*   rightBuildTable;
66
  SJoinRowCtx  rowCtx;
H
Haojun Liao 已提交
67 68
} SJoinOperatorInfo;

69
static void         setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode);
70
static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator);
71
static void         destroyMergeJoinOperator(void* param);
H
Haojun Liao 已提交
72 73
static void         extractTimeCondition(SJoinOperatorInfo* pInfo, SOperatorInfo** pDownstream, int32_t num,
                                         SSortMergeJoinPhysiNode* pJoinNode, const char* idStr);
74

H
Haojun Liao 已提交
75 76
static void extractTimeCondition(SJoinOperatorInfo* pInfo, SOperatorInfo** pDownstream,  int32_t num,
                                 SSortMergeJoinPhysiNode* pJoinNode, const char* idStr) {
77
  SNode* pMergeCondition = pJoinNode->pMergeCondition;
H
Haojun Liao 已提交
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
  if (nodeType(pMergeCondition) != QUERY_NODE_OPERATOR) {
    qError("not support this in join operator, %s", idStr);
    return;  // do not handle this
  }

  SOperatorNode* pNode = (SOperatorNode*)pMergeCondition;
  SColumnNode*   col1 = (SColumnNode*)pNode->pLeft;
  SColumnNode*   col2 = (SColumnNode*)pNode->pRight;
  SColumnNode*   leftTsCol = NULL;
  SColumnNode*   rightTsCol = NULL;
  if (col1->dataBlockId == col2->dataBlockId) {
    leftTsCol = col1;
    rightTsCol = col2;
  } else {
    if (col1->dataBlockId == pDownstream[0]->resultDataBlockId) {
      ASSERT(col2->dataBlockId == pDownstream[1]->resultDataBlockId);
94 95 96
      leftTsCol = col1;
      rightTsCol = col2;
    } else {
H
Haojun Liao 已提交
97 98 99 100
      ASSERT(col1->dataBlockId == pDownstream[1]->resultDataBlockId);
      ASSERT(col2->dataBlockId == pDownstream[0]->resultDataBlockId);
      leftTsCol = col2;
      rightTsCol = col1;
101
    }
H
Haojun Liao 已提交
102 103 104 105
  }
  setJoinColumnInfo(&pInfo->leftCol, leftTsCol);
  setJoinColumnInfo(&pInfo->rightCol, rightTsCol);
}
106

107
static void extractEqualOnCondColsFromOper(SJoinOperatorInfo* pInfo, SOperatorInfo** pDownstreams, SOperatorNode* pOperNode,
S
shenglian zhou 已提交
108 109 110 111 112 113 114 115 116 117 118 119
                                       SColumn* pLeft, SColumn* pRight) {
  SColumnNode* pLeftNode = (SColumnNode*)pOperNode->pLeft;
  SColumnNode* pRightNode = (SColumnNode*)pOperNode->pRight;
  if (pLeftNode->dataBlockId == pRightNode->dataBlockId || pLeftNode->dataBlockId == pDownstreams[0]->resultDataBlockId) {
    *pLeft = extractColumnFromColumnNode((SColumnNode*)pOperNode->pLeft);
    *pRight = extractColumnFromColumnNode((SColumnNode*)pOperNode->pRight);
  } else {
    *pLeft = extractColumnFromColumnNode((SColumnNode*)pOperNode->pRight);
    *pRight = extractColumnFromColumnNode((SColumnNode*)pOperNode->pLeft);
  }
}

S
slzhou 已提交
120
static void extractEqualOnCondCols(SJoinOperatorInfo* pInfo, SOperatorInfo** pDownStream, SNode* pEqualOnCondNode,
S
shenglian zhou 已提交
121 122 123
                                    SArray* leftTagEqCols, SArray* rightTagEqCols) {
  SColumn left = {0};
  SColumn right = {0};
S
slzhou 已提交
124
  if (nodeType(pEqualOnCondNode) == QUERY_NODE_LOGIC_CONDITION && ((SLogicConditionNode*)pEqualOnCondNode)->condType == LOGIC_COND_TYPE_AND) {
S
shenglian zhou 已提交
125
    SNode* pNode = NULL;
S
slzhou 已提交
126
    FOREACH(pNode, ((SLogicConditionNode*)pEqualOnCondNode)->pParameterList) {
S
shenglian zhou 已提交
127
      SOperatorNode* pOperNode = (SOperatorNode*)pNode;
128
      extractEqualOnCondColsFromOper(pInfo, pDownStream, pOperNode, &left, &right);
S
shenglian zhou 已提交
129 130 131 132 133 134
      taosArrayPush(leftTagEqCols, &left);
      taosArrayPush(rightTagEqCols, &right);
    }
    return;
  }

S
slzhou 已提交
135 136
  if (nodeType(pEqualOnCondNode) == QUERY_NODE_OPERATOR) {
    SOperatorNode* pOperNode = (SOperatorNode*)pEqualOnCondNode;
137
    extractEqualOnCondColsFromOper(pInfo, pDownStream, pOperNode, &left, &right);
S
shenglian zhou 已提交
138 139 140 141 142
    taosArrayPush(leftTagEqCols, &left);
    taosArrayPush(rightTagEqCols, &right);
  }
}

143
static int32_t initTagColskeyBuf(int32_t* keyLen, char** keyBuf, const SArray* pGroupColList) {
S
shenglian zhou 已提交
144 145 146 147 148 149 150 151 152
  int32_t numOfGroupCols = taosArrayGetSize(pGroupColList);
  for (int32_t i = 0; i < numOfGroupCols; ++i) {
    SColumn* pCol = (SColumn*)taosArrayGet(pGroupColList, i);
    (*keyLen) += pCol->bytes;  // actual data + null_flag
  }

  int32_t nullFlagSize = sizeof(int8_t) * numOfGroupCols;
  (*keyLen) += nullFlagSize;

H
Haojun Liao 已提交
153 154 155 156 157 158
  if (*keyLen >= 0) {

    (*keyBuf) = taosMemoryCalloc(1, (*keyLen));
    if ((*keyBuf) == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
S
shenglian zhou 已提交
159 160 161 162 163
  }

  return TSDB_CODE_SUCCESS;
}

164
static int32_t fillKeyBufFromTagCols(SArray* pCols, SSDataBlock* pBlock, int32_t rowIndex, void* pKey) {
S
shenglian zhou 已提交
165 166
  SColumnDataAgg* pColAgg = NULL;
  size_t numOfGroupCols = taosArrayGetSize(pCols);
167 168
  char* isNull = (char*)pKey;
  char* pStart = (char*)pKey + sizeof(int8_t) * numOfGroupCols;
S
shenglian zhou 已提交
169 170 171 172 173 174 175 176 177 178 179 180 181 182 183

  for (int32_t i = 0; i < numOfGroupCols; ++i) {
    SColumn*         pCol = (SColumn*) taosArrayGet(pCols, i);
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId);

    // valid range check. todo: return error code.
    if (pCol->slotId > taosArrayGetSize(pBlock->pDataBlock)) {
      continue;
    }

    if (pBlock->pBlockAgg != NULL) {
      pColAgg = pBlock->pBlockAgg[pCol->slotId];  // TODO is agg data matched?
    }

    if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) {
184
      isNull[i] = 1;
S
shenglian zhou 已提交
185
    } else {
186
      isNull[i] = 0;
S
shenglian zhou 已提交
187
      char* val = colDataGetData(pColInfoData, rowIndex);
188
      if (pCol->type == TSDB_DATA_TYPE_JSON) {
S
shenglian zhou 已提交
189
        int32_t dataLen = getJsonValueLen(val);
190 191 192 193 194
        memcpy(pStart, val, dataLen);
        pStart += dataLen;
      } else if (IS_VAR_DATA_TYPE(pCol->type)) {
        varDataCopy(pStart, val);
        pStart += varDataTLen(val);
S
shenglian zhou 已提交
195
      } else {
196 197
        memcpy(pStart, val, pCol->bytes);
        pStart += pCol->bytes;
S
shenglian zhou 已提交
198 199 200 201 202 203
      }
    }
  }
  return (int32_t)(pStart - (char*)pKey);
}

204 205
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
                                           SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo) {
206 207
  SJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SJoinOperatorInfo));
  SOperatorInfo*     pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
208 209

  int32_t code = TSDB_CODE_SUCCESS;
210
  if (pOperator == NULL || pInfo == NULL) {
H
Haojun Liao 已提交
211
    code = TSDB_CODE_OUT_OF_MEMORY;
212 213 214
    goto _error;
  }

H
Haojun Liao 已提交
215
  int32_t      numOfCols = 0;
H
Haojun Liao 已提交
216 217
  pInfo->pRes = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc);

H
Haojun Liao 已提交
218
  SExprInfo*   pExprInfo = createExprInfo(pJoinNode->pTargets, NULL, &numOfCols);
219
  initResultSizeInfo(&pOperator->resultInfo, 4096);
H
Haojun Liao 已提交
220
  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
H
Haojun Liao 已提交
221 222

  setOperatorInfo(pOperator, "MergeJoinOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN, false, OP_NOT_OPENED, pInfo, pTaskInfo);
223 224
  pOperator->exprSupp.pExprInfo = pExprInfo;
  pOperator->exprSupp.numOfExprs = numOfCols;
225

H
Haojun Liao 已提交
226
  extractTimeCondition(pInfo, pDownstream, numOfDownstream, pJoinNode, GET_TASKID(pTaskInfo));
227

228 229
  if (pJoinNode->pOnConditions != NULL && pJoinNode->node.pConditions != NULL) {
    pInfo->pCondAfterMerge = nodesMakeNode(QUERY_NODE_LOGIC_CONDITION);
H
Haojun Liao 已提交
230 231 232 233 234
    if (pInfo->pCondAfterMerge == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _error;
    }

235 236
    SLogicConditionNode* pLogicCond = (SLogicConditionNode*)(pInfo->pCondAfterMerge);
    pLogicCond->pParameterList = nodesMakeList();
H
Haojun Liao 已提交
237 238 239 240 241
    if (pLogicCond->pParameterList == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _error;
    }

242 243 244 245 246 247 248 249 250
    nodesListMakeAppend(&pLogicCond->pParameterList, nodesCloneNode(pJoinNode->pOnConditions));
    nodesListMakeAppend(&pLogicCond->pParameterList, nodesCloneNode(pJoinNode->node.pConditions));
    pLogicCond->condType = LOGIC_COND_TYPE_AND;
  } else if (pJoinNode->pOnConditions != NULL) {
    pInfo->pCondAfterMerge = nodesCloneNode(pJoinNode->pOnConditions);
  } else if (pJoinNode->node.pConditions != NULL) {
    pInfo->pCondAfterMerge = nodesCloneNode(pJoinNode->node.pConditions);
  } else {
    pInfo->pCondAfterMerge = NULL;
251 252
  }

H
Haojun Liao 已提交
253 254 255 256 257
  code = filterInitFromNode(pInfo->pCondAfterMerge, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

258
  pInfo->inputOrder = TSDB_ORDER_ASC;
259
  if (pJoinNode->node.inputTsOrder == ORDER_ASC) {
260
    pInfo->inputOrder = TSDB_ORDER_ASC;
261
  } else if (pJoinNode->node.inputTsOrder == ORDER_DESC) {
262
    pInfo->inputOrder = TSDB_ORDER_DESC;
263 264
  }

S
slzhou 已提交
265 266
  pInfo->pColEqualOnConditions = pJoinNode->pColEqualOnConditions;
  if (pInfo->pColEqualOnConditions != NULL) {
267 268
    pInfo->leftEqOnCondCols = taosArrayInit(4, sizeof(SColumn));
    pInfo->rightEqOnCondCols = taosArrayInit(4, sizeof(SColumn));
S
slzhou 已提交
269
    extractEqualOnCondCols(pInfo, pDownstream, pInfo->pColEqualOnConditions, pInfo->leftEqOnCondCols, pInfo->rightEqOnCondCols);
270 271
    initTagColskeyBuf(&pInfo->leftEqOnCondKeyLen, &pInfo->leftEqOnCondKeyBuf, pInfo->leftEqOnCondCols);
    initTagColskeyBuf(&pInfo->rightEqOnCondKeyLen, &pInfo->rightEqOnCondKeyBuf, pInfo->rightEqOnCondCols);
272 273
    _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
    pInfo->rightBuildTable = tSimpleHashInit(256,  hashFn);
274
  }
275
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doMergeJoin, NULL, destroyMergeJoinOperator, optrDefaultBufFn, NULL);
H
Haojun Liao 已提交
276
  code = appendDownstream(pOperator, pDownstream, numOfDownstream);
277 278 279 280 281 282
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  return pOperator;

283
_error:
H
Haojun Liao 已提交
284 285 286 287
  if (pInfo != NULL) {
    destroyMergeJoinOperator(pInfo);
  }

288
  taosMemoryFree(pOperator);
H
Haojun Liao 已提交
289
  pTaskInfo->code = code;
290 291 292 293 294 295 296 297 298 299 300
  return NULL;
}

void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode) {
  pColumn->slotId = pColumnNode->slotId;
  pColumn->type = pColumnNode->node.resType.type;
  pColumn->bytes = pColumnNode->node.resType.bytes;
  pColumn->precision = pColumnNode->node.resType.precision;
  pColumn->scale = pColumnNode->node.resType.scale;
}

301 302 303 304 305 306 307 308 309 310 311 312
static void mergeJoinDestoryBuildTable(SSHashObj* pBuildTable) {
  void* p = NULL;
  int32_t iter = 0;

  while ((p = tSimpleHashIterate(pBuildTable, p, &iter)) != NULL) {
    SArray* rows = (*(SArray**)p);
    taosArrayDestroy(rows);
  }

  tSimpleHashCleanup(pBuildTable);
}

313
void destroyMergeJoinOperator(void* param) {
314
  SJoinOperatorInfo* pJoinOperator = (SJoinOperatorInfo*)param;
S
slzhou 已提交
315
  if (pJoinOperator->pColEqualOnConditions != NULL) {
316
    mergeJoinDestoryBuildTable(pJoinOperator->rightBuildTable);
317 318
    taosMemoryFreeClear(pJoinOperator->rightEqOnCondKeyBuf);
    taosArrayDestroy(pJoinOperator->rightEqOnCondCols);
S
shenglian zhou 已提交
319

320 321
    taosMemoryFreeClear(pJoinOperator->leftEqOnCondKeyBuf);
    taosArrayDestroy(pJoinOperator->leftEqOnCondCols);
322
  }
323
  nodesDestroyNode(pJoinOperator->pCondAfterMerge);
324

325 326 327 328 329
  taosArrayDestroy(pJoinOperator->rowCtx.leftCreatedBlocks);
  taosArrayDestroy(pJoinOperator->rowCtx.rightCreatedBlocks);
  taosArrayDestroy(pJoinOperator->rowCtx.leftRowLocations);
  taosArrayDestroy(pJoinOperator->rowCtx.rightRowLocations);

S
shenglian zhou 已提交
330
  pJoinOperator->pRes = blockDataDestroy(pJoinOperator->pRes);
D
dapan1121 已提交
331
  taosMemoryFreeClear(param);
332
}
333 334

static void mergeJoinJoinLeftRight(struct SOperatorInfo* pOperator, SSDataBlock* pRes, int32_t currRow,
335 336
                                   SSDataBlock* pLeftBlock, int32_t leftPos, SSDataBlock* pRightBlock,
                                   int32_t rightPos) {
337 338 339 340 341 342 343 344 345 346 347 348
  SJoinOperatorInfo* pJoinInfo = pOperator->info;

  for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
    SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, i);

    SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[i];

    int32_t blockId = pExprInfo->base.pParam[0].pCol->dataBlockId;
    int32_t slotId = pExprInfo->base.pParam[0].pCol->slotId;
    int32_t rowIndex = -1;

    SColumnInfoData* pSrc = NULL;
H
Haojun Liao 已提交
349
    if (pLeftBlock->info.id.blockId == blockId) {
350 351
      pSrc = taosArrayGet(pLeftBlock->pDataBlock, slotId);
      rowIndex = leftPos;
352
    } else {
353 354
      pSrc = taosArrayGet(pRightBlock->pDataBlock, slotId);
      rowIndex = rightPos;
355
    }
356

357
    if (colDataIsNull_s(pSrc, rowIndex)) {
358
      colDataSetNULL(pDst, currRow);
359 360
    } else {
      char* p = colDataGetData(pSrc, rowIndex);
361
      colDataSetVal(pDst, currRow, p, false);
362 363 364
    }
  }
}
365
typedef struct SRowLocation {
366 367
  SSDataBlock* pDataBlock;
  int32_t      pos;
368 369
} SRowLocation;

370
// pBlock[tsSlotId][startPos, endPos) == timestamp,
S
shenglian zhou 已提交
371
static int32_t mergeJoinGetBlockRowsEqualTs(SSDataBlock* pBlock, int16_t tsSlotId, int32_t startPos, int64_t timestamp,
372
                                            int32_t* pEndPos, SArray* rowLocations, SArray* createdBlocks) {
373 374
  int32_t numRows = pBlock->info.rows;
  ASSERT(startPos < numRows);
S
shenglian zhou 已提交
375
  SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, tsSlotId);
376 377

  int32_t i = startPos;
378
  for (; i < numRows; ++i) {
379
    char* pNextVal = colDataGetData(pCol, i);
380
    if (timestamp != *(int64_t*)pNextVal) {
381 382 383 384
      break;
    }
  }
  int32_t endPos = i;
S
shenglian zhou 已提交
385
  *pEndPos = endPos;
386

387 388 389 390
  if (endPos - startPos == 0) {
    return 0;
  }

391
  SSDataBlock* block = pBlock;
S
slzhou 已提交
392
  bool         createdNewBlock = false;
S
shenglian zhou 已提交
393
  if (endPos == numRows) {
S
slzhou 已提交
394
    block = blockDataExtractBlock(pBlock, startPos, endPos - startPos);
S
shenglian zhou 已提交
395
    taosArrayPush(createdBlocks, &block);
396
    createdNewBlock = true;
397
  }
398
  SRowLocation location = {0};
399 400
  for (int32_t j = startPos; j < endPos; ++j) {
    location.pDataBlock = block;
S
slzhou 已提交
401
    location.pos = (createdNewBlock ? j - startPos : j);
402
    taosArrayPush(rowLocations, &location);
403 404 405
  }
  return 0;
}
406

407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433
// whichChild == 0, left child of join; whichChild ==1, right child of join
static int32_t mergeJoinGetDownStreamRowsEqualTimeStamp(SOperatorInfo* pOperator, int32_t whichChild, int16_t tsSlotId,
                                                        SSDataBlock* startDataBlock, int32_t startPos,
                                                        int64_t timestamp, SArray* rowLocations,
                                                        SArray* createdBlocks) {
  ASSERT(whichChild == 0 || whichChild == 1);

  SJoinOperatorInfo* pJoinInfo = pOperator->info;
  int32_t            endPos = -1;
  SSDataBlock*       dataBlock = startDataBlock;
  mergeJoinGetBlockRowsEqualTs(dataBlock, tsSlotId, startPos, timestamp, &endPos, rowLocations, createdBlocks);
  while (endPos == dataBlock->info.rows) {
    SOperatorInfo* ds = pOperator->pDownstream[whichChild];
    dataBlock = ds->fpSet.getNextFn(ds);
    if (whichChild == 0) {
      pJoinInfo->leftPos = 0;
      pJoinInfo->pLeft = dataBlock;
    } else if (whichChild == 1) {
      pJoinInfo->rightPos = 0;
      pJoinInfo->pRight = dataBlock;
    }

    if (dataBlock == NULL) {
      setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
      endPos = -1;
      break;
    }
434

435 436 437 438 439 440 441 442 443
    mergeJoinGetBlockRowsEqualTs(dataBlock, tsSlotId, 0, timestamp, &endPos, rowLocations, createdBlocks);
  }
  if (endPos != -1) {
    if (whichChild == 0) {
      pJoinInfo->leftPos = endPos;
    } else if (whichChild == 1) {
      pJoinInfo->rightPos = endPos;
    }
  }
444 445 446
  return 0;
}

447
static int32_t mergeJoinFillBuildTable(SJoinOperatorInfo* pInfo, SArray* rightRowLocations) {
S
shenglian zhou 已提交
448 449
  for (int32_t i = 0; i < taosArrayGetSize(rightRowLocations); ++i) {
    SRowLocation* rightRow = taosArrayGet(rightRowLocations, i);
450 451
    int32_t keyLen = fillKeyBufFromTagCols(pInfo->rightEqOnCondCols, rightRow->pDataBlock, rightRow->pos, pInfo->rightEqOnCondKeyBuf);
    SArray** ppRows = tSimpleHashGet(pInfo->rightBuildTable, pInfo->rightEqOnCondKeyBuf, keyLen);
S
shenglian zhou 已提交
452 453 454
    if (!ppRows) {
      SArray* rows = taosArrayInit(4, sizeof(SRowLocation));
      taosArrayPush(rows, rightRow);
455
      tSimpleHashPut(pInfo->rightBuildTable, pInfo->rightEqOnCondKeyBuf, keyLen, &rows, POINTER_BYTES);
S
shenglian zhou 已提交
456 457 458 459 460 461 462 463 464
    } else {
      taosArrayPush(*ppRows, rightRow);
    }
  }
  return TSDB_CODE_SUCCESS;
}

static int32_t mergeJoinLeftRowsRightRows(SOperatorInfo* pOperator, SSDataBlock* pRes, int32_t* nRows,
                                         const SArray* leftRowLocations, int32_t leftRowIdx,
465
                                       int32_t rightRowIdx, bool useBuildTableTSRange, SArray* rightRowLocations, bool* pReachThreshold) {
S
shenglian zhou 已提交
466 467 468 469 470 471 472 473 474
  *pReachThreshold = false;
  uint32_t limitRowNum = pOperator->resultInfo.threshold;
  SJoinOperatorInfo* pJoinInfo = pOperator->info;
  size_t leftNumJoin = taosArrayGetSize(leftRowLocations);

  int32_t i,j;

  for (i = leftRowIdx; i < leftNumJoin; ++i, rightRowIdx = 0) {
    SRowLocation* leftRow = taosArrayGet(leftRowLocations, i);
475
    SArray* pRightRows = NULL;
476
    if (useBuildTableTSRange) {
477 478
      int32_t  keyLen = fillKeyBufFromTagCols(pJoinInfo->leftEqOnCondCols, leftRow->pDataBlock, leftRow->pos, pJoinInfo->leftEqOnCondKeyBuf);
      SArray** ppRightRows = tSimpleHashGet(pJoinInfo->rightBuildTable, pJoinInfo->leftEqOnCondKeyBuf, keyLen);
479 480 481 482 483 484
      if (!ppRightRows) {
        continue;
      }
      pRightRows = *ppRightRows;
    } else {
      pRightRows = rightRowLocations;
S
shenglian zhou 已提交
485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511
    }
    size_t rightRowsSize = taosArrayGetSize(pRightRows);
    for (j = rightRowIdx; j < rightRowsSize; ++j) {
      if (*nRows >= limitRowNum) {
        *pReachThreshold = true;
        break;
      }

      SRowLocation* rightRow = taosArrayGet(pRightRows, j);
      mergeJoinJoinLeftRight(pOperator, pRes, *nRows, leftRow->pDataBlock, leftRow->pos, rightRow->pDataBlock,
                             rightRow->pos);
      ++*nRows;
    }
    if (*pReachThreshold) {
      break;
    }
  }

  if (*pReachThreshold) {
    pJoinInfo->rowCtx.rowRemains = true;
    pJoinInfo->rowCtx.leftRowIdx = i;
    pJoinInfo->rowCtx.rightRowIdx = j;
  }
  return TSDB_CODE_SUCCESS;
}

static void mergeJoinDestroyTSRangeCtx(SJoinOperatorInfo* pJoinInfo, SArray* leftRowLocations, SArray* leftCreatedBlocks,
512
                                       SArray* rightCreatedBlocks, bool rightUseBuildTable, SArray* rightRowLocations) {
S
shenglian zhou 已提交
513 514 515 516 517 518 519 520 521
  for (int i = 0; i < taosArrayGetSize(rightCreatedBlocks); ++i) {
    SSDataBlock* pBlock = taosArrayGetP(rightCreatedBlocks, i);
    blockDataDestroy(pBlock);
  }
  taosArrayDestroy(rightCreatedBlocks);
  for (int i = 0; i < taosArrayGetSize(leftCreatedBlocks); ++i) {
    SSDataBlock* pBlock = taosArrayGetP(leftCreatedBlocks, i);
    blockDataDestroy(pBlock);
  }
522 523 524
  if (rightRowLocations != NULL) {
    taosArrayDestroy(rightRowLocations);
  }
525
  if (rightUseBuildTable) {
S
shenglian zhou 已提交
526 527 528 529 530 531
    void* p = NULL;
    int32_t iter = 0;
    while ((p = tSimpleHashIterate(pJoinInfo->rightBuildTable, p, &iter)) != NULL) {
      SArray* rows = (*(SArray**)p);
      taosArrayDestroy(rows);
    }
532
    tSimpleHashClear(pJoinInfo->rightBuildTable);
533
  }
S
shenglian zhou 已提交
534 535 536 537 538 539 540 541

  taosArrayDestroy(leftCreatedBlocks);
  taosArrayDestroy(leftRowLocations);

  pJoinInfo->rowCtx.rowRemains = false;
  pJoinInfo->rowCtx.leftRowLocations = NULL;
  pJoinInfo->rowCtx.leftCreatedBlocks = NULL;
  pJoinInfo->rowCtx.rightCreatedBlocks = NULL;
542
  pJoinInfo->rowCtx.rightUseBuildTable = false;
543
  pJoinInfo->rowCtx.rightRowLocations = NULL;
S
shenglian zhou 已提交
544 545
}

546 547
static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t timestamp, SSDataBlock* pRes,
                                               int32_t* nRows) {
548
  int32_t code = TSDB_CODE_SUCCESS;
549
  SJoinOperatorInfo* pJoinInfo = pOperator->info;
550
  SArray* leftRowLocations = NULL;
551
  SArray* rightRowLocations = NULL;
552 553 554 555
  SArray* leftCreatedBlocks = NULL;
  SArray* rightCreatedBlocks = NULL;
  int32_t leftRowIdx = 0;
  int32_t rightRowIdx = 0;
S
shenglian zhou 已提交
556
  SSHashObj* rightTableHash = NULL;
557
  bool rightUseBuildTable = false;
S
shenglian zhou 已提交
558

559 560 561
  if (pJoinInfo->rowCtx.rowRemains) {
    leftRowLocations = pJoinInfo->rowCtx.leftRowLocations;
    leftCreatedBlocks = pJoinInfo->rowCtx.leftCreatedBlocks;
562
    rightUseBuildTable = pJoinInfo->rowCtx.rightUseBuildTable;
563
    rightRowLocations = pJoinInfo->rowCtx.rightRowLocations;
564 565 566 567 568 569
    rightCreatedBlocks = pJoinInfo->rowCtx.rightCreatedBlocks;
    leftRowIdx = pJoinInfo->rowCtx.leftRowIdx;
    rightRowIdx = pJoinInfo->rowCtx.rightRowIdx;
  } else {
    leftRowLocations = taosArrayInit(8, sizeof(SRowLocation));
    leftCreatedBlocks = taosArrayInit(8, POINTER_BYTES);
570

571
    rightRowLocations = taosArrayInit(8, sizeof(SRowLocation));
572
    rightCreatedBlocks = taosArrayInit(8, POINTER_BYTES);
573

574 575 576 577
    mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 0, pJoinInfo->leftCol.slotId, pJoinInfo->pLeft,
                                             pJoinInfo->leftPos, timestamp, leftRowLocations, leftCreatedBlocks);
    mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 1, pJoinInfo->rightCol.slotId, pJoinInfo->pRight,
                                             pJoinInfo->rightPos, timestamp, rightRowLocations, rightCreatedBlocks);
S
slzhou 已提交
578
    if (pJoinInfo->pColEqualOnConditions != NULL && taosArrayGetSize(rightRowLocations) > 16) {
579
      mergeJoinFillBuildTable(pJoinInfo, rightRowLocations);
580
      rightUseBuildTable = true;
581 582 583
      taosArrayDestroy(rightRowLocations);
      rightRowLocations = NULL;
    }
584 585
  }
  
586
  size_t leftNumJoin = taosArrayGetSize(leftRowLocations);
S
shenglian zhou 已提交
587
  code = blockDataEnsureCapacity(pRes, pOperator->resultInfo.threshold);
588
  if (code != TSDB_CODE_SUCCESS) {
S
shenglian zhou 已提交
589 590
    qError("%s can not ensure block capacity for join. left: %zu", GET_TASKID(pOperator->pTaskInfo),
           leftNumJoin);
591
  }
592

S
shenglian zhou 已提交
593
  bool reachThreshold = false;
594

S
shenglian zhou 已提交
595 596
  if (code == TSDB_CODE_SUCCESS) {
    mergeJoinLeftRowsRightRows(pOperator, pRes, nRows, leftRowLocations, leftRowIdx,
597
                                                rightRowIdx, rightUseBuildTable, rightRowLocations, &reachThreshold);
598
  }
599

S
shenglian zhou 已提交
600 601
  if (!reachThreshold) {
    mergeJoinDestroyTSRangeCtx(pJoinInfo, leftRowLocations, leftCreatedBlocks, rightCreatedBlocks,
602
                               rightUseBuildTable, rightRowLocations);
S
shenglian zhou 已提交
603 604 605 606 607 608 609

  } else {
      pJoinInfo->rowCtx.rowRemains = true;
      pJoinInfo->rowCtx.ts = timestamp;
      pJoinInfo->rowCtx.leftRowLocations = leftRowLocations;
      pJoinInfo->rowCtx.leftCreatedBlocks = leftCreatedBlocks;
      pJoinInfo->rowCtx.rightCreatedBlocks = rightCreatedBlocks;
S
slzhou 已提交
610
      pJoinInfo->rowCtx.rightUseBuildTable = rightUseBuildTable;
611
      pJoinInfo->rowCtx.rightRowLocations = rightRowLocations;
612 613 614 615
  }
  return TSDB_CODE_SUCCESS;
}

616 617
static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs, int64_t* pRightTs) {
  SJoinOperatorInfo* pJoinInfo = pOperator->info;
618

619 620 621
  if (pJoinInfo->pLeft == NULL || pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) {
    SOperatorInfo* ds1 = pOperator->pDownstream[0];
    pJoinInfo->pLeft = ds1->fpSet.getNextFn(ds1);
622

623 624 625 626
    pJoinInfo->leftPos = 0;
    if (pJoinInfo->pLeft == NULL) {
      setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
      return false;
627
    }
628
  }
629

630 631 632
  if (pJoinInfo->pRight == NULL || pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) {
    SOperatorInfo* ds2 = pOperator->pDownstream[1];
    pJoinInfo->pRight = ds2->fpSet.getNextFn(ds2);
633

634 635 636 637
    pJoinInfo->rightPos = 0;
    if (pJoinInfo->pRight == NULL) {
      setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
      return false;
638
    }
639 640 641 642 643 644 645 646 647 648 649 650 651 652 653
  }
  // only the timestamp match support for ordinary table
  SColumnInfoData* pLeftCol = taosArrayGet(pJoinInfo->pLeft->pDataBlock, pJoinInfo->leftCol.slotId);
  char*            pLeftVal = colDataGetData(pLeftCol, pJoinInfo->leftPos);
  *pLeftTs = *(int64_t*)pLeftVal;

  SColumnInfoData* pRightCol = taosArrayGet(pJoinInfo->pRight->pDataBlock, pJoinInfo->rightCol.slotId);
  char*            pRightVal = colDataGetData(pRightCol, pJoinInfo->rightPos);
  *pRightTs = *(int64_t*)pRightVal;

  return true;
}

static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes) {
  SJoinOperatorInfo* pJoinInfo = pOperator->info;
654

655
  int32_t nrows = pRes->info.rows;
656

657
  bool asc = (pJoinInfo->inputOrder == TSDB_ORDER_ASC) ? true : false;
658

659 660 661
  while (1) {
    int64_t leftTs = 0;
    int64_t rightTs = 0;
662 663 664 665 666 667 668 669
    if (pJoinInfo->rowCtx.rowRemains) {
      leftTs = pJoinInfo->rowCtx.ts;
      rightTs = pJoinInfo->rowCtx.ts;
    } else {
      bool    hasNextTs = mergeJoinGetNextTimestamp(pOperator, &leftTs, &rightTs);
      if (!hasNextTs) {
        break;
      }
670
    }
671

672
    if (leftTs == rightTs) {
673
      mergeJoinJoinDownstreamTsRanges(pOperator, leftTs, pRes, &nrows);
H
Haojun Liao 已提交
674
    } else if ((asc && leftTs < rightTs) || (!asc && leftTs > rightTs)) {
675 676
      pJoinInfo->leftPos += 1;

677
      if (pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows && pRes->info.rows < pOperator->resultInfo.threshold) {
678 679
        continue;
      }
H
Haojun Liao 已提交
680
    } else if ((asc && leftTs > rightTs) || (!asc && leftTs < rightTs)) {
681
      pJoinInfo->rightPos += 1;
682
      if (pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows && pRes->info.rows < pOperator->resultInfo.threshold) {
683 684 685 686 687 688
        continue;
      }
    }

    // the pDataBlock are always the same one, no need to call this again
    pRes->info.rows = nrows;
H
Haojun Liao 已提交
689
    pRes->info.dataLoad = 1;
690
    pRes->info.scanFlag = MAIN_SCAN;
691 692 693 694
    if (pRes->info.rows >= pOperator->resultInfo.threshold) {
      break;
    }
  }
695 696 697 698
}

SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) {
  SJoinOperatorInfo* pJoinInfo = pOperator->info;
699

700 701
  SSDataBlock* pRes = pJoinInfo->pRes;
  blockDataCleanup(pRes);
H
Haojun Liao 已提交
702

703 704 705 706 707 708 709
  while (true) {
    int32_t numOfRowsBefore = pRes->info.rows;
    doMergeJoinImpl(pOperator, pRes);
    int32_t numOfNewRows = pRes->info.rows - numOfRowsBefore;
    if (numOfNewRows == 0) {
      break;
    }
H
Haojun Liao 已提交
710 711
    if (pOperator->exprSupp.pFilterInfo != NULL) {
      doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
712
    }
713 714 715 716
    if (pRes->info.rows >= pOperator->resultInfo.threshold) {
      break;
    }
  }
717 718
  return (pRes->info.rows > 0) ? pRes : NULL;
}