tsdbMergeTree.c 20.7 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tsdb.h"

18
// SLDataIter =================================================
H
Hongze Cheng 已提交
19
struct SLDataIter {
H
Hongze Cheng 已提交
20 21 22 23 24 25 26 27 28 29 30 31
  SRBTreeNode        node;
  SSttBlk           *pSttBlk;
  SDataFReader      *pReader;
  int32_t            iStt;
  int8_t             backward;
  int32_t            iSttBlk;
  int32_t            iRow;
  SRowInfo           rInfo;
  uint64_t           uid;
  STimeWindow        timeWindow;
  SVersionRange      verRange;
  SSttBlockLoadInfo *pBlockLoadInfo;
32
  bool               ignoreEarlierTs;
H
Hongze Cheng 已提交
33
};
H
Hongze Cheng 已提交
34

35 36
SSttBlockLoadInfo *tCreateLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols,
                                            int32_t numOfSttTrigger) {
37
  SSttBlockLoadInfo *pLoadInfo = taosMemoryCalloc(numOfSttTrigger, sizeof(SSttBlockLoadInfo));
38
  if (pLoadInfo == NULL) {
H
Hongze Cheng 已提交
39
    terrno = TSDB_CODE_OUT_OF_MEMORY;
40 41 42
    return NULL;
  }

43 44 45
  pLoadInfo->numOfStt = numOfSttTrigger;

  for (int32_t i = 0; i < numOfSttTrigger; ++i) {
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
    pLoadInfo[i].blockIndex[0] = -1;
    pLoadInfo[i].blockIndex[1] = -1;
    pLoadInfo[i].currentLoadBlockIndex = 1;

    int32_t code = tBlockDataCreate(&pLoadInfo[i].blockData[0]);
    if (code) {
      terrno = code;
    }

    code = tBlockDataCreate(&pLoadInfo[i].blockData[1]);
    if (code) {
      terrno = code;
    }

    pLoadInfo[i].aSttBlk = taosArrayInit(4, sizeof(SSttBlk));
H
Haojun Liao 已提交
61 62 63
    pLoadInfo[i].pSchema = pSchema;
    pLoadInfo[i].colIds = colList;
    pLoadInfo[i].numOfCols = numOfCols;
64 65 66 67 68
  }

  return pLoadInfo;
}

H
Hongze Cheng 已提交
69
void resetLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
70
  for (int32_t i = 0; i < pLoadInfo->numOfStt; ++i) {
71 72 73
    pLoadInfo[i].currentLoadBlockIndex = 1;
    pLoadInfo[i].blockIndex[0] = -1;
    pLoadInfo[i].blockIndex[1] = -1;
74

75
    taosArrayClear(pLoadInfo[i].aSttBlk);
H
Haojun Liao 已提交
76 77 78

    pLoadInfo[i].elapsedTime = 0;
    pLoadInfo[i].loadBlocks = 0;
79
    pLoadInfo[i].sttBlockLoaded = false;
H
Haojun Liao 已提交
80 81 82
  }
}

H
Hongze Cheng 已提交
83
void getLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, int64_t *blocks, double *el) {
84
  for (int32_t i = 0; i < pLoadInfo->numOfStt; ++i) {
H
Haojun Liao 已提交
85 86
    *el += pLoadInfo[i].elapsedTime;
    *blocks += pLoadInfo[i].loadBlocks;
87 88 89
  }
}

H
Hongze Cheng 已提交
90
void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
H
Haojun Liao 已提交
91 92 93 94
  if (pLoadInfo == NULL) {
    return NULL;
  }

95
  for (int32_t i = 0; i < pLoadInfo->numOfStt; ++i) {
96 97 98 99
    pLoadInfo[i].currentLoadBlockIndex = 1;
    pLoadInfo[i].blockIndex[0] = -1;
    pLoadInfo[i].blockIndex[1] = -1;

H
Hongze Cheng 已提交
100 101
    tBlockDataDestroy(&pLoadInfo[i].blockData[0]);
    tBlockDataDestroy(&pLoadInfo[i].blockData[1]);
102 103 104 105 106 107 108 109

    taosArrayDestroy(pLoadInfo[i].aSttBlk);
  }

  taosMemoryFree(pLoadInfo);
  return NULL;
}

