tsdbMergeTree.c 29.6 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tsdb.h"
17
#include "tsdbFSet2.h"
18
#include "tsdbMerge.h"
H
Haojun Liao 已提交
19
#include "tsdbReadUtil.h"
20
#include "tsdbSttFileRW.h"
H
Hongze Cheng 已提交
21

H
Haojun Liao 已提交
22 23
static void tLDataIterClose2(SLDataIter *pIter);

24
// SLDataIter =================================================
25 26
SSttBlockLoadInfo *tCreateLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols,
                                            int32_t numOfSttTrigger) {
27
  SSttBlockLoadInfo *pLoadInfo = taosMemoryCalloc(numOfSttTrigger, sizeof(SSttBlockLoadInfo));
28
  if (pLoadInfo == NULL) {
H
Hongze Cheng 已提交
29
    terrno = TSDB_CODE_OUT_OF_MEMORY;
30 31 32
    return NULL;
  }

33
  for (int32_t i = 0; i < numOfSttTrigger; ++i) {
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
    pLoadInfo[i].blockIndex[0] = -1;
    pLoadInfo[i].blockIndex[1] = -1;
    pLoadInfo[i].currentLoadBlockIndex = 1;

    int32_t code = tBlockDataCreate(&pLoadInfo[i].blockData[0]);
    if (code) {
      terrno = code;
    }

    code = tBlockDataCreate(&pLoadInfo[i].blockData[1]);
    if (code) {
      terrno = code;
    }

    pLoadInfo[i].aSttBlk = taosArrayInit(4, sizeof(SSttBlk));
H
Haojun Liao 已提交
49 50 51
    pLoadInfo[i].pSchema = pSchema;
    pLoadInfo[i].colIds = colList;
    pLoadInfo[i].numOfCols = numOfCols;
52 53 54 55 56
  }

  return pLoadInfo;
}

57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
SSttBlockLoadInfo *tCreateOneLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols) {
  SSttBlockLoadInfo *pLoadInfo = taosMemoryCalloc(1, sizeof(SSttBlockLoadInfo));
  if (pLoadInfo == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  pLoadInfo->blockIndex[0] = -1;
  pLoadInfo->blockIndex[1] = -1;
  pLoadInfo->currentLoadBlockIndex = 1;

  int32_t code = tBlockDataCreate(&pLoadInfo->blockData[0]);
  if (code) {
    terrno = code;
  }

  code = tBlockDataCreate(&pLoadInfo->blockData[1]);
  if (code) {
    terrno = code;
  }

  pLoadInfo->aSttBlk = taosArrayInit(4, sizeof(SSttBlk));
  pLoadInfo->pSchema = pSchema;
  pLoadInfo->colIds = colList;
  pLoadInfo->numOfCols = numOfCols;

  return pLoadInfo;
}

H
Hongze Cheng 已提交
86
void resetLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
87
  for (int32_t i = 0; i < 1; ++i) {
88 89 90
    pLoadInfo[i].currentLoadBlockIndex = 1;
    pLoadInfo[i].blockIndex[0] = -1;
    pLoadInfo[i].blockIndex[1] = -1;
91

92
    taosArrayClear(pLoadInfo[i].aSttBlk);
H
Haojun Liao 已提交
93

94 95 96 97 98 99 100
    pLoadInfo[i].cost.loadBlocks = 0;
    pLoadInfo[i].cost.blockElapsedTime = 0;
    pLoadInfo[i].cost.statisElapsedTime = 0;
    pLoadInfo[i].cost.loadStatisBlocks = 0;
    pLoadInfo[i].statisBlockIndex = -1;
    tStatisBlockDestroy(pLoadInfo[i].statisBlock);

101
    pLoadInfo[i].sttBlockLoaded = false;
H
Haojun Liao 已提交
102 103 104
  }
}

105
void getSttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, SSttBlockLoadCostInfo* pLoadCost) {
106
  for (int32_t i = 0; i < 1; ++i) {
107 108 109 110
    pLoadCost->blockElapsedTime += pLoadInfo[i].cost.blockElapsedTime;
    pLoadCost->loadBlocks += pLoadInfo[i].cost.loadBlocks;
    pLoadCost->loadStatisBlocks += pLoadInfo[i].cost.loadStatisBlocks;
    pLoadCost->statisElapsedTime += pLoadInfo[i].cost.statisElapsedTime;
111 112 113
  }
}

H
Hongze Cheng 已提交
114
void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
H
Haojun Liao 已提交
115 116 117 118
  if (pLoadInfo == NULL) {
    return NULL;
  }

119 120 121
  pLoadInfo->currentLoadBlockIndex = 1;
  pLoadInfo->blockIndex[0] = -1;
  pLoadInfo->blockIndex[1] = -1;
122

123 124 125 126
  tBlockDataDestroy(&pLoadInfo->blockData[0]);
  tBlockDataDestroy(&pLoadInfo->blockData[1]);

  taosArrayDestroy(pLoadInfo->aSttBlk);
127 128 129 130
  taosMemoryFree(pLoadInfo);
  return NULL;
}

131
static void destroyLDataIter(SLDataIter *pIter) {
132 133 134
  tLDataIterClose2(pIter);
  destroyLastBlockLoadInfo(pIter->pBlockLoadInfo);
  taosMemoryFree(pIter);
135 136
}

