tsdbMergeTree.c 18.9 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tsdb.h"

18
// SLDataIter =================================================
H
Hongze Cheng 已提交
19
struct SLDataIter {
H
Hongze Cheng 已提交
20 21 22 23 24 25 26 27 28 29 30 31
  SRBTreeNode        node;
  SSttBlk           *pSttBlk;
  SDataFReader      *pReader;
  int32_t            iStt;
  int8_t             backward;
  int32_t            iSttBlk;
  int32_t            iRow;
  SRowInfo           rInfo;
  uint64_t           uid;
  STimeWindow        timeWindow;
  SVersionRange      verRange;
  SSttBlockLoadInfo *pBlockLoadInfo;
H
Hongze Cheng 已提交
32
};
H
Hongze Cheng 已提交
33

H
Hongze Cheng 已提交
34 35
SSttBlockLoadInfo *tCreateLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols) {
  SSttBlockLoadInfo *pLoadInfo = taosMemoryCalloc(TSDB_DEFAULT_STT_FILE, sizeof(SSttBlockLoadInfo));
36
  if (pLoadInfo == NULL) {
H
Hongze Cheng 已提交
37
    terrno = TSDB_CODE_OUT_OF_MEMORY;
38 39 40
    return NULL;
  }

H
Hongze Cheng 已提交
41
  for (int32_t i = 0; i < TSDB_DEFAULT_STT_FILE; ++i) {
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
    pLoadInfo[i].blockIndex[0] = -1;
    pLoadInfo[i].blockIndex[1] = -1;
    pLoadInfo[i].currentLoadBlockIndex = 1;

    int32_t code = tBlockDataCreate(&pLoadInfo[i].blockData[0]);
    if (code) {
      terrno = code;
    }

    code = tBlockDataCreate(&pLoadInfo[i].blockData[1]);
    if (code) {
      terrno = code;
    }

    pLoadInfo[i].aSttBlk = taosArrayInit(4, sizeof(SSttBlk));
H
Haojun Liao 已提交
57 58 59
    pLoadInfo[i].pSchema = pSchema;
    pLoadInfo[i].colIds = colList;
    pLoadInfo[i].numOfCols = numOfCols;
60 61 62 63 64
  }

  return pLoadInfo;
}

H
Hongze Cheng 已提交
65 66
void resetLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
  for (int32_t i = 0; i < TSDB_DEFAULT_STT_FILE; ++i) {
67 68 69
    pLoadInfo[i].currentLoadBlockIndex = 1;
    pLoadInfo[i].blockIndex[0] = -1;
    pLoadInfo[i].blockIndex[1] = -1;
70

71
    taosArrayClear(pLoadInfo[i].aSttBlk);
H
Haojun Liao 已提交
72 73 74

    pLoadInfo[i].elapsedTime = 0;
    pLoadInfo[i].loadBlocks = 0;
75
    pLoadInfo[i].sttBlockLoaded = false;
H
Haojun Liao 已提交
76 77 78
  }
}

H
Hongze Cheng 已提交
79 80
void getLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, int64_t *blocks, double *el) {
  for (int32_t i = 0; i < TSDB_DEFAULT_STT_FILE; ++i) {
H
Haojun Liao 已提交
81 82
    *el += pLoadInfo[i].elapsedTime;
    *blocks += pLoadInfo[i].loadBlocks;
83 84 85
  }
}

H
Hongze Cheng 已提交
86 87
void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
  for (int32_t i = 0; i < TSDB_DEFAULT_STT_FILE; ++i) {
88 89 90 91 92 93 94 95 96 97 98 99 100 101
    pLoadInfo[i].currentLoadBlockIndex = 1;
    pLoadInfo[i].blockIndex[0] = -1;
    pLoadInfo[i].blockIndex[1] = -1;

    tBlockDataDestroy(&pLoadInfo[i].blockData[0], true);
    tBlockDataDestroy(&pLoadInfo[i].blockData[1], true);

    taosArrayDestroy(pLoadInfo[i].aSttBlk);
  }

  taosMemoryFree(pLoadInfo);
  return NULL;
}

