tsdbMergeTree.c 23.7 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tsdb.h"
17 18
#include "tsdbFSet2.h"
#include "tsdbSttFileRW.h"
H
Hongze Cheng 已提交
19

H
Haojun Liao 已提交
20 21
static void tLDataIterClose2(SLDataIter *pIter);

22
// SLDataIter =================================================
23 24
SSttBlockLoadInfo *tCreateLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols,
                                            int32_t numOfSttTrigger) {
25
  SSttBlockLoadInfo *pLoadInfo = taosMemoryCalloc(numOfSttTrigger, sizeof(SSttBlockLoadInfo));
26
  if (pLoadInfo == NULL) {
H
Hongze Cheng 已提交
27
    terrno = TSDB_CODE_OUT_OF_MEMORY;
28 29 30
    return NULL;
  }

31 32 33
  pLoadInfo->numOfStt = numOfSttTrigger;

  for (int32_t i = 0; i < numOfSttTrigger; ++i) {
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
    pLoadInfo[i].blockIndex[0] = -1;
    pLoadInfo[i].blockIndex[1] = -1;
    pLoadInfo[i].currentLoadBlockIndex = 1;

    int32_t code = tBlockDataCreate(&pLoadInfo[i].blockData[0]);
    if (code) {
      terrno = code;
    }

    code = tBlockDataCreate(&pLoadInfo[i].blockData[1]);
    if (code) {
      terrno = code;
    }

    pLoadInfo[i].aSttBlk = taosArrayInit(4, sizeof(SSttBlk));
H
Haojun Liao 已提交
49 50 51
    pLoadInfo[i].pSchema = pSchema;
    pLoadInfo[i].colIds = colList;
    pLoadInfo[i].numOfCols = numOfCols;
52 53 54 55 56
  }

  return pLoadInfo;
}

H
Hongze Cheng 已提交
57
void resetLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
58
  for (int32_t i = 0; i < pLoadInfo->numOfStt; ++i) {
59 60 61
    pLoadInfo[i].currentLoadBlockIndex = 1;
    pLoadInfo[i].blockIndex[0] = -1;
    pLoadInfo[i].blockIndex[1] = -1;
62

63
    taosArrayClear(pLoadInfo[i].aSttBlk);
H
Haojun Liao 已提交
64 65 66

    pLoadInfo[i].elapsedTime = 0;
    pLoadInfo[i].loadBlocks = 0;
67
    pLoadInfo[i].sttBlockLoaded = false;
H
Haojun Liao 已提交
68 69 70
  }
}

H
Hongze Cheng 已提交
71
void getLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, int64_t *blocks, double *el) {
72
  for (int32_t i = 0; i < pLoadInfo->numOfStt; ++i) {
H
Haojun Liao 已提交
73 74
    *el += pLoadInfo[i].elapsedTime;
    *blocks += pLoadInfo[i].loadBlocks;
75 76 77
  }
}

H
Hongze Cheng 已提交
78
void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
H
Haojun Liao 已提交
79 80 81 82
  if (pLoadInfo == NULL) {
    return NULL;
  }

83
  for (int32_t i = 0; i < pLoadInfo->numOfStt; ++i) {
84 85 86 87
    pLoadInfo[i].currentLoadBlockIndex = 1;
    pLoadInfo[i].blockIndex[0] = -1;
    pLoadInfo[i].blockIndex[1] = -1;

H
Hongze Cheng 已提交
88 89
    tBlockDataDestroy(&pLoadInfo[i].blockData[0]);
    tBlockDataDestroy(&pLoadInfo[i].blockData[1]);
90 91 92 93 94 95 96 97

    taosArrayDestroy(pLoadInfo[i].aSttBlk);
  }

  taosMemoryFree(pLoadInfo);
  return NULL;
}

H
Haojun Liao 已提交
98 99 100 101 102 103
void destroySttBlockReader(SLDataIter* pLDataIter, int32_t numOfIter) {
  for(int32_t i = 0; i < numOfIter; ++i) {
    tLDataIterClose2(&pLDataIter[i]);
  }
}