137
void *destroySttBlockReader(SArray *pLDataIterArray, SSttBlockLoadCostInfo* pLoadCost) {
138 139
  if (pLDataIterArray == NULL) {
    return NULL;
H
Haojun Liao 已提交
140 141
  }

142
  int32_t numOfLevel = taosArrayGetSize(pLDataIterArray);
143 144 145 146
  for (int32_t i = 0; i < numOfLevel; ++i) {
    SArray *pList = taosArrayGetP(pLDataIterArray, i);
    for (int32_t j = 0; j < taosArrayGetSize(pList); ++j) {
      SLDataIter *pIter = taosArrayGetP(pList, j);
147 148 149 150 151 152 153
      if (pLoadCost != NULL) {
        pLoadCost->loadBlocks += pIter->pBlockLoadInfo->cost.loadBlocks;
        pLoadCost->loadStatisBlocks += pIter->pBlockLoadInfo->cost.loadStatisBlocks;
        pLoadCost->blockElapsedTime += pIter->pBlockLoadInfo->cost.blockElapsedTime;
        pLoadCost->statisElapsedTime += pIter->pBlockLoadInfo->cost.statisElapsedTime;
      }

154 155 156
      destroyLDataIter(pIter);
    }
    taosArrayDestroy(pList);
H
Haojun Liao 已提交
157
  }
158 159 160

  taosArrayDestroy(pLDataIterArray);
  return NULL;
H
Haojun Liao 已提交
161 162
}

H
Hongze Cheng 已提交
163
static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) {
164 165
  int32_t code = 0;

H
Hongze Cheng 已提交
166 167
  SSttBlockLoadInfo *pInfo = pIter->pBlockLoadInfo;
  if (pInfo->blockIndex[0] == pIter->iSttBlk) {
168 169 170 171 172
    if (pInfo->currentLoadBlockIndex != 0) {
      tsdbDebug("current load index is set to 0, block index:%d, file index:%d, due to uid:%" PRIu64 ", load data, %s",
                pIter->iSttBlk, pIter->iStt, pIter->uid, idStr);
      pInfo->currentLoadBlockIndex = 0;
    }
173 174 175
    return &pInfo->blockData[0];
  }

176
  if (pInfo->blockIndex[1] == pIter->iSttBlk) {
177
    if (pInfo->currentLoadBlockIndex != 1) {
H
Hongze Cheng 已提交
178
      tsdbDebug("current load index is set to 1, block index:%d, file index:%d, due to uid:%" PRIu64 ", load data, %s",
179 180 181
                pIter->iSttBlk, pIter->iStt, pIter->uid, idStr);
      pInfo->currentLoadBlockIndex = 1;
    }
182 183 184
    return &pInfo->blockData[1];
  }

185
  if (pIter->pSttBlk == NULL || pInfo->pSchema == NULL) {
H
Haojun Liao 已提交
186 187 188 189
    return NULL;
  }

  // current block not loaded yet
190
  pInfo->currentLoadBlockIndex ^= 1;
H
Haojun Liao 已提交
191
  int64_t st = taosGetTimestampUs();
192

H
Haojun Liao 已提交
193
  SBlockData *pBlock = &pInfo->blockData[pInfo->currentLoadBlockIndex];
194 195
  code = tsdbSttFileReadBlockDataByColumn(pIter->pReader, pIter->pSttBlk, pBlock, pInfo->pSchema, &pInfo->colIds[1],
                                          pInfo->numOfCols - 1);
H
Haojun Liao 已提交
196 197 198
  if (code != TSDB_CODE_SUCCESS) {
    goto _exit;
  }
H
Haojun Liao 已提交
199

H
Haojun Liao 已提交
200
  double el = (taosGetTimestampUs() - st) / 1000.0;
201 202
  pInfo->cost.blockElapsedTime += el;
  pInfo->cost.loadBlocks += 1;
203

204
  tsdbDebug("read last block, total load:%"PRId64", trigger by uid:%" PRIu64
205
            ", last file index:%d, last block index:%d, entry:%d, rows:%d, %p, elapsed time:%.2f ms, %s",
206
            pInfo->cost.loadBlocks, pIter->uid, pIter->iStt, pIter->iSttBlk, pInfo->currentLoadBlockIndex, pBlock->nRow,
dengyihao's avatar
dengyihao 已提交
207
            pBlock, el, idStr);
208

H
Haojun Liao 已提交
209 210
  pInfo->blockIndex[pInfo->currentLoadBlockIndex] = pIter->iSttBlk;
  pIter->iRow = (pIter->backward) ? pInfo->blockData[pInfo->currentLoadBlockIndex].nRow : -1;
H
Haojun Liao 已提交
211

212 213
  tsdbDebug("last block index list:%d, %d, rowIndex:%d %s", pInfo->blockIndex[0], pInfo->blockIndex[1], pIter->iRow,
            idStr);
214 215
  return &pInfo->blockData[pInfo->currentLoadBlockIndex];

H
Hongze Cheng 已提交
216
_exit:
217 218 219 220 221
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
  }

  return NULL;
222 223
}

224
// find the earliest block that contains the required records
H
Hongze Cheng 已提交
225 226
static FORCE_INLINE int32_t findEarliestIndex(int32_t index, uint64_t uid, const SSttBlk *pBlockList, int32_t num,
                                              int32_t backward) {
227
  int32_t i = index;
H
Hongze Cheng 已提交
228
  int32_t step = backward ? 1 : -1;
H
Haojun Liao 已提交
229
  while (i >= 0 && i < num && uid >= pBlockList[i].minUid && uid <= pBlockList[i].maxUid) {
230 231 232 233 234
    i += step;
  }
  return i - step;
}