H
Hongze Cheng 已提交
102
static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) {
103 104
  int32_t code = 0;

H
Hongze Cheng 已提交
105 106
  SSttBlockLoadInfo *pInfo = pIter->pBlockLoadInfo;
  if (pInfo->blockIndex[0] == pIter->iSttBlk) {
107 108 109 110 111
    if (pInfo->currentLoadBlockIndex != 0) {
      tsdbDebug("current load index is set to 0, block index:%d, file index:%d, due to uid:%" PRIu64 ", load data, %s",
                pIter->iSttBlk, pIter->iStt, pIter->uid, idStr);
      pInfo->currentLoadBlockIndex = 0;
    }
112 113 114
    return &pInfo->blockData[0];
  }

115
  if (pInfo->blockIndex[1] == pIter->iSttBlk) {
116
    if (pInfo->currentLoadBlockIndex != 1) {
H
Hongze Cheng 已提交
117
      tsdbDebug("current load index is set to 1, block index:%d, file index:%d, due to uid:%" PRIu64 ", load data, %s",
118 119 120
                pIter->iSttBlk, pIter->iStt, pIter->uid, idStr);
      pInfo->currentLoadBlockIndex = 1;
    }
121 122 123
    return &pInfo->blockData[1];
  }

124
  if (pIter->pSttBlk == NULL || pInfo->pSchema == NULL) {
H
Haojun Liao 已提交
125 126 127 128
    return NULL;
  }

  // current block not loaded yet
129
  pInfo->currentLoadBlockIndex ^= 1;
H
Haojun Liao 已提交
130
  int64_t st = taosGetTimestampUs();
131

H
Haojun Liao 已提交
132
  SBlockData *pBlock = &pInfo->blockData[pInfo->currentLoadBlockIndex];
H
Haojun Liao 已提交
133

H
Haojun Liao 已提交
134 135 136 137 138 139
  TABLEID id = {0};
  if (pIter->pSttBlk->suid != 0) {
    id.suid = pIter->pSttBlk->suid;
  } else {
    id.uid = pIter->uid;
  }
H
Haojun Liao 已提交
140

H
Haojun Liao 已提交
141 142 143 144
  code = tBlockDataInit(pBlock, &id, pInfo->pSchema, pInfo->colIds, pInfo->numOfCols);
  if (code != TSDB_CODE_SUCCESS) {
    goto _exit;
  }
145

H
Haojun Liao 已提交
146 147 148 149
  code = tsdbReadSttBlock(pIter->pReader, pIter->iStt, pIter->pSttBlk, pBlock);
  if (code != TSDB_CODE_SUCCESS) {
    goto _exit;
  }
H
Haojun Liao 已提交
150

H
Haojun Liao 已提交
151 152 153
  double el = (taosGetTimestampUs() - st) / 1000.0;
  pInfo->elapsedTime += el;
  pInfo->loadBlocks += 1;
154

H
Haojun Liao 已提交
155
  tsdbDebug("read last block, total load:%d, trigger by uid:%" PRIu64
156 157
            ", last file index:%d, last block index:%d, entry:%d, rows:%d, %p, elapsed time:%.2f ms, %s",
            pInfo->loadBlocks, pIter->uid, pIter->iStt, pIter->iSttBlk, pInfo->currentLoadBlockIndex, pBlock->nRow, pBlock, el,
H
Haojun Liao 已提交
158
            idStr);
159

H
Haojun Liao 已提交
160 161
  pInfo->blockIndex[pInfo->currentLoadBlockIndex] = pIter->iSttBlk;
  pIter->iRow = (pIter->backward) ? pInfo->blockData[pInfo->currentLoadBlockIndex].nRow : -1;
H
Haojun Liao 已提交
162 163

  tsdbDebug("last block index list:%d, %d, rowIndex:%d %s", pInfo->blockIndex[0], pInfo->blockIndex[1], pIter->iRow, idStr);
164 165
  return &pInfo->blockData[pInfo->currentLoadBlockIndex];

H
Hongze Cheng 已提交
166
_exit:
167 168 169 170 171
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
  }

  return NULL;
172 173
}

174
// find the earliest block that contains the required records
H
Hongze Cheng 已提交
175 176
static FORCE_INLINE int32_t findEarliestIndex(int32_t index, uint64_t uid, const SSttBlk *pBlockList, int32_t num,
                                              int32_t backward) {
177
  int32_t i = index;
H
Hongze Cheng 已提交
178
  int32_t step = backward ? 1 : -1;
H
Haojun Liao 已提交
179
  while (i >= 0 && i < num && uid >= pBlockList[i].minUid && uid <= pBlockList[i].maxUid) {
180 181 182 183 184
    i += step;
  }
  return i - step;
}