H
Hongze Cheng 已提交
104
static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) {
105 106
  int32_t code = 0;

H
Hongze Cheng 已提交
107 108
  SSttBlockLoadInfo *pInfo = pIter->pBlockLoadInfo;
  if (pInfo->blockIndex[0] == pIter->iSttBlk) {
109 110 111 112 113
    if (pInfo->currentLoadBlockIndex != 0) {
      tsdbDebug("current load index is set to 0, block index:%d, file index:%d, due to uid:%" PRIu64 ", load data, %s",
                pIter->iSttBlk, pIter->iStt, pIter->uid, idStr);
      pInfo->currentLoadBlockIndex = 0;
    }
114 115 116
    return &pInfo->blockData[0];
  }

117
  if (pInfo->blockIndex[1] == pIter->iSttBlk) {
118
    if (pInfo->currentLoadBlockIndex != 1) {
H
Hongze Cheng 已提交
119
      tsdbDebug("current load index is set to 1, block index:%d, file index:%d, due to uid:%" PRIu64 ", load data, %s",
120 121 122
                pIter->iSttBlk, pIter->iStt, pIter->uid, idStr);
      pInfo->currentLoadBlockIndex = 1;
    }
123 124 125
    return &pInfo->blockData[1];
  }

126
  if (pIter->pSttBlk == NULL || pInfo->pSchema == NULL) {
H
Haojun Liao 已提交
127 128 129 130
    return NULL;
  }

  // current block not loaded yet
131
  pInfo->currentLoadBlockIndex ^= 1;
H
Haojun Liao 已提交
132
  int64_t st = taosGetTimestampUs();
133

H
Haojun Liao 已提交
134
  SBlockData *pBlock = &pInfo->blockData[pInfo->currentLoadBlockIndex];
H
Haojun Liao 已提交
135

H
Haojun Liao 已提交
136 137 138 139 140 141
  TABLEID id = {0};
  if (pIter->pSttBlk->suid != 0) {
    id.suid = pIter->pSttBlk->suid;
  } else {
    id.uid = pIter->uid;
  }
H
Haojun Liao 已提交
142

H
Haojun Liao 已提交
143 144 145 146
  code = tBlockDataInit(pBlock, &id, pInfo->pSchema, pInfo->colIds, pInfo->numOfCols);
  if (code != TSDB_CODE_SUCCESS) {
    goto _exit;
  }
147

148 149
  code = tsdbSttFileReadBlockData(pIter->pReader, pIter->pSttBlk, pBlock);
//  code = tsdbReadSttBlock(pIter->pReader, pIter->iStt, pIter->pSttBlk, pBlock);
H
Haojun Liao 已提交
150 151 152
  if (code != TSDB_CODE_SUCCESS) {
    goto _exit;
  }
H
Haojun Liao 已提交
153

H
Haojun Liao 已提交
154 155 156
  double el = (taosGetTimestampUs() - st) / 1000.0;
  pInfo->elapsedTime += el;
  pInfo->loadBlocks += 1;
157

H
Haojun Liao 已提交
158
  tsdbDebug("read last block, total load:%d, trigger by uid:%" PRIu64
159
            ", last file index:%d, last block index:%d, entry:%d, rows:%d, %p, elapsed time:%.2f ms, %s",
dengyihao's avatar
dengyihao 已提交
160 161
            pInfo->loadBlocks, pIter->uid, pIter->iStt, pIter->iSttBlk, pInfo->currentLoadBlockIndex, pBlock->nRow,
            pBlock, el, idStr);
162

H
Haojun Liao 已提交
163 164
  pInfo->blockIndex[pInfo->currentLoadBlockIndex] = pIter->iSttBlk;
  pIter->iRow = (pIter->backward) ? pInfo->blockData[pInfo->currentLoadBlockIndex].nRow : -1;
H
Haojun Liao 已提交
165

166 167
  tsdbDebug("last block index list:%d, %d, rowIndex:%d %s", pInfo->blockIndex[0], pInfo->blockIndex[1], pIter->iRow,
            idStr);
168 169
  return &pInfo->blockData[pInfo->currentLoadBlockIndex];

H
Hongze Cheng 已提交
170
_exit:
171 172 173 174 175
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
  }

  return NULL;
176 177
}

178
// find the earliest block that contains the required records
H
Hongze Cheng 已提交
179 180
static FORCE_INLINE int32_t findEarliestIndex(int32_t index, uint64_t uid, const SSttBlk *pBlockList, int32_t num,
                                              int32_t backward) {
181
  int32_t i = index;
H
Hongze Cheng 已提交
182
  int32_t step = backward ? 1 : -1;
H
Haojun Liao 已提交
183
  while (i >= 0 && i < num && uid >= pBlockList[i].minUid && uid <= pBlockList[i].maxUid) {
184 185 186 187 188
    i += step;
  }
  return i - step;
}