H
Hongze Cheng 已提交
235
static int32_t binarySearchForStartBlock(SSttBlk *pBlockList, int32_t num, uint64_t uid, int32_t backward) {
236 237 238 239 240 241 242 243 244 245 246 247 248 249 250
  int32_t midPos = -1;
  if (num <= 0) {
    return -1;
  }

  int32_t firstPos = 0;
  int32_t lastPos = num - 1;

  // find the first position which is bigger than the key
  if ((uid > pBlockList[lastPos].maxUid) || (uid < pBlockList[firstPos].minUid)) {
    return -1;
  }

  while (1) {
    if (uid >= pBlockList[firstPos].minUid && uid <= pBlockList[firstPos].maxUid) {
251
      return findEarliestIndex(firstPos, uid, pBlockList, num, backward);
252 253 254 255 256 257 258 259 260 261 262 263 264 265
    }

    if (uid > pBlockList[lastPos].maxUid || uid < pBlockList[firstPos].minUid) {
      return -1;
    }

    int32_t numOfRows = lastPos - firstPos + 1;
    midPos = (numOfRows >> 1u) + firstPos;

    if (uid < pBlockList[midPos].minUid) {
      lastPos = midPos - 1;
    } else if (uid > pBlockList[midPos].maxUid) {
      firstPos = midPos + 1;
    } else {
266
      return findEarliestIndex(midPos, uid, pBlockList, num, backward);
267 268 269 270
    }
  }
}

H
Hongze Cheng 已提交
271 272
static FORCE_INLINE int32_t findEarliestRow(int32_t index, uint64_t uid, const uint64_t *uidList, int32_t num,
                                            int32_t backward) {
273
  int32_t i = index;
H
Hongze Cheng 已提交
274
  int32_t step = backward ? 1 : -1;
H
Haojun Liao 已提交
275
  while (i >= 0 && i < num && uid == uidList[i]) {
276 277 278 279 280
    i += step;
  }
  return i - step;
}

H
Hongze Cheng 已提交
281
static int32_t binarySearchForStartRowIndex(uint64_t *uidList, int32_t num, uint64_t uid, int32_t backward) {
282 283 284 285 286 287 288 289 290 291
  int32_t firstPos = 0;
  int32_t lastPos = num - 1;

  // find the first position which is bigger than the key
  if ((uid > uidList[lastPos]) || (uid < uidList[firstPos])) {
    return -1;
  }

  while (1) {
    if (uid == uidList[firstPos]) {
292
      return findEarliestRow(firstPos, uid, uidList, num, backward);
293 294 295 296 297 298 299 300 301 302 303 304 305 306
    }

    if (uid > uidList[lastPos] || uid < uidList[firstPos]) {
      return -1;
    }

    int32_t numOfRows = lastPos - firstPos + 1;
    int32_t midPos = (numOfRows >> 1u) + firstPos;

    if (uid < uidList[midPos]) {
      lastPos = midPos - 1;
    } else if (uid > uidList[midPos]) {
      firstPos = midPos + 1;
    } else {
307
      return findEarliestRow(midPos, uid, uidList, num, backward);
308 309 310 311
    }
  }
}

312
int32_t tLDataIterOpen(struct SLDataIter *pIter, SDataFReader *pReader, int32_t iStt, int8_t backward, uint64_t suid,
313 314
                       uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange, SSttBlockLoadInfo *pBlockLoadInfo,
                       const char *idStr, bool strictTimeRange) {
315 316 317
  return 0;
}

318 319
static int32_t extractSttBlockInfo(SLDataIter *pIter, const TSttBlkArray *pArray, SSttBlockLoadInfo *pBlockLoadInfo,
                                   uint64_t suid) {
320 321 322 323 324 325 326 327 328
  if (TARRAY2_SIZE(pArray) <= 0) {
    return TSDB_CODE_SUCCESS;
  }

  SSttBlk *pStart = &pArray->data[0];
  SSttBlk *pEnd = &pArray->data[TARRAY2_SIZE(pArray) - 1];

  // all identical
  if (pStart->suid == pEnd->suid) {
329
    if (pStart->suid != suid) {  // no qualified stt block existed
330 331 332 333 334 335 336 337 338 339
      taosArrayClear(pBlockLoadInfo->aSttBlk);
      pIter->iSttBlk = -1;
      return TSDB_CODE_SUCCESS;
    } else {  // all blocks are qualified
      taosArrayClear(pBlockLoadInfo->aSttBlk);
      taosArrayAddBatch(pBlockLoadInfo->aSttBlk, pArray->data, pArray->size);
    }
  } else {
    SArray *pTmp = taosArrayInit(TARRAY2_SIZE(pArray), sizeof(SSttBlk));
    for (int32_t i = 0; i < TARRAY2_SIZE(pArray); ++i) {
340
      SSttBlk *p = &pArray->data[i];
341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358
      if (p->suid < suid) {
        continue;
      }

      if (p->suid == suid) {
        taosArrayPush(pTmp, p);
      } else if (p->suid > suid) {
        break;
      }
    }

    taosArrayDestroy(pBlockLoadInfo->aSttBlk);
    pBlockLoadInfo->aSttBlk = pTmp;
  }

  return TSDB_CODE_SUCCESS;
}

359 360
static int32_t suidComparFn(const void *target, const void *p2) {
  const uint64_t *targetUid = target;
361
  const uint64_t *uid2 = p2;
362 363 364
  if (*uid2 == (*targetUid)) {
    return 0;
  } else {
H
Hongze Cheng 已提交
365
    return (*targetUid) < (*uid2) ? -1 : 1;
366
  }
367
}
368