H
Hongze Cheng 已提交
110
static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) {
111 112
  int32_t code = 0;

H
Hongze Cheng 已提交
113 114
  SSttBlockLoadInfo *pInfo = pIter->pBlockLoadInfo;
  if (pInfo->blockIndex[0] == pIter->iSttBlk) {
115 116 117 118 119
    if (pInfo->currentLoadBlockIndex != 0) {
      tsdbDebug("current load index is set to 0, block index:%d, file index:%d, due to uid:%" PRIu64 ", load data, %s",
                pIter->iSttBlk, pIter->iStt, pIter->uid, idStr);
      pInfo->currentLoadBlockIndex = 0;
    }
120 121 122
    return &pInfo->blockData[0];
  }

123
  if (pInfo->blockIndex[1] == pIter->iSttBlk) {
124
    if (pInfo->currentLoadBlockIndex != 1) {
H
Hongze Cheng 已提交
125
      tsdbDebug("current load index is set to 1, block index:%d, file index:%d, due to uid:%" PRIu64 ", load data, %s",
126 127 128
                pIter->iSttBlk, pIter->iStt, pIter->uid, idStr);
      pInfo->currentLoadBlockIndex = 1;
    }
129 130 131
    return &pInfo->blockData[1];
  }

132
  if (pIter->pSttBlk == NULL || pInfo->pSchema == NULL) {
H
Haojun Liao 已提交
133 134 135 136
    return NULL;
  }

  // current block not loaded yet
137
  pInfo->currentLoadBlockIndex ^= 1;
H
Haojun Liao 已提交
138
  int64_t st = taosGetTimestampUs();
139

H
Haojun Liao 已提交
140
  SBlockData *pBlock = &pInfo->blockData[pInfo->currentLoadBlockIndex];
H
Haojun Liao 已提交
141

H
Haojun Liao 已提交
142 143 144 145 146 147
  TABLEID id = {0};
  if (pIter->pSttBlk->suid != 0) {
    id.suid = pIter->pSttBlk->suid;
  } else {
    id.uid = pIter->uid;
  }
H
Haojun Liao 已提交
148

H
Haojun Liao 已提交
149 150 151 152
  code = tBlockDataInit(pBlock, &id, pInfo->pSchema, pInfo->colIds, pInfo->numOfCols);
  if (code != TSDB_CODE_SUCCESS) {
    goto _exit;
  }
153

H
Haojun Liao 已提交
154 155 156 157
  code = tsdbReadSttBlock(pIter->pReader, pIter->iStt, pIter->pSttBlk, pBlock);
  if (code != TSDB_CODE_SUCCESS) {
    goto _exit;
  }
H
Haojun Liao 已提交
158

H
Haojun Liao 已提交
159 160 161
  double el = (taosGetTimestampUs() - st) / 1000.0;
  pInfo->elapsedTime += el;
  pInfo->loadBlocks += 1;
162

H
Haojun Liao 已提交
163
  tsdbDebug("read last block, total load:%d, trigger by uid:%" PRIu64
164
            ", last file index:%d, last block index:%d, entry:%d, rows:%d, %p, elapsed time:%.2f ms, %s",
dengyihao's avatar
dengyihao 已提交
165 166
            pInfo->loadBlocks, pIter->uid, pIter->iStt, pIter->iSttBlk, pInfo->currentLoadBlockIndex, pBlock->nRow,
            pBlock, el, idStr);
167

H
Haojun Liao 已提交
168 169
  pInfo->blockIndex[pInfo->currentLoadBlockIndex] = pIter->iSttBlk;
  pIter->iRow = (pIter->backward) ? pInfo->blockData[pInfo->currentLoadBlockIndex].nRow : -1;
H
Haojun Liao 已提交
170

171 172
  tsdbDebug("last block index list:%d, %d, rowIndex:%d %s", pInfo->blockIndex[0], pInfo->blockIndex[1], pIter->iRow,
            idStr);
173 174
  return &pInfo->blockData[pInfo->currentLoadBlockIndex];

H
Hongze Cheng 已提交
175
_exit:
176 177 178 179 180
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
  }

  return NULL;