H
Hongze Cheng 已提交
185
static int32_t binarySearchForStartBlock(SSttBlk *pBlockList, int32_t num, uint64_t uid, int32_t backward) {
186 187 188 189 190 191 192 193 194 195 196 197 198 199 200
  int32_t midPos = -1;
  if (num <= 0) {
    return -1;
  }

  int32_t firstPos = 0;
  int32_t lastPos = num - 1;

  // find the first position which is bigger than the key
  if ((uid > pBlockList[lastPos].maxUid) || (uid < pBlockList[firstPos].minUid)) {
    return -1;
  }

  while (1) {
    if (uid >= pBlockList[firstPos].minUid && uid <= pBlockList[firstPos].maxUid) {
201
      return findEarliestIndex(firstPos, uid, pBlockList, num, backward);
202 203 204 205 206 207 208 209 210 211 212 213 214 215
    }

    if (uid > pBlockList[lastPos].maxUid || uid < pBlockList[firstPos].minUid) {
      return -1;
    }

    int32_t numOfRows = lastPos - firstPos + 1;
    midPos = (numOfRows >> 1u) + firstPos;

    if (uid < pBlockList[midPos].minUid) {
      lastPos = midPos - 1;
    } else if (uid > pBlockList[midPos].maxUid) {
      firstPos = midPos + 1;
    } else {
216
      return findEarliestIndex(midPos, uid, pBlockList, num, backward);
217 218 219 220
    }
  }
}

H
Hongze Cheng 已提交
221 222
static FORCE_INLINE int32_t findEarliestRow(int32_t index, uint64_t uid, const uint64_t *uidList, int32_t num,
                                            int32_t backward) {
223
  int32_t i = index;
H
Hongze Cheng 已提交
224
  int32_t step = backward ? 1 : -1;
H
Haojun Liao 已提交
225
  while (i >= 0 && i < num && uid == uidList[i]) {
226 227 228 229 230
    i += step;
  }
  return i - step;
}

H
Hongze Cheng 已提交
231
static int32_t binarySearchForStartRowIndex(uint64_t *uidList, int32_t num, uint64_t uid, int32_t backward) {
232 233 234 235 236 237 238 239 240 241
  int32_t firstPos = 0;
  int32_t lastPos = num - 1;

  // find the first position which is bigger than the key
  if ((uid > uidList[lastPos]) || (uid < uidList[firstPos])) {
    return -1;
  }

  while (1) {
    if (uid == uidList[firstPos]) {
242
      return findEarliestRow(firstPos, uid, uidList, num, backward);
243 244 245 246 247 248 249 250 251 252 253 254 255 256
    }

    if (uid > uidList[lastPos] || uid < uidList[firstPos]) {
      return -1;
    }

    int32_t numOfRows = lastPos - firstPos + 1;
    int32_t midPos = (numOfRows >> 1u) + firstPos;

    if (uid < uidList[midPos]) {
      lastPos = midPos - 1;
    } else if (uid > uidList[midPos]) {
      firstPos = midPos + 1;
    } else {
257
      return findEarliestRow(midPos, uid, uidList, num, backward);
258 259 260 261
    }
  }
}

