tsdbMergeTree.c 17.0 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tsdb.h"

18
// SLDataIter =================================================
H
Hongze Cheng 已提交
19
struct SLDataIter {
20
  SRBTreeNode   node;
H
Hongze Cheng 已提交
21
  SSttBlk      *pSttBlk;
H
Hongze Cheng 已提交
22
  SDataFReader *pReader;
H
Hongze Cheng 已提交
23
  int32_t       iStt;
H
Hongze Cheng 已提交
24
  int8_t        backward;
H
Hongze Cheng 已提交
25
  int32_t       iSttBlk;
H
Hongze Cheng 已提交
26 27
  int32_t       iRow;
  SRowInfo      rInfo;
28 29 30
  uint64_t      uid;
  STimeWindow   timeWindow;
  SVersionRange verRange;
31
  SSttBlockLoadInfo* pBlockLoadInfo;
H
Hongze Cheng 已提交
32
};
H
Hongze Cheng 已提交
33

34
SSttBlockLoadInfo* tCreateLastBlockLoadInfo(STSchema* pSchema, int16_t* colList, int32_t numOfCols) {
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
  SSttBlockLoadInfo* pLoadInfo = taosMemoryCalloc(TSDB_DEFAULT_STT_FILE, sizeof(SSttBlockLoadInfo));
  if (pLoadInfo == NULL) {
    terrno =  TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  for(int32_t i = 0; i < TSDB_DEFAULT_STT_FILE; ++i) {
    pLoadInfo[i].blockIndex[0] = -1;
    pLoadInfo[i].blockIndex[1] = -1;
    pLoadInfo[i].currentLoadBlockIndex = 1;

    int32_t code = tBlockDataCreate(&pLoadInfo[i].blockData[0]);
    if (code) {
      terrno = code;
    }

    code = tBlockDataCreate(&pLoadInfo[i].blockData[1]);
    if (code) {
      terrno = code;
    }

    pLoadInfo[i].aSttBlk = taosArrayInit(4, sizeof(SSttBlk));
H
Haojun Liao 已提交
57 58 59
    pLoadInfo[i].pSchema = pSchema;
    pLoadInfo[i].colIds = colList;
    pLoadInfo[i].numOfCols = numOfCols;
60 61 62 63 64 65 66 67 68 69
  }

  return pLoadInfo;
}

void resetLastBlockLoadInfo(SSttBlockLoadInfo* pLoadInfo) {
  for(int32_t i = 0; i < TSDB_DEFAULT_STT_FILE; ++i) {
    pLoadInfo[i].currentLoadBlockIndex = 1;
    pLoadInfo[i].blockIndex[0] = -1;
    pLoadInfo[i].blockIndex[1] = -1;
70

71
    taosArrayClear(pLoadInfo[i].aSttBlk);
H
Haojun Liao 已提交
72 73 74 75 76 77 78 79 80 81

    pLoadInfo[i].elapsedTime = 0;
    pLoadInfo[i].loadBlocks = 0;
  }
}

void getLastBlockLoadInfo(SSttBlockLoadInfo* pLoadInfo, int64_t* blocks, double* el) {
  for(int32_t i = 0; i < TSDB_DEFAULT_STT_FILE; ++i) {
    *el += pLoadInfo[i].elapsedTime;
    *blocks += pLoadInfo[i].loadBlocks;
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
  }
}

void* destroyLastBlockLoadInfo(SSttBlockLoadInfo* pLoadInfo) {
  for(int32_t i = 0; i < TSDB_DEFAULT_STT_FILE; ++i) {
    pLoadInfo[i].currentLoadBlockIndex = 1;
    pLoadInfo[i].blockIndex[0] = -1;
    pLoadInfo[i].blockIndex[1] = -1;

    tBlockDataDestroy(&pLoadInfo[i].blockData[0], true);
    tBlockDataDestroy(&pLoadInfo[i].blockData[1], true);

    taosArrayDestroy(pLoadInfo[i].aSttBlk);
  }

  taosMemoryFree(pLoadInfo);
  return NULL;
}

H
Haojun Liao 已提交
101
static SBlockData* loadLastBlock(SLDataIter *pIter, const char* idStr) {
102 103 104
  int32_t code = 0;

  SSttBlockLoadInfo* pInfo = pIter->pBlockLoadInfo;
105
  if (pInfo->blockIndex[0]  == pIter->iSttBlk) {
106 107 108
    return &pInfo->blockData[0];
  }

109
  if (pInfo->blockIndex[1] == pIter->iSttBlk) {
110 111 112 113 114
    return &pInfo->blockData[1];
  }

  pInfo->currentLoadBlockIndex ^= 1;
  if (pIter->pSttBlk != NULL) {  // current block not loaded yet
H
Haojun Liao 已提交
115
    int64_t st = taosGetTimestampUs();
116 117

    SBlockData* pBlock = &pInfo->blockData[pInfo->currentLoadBlockIndex];
H
Haojun Liao 已提交
118 119 120 121 122 123 124

    TABLEID id = {0};
    if (pIter->pSttBlk->suid != 0) {
      id.suid = pIter->pSttBlk->suid;
    } else {
      id.uid = pIter->uid;
    }
125 126 127 128

    tBlockDataInit(pBlock, &id, pInfo->pSchema, pInfo->colIds, pInfo->numOfCols);
    code = tsdbReadSttBlock(pIter->pReader, pIter->iStt, pIter->pSttBlk, pBlock);

H
Haojun Liao 已提交
129 130 131 132 133
    double el = (taosGetTimestampUs() - st)/ 1000.0;
    pInfo->elapsedTime += el;
    pInfo->loadBlocks += 1;

    tsdbDebug("read last block, index:%d, last file index:%d, elapsed time:%.2f ms, %s", pIter->iSttBlk, pIter->iStt, el, idStr);
134 135 136 137
    if (code != TSDB_CODE_SUCCESS) {
      goto _exit;
    }

138
    pInfo->blockIndex[pInfo->currentLoadBlockIndex] = pIter->iSttBlk;
139 140 141 142 143 144 145 146 147 148 149
    pIter->iRow = (pIter->backward) ? pInfo->blockData[pInfo->currentLoadBlockIndex].nRow : -1;
  }

  return &pInfo->blockData[pInfo->currentLoadBlockIndex];

  _exit:
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
  }

  return NULL;
150 151
}