181 182
}

183
// find the earliest block that contains the required records
H
Hongze Cheng 已提交
184 185
static FORCE_INLINE int32_t findEarliestIndex(int32_t index, uint64_t uid, const SSttBlk *pBlockList, int32_t num,
                                              int32_t backward) {
186
  int32_t i = index;
H
Hongze Cheng 已提交
187
  int32_t step = backward ? 1 : -1;
H
Haojun Liao 已提交
188
  while (i >= 0 && i < num && uid >= pBlockList[i].minUid && uid <= pBlockList[i].maxUid) {
189 190 191 192 193
    i += step;
  }
  return i - step;
}

H
Hongze Cheng 已提交
194
static int32_t binarySearchForStartBlock(SSttBlk *pBlockList, int32_t num, uint64_t uid, int32_t backward) {
195 196 197 198 199 200 201 202 203 204 205 206 207 208 209
  int32_t midPos = -1;
  if (num <= 0) {
    return -1;
  }

  int32_t firstPos = 0;
  int32_t lastPos = num - 1;

  // find the first position which is bigger than the key
  if ((uid > pBlockList[lastPos].maxUid) || (uid < pBlockList[firstPos].minUid)) {
    return -1;
  }

  while (1) {
    if (uid >= pBlockList[firstPos].minUid && uid <= pBlockList[firstPos].maxUid) {
210
      return findEarliestIndex(firstPos, uid, pBlockList, num, backward);
211 212 213 214 215 216 217 218 219 220 221 222 223 224
    }

    if (uid > pBlockList[lastPos].maxUid || uid < pBlockList[firstPos].minUid) {
      return -1;
    }

    int32_t numOfRows = lastPos - firstPos + 1;
    midPos = (numOfRows >> 1u) + firstPos;

    if (uid < pBlockList[midPos].minUid) {
      lastPos = midPos - 1;
    } else if (uid > pBlockList[midPos].maxUid) {
      firstPos = midPos + 1;
    } else {
225
      return findEarliestIndex(midPos, uid, pBlockList, num, backward);
226 227 228 229
    }
  }
}

H
Hongze Cheng 已提交
230 231
static FORCE_INLINE int32_t findEarliestRow(int32_t index, uint64_t uid, const uint64_t *uidList, int32_t num,
                                            int32_t backward) {
232
  int32_t i = index;
H
Hongze Cheng 已提交
233
  int32_t step = backward ? 1 : -1;
H
Haojun Liao 已提交
234
  while (i >= 0 && i < num && uid == uidList[i]) {
235 236 237 238 239
    i += step;
  }
  return i - step;
}

H
Hongze Cheng 已提交
240
static int32_t binarySearchForStartRowIndex(uint64_t *uidList, int32_t num, uint64_t uid, int32_t backward) {
241 242 243 244 245 246 247 248 249 250
  int32_t firstPos = 0;
  int32_t lastPos = num - 1;

  // find the first position which is bigger than the key
  if ((uid > uidList[lastPos]) || (uid < uidList[firstPos])) {
    return -1;
  }

  while (1) {
    if (uid == uidList[firstPos]) {
251
      return findEarliestRow(firstPos, uid, uidList, num, backward);
252 253 254 255 256 257 258 259 260 261 262 263 264 265
    }

    if (uid > uidList[lastPos] || uid < uidList[firstPos]) {
      return -1;
    }

    int32_t numOfRows = lastPos - firstPos + 1;
    int32_t midPos = (numOfRows >> 1u) + firstPos;

    if (uid < uidList[midPos]) {
      lastPos = midPos - 1;
    } else if (uid > uidList[midPos]) {
      firstPos = midPos + 1;
    } else {
266
      return findEarliestRow(midPos, uid, uidList, num, backward);
267 268 269 270
    }
  }
}

H
Haojun Liao 已提交
271
int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t iStt, int8_t backward, uint64_t suid,
H
Hongze Cheng 已提交
272
                       uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange, SSttBlockLoadInfo *pBlockLoadInfo,
