joinoperator.c 17.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "filter.h"
17
#include "executorimpl.h"
18 19 20 21
#include "function.h"
#include "os.h"
#include "querynodes.h"
#include "tcompare.h"
22
#include "tdatablock.h"
23
#include "thash.h"
24
#include "tmsg.h"
25 26
#include "ttypes.h"

27 28 29 30 31 32 33 34 35 36 37
typedef struct SJoinRowCtx {
  bool    rowRemains;
  int64_t ts;
  SArray* leftRowLocations;
  SArray* rightRowLocations;
  SArray* leftCreatedBlocks;
  SArray* rightCreatedBlocks;
  int32_t leftRowIdx;
  int32_t rightRowIdx;
} SJoinRowCtx;

H
Haojun Liao 已提交
38 39 40 41 42 43 44 45 46 47 48 49 50
typedef struct SJoinOperatorInfo {
  SSDataBlock* pRes;
  int32_t      joinType;
  int32_t      inputOrder;

  SSDataBlock* pLeft;
  int32_t      leftPos;
  SColumnInfo  leftCol;

  SSDataBlock* pRight;
  int32_t      rightPos;
  SColumnInfo  rightCol;
  SNode*       pCondAfterMerge;
51 52

  SJoinRowCtx  rowCtx;
H
Haojun Liao 已提交
53 54
} SJoinOperatorInfo;

55
static void         setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode);
56
static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator);
57
static void         destroyMergeJoinOperator(void* param);
H
Haojun Liao 已提交
58 59
static void         extractTimeCondition(SJoinOperatorInfo* pInfo, SOperatorInfo** pDownstream, int32_t num,
                                         SSortMergeJoinPhysiNode* pJoinNode, const char* idStr);
60

H
Haojun Liao 已提交
61 62
static void extractTimeCondition(SJoinOperatorInfo* pInfo, SOperatorInfo** pDownstream,  int32_t num,
                                 SSortMergeJoinPhysiNode* pJoinNode, const char* idStr) {
63
  SNode* pMergeCondition = pJoinNode->pMergeCondition;
H
Haojun Liao 已提交
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
  if (nodeType(pMergeCondition) != QUERY_NODE_OPERATOR) {
    qError("not support this in join operator, %s", idStr);
    return;  // do not handle this
  }

  SOperatorNode* pNode = (SOperatorNode*)pMergeCondition;
  SColumnNode*   col1 = (SColumnNode*)pNode->pLeft;
  SColumnNode*   col2 = (SColumnNode*)pNode->pRight;
  SColumnNode*   leftTsCol = NULL;
  SColumnNode*   rightTsCol = NULL;
  if (col1->dataBlockId == col2->dataBlockId) {
    leftTsCol = col1;
    rightTsCol = col2;
  } else {
    if (col1->dataBlockId == pDownstream[0]->resultDataBlockId) {
      ASSERT(col2->dataBlockId == pDownstream[1]->resultDataBlockId);
80 81 82
      leftTsCol = col1;
      rightTsCol = col2;
    } else {
H
Haojun Liao 已提交
83 84 85 86
      ASSERT(col1->dataBlockId == pDownstream[1]->resultDataBlockId);
      ASSERT(col2->dataBlockId == pDownstream[0]->resultDataBlockId);
      leftTsCol = col2;
      rightTsCol = col1;
87
    }
H
Haojun Liao 已提交
88 89 90 91
  }
  setJoinColumnInfo(&pInfo->leftCol, leftTsCol);
  setJoinColumnInfo(&pInfo->rightCol, rightTsCol);
}
92

93 94
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
                                           SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo) {
95 96
  SJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SJoinOperatorInfo));
  SOperatorInfo*     pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
97 98

  int32_t code = TSDB_CODE_SUCCESS;
99
  if (pOperator == NULL || pInfo == NULL) {
H
Haojun Liao 已提交
100
    code = TSDB_CODE_OUT_OF_MEMORY;
101 102 103
    goto _error;
  }