H
Hongze Cheng 已提交
189
static int32_t binarySearchForStartBlock(SSttBlk *pBlockList, int32_t num, uint64_t uid, int32_t backward) {
190 191 192 193 194 195 196 197 198 199 200 201 202 203 204
  int32_t midPos = -1;
  if (num <= 0) {
    return -1;
  }

  int32_t firstPos = 0;
  int32_t lastPos = num - 1;

  // find the first position which is bigger than the key
  if ((uid > pBlockList[lastPos].maxUid) || (uid < pBlockList[firstPos].minUid)) {
    return -1;
  }

  while (1) {
    if (uid >= pBlockList[firstPos].minUid && uid <= pBlockList[firstPos].maxUid) {
205
      return findEarliestIndex(firstPos, uid, pBlockList, num, backward);
206 207 208 209 210 211 212 213 214 215 216 217 218 219
    }

    if (uid > pBlockList[lastPos].maxUid || uid < pBlockList[firstPos].minUid) {
      return -1;
    }

    int32_t numOfRows = lastPos - firstPos + 1;
    midPos = (numOfRows >> 1u) + firstPos;

    if (uid < pBlockList[midPos].minUid) {
      lastPos = midPos - 1;
    } else if (uid > pBlockList[midPos].maxUid) {
      firstPos = midPos + 1;
    } else {
220
      return findEarliestIndex(midPos, uid, pBlockList, num, backward);
221 222 223 224
    }
  }
}

H
Hongze Cheng 已提交
225 226
static FORCE_INLINE int32_t findEarliestRow(int32_t index, uint64_t uid, const uint64_t *uidList, int32_t num,
                                            int32_t backward) {
227
  int32_t i = index;
H
Hongze Cheng 已提交
228
  int32_t step = backward ? 1 : -1;
H
Haojun Liao 已提交
229
  while (i >= 0 && i < num && uid == uidList[i]) {
230 231 232 233 234
    i += step;
  }
  return i - step;
}

H
Hongze Cheng 已提交
235
static int32_t binarySearchForStartRowIndex(uint64_t *uidList, int32_t num, uint64_t uid, int32_t backward) {
236 237 238 239 240 241 242 243 244 245
  int32_t firstPos = 0;
  int32_t lastPos = num - 1;

  // find the first position which is bigger than the key
  if ((uid > uidList[lastPos]) || (uid < uidList[firstPos])) {
    return -1;
  }

  while (1) {
    if (uid == uidList[firstPos]) {
246
      return findEarliestRow(firstPos, uid, uidList, num, backward);
247 248 249 250 251 252 253 254 255 256 257 258 259 260
    }

    if (uid > uidList[lastPos] || uid < uidList[firstPos]) {
      return -1;
    }

    int32_t numOfRows = lastPos - firstPos + 1;
    int32_t midPos = (numOfRows >> 1u) + firstPos;

    if (uid < uidList[midPos]) {
      lastPos = midPos - 1;
    } else if (uid > uidList[midPos]) {
      firstPos = midPos + 1;
    } else {
261
      return findEarliestRow(midPos, uid, uidList, num, backward);
262 263 264 265
    }
  }
}

266
int32_t tLDataIterOpen(struct SLDataIter *pIter, SDataFReader *pReader, int32_t iStt, int8_t backward, uint64_t suid,
267 268 269 270 271
                        uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange, SSttBlockLoadInfo *pBlockLoadInfo,
                        const char *idStr, bool strictTimeRange) {
  return 0;
}

272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348
static int32_t loadSttBlockInfo(SLDataIter *pIter, SSttBlockLoadInfo* pBlockLoadInfo, uint64_t suid) {
  TSttBlkArray* pArray = pBlockLoadInfo->pBlockArray;
  if (TARRAY2_SIZE(pArray) <= 0) {
    return TSDB_CODE_SUCCESS;
  }

  SSttBlk *pStart = &pArray->data[0];
  SSttBlk *pEnd = &pArray->data[TARRAY2_SIZE(pArray) - 1];

  // all identical
  if (pStart->suid == pEnd->suid) {
    if (pStart->suid != suid) { // no qualified stt block existed
      taosArrayClear(pBlockLoadInfo->aSttBlk);
      pIter->iSttBlk = -1;
      return TSDB_CODE_SUCCESS;
    } else {  // all blocks are qualified
      taosArrayClear(pBlockLoadInfo->aSttBlk);
      taosArrayAddBatch(pBlockLoadInfo->aSttBlk, pArray->data, pArray->size);
    }
  } else {
    SArray *pTmp = taosArrayInit(TARRAY2_SIZE(pArray), sizeof(SSttBlk));
    for (int32_t i = 0; i < TARRAY2_SIZE(pArray); ++i) {
      SSttBlk* p = &pArray->data[i];
      if (p->suid < suid) {
        continue;
      }

      if (p->suid == suid) {
        taosArrayPush(pTmp, p);
      } else if (p->suid > suid) {
        break;
      }
    }

    taosArrayDestroy(pBlockLoadInfo->aSttBlk);
    pBlockLoadInfo->aSttBlk = pTmp;
  }

  return TSDB_CODE_SUCCESS;
}