273
                       const char *idStr, bool strictTimeRange) {
H
Haojun Liao 已提交
274 275
  int32_t code = TSDB_CODE_SUCCESS;

276
  *pIter = taosMemoryCalloc(1, sizeof(SLDataIter));
277 278 279 280
  if (*pIter == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }
H
Hongze Cheng 已提交
281

282
  (*pIter)->uid = uid;
283
  (*pIter)->pReader = pReader;
H
Hongze Cheng 已提交
284
  (*pIter)->iStt = iStt;
285
  (*pIter)->backward = backward;
H
Haojun Liao 已提交
286 287
  (*pIter)->verRange = *pRange;
  (*pIter)->timeWindow = *pTimeWindow;
H
Hongze Cheng 已提交
288

289
  (*pIter)->pBlockLoadInfo = pBlockLoadInfo;
290

291
  if (!pBlockLoadInfo->sttBlockLoaded) {
292
    int64_t st = taosGetTimestampUs();
293
    pBlockLoadInfo->sttBlockLoaded = true;
294

295 296 297
    code = tsdbReadSttBlk(pReader, iStt, pBlockLoadInfo->aSttBlk);
    if (code) {
      goto _exit;
298 299 300
    }

    // only apply to the child tables, ordinary tables will not incur this filter procedure.
301
    size_t size = taosArrayGetSize(pBlockLoadInfo->aSttBlk);
302

H
Haojun Liao 已提交
303
    if (size >= 1) {
H
Haojun Liao 已提交
304 305 306 307 308
      SSttBlk *pStart = taosArrayGet(pBlockLoadInfo->aSttBlk, 0);
      SSttBlk *pEnd = taosArrayGet(pBlockLoadInfo->aSttBlk, size - 1);

      // all identical
      if (pStart->suid == pEnd->suid) {
309
        if (pStart->suid != suid) {
H
Haojun Liao 已提交
310
          // no qualified stt block existed
311 312
          taosArrayClear(pBlockLoadInfo->aSttBlk);

H
Haojun Liao 已提交
313
          (*pIter)->iSttBlk = -1;
H
Hongze Cheng 已提交
314
          double el = (taosGetTimestampUs() - st) / 1000.0;
H
Haojun Liao 已提交
315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335
          tsdbDebug("load the last file info completed, elapsed time:%.2fms, %s", el, idStr);
          return code;
        }
      } else {
        SArray *pTmp = taosArrayInit(size, sizeof(SSttBlk));
        for (int32_t i = 0; i < size; ++i) {
          SSttBlk *p = taosArrayGet(pBlockLoadInfo->aSttBlk, i);
          uint64_t s = p->suid;
          if (s < suid) {
            continue;
          }

          if (s == suid) {
            taosArrayPush(pTmp, p);
          } else if (s > suid) {
            break;
          }
        }

        taosArrayDestroy(pBlockLoadInfo->aSttBlk);
        pBlockLoadInfo->aSttBlk = pTmp;
336
      }
337
    }
338

H
Hongze Cheng 已提交
339
    double el = (taosGetTimestampUs() - st) / 1000.0;
340
    tsdbDebug("load the last file info completed, elapsed time:%.2fms, %s", el, idStr);
341
  }
H
Hongze Cheng 已提交
342

343
  size_t size = taosArrayGetSize(pBlockLoadInfo->aSttBlk);
344 345

  // find the start block
H
Haojun Liao 已提交
346 347
  (*pIter)->iSttBlk = binarySearchForStartBlock(pBlockLoadInfo->aSttBlk->pData, size, uid, backward);
  if ((*pIter)->iSttBlk != -1) {
348
    (*pIter)->pSttBlk = taosArrayGet(pBlockLoadInfo->aSttBlk, (*pIter)->iSttBlk);
H
Haojun Liao 已提交
349
    (*pIter)->iRow = ((*pIter)->backward) ? (*pIter)->pSttBlk->nRow : -1;
350 351 352 353 354 355 356 357 358

    if ((!backward) && ((strictTimeRange && (*pIter)->pSttBlk->minKey >= (*pIter)->timeWindow.ekey) ||
                        (!strictTimeRange && (*pIter)->pSttBlk->minKey > (*pIter)->timeWindow.ekey))) {
      (*pIter)->pSttBlk = NULL;
    }

    if (backward && ((strictTimeRange && (*pIter)->pSttBlk->maxKey <= (*pIter)->timeWindow.skey) ||
                     (!strictTimeRange && (*pIter)->pSttBlk->maxKey < (*pIter)->timeWindow.skey))) {
      (*pIter)->pSttBlk = NULL;
359
      (*pIter)->ignoreEarlierTs = true;
360
    }
H
Hongze Cheng 已提交
361 362
  }

H
Haojun Liao 已提交
363 364
  return code;

H
Hongze Cheng 已提交
365
_exit:
H
Haojun Liao 已提交
366
  taosMemoryFree(*pIter);
H
Hongze Cheng 已提交
367 368 369
  return code;
}