H
Haojun Liao 已提交
104
  int32_t      numOfCols = 0;
H
Haojun Liao 已提交
105 106
  pInfo->pRes = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc);

H
Haojun Liao 已提交
107
  SExprInfo*   pExprInfo = createExprInfo(pJoinNode->pTargets, NULL, &numOfCols);
108
  initResultSizeInfo(&pOperator->resultInfo, 4096);
H
Haojun Liao 已提交
109
  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
H
Haojun Liao 已提交
110 111

  setOperatorInfo(pOperator, "MergeJoinOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN, false, OP_NOT_OPENED, pInfo, pTaskInfo);
112 113
  pOperator->exprSupp.pExprInfo = pExprInfo;
  pOperator->exprSupp.numOfExprs = numOfCols;
114

H
Haojun Liao 已提交
115
  extractTimeCondition(pInfo, pDownstream, numOfDownstream, pJoinNode, GET_TASKID(pTaskInfo));
116

117 118
  if (pJoinNode->pOnConditions != NULL && pJoinNode->node.pConditions != NULL) {
    pInfo->pCondAfterMerge = nodesMakeNode(QUERY_NODE_LOGIC_CONDITION);
H
Haojun Liao 已提交
119 120 121 122 123
    if (pInfo->pCondAfterMerge == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _error;
    }

124 125
    SLogicConditionNode* pLogicCond = (SLogicConditionNode*)(pInfo->pCondAfterMerge);
    pLogicCond->pParameterList = nodesMakeList();
H
Haojun Liao 已提交
126 127 128 129 130
    if (pLogicCond->pParameterList == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _error;
    }

131 132 133 134 135 136 137 138 139
    nodesListMakeAppend(&pLogicCond->pParameterList, nodesCloneNode(pJoinNode->pOnConditions));
    nodesListMakeAppend(&pLogicCond->pParameterList, nodesCloneNode(pJoinNode->node.pConditions));
    pLogicCond->condType = LOGIC_COND_TYPE_AND;
  } else if (pJoinNode->pOnConditions != NULL) {
    pInfo->pCondAfterMerge = nodesCloneNode(pJoinNode->pOnConditions);
  } else if (pJoinNode->node.pConditions != NULL) {
    pInfo->pCondAfterMerge = nodesCloneNode(pJoinNode->node.pConditions);
  } else {
    pInfo->pCondAfterMerge = NULL;
140 141
  }

H
Haojun Liao 已提交
142 143 144 145 146
  code = filterInitFromNode(pInfo->pCondAfterMerge, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

147
  pInfo->inputOrder = TSDB_ORDER_ASC;
148
  if (pJoinNode->inputTsOrder == ORDER_ASC) {
149
    pInfo->inputOrder = TSDB_ORDER_ASC;
150
  } else if (pJoinNode->inputTsOrder == ORDER_DESC) {
151
    pInfo->inputOrder = TSDB_ORDER_DESC;
152 153
  }

154
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doMergeJoin, NULL, destroyMergeJoinOperator, optrDefaultBufFn, NULL);
H
Haojun Liao 已提交
155
  code = appendDownstream(pOperator, pDownstream, numOfDownstream);
156 157 158 159 160 161
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  return pOperator;

162
_error:
H
Haojun Liao 已提交
163 164 165 166
  if (pInfo != NULL) {
    destroyMergeJoinOperator(pInfo);
  }

167
  taosMemoryFree(pOperator);
H
Haojun Liao 已提交
168
  pTaskInfo->code = code;
169 170 171 172 173 174 175 176 177 178 179
  return NULL;
}

void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode) {
  pColumn->slotId = pColumnNode->slotId;
  pColumn->type = pColumnNode->node.resType.type;
  pColumn->bytes = pColumnNode->node.resType.bytes;
  pColumn->precision = pColumnNode->node.resType.precision;
  pColumn->scale = pColumnNode->node.resType.scale;
}