static int32_t loadSttTombBlockData(SSttFileReader* pSttFileReader, uint64_t suid, SSttBlockLoadInfo* pLoadInfo) {
  if (pLoadInfo->pTombBlockArray == NULL) {
    pLoadInfo->pTombBlockArray = taosArrayInit(4, POINTER_BYTES);
  }

  const TTombBlkArray* pBlkArray = NULL;
  int32_t code = tsdbSttFileReadTombBlk(pSttFileReader, &pBlkArray);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  for(int32_t j = 0; j < pBlkArray->size; ++j) {
    STombBlk* pTombBlk = &pBlkArray->data[j];
    if (pTombBlk->maxTbid.suid < suid) {
      continue;  // todo use binary search instead
    }

    if (pTombBlk->minTbid.suid > suid) {
      break;
    }

    STombBlock* pTombBlock = taosMemoryCalloc(1, sizeof(STombBlock));
    code = tsdbSttFileReadTombBlock(pSttFileReader, pTombBlk, pTombBlock);
    if (code != TSDB_CODE_SUCCESS) {
      // todo handle error
    }

    void* p = taosArrayPush(pLoadInfo->pTombBlockArray, &pTombBlock);
    if (p == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
  }

  return TSDB_CODE_SUCCESS;
}

349
int32_t tLDataIterOpen2(struct SLDataIter *pIter, SSttFileReader *pReader, int32_t iStt, int8_t backward, uint64_t suid,
H
Hongze Cheng 已提交
350
                       uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange, SSttBlockLoadInfo *pBlockLoadInfo,
351
                       const char *idStr, bool strictTimeRange) {
H
Haojun Liao 已提交
352 353
  int32_t code = TSDB_CODE_SUCCESS;

354 355 356
  pIter->uid = uid;
  pIter->iStt = iStt;
  pIter->backward = backward;
357 358 359 360
  pIter->verRange.minVer = pRange->minVer;
  pIter->verRange.maxVer = pRange->maxVer;
  pIter->timeWindow.skey = pTimeWindow->skey;
  pIter->timeWindow.ekey = pTimeWindow->ekey;
361
  pIter->pReader = pReader;
H
Hongze Cheng 已提交
362

363
  pIter->pBlockLoadInfo = pBlockLoadInfo;
364

365
  if (!pBlockLoadInfo->sttBlockLoaded) {
366
    int64_t st = taosGetTimestampUs();
367
    pBlockLoadInfo->sttBlockLoaded = true;
368

369
    code = tsdbSttFileReadSttBlk(pIter->pReader, (const TSttBlkArray **)&pBlockLoadInfo->pBlockArray);
370
    if (code != TSDB_CODE_SUCCESS) {
371
      return code;
372 373
    }

374 375 376 377 378
    code = loadSttBlockInfo(pIter, pBlockLoadInfo, suid);

    code = loadSttTombBlockData(pReader, suid, pBlockLoadInfo);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
379
    }
380

H
Hongze Cheng 已提交
381
    double el = (taosGetTimestampUs() - st) / 1000.0;
382
    tsdbDebug("load the last file info completed, elapsed time:%.2fms, %s", el, idStr);
383
  }
H
Hongze Cheng 已提交
384

385
  // find the start block
H
Haojun Liao 已提交
386 387
  size_t size = taosArrayGetSize(pBlockLoadInfo->aSttBlk);
  pIter->iSttBlk = binarySearchForStartBlock(pBlockLoadInfo->aSttBlk->pData, size, uid, backward);
388
  if (pIter->iSttBlk != -1) {
H
Haojun Liao 已提交
389
    pIter->pSttBlk = taosArrayGet(pBlockLoadInfo->aSttBlk, pIter->iSttBlk);
390 391 392 393 394
    pIter->iRow = (pIter->backward) ? pIter->pSttBlk->nRow : -1;

    if ((!backward) && ((strictTimeRange && pIter->pSttBlk->minKey >= pIter->timeWindow.ekey) ||
                        (!strictTimeRange && pIter->pSttBlk->minKey > pIter->timeWindow.ekey))) {
      pIter->pSttBlk = NULL;
395 396
    }

397 398 399 400
    if (backward && ((strictTimeRange && pIter->pSttBlk->maxKey <= pIter->timeWindow.skey) ||
                     (!strictTimeRange && pIter->pSttBlk->maxKey < pIter->timeWindow.skey))) {
      pIter->pSttBlk = NULL;
      pIter->ignoreEarlierTs = true;
401
    }
H
Hongze Cheng 已提交
402 403
  }

H
Haojun Liao 已提交
404
  return code;
H
Hongze Cheng 已提交
405 406
}