152
// find the earliest block that contains the required records
153
static FORCE_INLINE int32_t findEarliestIndex(int32_t index, uint64_t uid, const SSttBlk* pBlockList, int32_t num, int32_t backward) {
154 155
  int32_t i = index;
  int32_t step = backward? 1:-1;
H
Haojun Liao 已提交
156
  while (i >= 0 && i < num && uid >= pBlockList[i].minUid && uid <= pBlockList[i].maxUid) {
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177
    i += step;
  }
  return i - step;
}

static int32_t binarySearchForStartBlock(SSttBlk*pBlockList, int32_t num, uint64_t uid, int32_t backward) {
  int32_t midPos = -1;
  if (num <= 0) {
    return -1;
  }

  int32_t firstPos = 0;
  int32_t lastPos = num - 1;

  // find the first position which is bigger than the key
  if ((uid > pBlockList[lastPos].maxUid) || (uid < pBlockList[firstPos].minUid)) {
    return -1;
  }

  while (1) {
    if (uid >= pBlockList[firstPos].minUid && uid <= pBlockList[firstPos].maxUid) {
178
      return findEarliestIndex(firstPos, uid, pBlockList, num, backward);
179 180 181 182 183 184 185 186 187 188 189 190 191 192
    }

    if (uid > pBlockList[lastPos].maxUid || uid < pBlockList[firstPos].minUid) {
      return -1;
    }

    int32_t numOfRows = lastPos - firstPos + 1;
    midPos = (numOfRows >> 1u) + firstPos;

    if (uid < pBlockList[midPos].minUid) {
      lastPos = midPos - 1;
    } else if (uid > pBlockList[midPos].maxUid) {
      firstPos = midPos + 1;
    } else {
193
      return findEarliestIndex(midPos, uid, pBlockList, num, backward);
194 195 196 197
    }
  }
}

198
static FORCE_INLINE int32_t findEarliestRow(int32_t index, uint64_t uid, const uint64_t* uidList, int32_t num, int32_t backward) {
199 200
  int32_t i = index;
  int32_t step = backward? 1:-1;
H
Haojun Liao 已提交
201
  while (i >= 0 && i < num && uid == uidList[i]) {
202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217
    i += step;
  }
  return i - step;
}