180
void destroyMergeJoinOperator(void* param) {
181
  SJoinOperatorInfo* pJoinOperator = (SJoinOperatorInfo*)param;
182
  nodesDestroyNode(pJoinOperator->pCondAfterMerge);
183

S
shenglian zhou 已提交
184
  pJoinOperator->pRes = blockDataDestroy(pJoinOperator->pRes);
D
dapan1121 已提交
185
  taosMemoryFreeClear(param);
186
}
187 188

static void mergeJoinJoinLeftRight(struct SOperatorInfo* pOperator, SSDataBlock* pRes, int32_t currRow,
189 190
                                   SSDataBlock* pLeftBlock, int32_t leftPos, SSDataBlock* pRightBlock,
                                   int32_t rightPos) {
191 192 193 194 195 196 197 198 199 200 201 202
  SJoinOperatorInfo* pJoinInfo = pOperator->info;

  for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
    SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, i);

    SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[i];

    int32_t blockId = pExprInfo->base.pParam[0].pCol->dataBlockId;
    int32_t slotId = pExprInfo->base.pParam[0].pCol->slotId;
    int32_t rowIndex = -1;

    SColumnInfoData* pSrc = NULL;
H
Haojun Liao 已提交
203
    if (pLeftBlock->info.id.blockId == blockId) {
204 205
      pSrc = taosArrayGet(pLeftBlock->pDataBlock, slotId);
      rowIndex = leftPos;
206
    } else {
207 208
      pSrc = taosArrayGet(pRightBlock->pDataBlock, slotId);
      rowIndex = rightPos;
209
    }
210

211
    if (colDataIsNull_s(pSrc, rowIndex)) {
212
      colDataSetNULL(pDst, currRow);
213 214
    } else {
      char* p = colDataGetData(pSrc, rowIndex);
215
      colDataSetVal(pDst, currRow, p, false);
216 217 218
    }
  }
}
219
typedef struct SRowLocation {
220 221
  SSDataBlock* pDataBlock;
  int32_t      pos;
222 223
} SRowLocation;

224
// pBlock[tsSlotId][startPos, endPos) == timestamp,
S
shenglian zhou 已提交
225
static int32_t mergeJoinGetBlockRowsEqualTs(SSDataBlock* pBlock, int16_t tsSlotId, int32_t startPos, int64_t timestamp,
226
                                            int32_t* pEndPos, SArray* rowLocations, SArray* createdBlocks) {
227 228
  int32_t numRows = pBlock->info.rows;
  ASSERT(startPos < numRows);
S
shenglian zhou 已提交
229
  SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, tsSlotId);
230 231

  int32_t i = startPos;
232
  for (; i < numRows; ++i) {
233
    char* pNextVal = colDataGetData(pCol, i);
234
    if (timestamp != *(int64_t*)pNextVal) {
235 236 237 238
      break;
    }
  }
  int32_t endPos = i;
S
shenglian zhou 已提交
239
  *pEndPos = endPos;
240

241 242 243 244
  if (endPos - startPos == 0) {
    return 0;
  }

245
  SSDataBlock* block = pBlock;
S
slzhou 已提交
246
  bool         createdNewBlock = false;
S
shenglian zhou 已提交
247
  if (endPos == numRows) {
S
slzhou 已提交
248
    block = blockDataExtractBlock(pBlock, startPos, endPos - startPos);
S
shenglian zhou 已提交
249
    taosArrayPush(createdBlocks, &block);
250
    createdNewBlock = true;
251
  }
252
  SRowLocation location = {0};
253 254
  for (int32_t j = startPos; j < endPos; ++j) {
    location.pDataBlock = block;
S
slzhou 已提交
255
    location.pos = (createdNewBlock ? j - startPos : j);
256
    taosArrayPush(rowLocations, &location);
257 258 259
  }
  return 0;
}
260