H
Haojun Liao 已提交
407 408
void tLDataIterClose2(SLDataIter *pIter) {
  tsdbSttFileReaderClose(&pIter->pReader);
H
Haojun Liao 已提交
409
  pIter->pReader = NULL;
H
Haojun Liao 已提交
410
}
H
Hongze Cheng 已提交
411

H
Hongze Cheng 已提交
412
void tLDataIterNextBlock(SLDataIter *pIter, const char *idStr) {
H
Hongze Cheng 已提交
413
  int32_t step = pIter->backward ? -1 : 1;
414 415
  int32_t oldIndex = pIter->iSttBlk;

H
Hongze Cheng 已提交
416
  pIter->iSttBlk += step;
H
Hongze Cheng 已提交
417

418
  int32_t index = -1;
H
Haojun Liao 已提交
419
  size_t  size = pIter->pBlockLoadInfo->aSttBlk->size;
H
Hongze Cheng 已提交
420
  for (int32_t i = pIter->iSttBlk; i < size && i >= 0; i += step) {
421
    SSttBlk *p = taosArrayGet(pIter->pBlockLoadInfo->aSttBlk, i);
422 423 424 425 426
    if ((!pIter->backward) && p->minUid > pIter->uid) {
      break;
    }

    if (pIter->backward && p->maxUid < pIter->uid) {
427 428 429
      break;
    }

430
    // check uid firstly
431
    if (p->minUid <= pIter->uid && p->maxUid >= pIter->uid) {
432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454
      if ((!pIter->backward) && p->minKey > pIter->timeWindow.ekey) {
        break;
      }

      if (pIter->backward && p->maxKey < pIter->timeWindow.skey) {
        break;
      }

      // check time range secondly
      if (p->minKey <= pIter->timeWindow.ekey && p->maxKey >= pIter->timeWindow.skey) {
        if ((!pIter->backward) && p->minVer > pIter->verRange.maxVer) {
          break;
        }

        if (pIter->backward && p->maxVer < pIter->verRange.minVer) {
          break;
        }

        if (p->minVer <= pIter->verRange.maxVer && p->maxVer >= pIter->verRange.minVer) {
          index = i;
          break;
        }
      }
455 456 457
    }
  }

458 459
  pIter->pSttBlk = NULL;
  if (index != -1) {
H
Haojun Liao 已提交
460
    pIter->iSttBlk = index;
461
    pIter->pSttBlk = (SSttBlk *)taosArrayGet(pIter->pBlockLoadInfo->aSttBlk, pIter->iSttBlk);
H
Hongze Cheng 已提交
462 463
    tsdbDebug("try next last file block:%d from %d, trigger by uid:%" PRIu64 ", file index:%d, %s", pIter->iSttBlk,
              oldIndex, pIter->uid, pIter->iStt, idStr);
464
  } else {
465
    tsdbDebug("no more last block qualified, uid:%" PRIu64 ", file index:%d, %s", pIter->uid, oldIndex, idStr);
466 467 468
  }
}

H
Hongze Cheng 已提交
469 470
static void findNextValidRow(SLDataIter *pIter, const char *idStr) {
  bool    hasVal = false;
471
  int32_t step = pIter->backward ? -1 : 1;
H
Hongze Cheng 已提交
472
  int32_t i = pIter->iRow;
473

474
  SBlockData *pData = loadLastBlock(pIter, idStr);
475

476
  // mostly we only need to find the start position for a given table
477 478
  if ((((i == 0) && (!pIter->backward)) || (i == pData->nRow - 1 && pIter->backward)) && pData->aUid != NULL) {
    i = binarySearchForStartRowIndex((uint64_t *)pData->aUid, pData->nRow, pIter->uid, pIter->backward);
H
Haojun Liao 已提交
479
    if (i == -1) {
480
      tsdbDebug("failed to find the data in pBlockData, uid:%" PRIu64 " , %s", pIter->uid, idStr);
H
Haojun Liao 已提交
481 482 483
      pIter->iRow = -1;
      return;
    }
484 485
  }

486 487
  for (; i < pData->nRow && i >= 0; i += step) {
    if (pData->aUid != NULL) {
488
      if (!pIter->backward) {
489
        if (pData->aUid[i] > pIter->uid) {
490 491 492
          break;
        }
      } else {
493
        if (pData->aUid[i] < pIter->uid) {
494 495 496 497 498
          break;
        }
      }
    }

499
    int64_t ts = pData->aTSKEY[i];
H
Hongze Cheng 已提交
500
    if (!pIter->backward) {               // asc
501 502 503 504 505 506 507 508 509 510 511
      if (ts > pIter->timeWindow.ekey) {  // no more data
        break;
      } else if (ts < pIter->timeWindow.skey) {
        continue;
      }
    } else {
      if (ts < pIter->timeWindow.skey) {
        break;
      } else if (ts > pIter->timeWindow.ekey) {
        continue;
      }
512 513
    }

514
    int64_t ver = pData->aVersion[i];
515 516 517 518 519 520 521 522 523 524 525
    if (ver < pIter->verRange.minVer) {
      continue;
    }

    // todo opt handle desc case
    if (ver > pIter->verRange.maxVer) {
      continue;
    }

    hasVal = true;
    break;
H
Hongze Cheng 已提交
526
  }
527

H
Hongze Cheng 已提交
528
  pIter->iRow = (hasVal) ? i : -1;
H
Hongze Cheng 已提交
529 530
}