H
Hongze Cheng 已提交
370
void tLDataIterClose(SLDataIter *pIter) { taosMemoryFree(pIter); }
H
Hongze Cheng 已提交
371

H
Hongze Cheng 已提交
372
void tLDataIterNextBlock(SLDataIter *pIter, const char *idStr) {
H
Hongze Cheng 已提交
373
  int32_t step = pIter->backward ? -1 : 1;
374 375
  int32_t oldIndex = pIter->iSttBlk;

H
Hongze Cheng 已提交
376
  pIter->iSttBlk += step;
H
Hongze Cheng 已提交
377

378
  int32_t index = -1;
H
Haojun Liao 已提交
379
  size_t  size = pIter->pBlockLoadInfo->aSttBlk->size;
H
Hongze Cheng 已提交
380
  for (int32_t i = pIter->iSttBlk; i < size && i >= 0; i += step) {
381
    SSttBlk *p = taosArrayGet(pIter->pBlockLoadInfo->aSttBlk, i);
382 383 384 385 386
    if ((!pIter->backward) && p->minUid > pIter->uid) {
      break;
    }

    if (pIter->backward && p->maxUid < pIter->uid) {
387 388 389
      break;
    }

390
    // check uid firstly
391
    if (p->minUid <= pIter->uid && p->maxUid >= pIter->uid) {
392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414
      if ((!pIter->backward) && p->minKey > pIter->timeWindow.ekey) {
        break;
      }

      if (pIter->backward && p->maxKey < pIter->timeWindow.skey) {
        break;
      }

      // check time range secondly
      if (p->minKey <= pIter->timeWindow.ekey && p->maxKey >= pIter->timeWindow.skey) {
        if ((!pIter->backward) && p->minVer > pIter->verRange.maxVer) {
          break;
        }

        if (pIter->backward && p->maxVer < pIter->verRange.minVer) {
          break;
        }

        if (p->minVer <= pIter->verRange.maxVer && p->maxVer >= pIter->verRange.minVer) {
          index = i;
          break;
        }
      }
415 416 417
    }
  }

418 419
  pIter->pSttBlk = NULL;
  if (index != -1) {
H
Haojun Liao 已提交
420
    pIter->iSttBlk = index;
421
    pIter->pSttBlk = (SSttBlk *)taosArrayGet(pIter->pBlockLoadInfo->aSttBlk, pIter->iSttBlk);
H
Hongze Cheng 已提交
422 423
    tsdbDebug("try next last file block:%d from %d, trigger by uid:%" PRIu64 ", file index:%d, %s", pIter->iSttBlk,
              oldIndex, pIter->uid, pIter->iStt, idStr);
424
  } else {
425
    tsdbDebug("no more last block qualified, uid:%" PRIu64 ", file index:%d, %s", pIter->uid, oldIndex, idStr);
426 427 428
  }
}