369
static bool existsFromSttBlkStatis(SSttBlockLoadInfo *pBlockLoadInfo, uint64_t suid, uint64_t uid,
370
                                   SSttFileReader *pReader) {
371
  const TStatisBlkArray *pStatisBlkArray = pBlockLoadInfo->pSttStatisBlkArray;
372 373
  if (TARRAY2_SIZE(pStatisBlkArray) <= 0) {
    return true;
374 375
  }

376 377 378
  int32_t i = 0;
  for (i = 0; i < TARRAY2_SIZE(pStatisBlkArray); ++i) {
    SStatisBlk *p = &pStatisBlkArray->data[i];
H
Haojun Liao 已提交
379
    if (p->minTbid.suid <= suid && p->maxTbid.suid >= suid) {
380
      break;
381
    }
382
  }
383

384 385
  if (i >= TARRAY2_SIZE(pStatisBlkArray)) {
    return false;
386 387
  }

H
Hongze Cheng 已提交
388
  while (i < TARRAY2_SIZE(pStatisBlkArray)) {
389 390 391 392
    SStatisBlk *p = &pStatisBlkArray->data[i];
    if (p->minTbid.suid > suid) {
      return false;
    }
393

394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414
//    if (pBlockLoadInfo->statisBlock == NULL) {
//      pBlockLoadInfo->statisBlock = taosMemoryCalloc(1, sizeof(STbStatisBlock));
//
//      int64_t st = taosGetTimestampMs();
//      tsdbSttFileReadStatisBlock(pReader, p, pBlockLoadInfo->statisBlock);
//      pBlockLoadInfo->statisBlockIndex = i;
//
//      double el = (taosGetTimestampMs() - st) / 1000.0;
//      pBlockLoadInfo->cost.loadStatisBlocks += 1;
//      pBlockLoadInfo->cost.statisElapsedTime += el;
//    } else if (pBlockLoadInfo->statisBlockIndex != i) {
//      tStatisBlockDestroy(pBlockLoadInfo->statisBlock);
//
//      int64_t st = taosGetTimestampMs();
//      tsdbSttFileReadStatisBlock(pReader, p, pBlockLoadInfo->statisBlock);
//      pBlockLoadInfo->statisBlockIndex = i;
//
//      double el = (taosGetTimestampMs() - st) / 1000.0;
//      pBlockLoadInfo->cost.loadStatisBlocks += 1;
//      pBlockLoadInfo->cost.statisElapsedTime += el;
//    }
415

416 417
    STbStatisBlock* pBlock = pBlockLoadInfo->statisBlock;
    int32_t index = tarray2SearchIdx(pBlock->suid, &suid, sizeof(int64_t), suidComparFn, TD_EQ);
418 419
    if (index == -1) {
      return false;
420
    }
421 422

    int32_t j = index;
423
    if (pBlock->uid->data[j] == uid) {
424
      return true;
425 426 427
    } else if (pBlock->uid->data[j] > uid) {
      while (j >= 0 && pBlock->suid->data[j] == suid) {
        if (pBlock->uid->data[j] == uid) {
428 429 430 431 432 433 434
          return true;
        } else {
          j -= 1;
        }
      }
    } else {
      j = index + 1;
435 436
      while (j < pBlock->suid->size && pBlock->suid->data[j] == suid) {
        if (pBlock->uid->data[j] == uid) {
437 438 439 440
          return true;
        } else {
          j += 1;
        }
441 442
      }
    }
443 444

    i += 1;
445
  }
446

447
  return false;
448 449
}

450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484
static int32_t doLoadSttFilesBlk(SSttBlockLoadInfo *pBlockLoadInfo, SLDataIter *pIter, int64_t suid,
                                 _load_tomb_fn loadTombFn, void *pReader1, const char *idStr) {
  int64_t st = taosGetTimestampUs();

  const TSttBlkArray *pSttBlkArray = NULL;
  pBlockLoadInfo->sttBlockLoaded = true;

  // load the stt block info for each stt-block
  int32_t code = tsdbSttFileReadSttBlk(pIter->pReader, &pSttBlkArray);
  if (code != TSDB_CODE_SUCCESS) {
    tsdbError("load stt blk failed, code:%s, %s", tstrerror(code), idStr);
    return code;
  }

  code = extractSttBlockInfo(pIter, pSttBlkArray, pBlockLoadInfo, suid);
  if (code != TSDB_CODE_SUCCESS) {
    tsdbError("load stt block info failed, code:%s, %s", tstrerror(code), idStr);
    return code;
  }

  // load stt blocks statis for all stt-blocks, to decide if the data of queried table exists in current stt file
  code = tsdbSttFileReadStatisBlk(pIter->pReader, (const TStatisBlkArray **)&pBlockLoadInfo->pSttStatisBlkArray);
  if (code != TSDB_CODE_SUCCESS) {
    tsdbError("failed to load stt block statistics, code:%s, %s", tstrerror(code), idStr);
    return code;
  }

  code = loadTombFn(pReader1, pIter->pReader, pIter->pBlockLoadInfo);

  double el = (taosGetTimestampUs() - st) / 1000.0;
  tsdbDebug("load the stt file info completed, elapsed time:%.2fms, %s", el, idStr);
  return code;
}