H
Hongze Cheng 已提交
531
bool tLDataIterNextRow(SLDataIter *pIter, const char *idStr) {
H
Hongze Cheng 已提交
532
  int32_t step = pIter->backward ? -1 : 1;
H
Haojun Liao 已提交
533
  terrno = TSDB_CODE_SUCCESS;
534 535

  // no qualified last file block in current file, no need to fetch row
H
Hongze Cheng 已提交
536
  if (pIter->pSttBlk == NULL) {
537 538
    return false;
  }
H
Hongze Cheng 已提交
539

H
Hongze Cheng 已提交
540
  int32_t     iBlockL = pIter->iSttBlk;
H
Haojun Liao 已提交
541
  SBlockData *pBlockData = loadLastBlock(pIter, idStr);
H
Haojun Liao 已提交
542
  if (pBlockData == NULL || terrno != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
543 544 545
    goto _exit;
  }

546 547
  pIter->iRow += step;

H
Hongze Cheng 已提交
548
  while (1) {
549
    bool skipBlock = false;
H
Haojun Liao 已提交
550
    findNextValidRow(pIter, idStr);
551

552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575
    if (pIter->pBlockLoadInfo->checkRemainingRow) {
      skipBlock = true;
      int16_t *aCols = pIter->pBlockLoadInfo->colIds;
      int      nCols = pIter->pBlockLoadInfo->numOfCols;
      bool     isLast = pIter->pBlockLoadInfo->isLast;
      for (int inputColIndex = 0; inputColIndex < nCols; ++inputColIndex) {
        for (int colIndex = 0; colIndex < pBlockData->nColData; ++colIndex) {
          SColData *pColData = &pBlockData->aColData[colIndex];
          int16_t   cid = pColData->cid;

          if (cid == aCols[inputColIndex]) {
            if (isLast && (pColData->flag & HAS_VALUE)) {
              skipBlock = false;
              break;
            } else if (pColData->flag & (HAS_VALUE | HAS_NULL)) {
              skipBlock = false;
              break;
            }
          }
        }
      }
    }

    if (skipBlock || pIter->iRow >= pBlockData->nRow || pIter->iRow < 0) {
576
      tLDataIterNextBlock(pIter, idStr);
H
Hongze Cheng 已提交
577
      if (pIter->pSttBlk == NULL) {  // no more data
578 579 580 581
        goto _exit;
      }
    } else {
      break;
H
Hongze Cheng 已提交
582 583
    }

H
Hongze Cheng 已提交
584
    if (iBlockL != pIter->iSttBlk) {
H
Haojun Liao 已提交
585
      pBlockData = loadLastBlock(pIter, idStr);
586 587 588
      if (pBlockData == NULL) {
        goto _exit;
      }
H
Haojun Liao 已提交
589 590

      // set start row index
591
      pIter->iRow = pIter->backward ? pBlockData->nRow - 1 : 0;
H
Hongze Cheng 已提交
592 593 594
    }
  }

595 596 597
  pIter->rInfo.suid = pBlockData->suid;
  pIter->rInfo.uid = pBlockData->uid;
  pIter->rInfo.row = tsdbRowFromBlockData(pBlockData, pIter->iRow);
H
Hongze Cheng 已提交
598 599

_exit:
600
  return (terrno == TSDB_CODE_SUCCESS) && (pIter->pSttBlk != NULL) && (pBlockData != NULL);
H
Hongze Cheng 已提交
601 602
}

H
Hongze Cheng 已提交
603
SRowInfo *tLDataIterGet(SLDataIter *pIter) { return &pIter->rInfo; }
H
Hongze Cheng 已提交
604 605