static int32_t binarySearchForStartRowIndex(uint64_t* uidList, int32_t num, uint64_t uid, int32_t backward) {
  int32_t firstPos = 0;
  int32_t lastPos = num - 1;

  // find the first position which is bigger than the key
  if ((uid > uidList[lastPos]) || (uid < uidList[firstPos])) {
    return -1;
  }

  while (1) {
    if (uid == uidList[firstPos]) {
218
      return findEarliestRow(firstPos, uid, uidList, num, backward);
219 220 221 222 223 224 225 226 227 228 229 230 231 232
    }

    if (uid > uidList[lastPos] || uid < uidList[firstPos]) {
      return -1;
    }

    int32_t numOfRows = lastPos - firstPos + 1;
    int32_t midPos = (numOfRows >> 1u) + firstPos;

    if (uid < uidList[midPos]) {
      lastPos = midPos - 1;
    } else if (uid > uidList[midPos]) {
      firstPos = midPos + 1;
    } else {
233
      return findEarliestRow(midPos, uid, uidList, num, backward);
234 235 236 237
    }
  }
}

H
Haojun Liao 已提交
238
int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t iStt, int8_t backward, uint64_t suid,
239 240
                       uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange, SSttBlockLoadInfo* pBlockLoadInfo,
                       const char* idStr) {
H
Hongze Cheng 已提交
241
  int32_t code = 0;
242
  *pIter = taosMemoryCalloc(1, sizeof(SLDataIter));
243 244 245 246
  if (*pIter == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }
H
Hongze Cheng 已提交
247

248
  (*pIter)->uid = uid;
249
  (*pIter)->pReader = pReader;
H
Hongze Cheng 已提交
250
  (*pIter)->iStt = iStt;
251
  (*pIter)->backward = backward;
H
Haojun Liao 已提交
252 253
  (*pIter)->verRange = *pRange;
  (*pIter)->timeWindow = *pTimeWindow;
H
Hongze Cheng 已提交
254

255
  (*pIter)->pBlockLoadInfo = pBlockLoadInfo;
256 257 258

  size_t size = taosArrayGetSize(pBlockLoadInfo->aSttBlk);
  if (size == 0) {
259 260
    int64_t st = taosGetTimestampUs();

261 262 263
    code = tsdbReadSttBlk(pReader, iStt, pBlockLoadInfo->aSttBlk);
    if (code) {
      goto _exit;
264 265 266
    }

    // only apply to the child tables, ordinary tables will not incur this filter procedure.
267
    size = taosArrayGetSize(pBlockLoadInfo->aSttBlk);
268

H
Haojun Liao 已提交
269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301
    if (size > 1) {
      SSttBlk *pStart = taosArrayGet(pBlockLoadInfo->aSttBlk, 0);
      SSttBlk *pEnd = taosArrayGet(pBlockLoadInfo->aSttBlk, size - 1);

      // all identical
      if (pStart->suid == pEnd->suid) {
        if (pStart->suid == suid) {
          // do nothing
        } else if (pStart->suid != suid) {
          // no qualified stt block existed
          (*pIter)->iSttBlk = -1;
          double el = (taosGetTimestampUs() - st)/1000.0;
          tsdbDebug("load the last file info completed, elapsed time:%.2fms, %s", el, idStr);
          return code;
        }
      } else {
        SArray *pTmp = taosArrayInit(size, sizeof(SSttBlk));
        for (int32_t i = 0; i < size; ++i) {
          SSttBlk *p = taosArrayGet(pBlockLoadInfo->aSttBlk, i);
          uint64_t s = p->suid;
          if (s < suid) {
            continue;
          }

          if (s == suid) {
            taosArrayPush(pTmp, p);
          } else if (s > suid) {
            break;
          }
        }

        taosArrayDestroy(pBlockLoadInfo->aSttBlk);
        pBlockLoadInfo->aSttBlk = pTmp;
302
      }
303
    }
304 305 306

    double el = (taosGetTimestampUs() - st)/1000.0;
    tsdbDebug("load the last file info completed, elapsed time:%.2fms, %s", el, idStr);
307
  }
H
Hongze Cheng 已提交
308

309
  size = taosArrayGetSize(pBlockLoadInfo->aSttBlk);
310 311

  // find the start block
H
Haojun Liao 已提交
312 313
  (*pIter)->iSttBlk = binarySearchForStartBlock(pBlockLoadInfo->aSttBlk->pData, size, uid, backward);
  if ((*pIter)->iSttBlk != -1) {
314
    (*pIter)->pSttBlk = taosArrayGet(pBlockLoadInfo->aSttBlk, (*pIter)->iSttBlk);
H
Haojun Liao 已提交
315
    (*pIter)->iRow = ((*pIter)->backward) ? (*pIter)->pSttBlk->nRow : -1;
H
Hongze Cheng 已提交
316 317 318 319 320 321 322
  }

_exit:
  return code;
}