H
Hongze Cheng 已提交
429
static void findNextValidRow(SLDataIter *pIter, const char *idStr) {
H
Hongze Cheng 已提交
430
  int32_t step = pIter->backward ? -1 : 1;
431

H
Hongze Cheng 已提交
432 433
  bool    hasVal = false;
  int32_t i = pIter->iRow;
434

H
Haojun Liao 已提交
435
  SBlockData *pBlockData = loadLastBlock(pIter, idStr);
436

437
  // mostly we only need to find the start position for a given table
H
Hongze Cheng 已提交
438 439 440
  if ((((i == 0) && (!pIter->backward)) || (i == pBlockData->nRow - 1 && pIter->backward)) &&
      pBlockData->aUid != NULL) {
    i = binarySearchForStartRowIndex((uint64_t *)pBlockData->aUid, pBlockData->nRow, pIter->uid, pIter->backward);
H
Haojun Liao 已提交
441
    if (i == -1) {
442
      tsdbDebug("failed to find the data in pBlockData, uid:%" PRIu64 " , %s", pIter->uid, idStr);
H
Haojun Liao 已提交
443 444 445
      pIter->iRow = -1;
      return;
    }
446 447
  }

448 449
  for (; i < pBlockData->nRow && i >= 0; i += step) {
    if (pBlockData->aUid != NULL) {
450
      if (!pIter->backward) {
451
        if (pBlockData->aUid[i] > pIter->uid) {
452 453 454
          break;
        }
      } else {
455
        if (pBlockData->aUid[i] < pIter->uid) {
456 457 458 459 460
          break;
        }
      }
    }

461
    int64_t ts = pBlockData->aTSKEY[i];
H
Hongze Cheng 已提交
462
    if (!pIter->backward) {               // asc
463 464 465 466 467 468 469 470 471 472 473
      if (ts > pIter->timeWindow.ekey) {  // no more data
        break;
      } else if (ts < pIter->timeWindow.skey) {
        continue;
      }
    } else {
      if (ts < pIter->timeWindow.skey) {
        break;
      } else if (ts > pIter->timeWindow.ekey) {
        continue;
      }
474 475
    }

476
    int64_t ver = pBlockData->aVersion[i];
477 478 479 480 481 482 483 484 485 486 487
    if (ver < pIter->verRange.minVer) {
      continue;
    }

    // todo opt handle desc case
    if (ver > pIter->verRange.maxVer) {
      continue;
    }

    hasVal = true;
    break;
H
Hongze Cheng 已提交
488
  }
489

H
Hongze Cheng 已提交
490
  pIter->iRow = (hasVal) ? i : -1;
H
Hongze Cheng 已提交
491 492
}