int32_t tLDataIterOpen2(SLDataIter *pIter, SSttFileReader *pSttFileReader, int32_t iStt, int8_t backward,
485 486 487
                        uint64_t suid, uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange,
                        SSttBlockLoadInfo *pBlockLoadInfo, const char *idStr, bool strictTimeRange,
                        _load_tomb_fn loadTombFn, void *pReader1) {
H
Haojun Liao 已提交
488 489
  int32_t code = TSDB_CODE_SUCCESS;

490 491 492
  pIter->uid = uid;
  pIter->iStt = iStt;
  pIter->backward = backward;
493 494 495 496
  pIter->verRange.minVer = pRange->minVer;
  pIter->verRange.maxVer = pRange->maxVer;
  pIter->timeWindow.skey = pTimeWindow->skey;
  pIter->timeWindow.ekey = pTimeWindow->ekey;
H
Haojun Liao 已提交
497
  pIter->pReader = pSttFileReader;
498
  pIter->pBlockLoadInfo = pBlockLoadInfo;
499

500
  // open stt file failed, ignore and continue
S
slzhou 已提交
501 502 503 504 505 506 507
  if (pIter->pReader == NULL) {
    tsdbError("stt file reader is null, %s", idStr);
    pIter->pSttBlk = NULL;
    pIter->iSttBlk = -1;
    return TSDB_CODE_SUCCESS;
  }

508
  if (!pBlockLoadInfo->sttBlockLoaded) {
509
    code = doLoadSttFilesBlk(pBlockLoadInfo, pIter, suid, loadTombFn, pReader1, idStr);
510 511
    if (code != TSDB_CODE_SUCCESS) {
      return code;
512
    }
513 514
  }

515 516 517 518 519 520
//  bool exists = existsFromSttBlkStatis(pBlockLoadInfo, suid, uid, pIter->pReader);
//  if (!exists) {
//    pIter->iSttBlk = -1;
//    pIter->pSttBlk = NULL;
//    return TSDB_CODE_SUCCESS;
//   }
H
Hongze Cheng 已提交
521

522 523
  // find the start block, actually we could load the position to avoid repeatly searching for the start position when
  // the skey is updated.
H
Haojun Liao 已提交
524 525
  size_t size = taosArrayGetSize(pBlockLoadInfo->aSttBlk);
  pIter->iSttBlk = binarySearchForStartBlock(pBlockLoadInfo->aSttBlk->pData, size, uid, backward);
526
  if (pIter->iSttBlk != -1) {
H
Haojun Liao 已提交
527
    pIter->pSttBlk = taosArrayGet(pBlockLoadInfo->aSttBlk, pIter->iSttBlk);
528 529 530 531 532
    pIter->iRow = (pIter->backward) ? pIter->pSttBlk->nRow : -1;

    if ((!backward) && ((strictTimeRange && pIter->pSttBlk->minKey >= pIter->timeWindow.ekey) ||
                        (!strictTimeRange && pIter->pSttBlk->minKey > pIter->timeWindow.ekey))) {
      pIter->pSttBlk = NULL;
533 534
    }

535 536 537 538
    if (backward && ((strictTimeRange && pIter->pSttBlk->maxKey <= pIter->timeWindow.skey) ||
                     (!strictTimeRange && pIter->pSttBlk->maxKey < pIter->timeWindow.skey))) {
      pIter->pSttBlk = NULL;
      pIter->ignoreEarlierTs = true;
539
    }
H
Hongze Cheng 已提交
540 541
  }

H
Haojun Liao 已提交
542
  return code;
H
Hongze Cheng 已提交
543 544
}

H
Haojun Liao 已提交
545 546
void tLDataIterClose2(SLDataIter *pIter) {
  tsdbSttFileReaderClose(&pIter->pReader);
H
Haojun Liao 已提交
547
  pIter->pReader = NULL;
H
Haojun Liao 已提交
548
}
H
Hongze Cheng 已提交
549

H
Hongze Cheng 已提交
550
void tLDataIterNextBlock(SLDataIter *pIter, const char *idStr) {
H
Hongze Cheng 已提交
551
  int32_t step = pIter->backward ? -1 : 1;
552 553
  int32_t oldIndex = pIter->iSttBlk;

H
Hongze Cheng 已提交
554
  pIter->iSttBlk += step;
H
Hongze Cheng 已提交
555

556
  int32_t index = -1;
H
Haojun Liao 已提交
557
  size_t  size = pIter->pBlockLoadInfo->aSttBlk->size;
H
Hongze Cheng 已提交
558
  for (int32_t i = pIter->iSttBlk; i < size && i >= 0; i += step) {
559
    SSttBlk *p = taosArrayGet(pIter->pBlockLoadInfo->aSttBlk, i);
560 561 562 563 564
    if ((!pIter->backward) && p->minUid > pIter->uid) {
      break;
    }

    if (pIter->backward && p->maxUid < pIter->uid) {
565 566 567
      break;
    }

568
    // check uid firstly
569
    if (p->minUid <= pIter->uid && p->maxUid >= pIter->uid) {
570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592
      if ((!pIter->backward) && p->minKey > pIter->timeWindow.ekey) {
        break;
      }

      if (pIter->backward && p->maxKey < pIter->timeWindow.skey) {
        break;
      }

      // check time range secondly
      if (p->minKey <= pIter->timeWindow.ekey && p->maxKey >= pIter->timeWindow.skey) {
        if ((!pIter->backward) && p->minVer > pIter->verRange.maxVer) {
          break;
        }

        if (pIter->backward && p->maxVer < pIter->verRange.minVer) {
          break;
        }

        if (p->minVer <= pIter->verRange.maxVer && p->maxVer >= pIter->verRange.minVer) {
          index = i;
          break;
        }
      }
593 594 595
    }
  }

596 597
  pIter->pSttBlk = NULL;
  if (index != -1) {
H
Haojun Liao 已提交
598
    pIter->iSttBlk = index;
599
    pIter->pSttBlk = (SSttBlk *)taosArrayGet(pIter->pBlockLoadInfo->aSttBlk, pIter->iSttBlk);
600 601
    tsdbDebug("try next last file block:%d from stt fileIdx:%d, trigger by uid:%" PRIu64 ", file index:%d, %s",
              pIter->iSttBlk, oldIndex, pIter->uid, pIter->iStt, idStr);
602
  } else {
603
    tsdbDebug("no more last block qualified, uid:%" PRIu64 ", file index:%d, %s", pIter->uid, oldIndex, idStr);
604 605 606
  }
}