// SMergeTree =================================================
H
Hongze Cheng 已提交
606 607 608
static FORCE_INLINE int32_t tLDataIterCmprFn(const SRBTreeNode *p1, const SRBTreeNode *p2) {
  SLDataIter *pIter1 = (SLDataIter *)(((uint8_t *)p1) - offsetof(SLDataIter, node));
  SLDataIter *pIter2 = (SLDataIter *)(((uint8_t *)p2) - offsetof(SLDataIter, node));
H
Hongze Cheng 已提交
609

610 611
  TSDBKEY key1 = TSDBROW_KEY(&pIter1->rInfo.row);
  TSDBKEY key2 = TSDBROW_KEY(&pIter2->rInfo.row);
H
Hongze Cheng 已提交
612

613 614 615 616 617 618 619 620 621 622 623 624 625
  if (key1.ts < key2.ts) {
    return -1;
  } else if (key1.ts > key2.ts) {
    return 1;
  } else {
    if (key1.version < key2.version) {
      return -1;
    } else if (key1.version > key2.version) {
      return 1;
    } else {
      return 0;
    }
  }
H
Hongze Cheng 已提交
626 627
}

628 629 630 631
static FORCE_INLINE int32_t tLDataIterDescCmprFn(const SRBTreeNode *p1, const SRBTreeNode *p2) {
  return -1 * tLDataIterCmprFn(p1, p2);
}

632
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid,
633
                       STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo,
634 635 636
                       bool destroyLoadInfo, const char *idStr, bool strictTimeRange, SLDataIter* pLDataIter) {
  int32_t code = TSDB_CODE_SUCCESS;

H
Hongze Cheng 已提交
637
  pMTree->backward = backward;
638
  pMTree->pIter = NULL;
H
Haojun Liao 已提交
639
  pMTree->idStr = idStr;
640

dengyihao's avatar
dengyihao 已提交
641
  if (!pMTree->backward) {  // asc
642
    tRBTreeCreate(&pMTree->rbt, tLDataIterCmprFn);
dengyihao's avatar
dengyihao 已提交
643
  } else {  // desc
644 645
    tRBTreeCreate(&pMTree->rbt, tLDataIterDescCmprFn);
  }
646

647 648
  pMTree->pLoadInfo = pBlockLoadInfo;
  pMTree->destroyLoadInfo = destroyLoadInfo;
649
  pMTree->ignoreEarlierTs = false;
650

H
Hongze Cheng 已提交
651
  for (int32_t i = 0; i < pFReader->pSet->nSttF; ++i) {  // open all last file
652 653
    memset(&pLDataIter[i], 0, sizeof(SLDataIter));
    code = tLDataIterOpen(&pLDataIter[i], pFReader, i, pMTree->backward, suid, uid, pTimeWindow, pVerRange,
654
                          &pMTree->pLoadInfo[i], pMTree->idStr, strictTimeRange);
655 656 657 658
    if (code != TSDB_CODE_SUCCESS) {
      goto _end;
    }

659
    bool hasVal = tLDataIterNextRow(&pLDataIter[i], pMTree->idStr);
660
    if (hasVal) {
661
      tMergeTreeAddIter(pMTree, &pLDataIter[i]);
662
    } else {
663
      if (!pMTree->ignoreEarlierTs) {
664
        pMTree->ignoreEarlierTs = pLDataIter[i].ignoreEarlierTs;
665
      }
666 667
    }
  }
668 669 670

  return code;

H
Hongze Cheng 已提交
671
_end:
672 673
  tMergeTreeClose(pMTree);
  return code;
H
Hongze Cheng 已提交
674
}
H
Hongze Cheng 已提交
675