void tLDataIterClose(SLDataIter *pIter) {
323
  taosMemoryFree(pIter);
H
Hongze Cheng 已提交
324 325 326
}

void tLDataIterNextBlock(SLDataIter *pIter) {
H
Hongze Cheng 已提交
327
  int32_t step = pIter->backward ? -1 : 1;
H
Hongze Cheng 已提交
328
  pIter->iSttBlk += step;
H
Hongze Cheng 已提交
329

330
  int32_t index = -1;
H
Haojun Liao 已提交
331
  size_t  size = pIter->pBlockLoadInfo->aSttBlk->size;
H
Hongze Cheng 已提交
332
  for (int32_t i = pIter->iSttBlk; i < size && i >= 0; i += step) {
333
    SSttBlk *p = taosArrayGet(pIter->pBlockLoadInfo->aSttBlk, i);
334 335 336 337 338
    if ((!pIter->backward) && p->minUid > pIter->uid) {
      break;
    }

    if (pIter->backward && p->maxUid < pIter->uid) {
339 340 341
      break;
    }

342
    // check uid firstly
343
    if (p->minUid <= pIter->uid && p->maxUid >= pIter->uid) {
344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366
      if ((!pIter->backward) && p->minKey > pIter->timeWindow.ekey) {
        break;
      }

      if (pIter->backward && p->maxKey < pIter->timeWindow.skey) {
        break;
      }

      // check time range secondly
      if (p->minKey <= pIter->timeWindow.ekey && p->maxKey >= pIter->timeWindow.skey) {
        if ((!pIter->backward) && p->minVer > pIter->verRange.maxVer) {
          break;
        }

        if (pIter->backward && p->maxVer < pIter->verRange.minVer) {
          break;
        }

        if (p->minVer <= pIter->verRange.maxVer && p->maxVer >= pIter->verRange.minVer) {
          index = i;
          break;
        }
      }
367 368 369
    }
  }

370 371
  pIter->pSttBlk = NULL;
  if (index != -1) {
H
Haojun Liao 已提交
372
    pIter->iSttBlk = index;
373
    pIter->pSttBlk = (SSttBlk *)taosArrayGet(pIter->pBlockLoadInfo->aSttBlk, pIter->iSttBlk);
374 375 376
  }
}

H
Haojun Liao 已提交
377
static void findNextValidRow(SLDataIter *pIter, const char* idStr) {
H
Hongze Cheng 已提交
378
  int32_t step = pIter->backward ? -1 : 1;
379

H
Hongze Cheng 已提交
380 381
  bool        hasVal = false;
  int32_t     i = pIter->iRow;
382

H
Haojun Liao 已提交
383
  SBlockData *pBlockData = loadLastBlock(pIter, idStr);
384

385 386 387
  // mostly we only need to find the start position for a given table
  if ((((i == 0) && (!pIter->backward)) || (i == pBlockData->nRow - 1 && pIter->backward)) && pBlockData->aUid != NULL) {
    i = binarySearchForStartRowIndex((uint64_t*)pBlockData->aUid, pBlockData->nRow, pIter->uid, pIter->backward);
H
Haojun Liao 已提交
388 389 390 391
    if (i == -1) {
      pIter->iRow = -1;
      return;
    }
392 393
  }

394 395
  for (; i < pBlockData->nRow && i >= 0; i += step) {
    if (pBlockData->aUid != NULL) {
396
      if (!pIter->backward) {
397
        /*if (pBlockData->aUid[i] < pIter->uid) {
398
          continue;
399
        } else */if (pBlockData->aUid[i] > pIter->uid) {
400 401 402
          break;
        }
      } else {
403
        /*if (pBlockData->aUid[i] > pIter->uid) {
404
          continue;
405
        } else */if (pBlockData->aUid[i] < pIter->uid) {
406 407 408 409 410
          break;
        }
      }
    }

411
    int64_t ts = pBlockData->aTSKEY[i];
H
Hongze Cheng 已提交
412
    if (!pIter->backward) {               // asc
413 414 415 416 417 418 419 420 421 422 423
      if (ts > pIter->timeWindow.ekey) {  // no more data
        break;
      } else if (ts < pIter->timeWindow.skey) {
        continue;
      }
    } else {
      if (ts < pIter->timeWindow.skey) {
        break;
      } else if (ts > pIter->timeWindow.ekey) {
        continue;
      }
424 425
    }

426
    int64_t ver = pBlockData->aVersion[i];
427 428 429 430 431 432 433 434 435 436 437
    if (ver < pIter->verRange.minVer) {
      continue;
    }

    // todo opt handle desc case
    if (ver > pIter->verRange.maxVer) {
      continue;
    }

    hasVal = true;
    break;
H
Hongze Cheng 已提交
438
  }
439

H
Hongze Cheng 已提交
440
  pIter->iRow = (hasVal) ? i : -1;
H
Hongze Cheng 已提交
441 442
}