H
Hongze Cheng 已提交
607 608
static void findNextValidRow(SLDataIter *pIter, const char *idStr) {
  bool    hasVal = false;
609
  int32_t step = pIter->backward ? -1 : 1;
H
Hongze Cheng 已提交
610
  int32_t i = pIter->iRow;
611

612
  SBlockData *pData = loadLastBlock(pIter, idStr);
613

614
  // mostly we only need to find the start position for a given table
615 616
  if ((((i == 0) && (!pIter->backward)) || (i == pData->nRow - 1 && pIter->backward)) && pData->aUid != NULL) {
    i = binarySearchForStartRowIndex((uint64_t *)pData->aUid, pData->nRow, pIter->uid, pIter->backward);
H
Haojun Liao 已提交
617
    if (i == -1) {
618
      tsdbDebug("failed to find the data in pBlockData, uid:%" PRIu64 " , %s", pIter->uid, idStr);
H
Haojun Liao 已提交
619 620 621
      pIter->iRow = -1;
      return;
    }
622 623
  }

624 625
  for (; i < pData->nRow && i >= 0; i += step) {
    if (pData->aUid != NULL) {
626
      if (!pIter->backward) {
627
        if (pData->aUid[i] > pIter->uid) {
628 629 630
          break;
        }
      } else {
631
        if (pData->aUid[i] < pIter->uid) {
632 633 634 635 636
          break;
        }
      }
    }

637
    int64_t ts = pData->aTSKEY[i];
H
Hongze Cheng 已提交
638
    if (!pIter->backward) {               // asc
639 640 641 642 643 644 645 646 647 648 649
      if (ts > pIter->timeWindow.ekey) {  // no more data
        break;
      } else if (ts < pIter->timeWindow.skey) {
        continue;
      }
    } else {
      if (ts < pIter->timeWindow.skey) {
        break;
      } else if (ts > pIter->timeWindow.ekey) {
        continue;
      }
650 651
    }

652
    int64_t ver = pData->aVersion[i];
653 654 655 656 657 658 659 660 661 662 663
    if (ver < pIter->verRange.minVer) {
      continue;
    }

    // todo opt handle desc case
    if (ver > pIter->verRange.maxVer) {
      continue;
    }

    hasVal = true;
    break;
H
Hongze Cheng 已提交
664
  }
665

H
Hongze Cheng 已提交
666
  pIter->iRow = (hasVal) ? i : -1;
H
Hongze Cheng 已提交
667 668
}

H
Hongze Cheng 已提交
669
bool tLDataIterNextRow(SLDataIter *pIter, const char *idStr) {
H
Hongze Cheng 已提交
670
  int32_t step = pIter->backward ? -1 : 1;
H
Haojun Liao 已提交
671
  terrno = TSDB_CODE_SUCCESS;
672 673

  // no qualified last file block in current file, no need to fetch row
H
Hongze Cheng 已提交
674
  if (pIter->pSttBlk == NULL) {
675 676
    return false;
  }
H
Hongze Cheng 已提交
677

H
Hongze Cheng 已提交
678
  int32_t     iBlockL = pIter->iSttBlk;
H
Haojun Liao 已提交
679
  SBlockData *pBlockData = loadLastBlock(pIter, idStr);
H
Haojun Liao 已提交
680
  if (pBlockData == NULL || terrno != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
681 682 683
    goto _exit;
  }

684 685
  pIter->iRow += step;

H
Hongze Cheng 已提交
686
  while (1) {
687
    bool skipBlock = false;
H
Haojun Liao 已提交
688
    findNextValidRow(pIter, idStr);
689

690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713
    if (pIter->pBlockLoadInfo->checkRemainingRow) {
      skipBlock = true;
      int16_t *aCols = pIter->pBlockLoadInfo->colIds;
      int      nCols = pIter->pBlockLoadInfo->numOfCols;
      bool     isLast = pIter->pBlockLoadInfo->isLast;
      for (int inputColIndex = 0; inputColIndex < nCols; ++inputColIndex) {
        for (int colIndex = 0; colIndex < pBlockData->nColData; ++colIndex) {
          SColData *pColData = &pBlockData->aColData[colIndex];
          int16_t   cid = pColData->cid;

          if (cid == aCols[inputColIndex]) {
            if (isLast && (pColData->flag & HAS_VALUE)) {
              skipBlock = false;
              break;
            } else if (pColData->flag & (HAS_VALUE | HAS_NULL)) {
              skipBlock = false;
              break;
            }
          }
        }
      }
    }

    if (skipBlock || pIter->iRow >= pBlockData->nRow || pIter->iRow < 0) {
714
      tLDataIterNextBlock(pIter, idStr);
H
Hongze Cheng 已提交
715
      if (pIter->pSttBlk == NULL) {  // no more data
716 717 718 719
        goto _exit;
      }
    } else {
      break;
H
Hongze Cheng 已提交
720 721
    }

H
Hongze Cheng 已提交
722
    if (iBlockL != pIter->iSttBlk) {
H
Haojun Liao 已提交
723
      pBlockData = loadLastBlock(pIter, idStr);
724 725 726
      if (pBlockData == NULL) {
        goto _exit;
      }
H
Haojun Liao 已提交
727 728

      // set start row index
729
      pIter->iRow = pIter->backward ? pBlockData->nRow - 1 : 0;
H
Hongze Cheng 已提交
730 731 732
    }
  }

733 734 735
  pIter->rInfo.suid = pBlockData->suid;
  pIter->rInfo.uid = pBlockData->uid;
  pIter->rInfo.row = tsdbRowFromBlockData(pBlockData, pIter->iRow);
H
Hongze Cheng 已提交
736 737

_exit:
738
  return (terrno == TSDB_CODE_SUCCESS) && (pIter->pSttBlk != NULL) && (pBlockData != NULL);
H
Hongze Cheng 已提交
739 740
}