261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287
// whichChild == 0, left child of join; whichChild ==1, right child of join
static int32_t mergeJoinGetDownStreamRowsEqualTimeStamp(SOperatorInfo* pOperator, int32_t whichChild, int16_t tsSlotId,
                                                        SSDataBlock* startDataBlock, int32_t startPos,
                                                        int64_t timestamp, SArray* rowLocations,
                                                        SArray* createdBlocks) {
  ASSERT(whichChild == 0 || whichChild == 1);

  SJoinOperatorInfo* pJoinInfo = pOperator->info;
  int32_t            endPos = -1;
  SSDataBlock*       dataBlock = startDataBlock;
  mergeJoinGetBlockRowsEqualTs(dataBlock, tsSlotId, startPos, timestamp, &endPos, rowLocations, createdBlocks);
  while (endPos == dataBlock->info.rows) {
    SOperatorInfo* ds = pOperator->pDownstream[whichChild];
    dataBlock = ds->fpSet.getNextFn(ds);
    if (whichChild == 0) {
      pJoinInfo->leftPos = 0;
      pJoinInfo->pLeft = dataBlock;
    } else if (whichChild == 1) {
      pJoinInfo->rightPos = 0;
      pJoinInfo->pRight = dataBlock;
    }

    if (dataBlock == NULL) {
      setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
      endPos = -1;
      break;
    }
288

289 290 291 292 293 294 295 296 297
    mergeJoinGetBlockRowsEqualTs(dataBlock, tsSlotId, 0, timestamp, &endPos, rowLocations, createdBlocks);
  }
  if (endPos != -1) {
    if (whichChild == 0) {
      pJoinInfo->leftPos = endPos;
    } else if (whichChild == 1) {
      pJoinInfo->rightPos = endPos;
    }
  }
298 299 300
  return 0;
}

301 302
static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t timestamp, SSDataBlock* pRes,
                                               int32_t* nRows) {
303
  int32_t code = TSDB_CODE_SUCCESS;
304
  SJoinOperatorInfo* pJoinInfo = pOperator->info;
305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322
  SArray* leftRowLocations = NULL;
  SArray* leftCreatedBlocks = NULL;
  SArray* rightRowLocations = NULL;
  SArray* rightCreatedBlocks = NULL;
  int32_t leftRowIdx = 0;
  int32_t rightRowIdx = 0;
  int32_t i, j;
  
  if (pJoinInfo->rowCtx.rowRemains) {
    leftRowLocations = pJoinInfo->rowCtx.leftRowLocations;
    leftCreatedBlocks = pJoinInfo->rowCtx.leftCreatedBlocks;
    rightRowLocations = pJoinInfo->rowCtx.rightRowLocations;
    rightCreatedBlocks = pJoinInfo->rowCtx.rightCreatedBlocks;
    leftRowIdx = pJoinInfo->rowCtx.leftRowIdx;
    rightRowIdx = pJoinInfo->rowCtx.rightRowIdx;
  } else {
    leftRowLocations = taosArrayInit(8, sizeof(SRowLocation));
    leftCreatedBlocks = taosArrayInit(8, POINTER_BYTES);
323

324 325
    rightRowLocations = taosArrayInit(8, sizeof(SRowLocation));
    rightCreatedBlocks = taosArrayInit(8, POINTER_BYTES);
326

327 328 329 330 331 332
    mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 0, pJoinInfo->leftCol.slotId, pJoinInfo->pLeft,
                                             pJoinInfo->leftPos, timestamp, leftRowLocations, leftCreatedBlocks);
    mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 1, pJoinInfo->rightCol.slotId, pJoinInfo->pRight,
                                             pJoinInfo->rightPos, timestamp, rightRowLocations, rightCreatedBlocks);
  }
  
333 334
  size_t leftNumJoin = taosArrayGetSize(leftRowLocations);
  size_t rightNumJoin = taosArrayGetSize(rightRowLocations);