H
Haojun Liao 已提交
443
bool tLDataIterNextRow(SLDataIter *pIter, const char* idStr) {
H
Hongze Cheng 已提交
444
  int32_t code = 0;
H
Hongze Cheng 已提交
445
  int32_t step = pIter->backward ? -1 : 1;
446 447

  // no qualified last file block in current file, no need to fetch row
H
Hongze Cheng 已提交
448
  if (pIter->pSttBlk == NULL) {
449 450
    return false;
  }
H
Hongze Cheng 已提交
451

452
  int32_t iBlockL = pIter->iSttBlk;
H
Haojun Liao 已提交
453
  SBlockData *pBlockData = loadLastBlock(pIter, idStr);
454 455
  pIter->iRow += step;

H
Hongze Cheng 已提交
456
  while (1) {
H
Haojun Liao 已提交
457
    findNextValidRow(pIter, idStr);
458

459
    if (pIter->iRow >= pBlockData->nRow || pIter->iRow < 0) {
460
      tLDataIterNextBlock(pIter);
H
Hongze Cheng 已提交
461
      if (pIter->pSttBlk == NULL) {  // no more data
462 463 464 465
        goto _exit;
      }
    } else {
      break;
H
Hongze Cheng 已提交
466 467
    }

H
Hongze Cheng 已提交
468
    if (iBlockL != pIter->iSttBlk) {
H
Haojun Liao 已提交
469
      pBlockData = loadLastBlock(pIter, idStr);
H
Haojun Liao 已提交
470
      pIter->iRow += step;
H
Hongze Cheng 已提交
471 472 473
    }
  }

474 475 476
  pIter->rInfo.suid = pBlockData->suid;
  pIter->rInfo.uid = pBlockData->uid;
  pIter->rInfo.row = tsdbRowFromBlockData(pBlockData, pIter->iRow);
H
Hongze Cheng 已提交
477 478

_exit:
H
Hongze Cheng 已提交
479
  if (code != TSDB_CODE_SUCCESS) {
480
    terrno = code;
481
  }
482

H
Hongze Cheng 已提交
483
  return (code == TSDB_CODE_SUCCESS) && (pIter->pSttBlk != NULL);
H
Hongze Cheng 已提交
484 485
}

H
Hongze Cheng 已提交
486
SRowInfo *tLDataIterGet(SLDataIter *pIter) { return &pIter->rInfo; }
H
Hongze Cheng 已提交
487 488

// SMergeTree =================================================
H
Hongze Cheng 已提交
489
static FORCE_INLINE int32_t tLDataIterCmprFn(const void *p1, const void *p2) {
H
Hongze Cheng 已提交
490 491
  SLDataIter *pIter1 = (SLDataIter *)(((uint8_t *)p1) - sizeof(SRBTreeNode));
  SLDataIter *pIter2 = (SLDataIter *)(((uint8_t *)p2) - sizeof(SRBTreeNode));
H
Hongze Cheng 已提交
492

493 494
  TSDBKEY key1 = TSDBROW_KEY(&pIter1->rInfo.row);
  TSDBKEY key2 = TSDBROW_KEY(&pIter2->rInfo.row);
H
Hongze Cheng 已提交
495

496 497 498 499 500 501 502 503 504 505 506 507 508
  if (key1.ts < key2.ts) {
    return -1;
  } else if (key1.ts > key2.ts) {
    return 1;
  } else {
    if (key1.version < key2.version) {
      return -1;
    } else if (key1.version > key2.version) {
      return 1;
    } else {
      return 0;
    }
  }
H
Hongze Cheng 已提交
509 510
}