H
Hongze Cheng 已提交
493
bool tLDataIterNextRow(SLDataIter *pIter, const char *idStr) {
H
Hongze Cheng 已提交
494
  int32_t step = pIter->backward ? -1 : 1;
H
Haojun Liao 已提交
495
  terrno = TSDB_CODE_SUCCESS;
496 497

  // no qualified last file block in current file, no need to fetch row
H
Hongze Cheng 已提交
498
  if (pIter->pSttBlk == NULL) {
499 500
    return false;
  }
H
Hongze Cheng 已提交
501

H
Hongze Cheng 已提交
502
  int32_t     iBlockL = pIter->iSttBlk;
H
Haojun Liao 已提交
503
  SBlockData *pBlockData = loadLastBlock(pIter, idStr);
H
Haojun Liao 已提交
504
  if (pBlockData == NULL || terrno != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
505 506 507
    goto _exit;
  }

508 509
  pIter->iRow += step;

H
Hongze Cheng 已提交
510
  while (1) {
511 512
    bool skipBlock = false;

H
Haojun Liao 已提交
513
    findNextValidRow(pIter, idStr);
514

515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538
    if (pIter->pBlockLoadInfo->checkRemainingRow) {
      skipBlock = true;
      int16_t *aCols = pIter->pBlockLoadInfo->colIds;
      int      nCols = pIter->pBlockLoadInfo->numOfCols;
      bool     isLast = pIter->pBlockLoadInfo->isLast;
      for (int inputColIndex = 0; inputColIndex < nCols; ++inputColIndex) {
        for (int colIndex = 0; colIndex < pBlockData->nColData; ++colIndex) {
          SColData *pColData = &pBlockData->aColData[colIndex];
          int16_t   cid = pColData->cid;

          if (cid == aCols[inputColIndex]) {
            if (isLast && (pColData->flag & HAS_VALUE)) {
              skipBlock = false;
              break;
            } else if (pColData->flag & (HAS_VALUE | HAS_NULL)) {
              skipBlock = false;
              break;
            }
          }
        }
      }
    }

    if (skipBlock || pIter->iRow >= pBlockData->nRow || pIter->iRow < 0) {
539
      tLDataIterNextBlock(pIter, idStr);
H
Hongze Cheng 已提交
540
      if (pIter->pSttBlk == NULL) {  // no more data
541 542 543 544
        goto _exit;
      }
    } else {
      break;
H
Hongze Cheng 已提交
545 546
    }

H
Hongze Cheng 已提交
547
    if (iBlockL != pIter->iSttBlk) {
H
Haojun Liao 已提交
548
      pBlockData = loadLastBlock(pIter, idStr);
549 550 551
      if (pBlockData == NULL) {
        goto _exit;
      }
H
Haojun Liao 已提交
552 553

      // set start row index
554
      pIter->iRow = pIter->backward ? pBlockData->nRow - 1 : 0;
H
Hongze Cheng 已提交
555 556 557
    }
  }

558 559 560
  pIter->rInfo.suid = pBlockData->suid;
  pIter->rInfo.uid = pBlockData->uid;
  pIter->rInfo.row = tsdbRowFromBlockData(pBlockData, pIter->iRow);
H
Hongze Cheng 已提交
561 562

_exit:
563
  return (terrno == TSDB_CODE_SUCCESS) && (pIter->pSttBlk != NULL) && (pBlockData != NULL);
H
Hongze Cheng 已提交
564 565
}

H
Hongze Cheng 已提交
566
SRowInfo *tLDataIterGet(SLDataIter *pIter) { return &pIter->rInfo; }
H
Hongze Cheng 已提交
567 568

// SMergeTree =================================================
H
Hongze Cheng 已提交
569 570 571
static FORCE_INLINE int32_t tLDataIterCmprFn(const SRBTreeNode *p1, const SRBTreeNode *p2) {
  SLDataIter *pIter1 = (SLDataIter *)(((uint8_t *)p1) - offsetof(SLDataIter, node));
  SLDataIter *pIter2 = (SLDataIter *)(((uint8_t *)p2) - offsetof(SLDataIter, node));
H
Hongze Cheng 已提交
572

573 574
  TSDBKEY key1 = TSDBROW_KEY(&pIter1->rInfo.row);
  TSDBKEY key2 = TSDBROW_KEY(&pIter2->rInfo.row);
H
Hongze Cheng 已提交
575

576 577 578 579 580 581 582 583 584 585 586 587 588
  if (key1.ts < key2.ts) {
    return -1;
  } else if (key1.ts > key2.ts) {
    return 1;
  } else {
    if (key1.version < key2.version) {
      return -1;
    } else if (key1.version > key2.version) {
      return 1;
    } else {
      return 0;
    }
  }
H
Hongze Cheng 已提交
589 590
}

591 592 593 594
static FORCE_INLINE int32_t tLDataIterDescCmprFn(const SRBTreeNode *p1, const SRBTreeNode *p2) {
  return -1 * tLDataIterCmprFn(p1, p2);
}

595
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid,
596
                       STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo,