335 336 337 338 339 340 341 342 343 344 345 346 347 348 349
  uint32_t maxRowNum = *nRows + (leftNumJoin - leftRowIdx - 1) * rightNumJoin + rightNumJoin - rightRowIdx;
  uint32_t limitRowNum = maxRowNum;
  if (maxRowNum > pOperator->resultInfo.threshold) {
    limitRowNum = pOperator->resultInfo.threshold;
    if (!pJoinInfo->rowCtx.rowRemains) {
      pJoinInfo->rowCtx.rowRemains = true;
      pJoinInfo->rowCtx.ts = timestamp;
      pJoinInfo->rowCtx.leftRowLocations = leftRowLocations;
      pJoinInfo->rowCtx.rightRowLocations = rightRowLocations;
      pJoinInfo->rowCtx.leftCreatedBlocks = leftCreatedBlocks;
      pJoinInfo->rowCtx.rightCreatedBlocks = rightCreatedBlocks;
    }
  }

  code = blockDataEnsureCapacity(pRes, limitRowNum);
350
  if (code != TSDB_CODE_SUCCESS) {
H
Hongze Cheng 已提交
351 352
    qError("%s can not ensure block capacity for join. left: %zu, right: %zu", GET_TASKID(pOperator->pTaskInfo),
           leftNumJoin, rightNumJoin);
353
  }
354 355

  
356
  if (code == TSDB_CODE_SUCCESS) {
357 358 359 360 361 362 363 364
    bool done = false;
    for (i = leftRowIdx; i < leftNumJoin; ++i, rightRowIdx = 0) {
      for (j = rightRowIdx; j < rightNumJoin; ++j) {
        if (*nRows >= limitRowNum) {
          done = true;
          break;
        }

H
Hongze Cheng 已提交
365 366 367 368 369
        SRowLocation* leftRow = taosArrayGet(leftRowLocations, i);
        SRowLocation* rightRow = taosArrayGet(rightRowLocations, j);
        mergeJoinJoinLeftRight(pOperator, pRes, *nRows, leftRow->pDataBlock, leftRow->pos, rightRow->pDataBlock,
                               rightRow->pos);
        ++*nRows;
370
      }
371 372 373
      if (done) {
        break;
      }
H
Hongze Cheng 已提交
374
    }
375

376 377 378 379
    if (maxRowNum > pOperator->resultInfo.threshold) {
      pJoinInfo->rowCtx.leftRowIdx = i;
      pJoinInfo->rowCtx.rightRowIdx = j;
    }
380
  }
381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402

  if (maxRowNum <= pOperator->resultInfo.threshold) {
    for (int i = 0; i < taosArrayGetSize(rightCreatedBlocks); ++i) {
      SSDataBlock* pBlock = taosArrayGetP(rightCreatedBlocks, i);
      blockDataDestroy(pBlock);
    }
    taosArrayDestroy(rightCreatedBlocks);
    taosArrayDestroy(rightRowLocations);
    for (int i = 0; i < taosArrayGetSize(leftCreatedBlocks); ++i) {
      SSDataBlock* pBlock = taosArrayGetP(leftCreatedBlocks, i);
      blockDataDestroy(pBlock);
    }
    taosArrayDestroy(leftCreatedBlocks);
    taosArrayDestroy(leftRowLocations);

    if (pJoinInfo->rowCtx.rowRemains) {
      pJoinInfo->rowCtx.rowRemains = false;
      pJoinInfo->rowCtx.leftRowLocations = NULL;
      pJoinInfo->rowCtx.rightRowLocations = NULL;
      pJoinInfo->rowCtx.leftCreatedBlocks = NULL;
      pJoinInfo->rowCtx.rightCreatedBlocks = NULL;
    }
403 404 405 406
  }
  return TSDB_CODE_SUCCESS;
}