511
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid,
512 513
                       STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo,
                       bool destroyLoadInfo, const char *idStr) {
H
Hongze Cheng 已提交
514
  pMTree->backward = backward;
515
  pMTree->pIter = NULL;
516
  pMTree->pIterList = taosArrayInit(4, POINTER_BYTES);
517 518 519
  if (pMTree->pIterList == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
520

H
Haojun Liao 已提交
521 522
  pMTree->idStr = idStr;

H
Hongze Cheng 已提交
523
  tRBTreeCreate(&pMTree->rbt, tLDataIterCmprFn);
524 525
  int32_t code = TSDB_CODE_SUCCESS;

526 527
  pMTree->pLoadInfo = pBlockLoadInfo;
  pMTree->destroyLoadInfo = destroyLoadInfo;
528

H
Hongze Cheng 已提交
529
  for (int32_t i = 0; i < pFReader->pSet->nSttF; ++i) {  // open all last file
530
    struct SLDataIter* pIter = NULL;
531
    code = tLDataIterOpen(&pIter, pFReader, i, pMTree->backward, suid, uid, pTimeWindow, pVerRange, &pMTree->pLoadInfo[i], pMTree->idStr);
532 533 534 535
    if (code != TSDB_CODE_SUCCESS) {
      goto _end;
    }

H
Haojun Liao 已提交
536
    bool hasVal = tLDataIterNextRow(pIter, pMTree->idStr);
537
    if (hasVal) {
538 539
      taosArrayPush(pMTree->pIterList, &pIter);
      tMergeTreeAddIter(pMTree, pIter);
540
    } else {
541
      tLDataIterClose(pIter);
542 543
    }
  }
544 545 546

  return code;

H
Hongze Cheng 已提交
547
_end:
548 549
  tMergeTreeClose(pMTree);
  return code;
H
Hongze Cheng 已提交
550
}
H
Hongze Cheng 已提交
551

552 553
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter) { tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pIter); }

H
Hongze Cheng 已提交
554
bool tMergeTreeNext(SMergeTree *pMTree) {
555 556 557 558
  int32_t code = TSDB_CODE_SUCCESS;
  if (pMTree->pIter) {
    SLDataIter *pIter = pMTree->pIter;

H
Haojun Liao 已提交
559
    bool hasVal = tLDataIterNextRow(pIter, pMTree->idStr);
560 561 562 563 564 565 566
    if (!hasVal) {
      pMTree->pIter = NULL;
    }

    // compare with min in RB Tree
    pIter = (SLDataIter *)tRBTreeMin(&pMTree->rbt);
    if (pMTree->pIter && pIter) {
H
Hongze Cheng 已提交
567
      int32_t c = pMTree->rbt.cmprFn(RBTREE_NODE_PAYLOAD(&pMTree->pIter->node), RBTREE_NODE_PAYLOAD(&pIter->node));
568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586
      if (c > 0) {
        tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pMTree->pIter);
        pMTree->pIter = NULL;
      } else {
        ASSERT(c);
      }
    }
  }

  if (pMTree->pIter == NULL) {
    pMTree->pIter = (SLDataIter *)tRBTreeMin(&pMTree->rbt);
    if (pMTree->pIter) {
      tRBTreeDrop(&pMTree->rbt, (SRBTreeNode *)pMTree->pIter);
    }
  }

  return pMTree->pIter != NULL;
}

H
Hongze Cheng 已提交
587
TSDBROW tMergeTreeGetRow(SMergeTree *pMTree) { return pMTree->pIter->rInfo.row; }
588

H
Hongze Cheng 已提交
589
void tMergeTreeClose(SMergeTree *pMTree) {
590
  size_t size = taosArrayGetSize(pMTree->pIterList);
H
Hongze Cheng 已提交
591 592
  for (int32_t i = 0; i < size; ++i) {
    SLDataIter *pIter = taosArrayGetP(pMTree->pIterList, i);
593 594
    tLDataIterClose(pIter);
  }
595

596 597
  pMTree->pIterList = taosArrayDestroy(pMTree->pIterList);
  pMTree->pIter = NULL;
598 599 600 601 602

  if (pMTree->destroyLoadInfo) {
    pMTree->pLoadInfo = destroyLastBlockLoadInfo(pMTree->pLoadInfo);
    pMTree->destroyLoadInfo = false;
  }
603
}