H
Hongze Cheng 已提交
741
SRowInfo *tLDataIterGet(SLDataIter *pIter) { return &pIter->rInfo; }
H
Hongze Cheng 已提交
742 743

// SMergeTree =================================================
H
Hongze Cheng 已提交
744 745 746
static FORCE_INLINE int32_t tLDataIterCmprFn(const SRBTreeNode *p1, const SRBTreeNode *p2) {
  SLDataIter *pIter1 = (SLDataIter *)(((uint8_t *)p1) - offsetof(SLDataIter, node));
  SLDataIter *pIter2 = (SLDataIter *)(((uint8_t *)p2) - offsetof(SLDataIter, node));
H
Hongze Cheng 已提交
747

748 749
  TSDBKEY key1 = TSDBROW_KEY(&pIter1->rInfo.row);
  TSDBKEY key2 = TSDBROW_KEY(&pIter2->rInfo.row);
H
Hongze Cheng 已提交
750

751 752 753 754 755 756 757 758 759 760 761 762 763
  if (key1.ts < key2.ts) {
    return -1;
  } else if (key1.ts > key2.ts) {
    return 1;
  } else {
    if (key1.version < key2.version) {
      return -1;
    } else if (key1.version > key2.version) {
      return 1;
    } else {
      return 0;
    }
  }
H
Hongze Cheng 已提交
764 765
}

766 767 768 769
static FORCE_INLINE int32_t tLDataIterDescCmprFn(const SRBTreeNode *p1, const SRBTreeNode *p2) {
  return -1 * tLDataIterCmprFn(p1, p2);
}

770
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid,
771
                       STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo,
772
                       bool destroyLoadInfo, const char *idStr, bool strictTimeRange, SLDataIter *pLDataIter) {
773 774
  int32_t code = TSDB_CODE_SUCCESS;

H
Hongze Cheng 已提交
775
  pMTree->backward = backward;
776
  pMTree->pIter = NULL;
H
Haojun Liao 已提交
777
  pMTree->idStr = idStr;
778

dengyihao's avatar
dengyihao 已提交
779
  if (!pMTree->backward) {  // asc
780
    tRBTreeCreate(&pMTree->rbt, tLDataIterCmprFn);
dengyihao's avatar
dengyihao 已提交
781
  } else {  // desc
782 783
    tRBTreeCreate(&pMTree->rbt, tLDataIterDescCmprFn);
  }
784

785 786
  pMTree->pLoadInfo = pBlockLoadInfo;
  pMTree->destroyLoadInfo = destroyLoadInfo;
787
  pMTree->ignoreEarlierTs = false;
788

H
Hongze Cheng 已提交
789
  for (int32_t i = 0; i < pFReader->pSet->nSttF; ++i) {  // open all last file
790 791
    memset(&pLDataIter[i], 0, sizeof(SLDataIter));
    code = tLDataIterOpen(&pLDataIter[i], pFReader, i, pMTree->backward, suid, uid, pTimeWindow, pVerRange,
792
                          &pMTree->pLoadInfo[i], pMTree->idStr, strictTimeRange);
793 794 795 796
    if (code != TSDB_CODE_SUCCESS) {
      goto _end;
    }

797
    bool hasVal = tLDataIterNextRow(&pLDataIter[i], pMTree->idStr);
798
    if (hasVal) {
799
      tMergeTreeAddIter(pMTree, &pLDataIter[i]);
800
    } else {
801
      if (!pMTree->ignoreEarlierTs) {
802
        pMTree->ignoreEarlierTs = pLDataIter[i].ignoreEarlierTs;
803
      }
804 805
    }
  }
806 807 808

  return code;

H
Hongze Cheng 已提交
809
_end:
810 811
  tMergeTreeClose(pMTree);
  return code;
H
Hongze Cheng 已提交
812
}
H
Hongze Cheng 已提交
813