597
                       bool destroyLoadInfo, const char *idStr, bool strictTimeRange) {
H
Hongze Cheng 已提交
598
  pMTree->backward = backward;
599
  pMTree->pIter = NULL;
600
  pMTree->pIterList = taosArrayInit(4, POINTER_BYTES);
601 602 603
  if (pMTree->pIterList == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
604

H
Haojun Liao 已提交
605
  pMTree->idStr = idStr;
dengyihao's avatar
dengyihao 已提交
606
  if (!pMTree->backward) {  // asc
607
    tRBTreeCreate(&pMTree->rbt, tLDataIterCmprFn);
dengyihao's avatar
dengyihao 已提交
608
  } else {  // desc
609 610
    tRBTreeCreate(&pMTree->rbt, tLDataIterDescCmprFn);
  }
611 612
  int32_t code = TSDB_CODE_SUCCESS;

613 614
  pMTree->pLoadInfo = pBlockLoadInfo;
  pMTree->destroyLoadInfo = destroyLoadInfo;
615
  pMTree->ignoreEarlierTs = false;
616

H
Hongze Cheng 已提交
617
  for (int32_t i = 0; i < pFReader->pSet->nSttF; ++i) {  // open all last file
H
Hongze Cheng 已提交
618 619
    struct SLDataIter *pIter = NULL;
    code = tLDataIterOpen(&pIter, pFReader, i, pMTree->backward, suid, uid, pTimeWindow, pVerRange,
620
                          &pMTree->pLoadInfo[i], pMTree->idStr, strictTimeRange);
621 622 623 624
    if (code != TSDB_CODE_SUCCESS) {
      goto _end;
    }

H
Haojun Liao 已提交
625
    bool hasVal = tLDataIterNextRow(pIter, pMTree->idStr);
626
    if (hasVal) {
627 628
      taosArrayPush(pMTree->pIterList, &pIter);
      tMergeTreeAddIter(pMTree, pIter);
629
    } else {
630 631 632
      if (!pMTree->ignoreEarlierTs) {
        pMTree->ignoreEarlierTs = pIter->ignoreEarlierTs;
      }
633
      tLDataIterClose(pIter);
634 635
    }
  }
636 637 638

  return code;

H
Hongze Cheng 已提交
639
_end:
640 641
  tMergeTreeClose(pMTree);
  return code;
H
Hongze Cheng 已提交
642
}
H
Hongze Cheng 已提交
643

644 645
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter) { tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pIter); }

646 647
bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree) { return pMTree->ignoreEarlierTs; }

H
Hongze Cheng 已提交
648
bool tMergeTreeNext(SMergeTree *pMTree) {
649 650 651 652
  int32_t code = TSDB_CODE_SUCCESS;
  if (pMTree->pIter) {
    SLDataIter *pIter = pMTree->pIter;

H
Haojun Liao 已提交
653
    bool hasVal = tLDataIterNextRow(pIter, pMTree->idStr);
654 655 656 657 658 659 660
    if (!hasVal) {
      pMTree->pIter = NULL;
    }

    // compare with min in RB Tree
    pIter = (SLDataIter *)tRBTreeMin(&pMTree->rbt);
    if (pMTree->pIter && pIter) {
H
Hongze Cheng 已提交
661
      int32_t c = pMTree->rbt.cmprFn(&pMTree->pIter->node, &pIter->node);
662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680
      if (c > 0) {
        tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pMTree->pIter);
        pMTree->pIter = NULL;
      } else {
        ASSERT(c);
      }
    }
  }

  if (pMTree->pIter == NULL) {
    pMTree->pIter = (SLDataIter *)tRBTreeMin(&pMTree->rbt);
    if (pMTree->pIter) {
      tRBTreeDrop(&pMTree->rbt, (SRBTreeNode *)pMTree->pIter);
    }
  }

  return pMTree->pIter != NULL;
}

H
Hongze Cheng 已提交
681
TSDBROW tMergeTreeGetRow(SMergeTree *pMTree) { return pMTree->pIter->rInfo.row; }
682

H
Hongze Cheng 已提交
683
void tMergeTreeClose(SMergeTree *pMTree) {
684
  size_t size = taosArrayGetSize(pMTree->pIterList);
H
Hongze Cheng 已提交
685 686
  for (int32_t i = 0; i < size; ++i) {
    SLDataIter *pIter = taosArrayGetP(pMTree->pIterList, i);
687 688
    tLDataIterClose(pIter);
  }
689

690 691
  pMTree->pIterList = taosArrayDestroy(pMTree->pIterList);
  pMTree->pIter = NULL;
692 693 694 695 696

  if (pMTree->destroyLoadInfo) {
    pMTree->pLoadInfo = destroyLastBlockLoadInfo(pMTree->pLoadInfo);
    pMTree->destroyLoadInfo = false;
  }
697
}