H
Haojun Liao 已提交
262
int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t iStt, int8_t backward, uint64_t suid,
H
Hongze Cheng 已提交
263 264
                       uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange, SSttBlockLoadInfo *pBlockLoadInfo,
                       const char *idStr) {
H
Haojun Liao 已提交
265 266
  int32_t code = TSDB_CODE_SUCCESS;

267
  *pIter = taosMemoryCalloc(1, sizeof(SLDataIter));
268 269 270 271
  if (*pIter == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }
H
Hongze Cheng 已提交
272

273
  (*pIter)->uid = uid;
274
  (*pIter)->pReader = pReader;
H
Hongze Cheng 已提交
275
  (*pIter)->iStt = iStt;
276
  (*pIter)->backward = backward;
H
Haojun Liao 已提交
277 278
  (*pIter)->verRange = *pRange;
  (*pIter)->timeWindow = *pTimeWindow;
H
Hongze Cheng 已提交
279

280
  (*pIter)->pBlockLoadInfo = pBlockLoadInfo;
281

282
  if (!pBlockLoadInfo->sttBlockLoaded) {
283
    int64_t st = taosGetTimestampUs();
284
    pBlockLoadInfo->sttBlockLoaded = true;
285

286 287 288
    code = tsdbReadSttBlk(pReader, iStt, pBlockLoadInfo->aSttBlk);
    if (code) {
      goto _exit;
289 290 291
    }

    // only apply to the child tables, ordinary tables will not incur this filter procedure.
292
    size_t size = taosArrayGetSize(pBlockLoadInfo->aSttBlk);
293

H
Haojun Liao 已提交
294
    if (size >= 1) {
H
Haojun Liao 已提交
295 296 297 298 299
      SSttBlk *pStart = taosArrayGet(pBlockLoadInfo->aSttBlk, 0);
      SSttBlk *pEnd = taosArrayGet(pBlockLoadInfo->aSttBlk, size - 1);

      // all identical
      if (pStart->suid == pEnd->suid) {
300
        if (pStart->suid != suid) {
H
Haojun Liao 已提交
301
          // no qualified stt block existed
302 303
          taosArrayClear(pBlockLoadInfo->aSttBlk);

H
Haojun Liao 已提交
304
          (*pIter)->iSttBlk = -1;
H
Hongze Cheng 已提交
305
          double el = (taosGetTimestampUs() - st) / 1000.0;
H
Haojun Liao 已提交
306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326
          tsdbDebug("load the last file info completed, elapsed time:%.2fms, %s", el, idStr);
          return code;
        }
      } else {
        SArray *pTmp = taosArrayInit(size, sizeof(SSttBlk));
        for (int32_t i = 0; i < size; ++i) {
          SSttBlk *p = taosArrayGet(pBlockLoadInfo->aSttBlk, i);
          uint64_t s = p->suid;
          if (s < suid) {
            continue;
          }

          if (s == suid) {
            taosArrayPush(pTmp, p);
          } else if (s > suid) {
            break;
          }
        }

        taosArrayDestroy(pBlockLoadInfo->aSttBlk);
        pBlockLoadInfo->aSttBlk = pTmp;
327
      }
328
    }
329

H
Hongze Cheng 已提交
330
    double el = (taosGetTimestampUs() - st) / 1000.0;
331
    tsdbDebug("load the last file info completed, elapsed time:%.2fms, %s", el, idStr);
332
  }
H
Hongze Cheng 已提交
333

334
  size_t size = taosArrayGetSize(pBlockLoadInfo->aSttBlk);
335 336

  // find the start block
H
Haojun Liao 已提交
337 338
  (*pIter)->iSttBlk = binarySearchForStartBlock(pBlockLoadInfo->aSttBlk->pData, size, uid, backward);
  if ((*pIter)->iSttBlk != -1) {
339
    (*pIter)->pSttBlk = taosArrayGet(pBlockLoadInfo->aSttBlk, (*pIter)->iSttBlk);
H
Haojun Liao 已提交
340
    (*pIter)->iRow = ((*pIter)->backward) ? (*pIter)->pSttBlk->nRow : -1;
H
Hongze Cheng 已提交
341 342
  }

H
Haojun Liao 已提交
343 344
  return code;

H
Hongze Cheng 已提交
345
_exit:
H
Haojun Liao 已提交
346
  taosMemoryFree(*pIter);
H
Hongze Cheng 已提交
347 348 349
  return code;
}

H
Hongze Cheng 已提交
350
void tLDataIterClose(SLDataIter *pIter) { taosMemoryFree(pIter); }
H
Hongze Cheng 已提交
351