407 408
static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs, int64_t* pRightTs) {
  SJoinOperatorInfo* pJoinInfo = pOperator->info;
409

410 411 412
  if (pJoinInfo->pLeft == NULL || pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) {
    SOperatorInfo* ds1 = pOperator->pDownstream[0];
    pJoinInfo->pLeft = ds1->fpSet.getNextFn(ds1);
413

414 415 416 417
    pJoinInfo->leftPos = 0;
    if (pJoinInfo->pLeft == NULL) {
      setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
      return false;
418
    }
419
  }
420

421 422 423
  if (pJoinInfo->pRight == NULL || pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) {
    SOperatorInfo* ds2 = pOperator->pDownstream[1];
    pJoinInfo->pRight = ds2->fpSet.getNextFn(ds2);
424

425 426 427 428
    pJoinInfo->rightPos = 0;
    if (pJoinInfo->pRight == NULL) {
      setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
      return false;
429
    }
430 431 432 433 434 435 436 437 438 439 440 441 442 443 444
  }
  // only the timestamp match support for ordinary table
  SColumnInfoData* pLeftCol = taosArrayGet(pJoinInfo->pLeft->pDataBlock, pJoinInfo->leftCol.slotId);
  char*            pLeftVal = colDataGetData(pLeftCol, pJoinInfo->leftPos);
  *pLeftTs = *(int64_t*)pLeftVal;

  SColumnInfoData* pRightCol = taosArrayGet(pJoinInfo->pRight->pDataBlock, pJoinInfo->rightCol.slotId);
  char*            pRightVal = colDataGetData(pRightCol, pJoinInfo->rightPos);
  *pRightTs = *(int64_t*)pRightVal;

  return true;
}

static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes) {
  SJoinOperatorInfo* pJoinInfo = pOperator->info;
445

446
  int32_t nrows = pRes->info.rows;
447

448
  bool asc = (pJoinInfo->inputOrder == TSDB_ORDER_ASC) ? true : false;
449

450 451 452
  while (1) {
    int64_t leftTs = 0;
    int64_t rightTs = 0;
453 454 455 456 457 458 459 460
    if (pJoinInfo->rowCtx.rowRemains) {
      leftTs = pJoinInfo->rowCtx.ts;
      rightTs = pJoinInfo->rowCtx.ts;
    } else {
      bool    hasNextTs = mergeJoinGetNextTimestamp(pOperator, &leftTs, &rightTs);
      if (!hasNextTs) {
        break;
      }
461
    }
462

463
    if (leftTs == rightTs) {
464
      mergeJoinJoinDownstreamTsRanges(pOperator, leftTs, pRes, &nrows);
H
Haojun Liao 已提交
465
    } else if ((asc && leftTs < rightTs) || (!asc && leftTs > rightTs)) {
466 467
      pJoinInfo->leftPos += 1;

468
      if (pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows && pRes->info.rows < pOperator->resultInfo.threshold) {
469 470
        continue;
      }
H
Haojun Liao 已提交
471
    } else if ((asc && leftTs > rightTs) || (!asc && leftTs < rightTs)) {
472
      pJoinInfo->rightPos += 1;
473
      if (pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows && pRes->info.rows < pOperator->resultInfo.threshold) {
474 475 476 477 478 479
        continue;
      }
    }

    // the pDataBlock are always the same one, no need to call this again
    pRes->info.rows = nrows;
H
Haojun Liao 已提交
480
    pRes->info.dataLoad = 1;
481 482 483 484
    if (pRes->info.rows >= pOperator->resultInfo.threshold) {
      break;
    }
  }
485 486 487 488
}

SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) {
  SJoinOperatorInfo* pJoinInfo = pOperator->info;
489

490 491
  SSDataBlock* pRes = pJoinInfo->pRes;
  blockDataCleanup(pRes);
H
Haojun Liao 已提交
492

493 494 495 496 497 498 499
  while (true) {
    int32_t numOfRowsBefore = pRes->info.rows;
    doMergeJoinImpl(pOperator, pRes);
    int32_t numOfNewRows = pRes->info.rows - numOfRowsBefore;
    if (numOfNewRows == 0) {
      break;
    }
H
Haojun Liao 已提交
500 501
    if (pOperator->exprSupp.pFilterInfo != NULL) {
      doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
502
    }
503 504 505 506
    if (pRes->info.rows >= pOperator->resultInfo.threshold) {
      break;
    }
  }
507 508
  return (pRes->info.rows > 0) ? pRes : NULL;
}