814
int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf) {
H
Haojun Liao 已提交
815
  int32_t code = TSDB_CODE_SUCCESS;
816

H
Haojun Liao 已提交
817
  pMTree->pIter = NULL;
818 819
  pMTree->backward = pConf->backward;
  pMTree->idStr = pConf->idstr;
820

H
Haojun Liao 已提交
821 822 823 824 825
  if (!pMTree->backward) {  // asc
    tRBTreeCreate(&pMTree->rbt, tLDataIterCmprFn);
  } else {  // desc
    tRBTreeCreate(&pMTree->rbt, tLDataIterDescCmprFn);
  }
826

H
Haojun Liao 已提交
827
  pMTree->ignoreEarlierTs = false;
828

829
  int32_t size = ((STFileSet *)pConf->pCurrentFileset)->lvlArr->size;
H
Haojun Liao 已提交
830 831 832
  if (size == 0) {
    goto _end;
  }
833

834
  // add the list/iter placeholder
835
  while (taosArrayGetSize(pConf->pSttFileBlockIterArray) < size) {
836
    SArray *pList = taosArrayInit(4, POINTER_BYTES);
837
    taosArrayPush(pConf->pSttFileBlockIterArray, &pList);
838
  }
839

840
  for (int32_t j = 0; j < size; ++j) {
841
    SSttLvl *pSttLevel = ((STFileSet *)pConf->pCurrentFileset)->lvlArr->data[j];
842
    ASSERT(pSttLevel->level == j);
843

844
    SArray *pList = taosArrayGetP(pConf->pSttFileBlockIterArray, j);
845
    int32_t numOfIter = taosArrayGetSize(pList);
H
Haojun Liao 已提交
846

847 848
    if (numOfIter < TARRAY2_SIZE(pSttLevel->fobjArr)) {
      int32_t inc = TARRAY2_SIZE(pSttLevel->fobjArr) - numOfIter;
849
      for (int32_t k = 0; k < inc; ++k) {
850 851
        SLDataIter *pIter = taosMemoryCalloc(1, sizeof(SLDataIter));
        taosArrayPush(pList, &pIter);
H
Haojun Liao 已提交
852
      }
853 854 855 856 857 858
    } else if (numOfIter > TARRAY2_SIZE(pSttLevel->fobjArr)){
        int32_t inc = numOfIter - TARRAY2_SIZE(pSttLevel->fobjArr);
        for (int i = 0; i < inc; ++i) {
            SLDataIter *pIter = taosArrayPop(pList);
            destroyLDataIter(pIter);
        }
H
Haojun Liao 已提交
859
    }
H
Haojun Liao 已提交
860

861
    for (int32_t i = 0; i < TARRAY2_SIZE(pSttLevel->fobjArr); ++i) {  // open all last file
862
      SLDataIter *pIter = taosArrayGetP(pList, i);
H
Haojun Liao 已提交
863

864 865
      SSttFileReader    *pSttFileReader = pIter->pReader;
      SSttBlockLoadInfo *pLoadInfo = pIter->pBlockLoadInfo;
866 867 868

      // open stt file reader if not
      if (pSttFileReader == NULL) {
869
        SSttFileReaderConfig conf = {.tsdb = pConf->pTsdb, .szPage = pConf->pTsdb->pVnode->config.tsdbPageSize};
870 871 872 873
        conf.file[0] = *pSttLevel->fobjArr->data[i]->f;

        code = tsdbSttFileReaderOpen(pSttLevel->fobjArr->data[i]->fname, &conf, &pSttFileReader);
        if (code != TSDB_CODE_SUCCESS) {
S
slzhou 已提交
874 875
          tsdbError("open stt file reader error. file name %s, code %s, %s", pSttLevel->fobjArr->data[i]->fname,
                    tstrerror(code), pMTree->idStr);
876 877 878 879
        }
      }

      if (pLoadInfo == NULL) {
880
        pLoadInfo = tCreateOneLastBlockLoadInfo(pConf->pSchema, pConf->pCols, pConf->numOfCols);
881 882 883
      }

      memset(pIter, 0, sizeof(SLDataIter));
884
      code = tLDataIterOpen2(pIter, pSttFileReader, i, pMTree->backward, pConf->suid, pConf->uid, &pConf->timewindow,
885 886
                             &pConf->verRange, pLoadInfo, pMTree->idStr, pConf->strictTimeRange, pConf->loadTombFn,
                             pConf->pReader);
887 888 889
      if (code != TSDB_CODE_SUCCESS) {
        goto _end;
      }
S
slzhou 已提交
890
      
891 892 893 894 895 896 897
      bool hasVal = tLDataIterNextRow(pIter, pMTree->idStr);
      if (hasVal) {
        tMergeTreeAddIter(pMTree, pIter);
      } else {
        if (!pMTree->ignoreEarlierTs) {
          pMTree->ignoreEarlierTs = pIter->ignoreEarlierTs;
        }
H
Haojun Liao 已提交
898
      }
899
    }
H
Haojun Liao 已提交
900
  }
901

H
Haojun Liao 已提交
902
  return code;
903

H
Haojun Liao 已提交
904 905 906
_end:
  tMergeTreeClose(pMTree);
  return code;
907 908
}

909 910
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter) { tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pIter); }

911 912
bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree) { return pMTree->ignoreEarlierTs; }

H
Hongze Cheng 已提交
913
bool tMergeTreeNext(SMergeTree *pMTree) {
914 915 916 917
  int32_t code = TSDB_CODE_SUCCESS;
  if (pMTree->pIter) {
    SLDataIter *pIter = pMTree->pIter;

H
Haojun Liao 已提交
918
    bool hasVal = tLDataIterNextRow(pIter, pMTree->idStr);
919 920 921 922 923 924 925
    if (!hasVal) {
      pMTree->pIter = NULL;
    }

    // compare with min in RB Tree
    pIter = (SLDataIter *)tRBTreeMin(&pMTree->rbt);
    if (pMTree->pIter && pIter) {
H
Hongze Cheng 已提交
926
      int32_t c = pMTree->rbt.cmprFn(&pMTree->pIter->node, &pIter->node);
927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945
      if (c > 0) {
        tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pMTree->pIter);
        pMTree->pIter = NULL;
      } else {
        ASSERT(c);
      }
    }
  }

  if (pMTree->pIter == NULL) {
    pMTree->pIter = (SLDataIter *)tRBTreeMin(&pMTree->rbt);
    if (pMTree->pIter) {
      tRBTreeDrop(&pMTree->rbt, (SRBTreeNode *)pMTree->pIter);
    }
  }

  return pMTree->pIter != NULL;
}

H
Hongze Cheng 已提交
946
void tMergeTreeClose(SMergeTree *pMTree) {
947
  pMTree->pIter = NULL;
948 949 950 951
  if (pMTree->destroyLoadInfo) {
    pMTree->pLoadInfo = destroyLastBlockLoadInfo(pMTree->pLoadInfo);
    pMTree->destroyLoadInfo = false;
  }
952
}