H
Hongze Cheng 已提交
352
void tLDataIterNextBlock(SLDataIter *pIter, const char *idStr) {
H
Hongze Cheng 已提交
353
  int32_t step = pIter->backward ? -1 : 1;
354 355
  int32_t oldIndex = pIter->iSttBlk;

H
Hongze Cheng 已提交
356
  pIter->iSttBlk += step;
H
Hongze Cheng 已提交
357

358
  int32_t index = -1;
H
Haojun Liao 已提交
359
  size_t  size = pIter->pBlockLoadInfo->aSttBlk->size;
H
Hongze Cheng 已提交
360
  for (int32_t i = pIter->iSttBlk; i < size && i >= 0; i += step) {
361
    SSttBlk *p = taosArrayGet(pIter->pBlockLoadInfo->aSttBlk, i);
362 363 364 365 366
    if ((!pIter->backward) && p->minUid > pIter->uid) {
      break;
    }

    if (pIter->backward && p->maxUid < pIter->uid) {
367 368 369
      break;
    }

370
    // check uid firstly
371
    if (p->minUid <= pIter->uid && p->maxUid >= pIter->uid) {
372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394
      if ((!pIter->backward) && p->minKey > pIter->timeWindow.ekey) {
        break;
      }

      if (pIter->backward && p->maxKey < pIter->timeWindow.skey) {
        break;
      }

      // check time range secondly
      if (p->minKey <= pIter->timeWindow.ekey && p->maxKey >= pIter->timeWindow.skey) {
        if ((!pIter->backward) && p->minVer > pIter->verRange.maxVer) {
          break;
        }

        if (pIter->backward && p->maxVer < pIter->verRange.minVer) {
          break;
        }

        if (p->minVer <= pIter->verRange.maxVer && p->maxVer >= pIter->verRange.minVer) {
          index = i;
          break;
        }
      }
395 396 397
    }
  }

398 399
  pIter->pSttBlk = NULL;
  if (index != -1) {
H
Haojun Liao 已提交
400
    pIter->iSttBlk = index;
401
    pIter->pSttBlk = (SSttBlk *)taosArrayGet(pIter->pBlockLoadInfo->aSttBlk, pIter->iSttBlk);
H
Hongze Cheng 已提交
402 403
    tsdbDebug("try next last file block:%d from %d, trigger by uid:%" PRIu64 ", file index:%d, %s", pIter->iSttBlk,
              oldIndex, pIter->uid, pIter->iStt, idStr);
404
  } else {
H
Hongze Cheng 已提交
405
    tsdbDebug("no more last block qualified, uid:%" PRIu64 ", file index::%d, %s", pIter->uid, oldIndex, idStr);
406 407 408
  }
}

H
Hongze Cheng 已提交
409
static void findNextValidRow(SLDataIter *pIter, const char *idStr) {
H
Hongze Cheng 已提交
410
  int32_t step = pIter->backward ? -1 : 1;
411

H
Hongze Cheng 已提交
412 413
  bool    hasVal = false;
  int32_t i = pIter->iRow;
414

H
Haojun Liao 已提交
415
  SBlockData *pBlockData = loadLastBlock(pIter, idStr);
416

417
  // mostly we only need to find the start position for a given table
H
Hongze Cheng 已提交
418 419 420
  if ((((i == 0) && (!pIter->backward)) || (i == pBlockData->nRow - 1 && pIter->backward)) &&
      pBlockData->aUid != NULL) {
    i = binarySearchForStartRowIndex((uint64_t *)pBlockData->aUid, pBlockData->nRow, pIter->uid, pIter->backward);
H
Haojun Liao 已提交
421
    if (i == -1) {
H
Haojun Liao 已提交
422
      tsdbDebug("failed to find the data in pBlockData, uid:%"PRIu64" , %s", pIter->uid, idStr);
H
Haojun Liao 已提交
423 424 425
      pIter->iRow = -1;
      return;
    }
426 427
  }

428 429
  for (; i < pBlockData->nRow && i >= 0; i += step) {
    if (pBlockData->aUid != NULL) {
430
      if (!pIter->backward) {
431
        if (pBlockData->aUid[i] > pIter->uid) {
432 433 434
          break;
        }
      } else {
435
        if (pBlockData->aUid[i] < pIter->uid) {
436 437 438 439 440
          break;
        }
      }
    }

441
    int64_t ts = pBlockData->aTSKEY[i];
H
Hongze Cheng 已提交
442
    if (!pIter->backward) {               // asc
443 444 445 446 447 448 449 450 451 452 453
      if (ts > pIter->timeWindow.ekey) {  // no more data
        break;
      } else if (ts < pIter->timeWindow.skey) {
        continue;
      }
    } else {
      if (ts < pIter->timeWindow.skey) {
        break;
      } else if (ts > pIter->timeWindow.ekey) {
        continue;
      }
454 455
    }

456
    int64_t ver = pBlockData->aVersion[i];
457 458 459 460 461 462 463 464 465 466 467
    if (ver < pIter->verRange.minVer) {
      continue;
    }

    // todo opt handle desc case
    if (ver > pIter->verRange.maxVer) {
      continue;
    }

    hasVal = true;
    break;
H
Hongze Cheng 已提交
468
  }
469

H
Hongze Cheng 已提交
470
  pIter->iRow = (hasVal) ? i : -1;
H
Hongze Cheng 已提交
471 472
}