H
Haojun Liao 已提交
676 677 678 679 680
int32_t tMergeTreeOpen2(SMergeTree *pMTree, int8_t backward, STsdb *pTsdb, uint64_t suid, uint64_t uid,
                        STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo,
                        bool destroyLoadInfo, const char *idStr, bool strictTimeRange, SLDataIter *pLDataIter,
                        void *pCurrentFileSet) {
  int32_t code = TSDB_CODE_SUCCESS;
681

H
Haojun Liao 已提交
682 683 684
  pMTree->backward = backward;
  pMTree->pIter = NULL;
  pMTree->idStr = idStr;
685

H
Haojun Liao 已提交
686 687 688 689 690
  if (!pMTree->backward) {  // asc
    tRBTreeCreate(&pMTree->rbt, tLDataIterCmprFn);
  } else {  // desc
    tRBTreeCreate(&pMTree->rbt, tLDataIterDescCmprFn);
  }
691

H
Haojun Liao 已提交
692 693 694
  pMTree->pLoadInfo = pBlockLoadInfo;
  pMTree->destroyLoadInfo = destroyLoadInfo;
  pMTree->ignoreEarlierTs = false;
695

H
Haojun Liao 已提交
696 697 698 699 700
  // todo handle other level of stt files, here only deal with the first level stt
  int32_t size = ((STFileSet *)pCurrentFileSet)->lvlArr[0].size;
  if (size == 0) {
    goto _end;
  }
701

H
Haojun Liao 已提交
702 703
  SSttLvl *pSttLevel = ((STFileSet *)pCurrentFileSet)->lvlArr[0].data[0];
  ASSERT(pSttLevel->level == 0);
704

H
Haojun Liao 已提交
705 706 707
  for (int32_t i = 0; i < pSttLevel->fobjArr[0].size; ++i) {  // open all last file
    SSttFileReader* pSttFileReader = pLDataIter[i].pReader;
    memset(&pLDataIter[i], 0, sizeof(SLDataIter));
708

H
Haojun Liao 已提交
709
    if (pSttFileReader == NULL) {
H
Haojun Liao 已提交
710 711 712
      SSttFileReaderConfig conf = {.tsdb = pTsdb, .szPage = pTsdb->pVnode->config.szPage};
      conf.file[0] = *pSttLevel->fobjArr[0].data[i]->f;

H
Haojun Liao 已提交
713
      code = tsdbSttFileReaderOpen(pSttLevel->fobjArr[0].data[i]->fname, &conf, &pSttFileReader);
H
Haojun Liao 已提交
714
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
715
        return code;
H
Haojun Liao 已提交
716
      }
H
Haojun Liao 已提交
717
    }
H
Haojun Liao 已提交
718

H
Haojun Liao 已提交
719 720 721 722 723 724 725 726 727 728 729 730
    code = tLDataIterOpen2(&pLDataIter[i], pSttFileReader, i, pMTree->backward, suid, uid, pTimeWindow, pVerRange,
                           &pMTree->pLoadInfo[i], pMTree->idStr, strictTimeRange);
    if (code != TSDB_CODE_SUCCESS) {
      goto _end;
    }

    bool hasVal = tLDataIterNextRow(&pLDataIter[i], pMTree->idStr);
    if (hasVal) {
      tMergeTreeAddIter(pMTree, &pLDataIter[i]);
    } else {
      if (!pMTree->ignoreEarlierTs) {
        pMTree->ignoreEarlierTs = pLDataIter[i].ignoreEarlierTs;
H
Haojun Liao 已提交
731
      }
732
    }
H
Haojun Liao 已提交
733
  }
734

H
Haojun Liao 已提交
735
  return code;
736

H
Haojun Liao 已提交
737 738 739
_end:
  tMergeTreeClose(pMTree);
  return code;
740 741
}

742 743
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter) { tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pIter); }

744 745
bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree) { return pMTree->ignoreEarlierTs; }

H
Hongze Cheng 已提交
746
bool tMergeTreeNext(SMergeTree *pMTree) {
747 748 749 750
  int32_t code = TSDB_CODE_SUCCESS;
  if (pMTree->pIter) {
    SLDataIter *pIter = pMTree->pIter;

H
Haojun Liao 已提交
751
    bool hasVal = tLDataIterNextRow(pIter, pMTree->idStr);
752 753 754 755 756 757 758
    if (!hasVal) {
      pMTree->pIter = NULL;
    }

    // compare with min in RB Tree
    pIter = (SLDataIter *)tRBTreeMin(&pMTree->rbt);
    if (pMTree->pIter && pIter) {
H
Hongze Cheng 已提交
759
      int32_t c = pMTree->rbt.cmprFn(&pMTree->pIter->node, &pIter->node);
760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778
      if (c > 0) {
        tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pMTree->pIter);
        pMTree->pIter = NULL;
      } else {
        ASSERT(c);
      }
    }
  }

  if (pMTree->pIter == NULL) {
    pMTree->pIter = (SLDataIter *)tRBTreeMin(&pMTree->rbt);
    if (pMTree->pIter) {
      tRBTreeDrop(&pMTree->rbt, (SRBTreeNode *)pMTree->pIter);
    }
  }

  return pMTree->pIter != NULL;
}

H
Hongze Cheng 已提交
779
void tMergeTreeClose(SMergeTree *pMTree) {
780
  pMTree->pIter = NULL;
781 782 783 784
  if (pMTree->destroyLoadInfo) {
    pMTree->pLoadInfo = destroyLastBlockLoadInfo(pMTree->pLoadInfo);
    pMTree->destroyLoadInfo = false;
  }
785
}