H
Hongze Cheng 已提交
473
bool tLDataIterNextRow(SLDataIter *pIter, const char *idStr) {
H
Hongze Cheng 已提交
474
  int32_t step = pIter->backward ? -1 : 1;
H
Haojun Liao 已提交
475
  terrno = TSDB_CODE_SUCCESS;
476 477

  // no qualified last file block in current file, no need to fetch row
H
Hongze Cheng 已提交
478
  if (pIter->pSttBlk == NULL) {
479 480
    return false;
  }
H
Hongze Cheng 已提交
481

H
Hongze Cheng 已提交
482
  int32_t     iBlockL = pIter->iSttBlk;
H
Haojun Liao 已提交
483
  SBlockData *pBlockData = loadLastBlock(pIter, idStr);
H
Haojun Liao 已提交
484
  if (pBlockData == NULL || terrno != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
485 486 487
    goto _exit;
  }

488 489
  pIter->iRow += step;

H
Hongze Cheng 已提交
490
  while (1) {
H
Haojun Liao 已提交
491
    findNextValidRow(pIter, idStr);
492

493
    if (pIter->iRow >= pBlockData->nRow || pIter->iRow < 0) {
494
      tLDataIterNextBlock(pIter, idStr);
H
Hongze Cheng 已提交
495
      if (pIter->pSttBlk == NULL) {  // no more data
496 497 498 499
        goto _exit;
      }
    } else {
      break;
H
Hongze Cheng 已提交
500 501
    }

H
Hongze Cheng 已提交
502
    if (iBlockL != pIter->iSttBlk) {
H
Haojun Liao 已提交
503
      pBlockData = loadLastBlock(pIter, idStr);
504 505 506 507
      if (pBlockData == NULL) {
        goto _exit;
      }
      pIter->iRow = pIter->backward? pBlockData->nRow-1:0;
H
Hongze Cheng 已提交
508 509 510
    }
  }

511 512 513
  pIter->rInfo.suid = pBlockData->suid;
  pIter->rInfo.uid = pBlockData->uid;
  pIter->rInfo.row = tsdbRowFromBlockData(pBlockData, pIter->iRow);
H
Hongze Cheng 已提交
514 515

_exit:
H
Haojun Liao 已提交
516
  return (terrno == TSDB_CODE_SUCCESS) && (pIter->pSttBlk != NULL);
H
Hongze Cheng 已提交
517 518
}

H
Hongze Cheng 已提交
519
SRowInfo *tLDataIterGet(SLDataIter *pIter) { return &pIter->rInfo; }
H
Hongze Cheng 已提交
520 521

// SMergeTree =================================================
H
Hongze Cheng 已提交
522 523 524
static FORCE_INLINE int32_t tLDataIterCmprFn(const SRBTreeNode *p1, const SRBTreeNode *p2) {
  SLDataIter *pIter1 = (SLDataIter *)(((uint8_t *)p1) - offsetof(SLDataIter, node));
  SLDataIter *pIter2 = (SLDataIter *)(((uint8_t *)p2) - offsetof(SLDataIter, node));
H
Hongze Cheng 已提交
525

526 527
  TSDBKEY key1 = TSDBROW_KEY(&pIter1->rInfo.row);
  TSDBKEY key2 = TSDBROW_KEY(&pIter2->rInfo.row);
H
Hongze Cheng 已提交
528

529 530 531 532 533 534 535 536 537 538 539 540 541
  if (key1.ts < key2.ts) {
    return -1;
  } else if (key1.ts > key2.ts) {
    return 1;
  } else {
    if (key1.version < key2.version) {
      return -1;
    } else if (key1.version > key2.version) {
      return 1;
    } else {
      return 0;
    }
  }
H
Hongze Cheng 已提交
542 543
}

544 545 546 547
static FORCE_INLINE int32_t tLDataIterDescCmprFn(const SRBTreeNode *p1, const SRBTreeNode *p2) {
  return -1 * tLDataIterCmprFn(p1, p2);
}

548
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid,
549 550
                       STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo,
                       bool destroyLoadInfo, const char *idStr) {
H
Hongze Cheng 已提交
551
  pMTree->backward = backward;
552
  pMTree->pIter = NULL;
553
  pMTree->pIterList = taosArrayInit(4, POINTER_BYTES);
554 555 556
  if (pMTree->pIterList == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
557

H
Haojun Liao 已提交
558
  pMTree->idStr = idStr;
559 560 561 562 563
  if (!pMTree->backward) { // asc
    tRBTreeCreate(&pMTree->rbt, tLDataIterCmprFn);
  } else { // desc
    tRBTreeCreate(&pMTree->rbt, tLDataIterDescCmprFn);
  }
564 565
  int32_t code = TSDB_CODE_SUCCESS;

566 567
  pMTree->pLoadInfo = pBlockLoadInfo;
  pMTree->destroyLoadInfo = destroyLoadInfo;
568
  ASSERT(pMTree->pLoadInfo != NULL);
569

H
Hongze Cheng 已提交
570
  for (int32_t i = 0; i < pFReader->pSet->nSttF; ++i) {  // open all last file
H
Hongze Cheng 已提交
571 572 573
    struct SLDataIter *pIter = NULL;
    code = tLDataIterOpen(&pIter, pFReader, i, pMTree->backward, suid, uid, pTimeWindow, pVerRange,
                          &pMTree->pLoadInfo[i], pMTree->idStr);
574 575 576 577
    if (code != TSDB_CODE_SUCCESS) {
      goto _end;
    }

H
Haojun Liao 已提交
578
    bool hasVal = tLDataIterNextRow(pIter, pMTree->idStr);
579
    if (hasVal) {
580 581
      taosArrayPush(pMTree->pIterList, &pIter);
      tMergeTreeAddIter(pMTree, pIter);
582
    } else {
583
      tLDataIterClose(pIter);
584 585
    }
  }
586 587 588

  return code;

H
Hongze Cheng 已提交
589
_end:
590 591
  tMergeTreeClose(pMTree);
  return code;
H
Hongze Cheng 已提交
592
}
H
Hongze Cheng 已提交
593

594 595
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter) { tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pIter); }

H
Hongze Cheng 已提交
596
bool tMergeTreeNext(SMergeTree *pMTree) {
597 598 599 600
  int32_t code = TSDB_CODE_SUCCESS;
  if (pMTree->pIter) {
    SLDataIter *pIter = pMTree->pIter;

H
Haojun Liao 已提交
601
    bool hasVal = tLDataIterNextRow(pIter, pMTree->idStr);
602 603 604 605 606 607 608
    if (!hasVal) {
      pMTree->pIter = NULL;
    }

    // compare with min in RB Tree
    pIter = (SLDataIter *)tRBTreeMin(&pMTree->rbt);
    if (pMTree->pIter && pIter) {
H
Hongze Cheng 已提交
609
      int32_t c = pMTree->rbt.cmprFn(&pMTree->pIter->node, &pIter->node);
610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628
      if (c > 0) {
        tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pMTree->pIter);
        pMTree->pIter = NULL;
      } else {
        ASSERT(c);
      }
    }
  }

  if (pMTree->pIter == NULL) {
    pMTree->pIter = (SLDataIter *)tRBTreeMin(&pMTree->rbt);
    if (pMTree->pIter) {
      tRBTreeDrop(&pMTree->rbt, (SRBTreeNode *)pMTree->pIter);
    }
  }

  return pMTree->pIter != NULL;
}

H
Hongze Cheng 已提交
629
TSDBROW tMergeTreeGetRow(SMergeTree *pMTree) { return pMTree->pIter->rInfo.row; }
630

H
Hongze Cheng 已提交
631
void tMergeTreeClose(SMergeTree *pMTree) {
632
  size_t size = taosArrayGetSize(pMTree->pIterList);
H
Hongze Cheng 已提交
633 634
  for (int32_t i = 0; i < size; ++i) {
    SLDataIter *pIter = taosArrayGetP(pMTree->pIterList, i);
635 636
    tLDataIterClose(pIter);
  }
637

638 639
  pMTree->pIterList = taosArrayDestroy(pMTree->pIterList);
  pMTree->pIter = NULL;
640 641 642 643 644

  if (pMTree->destroyLoadInfo) {
    pMTree->pLoadInfo = destroyLastBlockLoadInfo(pMTree->pLoadInfo);
    pMTree->destroyLoadInfo = false;
  }
645
}