tsdbMergeTree.c 29.6 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tsdb.h"
17
#include "tsdbFSet2.h"
18
#include "tsdbMerge.h"
H
Haojun Liao 已提交
19
#include "tsdbReadUtil.h"
20
#include "tsdbSttFileRW.h"
H
Hongze Cheng 已提交
21

H
Haojun Liao 已提交
22 23
static void tLDataIterClose2(SLDataIter *pIter);

24
// SLDataIter =================================================
25 26
SSttBlockLoadInfo *tCreateLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols,
                                            int32_t numOfSttTrigger) {
27
  SSttBlockLoadInfo *pLoadInfo = taosMemoryCalloc(numOfSttTrigger, sizeof(SSttBlockLoadInfo));
28
  if (pLoadInfo == NULL) {
H
Hongze Cheng 已提交
29
    terrno = TSDB_CODE_OUT_OF_MEMORY;
30 31 32
    return NULL;
  }

33
  for (int32_t i = 0; i < numOfSttTrigger; ++i) {
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
    pLoadInfo[i].blockIndex[0] = -1;
    pLoadInfo[i].blockIndex[1] = -1;
    pLoadInfo[i].currentLoadBlockIndex = 1;

    int32_t code = tBlockDataCreate(&pLoadInfo[i].blockData[0]);
    if (code) {
      terrno = code;
    }

    code = tBlockDataCreate(&pLoadInfo[i].blockData[1]);
    if (code) {
      terrno = code;
    }

    pLoadInfo[i].aSttBlk = taosArrayInit(4, sizeof(SSttBlk));
H
Haojun Liao 已提交
49 50 51
    pLoadInfo[i].pSchema = pSchema;
    pLoadInfo[i].colIds = colList;
    pLoadInfo[i].numOfCols = numOfCols;
52 53 54 55 56
  }

  return pLoadInfo;
}

57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
SSttBlockLoadInfo *tCreateOneLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols) {
  SSttBlockLoadInfo *pLoadInfo = taosMemoryCalloc(1, sizeof(SSttBlockLoadInfo));
  if (pLoadInfo == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  pLoadInfo->blockIndex[0] = -1;
  pLoadInfo->blockIndex[1] = -1;
  pLoadInfo->currentLoadBlockIndex = 1;

  int32_t code = tBlockDataCreate(&pLoadInfo->blockData[0]);
  if (code) {
    terrno = code;
  }

  code = tBlockDataCreate(&pLoadInfo->blockData[1]);
  if (code) {
    terrno = code;
  }

  pLoadInfo->aSttBlk = taosArrayInit(4, sizeof(SSttBlk));
  pLoadInfo->pSchema = pSchema;
  pLoadInfo->colIds = colList;
  pLoadInfo->numOfCols = numOfCols;

  return pLoadInfo;
}

H
Hongze Cheng 已提交
86
void resetLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
87
  for (int32_t i = 0; i < 1; ++i) {
88 89 90
    pLoadInfo[i].currentLoadBlockIndex = 1;
    pLoadInfo[i].blockIndex[0] = -1;
    pLoadInfo[i].blockIndex[1] = -1;
91

92
    taosArrayClear(pLoadInfo[i].aSttBlk);
H
Haojun Liao 已提交
93

94 95 96 97 98 99 100
    pLoadInfo[i].cost.loadBlocks = 0;
    pLoadInfo[i].cost.blockElapsedTime = 0;
    pLoadInfo[i].cost.statisElapsedTime = 0;
    pLoadInfo[i].cost.loadStatisBlocks = 0;
    pLoadInfo[i].statisBlockIndex = -1;
    tStatisBlockDestroy(pLoadInfo[i].statisBlock);

101
    pLoadInfo[i].sttBlockLoaded = false;
H
Haojun Liao 已提交
102 103 104
  }
}

105
void getSttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, SSttBlockLoadCostInfo* pLoadCost) {
106
  for (int32_t i = 0; i < 1; ++i) {
107 108 109 110
    pLoadCost->blockElapsedTime += pLoadInfo[i].cost.blockElapsedTime;
    pLoadCost->loadBlocks += pLoadInfo[i].cost.loadBlocks;
    pLoadCost->loadStatisBlocks += pLoadInfo[i].cost.loadStatisBlocks;
    pLoadCost->statisElapsedTime += pLoadInfo[i].cost.statisElapsedTime;
111 112 113
  }
}

H
Hongze Cheng 已提交
114
void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
H
Haojun Liao 已提交
115 116 117 118
  if (pLoadInfo == NULL) {
    return NULL;
  }

119 120 121
  pLoadInfo->currentLoadBlockIndex = 1;
  pLoadInfo->blockIndex[0] = -1;
  pLoadInfo->blockIndex[1] = -1;
122

123 124
  tStatisBlockDestroy(pLoadInfo->statisBlock);
  TARRAY2_DESTROY((TStatisBlkArray*)pLoadInfo->pSttStatisBlkArray, NULL);
125

126 127 128 129
  tBlockDataDestroy(&pLoadInfo->blockData[0]);
  tBlockDataDestroy(&pLoadInfo->blockData[1]);

  taosArrayDestroy(pLoadInfo->aSttBlk);
130 131 132 133 134

  taosMemoryFree(pLoadInfo);
  return NULL;
}

135
static void destroyLDataIter(SLDataIter *pIter) {
136 137 138
  tLDataIterClose2(pIter);
  destroyLastBlockLoadInfo(pIter->pBlockLoadInfo);
  taosMemoryFree(pIter);
139 140
}

141
void *destroySttBlockReader(SArray *pLDataIterArray, SSttBlockLoadCostInfo* pLoadCost) {
142 143
  if (pLDataIterArray == NULL) {
    return NULL;
H
Haojun Liao 已提交
144 145
  }

146
  int32_t numOfLevel = taosArrayGetSize(pLDataIterArray);
147 148 149 150
  for (int32_t i = 0; i < numOfLevel; ++i) {
    SArray *pList = taosArrayGetP(pLDataIterArray, i);
    for (int32_t j = 0; j < taosArrayGetSize(pList); ++j) {
      SLDataIter *pIter = taosArrayGetP(pList, j);
151 152 153 154 155 156 157
      if (pLoadCost != NULL) {
        pLoadCost->loadBlocks += pIter->pBlockLoadInfo->cost.loadBlocks;
        pLoadCost->loadStatisBlocks += pIter->pBlockLoadInfo->cost.loadStatisBlocks;
        pLoadCost->blockElapsedTime += pIter->pBlockLoadInfo->cost.blockElapsedTime;
        pLoadCost->statisElapsedTime += pIter->pBlockLoadInfo->cost.statisElapsedTime;
      }

158 159 160
      destroyLDataIter(pIter);
    }
    taosArrayDestroy(pList);
H
Haojun Liao 已提交
161
  }
162 163 164

  taosArrayDestroy(pLDataIterArray);
  return NULL;
H
Haojun Liao 已提交
165 166
}

H
Hongze Cheng 已提交
167
static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) {
168 169
  int32_t code = 0;

H
Hongze Cheng 已提交
170 171
  SSttBlockLoadInfo *pInfo = pIter->pBlockLoadInfo;
  if (pInfo->blockIndex[0] == pIter->iSttBlk) {
172 173 174 175 176
    if (pInfo->currentLoadBlockIndex != 0) {
      tsdbDebug("current load index is set to 0, block index:%d, file index:%d, due to uid:%" PRIu64 ", load data, %s",
                pIter->iSttBlk, pIter->iStt, pIter->uid, idStr);
      pInfo->currentLoadBlockIndex = 0;
    }
177 178 179
    return &pInfo->blockData[0];
  }

180
  if (pInfo->blockIndex[1] == pIter->iSttBlk) {
181
    if (pInfo->currentLoadBlockIndex != 1) {
H
Hongze Cheng 已提交
182
      tsdbDebug("current load index is set to 1, block index:%d, file index:%d, due to uid:%" PRIu64 ", load data, %s",
183 184 185
                pIter->iSttBlk, pIter->iStt, pIter->uid, idStr);
      pInfo->currentLoadBlockIndex = 1;
    }
186 187 188
    return &pInfo->blockData[1];
  }

189
  if (pIter->pSttBlk == NULL || pInfo->pSchema == NULL) {
H
Haojun Liao 已提交
190 191 192 193
    return NULL;
  }

  // current block not loaded yet
194
  pInfo->currentLoadBlockIndex ^= 1;
H
Haojun Liao 已提交
195
  int64_t st = taosGetTimestampUs();
196

H
Haojun Liao 已提交
197
  SBlockData *pBlock = &pInfo->blockData[pInfo->currentLoadBlockIndex];
198 199
  code = tsdbSttFileReadBlockDataByColumn(pIter->pReader, pIter->pSttBlk, pBlock, pInfo->pSchema, &pInfo->colIds[1],
                                          pInfo->numOfCols - 1);
H
Haojun Liao 已提交
200 201 202
  if (code != TSDB_CODE_SUCCESS) {
    goto _exit;
  }
H
Haojun Liao 已提交
203

H
Haojun Liao 已提交
204
  double el = (taosGetTimestampUs() - st) / 1000.0;
205 206
  pInfo->cost.blockElapsedTime += el;
  pInfo->cost.loadBlocks += 1;
207

208
  tsdbDebug("read last block, total load:%"PRId64", trigger by uid:%" PRIu64
209
            ", last file index:%d, last block index:%d, entry:%d, rows:%d, %p, elapsed time:%.2f ms, %s",
210
            pInfo->cost.loadBlocks, pIter->uid, pIter->iStt, pIter->iSttBlk, pInfo->currentLoadBlockIndex, pBlock->nRow,
dengyihao's avatar
dengyihao 已提交
211
            pBlock, el, idStr);
212

H
Haojun Liao 已提交
213 214
  pInfo->blockIndex[pInfo->currentLoadBlockIndex] = pIter->iSttBlk;
  pIter->iRow = (pIter->backward) ? pInfo->blockData[pInfo->currentLoadBlockIndex].nRow : -1;
H
Haojun Liao 已提交
215

216 217
  tsdbDebug("last block index list:%d, %d, rowIndex:%d %s", pInfo->blockIndex[0], pInfo->blockIndex[1], pIter->iRow,
            idStr);
218 219
  return &pInfo->blockData[pInfo->currentLoadBlockIndex];

H
Hongze Cheng 已提交
220
_exit:
221 222 223 224 225
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
  }

  return NULL;
226 227
}

228
// find the earliest block that contains the required records
H
Hongze Cheng 已提交
229 230
static FORCE_INLINE int32_t findEarliestIndex(int32_t index, uint64_t uid, const SSttBlk *pBlockList, int32_t num,
                                              int32_t backward) {
231
  int32_t i = index;
H
Hongze Cheng 已提交
232
  int32_t step = backward ? 1 : -1;
H
Haojun Liao 已提交
233
  while (i >= 0 && i < num && uid >= pBlockList[i].minUid && uid <= pBlockList[i].maxUid) {
234 235 236 237 238
    i += step;
  }
  return i - step;
}

H
Hongze Cheng 已提交
239
static int32_t binarySearchForStartBlock(SSttBlk *pBlockList, int32_t num, uint64_t uid, int32_t backward) {
240 241 242 243 244 245 246 247 248 249 250 251 252 253 254
  int32_t midPos = -1;
  if (num <= 0) {
    return -1;
  }

  int32_t firstPos = 0;
  int32_t lastPos = num - 1;

  // find the first position which is bigger than the key
  if ((uid > pBlockList[lastPos].maxUid) || (uid < pBlockList[firstPos].minUid)) {
    return -1;
  }

  while (1) {
    if (uid >= pBlockList[firstPos].minUid && uid <= pBlockList[firstPos].maxUid) {
255
      return findEarliestIndex(firstPos, uid, pBlockList, num, backward);
256 257 258 259 260 261 262 263 264 265 266 267 268 269
    }

    if (uid > pBlockList[lastPos].maxUid || uid < pBlockList[firstPos].minUid) {
      return -1;
    }

    int32_t numOfRows = lastPos - firstPos + 1;
    midPos = (numOfRows >> 1u) + firstPos;

    if (uid < pBlockList[midPos].minUid) {
      lastPos = midPos - 1;
    } else if (uid > pBlockList[midPos].maxUid) {
      firstPos = midPos + 1;
    } else {
270
      return findEarliestIndex(midPos, uid, pBlockList, num, backward);
271 272 273 274
    }
  }
}

H
Hongze Cheng 已提交
275 276
static FORCE_INLINE int32_t findEarliestRow(int32_t index, uint64_t uid, const uint64_t *uidList, int32_t num,
                                            int32_t backward) {
277
  int32_t i = index;
H
Hongze Cheng 已提交
278
  int32_t step = backward ? 1 : -1;
H
Haojun Liao 已提交
279
  while (i >= 0 && i < num && uid == uidList[i]) {
280 281 282 283 284
    i += step;
  }
  return i - step;
}

H
Hongze Cheng 已提交
285
static int32_t binarySearchForStartRowIndex(uint64_t *uidList, int32_t num, uint64_t uid, int32_t backward) {
286 287 288 289 290 291 292 293 294 295
  int32_t firstPos = 0;
  int32_t lastPos = num - 1;

  // find the first position which is bigger than the key
  if ((uid > uidList[lastPos]) || (uid < uidList[firstPos])) {
    return -1;
  }

  while (1) {
    if (uid == uidList[firstPos]) {
296
      return findEarliestRow(firstPos, uid, uidList, num, backward);
297 298 299 300 301 302 303 304 305 306 307 308 309 310
    }

    if (uid > uidList[lastPos] || uid < uidList[firstPos]) {
      return -1;
    }

    int32_t numOfRows = lastPos - firstPos + 1;
    int32_t midPos = (numOfRows >> 1u) + firstPos;

    if (uid < uidList[midPos]) {
      lastPos = midPos - 1;
    } else if (uid > uidList[midPos]) {
      firstPos = midPos + 1;
    } else {
311
      return findEarliestRow(midPos, uid, uidList, num, backward);
312 313 314 315
    }
  }
}

316
int32_t tLDataIterOpen(struct SLDataIter *pIter, SDataFReader *pReader, int32_t iStt, int8_t backward, uint64_t suid,
317 318
                       uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange, SSttBlockLoadInfo *pBlockLoadInfo,
                       const char *idStr, bool strictTimeRange) {
319 320 321
  return 0;
}

322 323
static int32_t extractSttBlockInfo(SLDataIter *pIter, const TSttBlkArray *pArray, SSttBlockLoadInfo *pBlockLoadInfo,
                                   uint64_t suid) {
324 325 326 327 328 329 330 331 332
  if (TARRAY2_SIZE(pArray) <= 0) {
    return TSDB_CODE_SUCCESS;
  }

  SSttBlk *pStart = &pArray->data[0];
  SSttBlk *pEnd = &pArray->data[TARRAY2_SIZE(pArray) - 1];

  // all identical
  if (pStart->suid == pEnd->suid) {
333
    if (pStart->suid != suid) {  // no qualified stt block existed
334 335 336 337 338 339 340 341 342 343
      taosArrayClear(pBlockLoadInfo->aSttBlk);
      pIter->iSttBlk = -1;
      return TSDB_CODE_SUCCESS;
    } else {  // all blocks are qualified
      taosArrayClear(pBlockLoadInfo->aSttBlk);
      taosArrayAddBatch(pBlockLoadInfo->aSttBlk, pArray->data, pArray->size);
    }
  } else {
    SArray *pTmp = taosArrayInit(TARRAY2_SIZE(pArray), sizeof(SSttBlk));
    for (int32_t i = 0; i < TARRAY2_SIZE(pArray); ++i) {
344
      SSttBlk *p = &pArray->data[i];
345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362
      if (p->suid < suid) {
        continue;
      }

      if (p->suid == suid) {
        taosArrayPush(pTmp, p);
      } else if (p->suid > suid) {
        break;
      }
    }

    taosArrayDestroy(pBlockLoadInfo->aSttBlk);
    pBlockLoadInfo->aSttBlk = pTmp;
  }

  return TSDB_CODE_SUCCESS;
}

363 364
static int32_t suidComparFn(const void *target, const void *p2) {
  const uint64_t *targetUid = target;
365
  const uint64_t *uid2 = p2;
366 367 368
  if (*uid2 == (*targetUid)) {
    return 0;
  } else {
H
Hongze Cheng 已提交
369
    return (*targetUid) < (*uid2) ? -1 : 1;
370
  }
371
}
372

373
static bool existsFromSttBlkStatis(SSttBlockLoadInfo *pBlockLoadInfo, uint64_t suid, uint64_t uid,
374
                                   SSttFileReader *pReader) {
375
  const TStatisBlkArray *pStatisBlkArray = pBlockLoadInfo->pSttStatisBlkArray;
376 377
  if (TARRAY2_SIZE(pStatisBlkArray) <= 0) {
    return true;
378 379
  }

380 381 382
  int32_t i = 0;
  for (i = 0; i < TARRAY2_SIZE(pStatisBlkArray); ++i) {
    SStatisBlk *p = &pStatisBlkArray->data[i];
H
Haojun Liao 已提交
383
    if (p->minTbid.suid <= suid && p->maxTbid.suid >= suid) {
384
      break;
385
    }
386
  }
387

388 389
  if (i >= TARRAY2_SIZE(pStatisBlkArray)) {
    return false;
390 391
  }

H
Hongze Cheng 已提交
392
  while (i < TARRAY2_SIZE(pStatisBlkArray)) {
393 394 395 396
    SStatisBlk *p = &pStatisBlkArray->data[i];
    if (p->minTbid.suid > suid) {
      return false;
    }
397

398 399
    if (pBlockLoadInfo->statisBlock == NULL) {
      pBlockLoadInfo->statisBlock = taosMemoryCalloc(1, sizeof(STbStatisBlock));
400 401

      int64_t st = taosGetTimestampMs();
402 403
      tsdbSttFileReadStatisBlock(pReader, p, pBlockLoadInfo->statisBlock);
      pBlockLoadInfo->statisBlockIndex = i;
404 405

      double el = (taosGetTimestampMs() - st) / 1000.0;
406
      pBlockLoadInfo->cost.loadStatisBlocks += 1;
407
      pBlockLoadInfo->cost.statisElapsedTime += el;
408 409
    } else if (pBlockLoadInfo->statisBlockIndex != i) {
      tStatisBlockDestroy(pBlockLoadInfo->statisBlock);
410 411

      int64_t st = taosGetTimestampMs();
412 413
      tsdbSttFileReadStatisBlock(pReader, p, pBlockLoadInfo->statisBlock);
      pBlockLoadInfo->statisBlockIndex = i;
414 415 416


      double el = (taosGetTimestampMs() - st) / 1000.0;
417
      pBlockLoadInfo->cost.loadStatisBlocks += 1;
418
      pBlockLoadInfo->cost.statisElapsedTime += el;
419
    }
420

421 422
    STbStatisBlock* pBlock = pBlockLoadInfo->statisBlock;
    int32_t index = tarray2SearchIdx(pBlock->suid, &suid, sizeof(int64_t), suidComparFn, TD_EQ);
423 424
    if (index == -1) {
      return false;
425
    }
426 427

    int32_t j = index;
428
    if (pBlock->uid->data[j] == uid) {
429
      return true;
430 431 432
    } else if (pBlock->uid->data[j] > uid) {
      while (j >= 0 && pBlock->suid->data[j] == suid) {
        if (pBlock->uid->data[j] == uid) {
433 434 435 436 437 438 439
          return true;
        } else {
          j -= 1;
        }
      }
    } else {
      j = index + 1;
440 441
      while (j < pBlock->suid->size && pBlock->suid->data[j] == suid) {
        if (pBlock->uid->data[j] == uid) {
442 443 444 445
          return true;
        } else {
          j += 1;
        }
446 447
      }
    }
448 449

    i += 1;
450
  }
451

452
  return false;
453 454
}

455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489
static int32_t doLoadSttFilesBlk(SSttBlockLoadInfo *pBlockLoadInfo, SLDataIter *pIter, int64_t suid,
                                 _load_tomb_fn loadTombFn, void *pReader1, const char *idStr) {
  int64_t st = taosGetTimestampUs();

  const TSttBlkArray *pSttBlkArray = NULL;
  pBlockLoadInfo->sttBlockLoaded = true;

  // load the stt block info for each stt-block
  int32_t code = tsdbSttFileReadSttBlk(pIter->pReader, &pSttBlkArray);
  if (code != TSDB_CODE_SUCCESS) {
    tsdbError("load stt blk failed, code:%s, %s", tstrerror(code), idStr);
    return code;
  }

  code = extractSttBlockInfo(pIter, pSttBlkArray, pBlockLoadInfo, suid);
  if (code != TSDB_CODE_SUCCESS) {
    tsdbError("load stt block info failed, code:%s, %s", tstrerror(code), idStr);
    return code;
  }

  // load stt blocks statis for all stt-blocks, to decide if the data of queried table exists in current stt file
  code = tsdbSttFileReadStatisBlk(pIter->pReader, (const TStatisBlkArray **)&pBlockLoadInfo->pSttStatisBlkArray);
  if (code != TSDB_CODE_SUCCESS) {
    tsdbError("failed to load stt block statistics, code:%s, %s", tstrerror(code), idStr);
    return code;
  }

  code = loadTombFn(pReader1, pIter->pReader, pIter->pBlockLoadInfo);

  double el = (taosGetTimestampUs() - st) / 1000.0;
  tsdbDebug("load the stt file info completed, elapsed time:%.2fms, %s", el, idStr);
  return code;
}

int32_t tLDataIterOpen2(SLDataIter *pIter, SSttFileReader *pSttFileReader, int32_t iStt, int8_t backward,
490 491 492
                        uint64_t suid, uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange,
                        SSttBlockLoadInfo *pBlockLoadInfo, const char *idStr, bool strictTimeRange,
                        _load_tomb_fn loadTombFn, void *pReader1) {
H
Haojun Liao 已提交
493 494
  int32_t code = TSDB_CODE_SUCCESS;

495 496 497
  pIter->uid = uid;
  pIter->iStt = iStt;
  pIter->backward = backward;
498 499 500 501
  pIter->verRange.minVer = pRange->minVer;
  pIter->verRange.maxVer = pRange->maxVer;
  pIter->timeWindow.skey = pTimeWindow->skey;
  pIter->timeWindow.ekey = pTimeWindow->ekey;
H
Haojun Liao 已提交
502
  pIter->pReader = pSttFileReader;
503
  pIter->pBlockLoadInfo = pBlockLoadInfo;
504

505
  // open stt file failed, ignore and continue
S
slzhou 已提交
506 507 508 509 510 511 512
  if (pIter->pReader == NULL) {
    tsdbError("stt file reader is null, %s", idStr);
    pIter->pSttBlk = NULL;
    pIter->iSttBlk = -1;
    return TSDB_CODE_SUCCESS;
  }

513
  if (!pBlockLoadInfo->sttBlockLoaded) {
514
    code = doLoadSttFilesBlk(pBlockLoadInfo, pIter, suid, loadTombFn, pReader1, idStr);
515 516
    if (code != TSDB_CODE_SUCCESS) {
      return code;
517
    }
518 519
  }

520 521 522 523 524 525
  bool exists = existsFromSttBlkStatis(pBlockLoadInfo, suid, uid, pIter->pReader);
  if (!exists) {
    pIter->iSttBlk = -1;
    pIter->pSttBlk = NULL;
    return TSDB_CODE_SUCCESS;
   }
H
Hongze Cheng 已提交
526

527 528
  // find the start block, actually we could load the position to avoid repeatly searching for the start position when
  // the skey is updated.
H
Haojun Liao 已提交
529 530
  size_t size = taosArrayGetSize(pBlockLoadInfo->aSttBlk);
  pIter->iSttBlk = binarySearchForStartBlock(pBlockLoadInfo->aSttBlk->pData, size, uid, backward);
531
  if (pIter->iSttBlk != -1) {
H
Haojun Liao 已提交
532
    pIter->pSttBlk = taosArrayGet(pBlockLoadInfo->aSttBlk, pIter->iSttBlk);
533 534 535 536 537
    pIter->iRow = (pIter->backward) ? pIter->pSttBlk->nRow : -1;

    if ((!backward) && ((strictTimeRange && pIter->pSttBlk->minKey >= pIter->timeWindow.ekey) ||
                        (!strictTimeRange && pIter->pSttBlk->minKey > pIter->timeWindow.ekey))) {
      pIter->pSttBlk = NULL;
538 539
    }

540 541 542 543
    if (backward && ((strictTimeRange && pIter->pSttBlk->maxKey <= pIter->timeWindow.skey) ||
                     (!strictTimeRange && pIter->pSttBlk->maxKey < pIter->timeWindow.skey))) {
      pIter->pSttBlk = NULL;
      pIter->ignoreEarlierTs = true;
544
    }
H
Hongze Cheng 已提交
545 546
  }

H
Haojun Liao 已提交
547
  return code;
H
Hongze Cheng 已提交
548 549
}

H
Haojun Liao 已提交
550 551
void tLDataIterClose2(SLDataIter *pIter) {
  tsdbSttFileReaderClose(&pIter->pReader);
H
Haojun Liao 已提交
552
  pIter->pReader = NULL;
H
Haojun Liao 已提交
553
}
H
Hongze Cheng 已提交
554

H
Hongze Cheng 已提交
555
void tLDataIterNextBlock(SLDataIter *pIter, const char *idStr) {
H
Hongze Cheng 已提交
556
  int32_t step = pIter->backward ? -1 : 1;
557 558
  int32_t oldIndex = pIter->iSttBlk;

H
Hongze Cheng 已提交
559
  pIter->iSttBlk += step;
H
Hongze Cheng 已提交
560

561
  int32_t index = -1;
H
Haojun Liao 已提交
562
  size_t  size = pIter->pBlockLoadInfo->aSttBlk->size;
H
Hongze Cheng 已提交
563
  for (int32_t i = pIter->iSttBlk; i < size && i >= 0; i += step) {
564
    SSttBlk *p = taosArrayGet(pIter->pBlockLoadInfo->aSttBlk, i);
565 566 567 568 569
    if ((!pIter->backward) && p->minUid > pIter->uid) {
      break;
    }

    if (pIter->backward && p->maxUid < pIter->uid) {
570 571 572
      break;
    }

573
    // check uid firstly
574
    if (p->minUid <= pIter->uid && p->maxUid >= pIter->uid) {
575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597
      if ((!pIter->backward) && p->minKey > pIter->timeWindow.ekey) {
        break;
      }

      if (pIter->backward && p->maxKey < pIter->timeWindow.skey) {
        break;
      }

      // check time range secondly
      if (p->minKey <= pIter->timeWindow.ekey && p->maxKey >= pIter->timeWindow.skey) {
        if ((!pIter->backward) && p->minVer > pIter->verRange.maxVer) {
          break;
        }

        if (pIter->backward && p->maxVer < pIter->verRange.minVer) {
          break;
        }

        if (p->minVer <= pIter->verRange.maxVer && p->maxVer >= pIter->verRange.minVer) {
          index = i;
          break;
        }
      }
598 599 600
    }
  }

601 602
  pIter->pSttBlk = NULL;
  if (index != -1) {
H
Haojun Liao 已提交
603
    pIter->iSttBlk = index;
604
    pIter->pSttBlk = (SSttBlk *)taosArrayGet(pIter->pBlockLoadInfo->aSttBlk, pIter->iSttBlk);
605 606
    tsdbDebug("try next last file block:%d from stt fileIdx:%d, trigger by uid:%" PRIu64 ", file index:%d, %s",
              pIter->iSttBlk, oldIndex, pIter->uid, pIter->iStt, idStr);
607
  } else {
608
    tsdbDebug("no more last block qualified, uid:%" PRIu64 ", file index:%d, %s", pIter->uid, oldIndex, idStr);
609 610 611
  }
}

H
Hongze Cheng 已提交
612 613
static void findNextValidRow(SLDataIter *pIter, const char *idStr) {
  bool    hasVal = false;
614
  int32_t step = pIter->backward ? -1 : 1;
H
Hongze Cheng 已提交
615
  int32_t i = pIter->iRow;
616

617
  SBlockData *pData = loadLastBlock(pIter, idStr);
618

619
  // mostly we only need to find the start position for a given table
620 621
  if ((((i == 0) && (!pIter->backward)) || (i == pData->nRow - 1 && pIter->backward)) && pData->aUid != NULL) {
    i = binarySearchForStartRowIndex((uint64_t *)pData->aUid, pData->nRow, pIter->uid, pIter->backward);
H
Haojun Liao 已提交
622
    if (i == -1) {
623
      tsdbDebug("failed to find the data in pBlockData, uid:%" PRIu64 " , %s", pIter->uid, idStr);
H
Haojun Liao 已提交
624 625 626
      pIter->iRow = -1;
      return;
    }
627 628
  }

629 630
  for (; i < pData->nRow && i >= 0; i += step) {
    if (pData->aUid != NULL) {
631
      if (!pIter->backward) {
632
        if (pData->aUid[i] > pIter->uid) {
633 634 635
          break;
        }
      } else {
636
        if (pData->aUid[i] < pIter->uid) {
637 638 639 640 641
          break;
        }
      }
    }

642
    int64_t ts = pData->aTSKEY[i];
H
Hongze Cheng 已提交
643
    if (!pIter->backward) {               // asc
644 645 646 647 648 649 650 651 652 653 654
      if (ts > pIter->timeWindow.ekey) {  // no more data
        break;
      } else if (ts < pIter->timeWindow.skey) {
        continue;
      }
    } else {
      if (ts < pIter->timeWindow.skey) {
        break;
      } else if (ts > pIter->timeWindow.ekey) {
        continue;
      }
655 656
    }

657
    int64_t ver = pData->aVersion[i];
658 659 660 661 662 663 664 665 666 667 668
    if (ver < pIter->verRange.minVer) {
      continue;
    }

    // todo opt handle desc case
    if (ver > pIter->verRange.maxVer) {
      continue;
    }

    hasVal = true;
    break;
H
Hongze Cheng 已提交
669
  }
670

H
Hongze Cheng 已提交
671
  pIter->iRow = (hasVal) ? i : -1;
H
Hongze Cheng 已提交
672 673
}

H
Hongze Cheng 已提交
674
bool tLDataIterNextRow(SLDataIter *pIter, const char *idStr) {
H
Hongze Cheng 已提交
675
  int32_t step = pIter->backward ? -1 : 1;
H
Haojun Liao 已提交
676
  terrno = TSDB_CODE_SUCCESS;
677 678

  // no qualified last file block in current file, no need to fetch row
H
Hongze Cheng 已提交
679
  if (pIter->pSttBlk == NULL) {
680 681
    return false;
  }
H
Hongze Cheng 已提交
682

H
Hongze Cheng 已提交
683
  int32_t     iBlockL = pIter->iSttBlk;
H
Haojun Liao 已提交
684
  SBlockData *pBlockData = loadLastBlock(pIter, idStr);
H
Haojun Liao 已提交
685
  if (pBlockData == NULL || terrno != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
686 687 688
    goto _exit;
  }

689 690
  pIter->iRow += step;

H
Hongze Cheng 已提交
691
  while (1) {
692
    bool skipBlock = false;
H
Haojun Liao 已提交
693
    findNextValidRow(pIter, idStr);
694

695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718
    if (pIter->pBlockLoadInfo->checkRemainingRow) {
      skipBlock = true;
      int16_t *aCols = pIter->pBlockLoadInfo->colIds;
      int      nCols = pIter->pBlockLoadInfo->numOfCols;
      bool     isLast = pIter->pBlockLoadInfo->isLast;
      for (int inputColIndex = 0; inputColIndex < nCols; ++inputColIndex) {
        for (int colIndex = 0; colIndex < pBlockData->nColData; ++colIndex) {
          SColData *pColData = &pBlockData->aColData[colIndex];
          int16_t   cid = pColData->cid;

          if (cid == aCols[inputColIndex]) {
            if (isLast && (pColData->flag & HAS_VALUE)) {
              skipBlock = false;
              break;
            } else if (pColData->flag & (HAS_VALUE | HAS_NULL)) {
              skipBlock = false;
              break;
            }
          }
        }
      }
    }

    if (skipBlock || pIter->iRow >= pBlockData->nRow || pIter->iRow < 0) {
719
      tLDataIterNextBlock(pIter, idStr);
H
Hongze Cheng 已提交
720
      if (pIter->pSttBlk == NULL) {  // no more data
721 722 723 724
        goto _exit;
      }
    } else {
      break;
H
Hongze Cheng 已提交
725 726
    }

H
Hongze Cheng 已提交
727
    if (iBlockL != pIter->iSttBlk) {
H
Haojun Liao 已提交
728
      pBlockData = loadLastBlock(pIter, idStr);
729 730 731
      if (pBlockData == NULL) {
        goto _exit;
      }
H
Haojun Liao 已提交
732 733

      // set start row index
734
      pIter->iRow = pIter->backward ? pBlockData->nRow - 1 : 0;
H
Hongze Cheng 已提交
735 736 737
    }
  }

738 739 740
  pIter->rInfo.suid = pBlockData->suid;
  pIter->rInfo.uid = pBlockData->uid;
  pIter->rInfo.row = tsdbRowFromBlockData(pBlockData, pIter->iRow);
H
Hongze Cheng 已提交
741 742

_exit:
743
  return (terrno == TSDB_CODE_SUCCESS) && (pIter->pSttBlk != NULL) && (pBlockData != NULL);
H
Hongze Cheng 已提交
744 745
}

H
Hongze Cheng 已提交
746
SRowInfo *tLDataIterGet(SLDataIter *pIter) { return &pIter->rInfo; }
H
Hongze Cheng 已提交
747 748

// SMergeTree =================================================
H
Hongze Cheng 已提交
749 750 751
static FORCE_INLINE int32_t tLDataIterCmprFn(const SRBTreeNode *p1, const SRBTreeNode *p2) {
  SLDataIter *pIter1 = (SLDataIter *)(((uint8_t *)p1) - offsetof(SLDataIter, node));
  SLDataIter *pIter2 = (SLDataIter *)(((uint8_t *)p2) - offsetof(SLDataIter, node));
H
Hongze Cheng 已提交
752

753 754
  TSDBKEY key1 = TSDBROW_KEY(&pIter1->rInfo.row);
  TSDBKEY key2 = TSDBROW_KEY(&pIter2->rInfo.row);
H
Hongze Cheng 已提交
755

756 757 758 759 760 761 762 763 764 765 766 767 768
  if (key1.ts < key2.ts) {
    return -1;
  } else if (key1.ts > key2.ts) {
    return 1;
  } else {
    if (key1.version < key2.version) {
      return -1;
    } else if (key1.version > key2.version) {
      return 1;
    } else {
      return 0;
    }
  }
H
Hongze Cheng 已提交
769 770
}

771 772 773 774
static FORCE_INLINE int32_t tLDataIterDescCmprFn(const SRBTreeNode *p1, const SRBTreeNode *p2) {
  return -1 * tLDataIterCmprFn(p1, p2);
}

775
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid,
776
                       STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo,
777
                       bool destroyLoadInfo, const char *idStr, bool strictTimeRange, SLDataIter *pLDataIter) {
778 779
  int32_t code = TSDB_CODE_SUCCESS;

H
Hongze Cheng 已提交
780
  pMTree->backward = backward;
781
  pMTree->pIter = NULL;
H
Haojun Liao 已提交
782
  pMTree->idStr = idStr;
783

dengyihao's avatar
dengyihao 已提交
784
  if (!pMTree->backward) {  // asc
785
    tRBTreeCreate(&pMTree->rbt, tLDataIterCmprFn);
dengyihao's avatar
dengyihao 已提交
786
  } else {  // desc
787 788
    tRBTreeCreate(&pMTree->rbt, tLDataIterDescCmprFn);
  }
789

790 791
  pMTree->pLoadInfo = pBlockLoadInfo;
  pMTree->destroyLoadInfo = destroyLoadInfo;
792
  pMTree->ignoreEarlierTs = false;
793

H
Hongze Cheng 已提交
794
  for (int32_t i = 0; i < pFReader->pSet->nSttF; ++i) {  // open all last file
795 796
    memset(&pLDataIter[i], 0, sizeof(SLDataIter));
    code = tLDataIterOpen(&pLDataIter[i], pFReader, i, pMTree->backward, suid, uid, pTimeWindow, pVerRange,
797
                          &pMTree->pLoadInfo[i], pMTree->idStr, strictTimeRange);
798 799 800 801
    if (code != TSDB_CODE_SUCCESS) {
      goto _end;
    }

802
    bool hasVal = tLDataIterNextRow(&pLDataIter[i], pMTree->idStr);
803
    if (hasVal) {
804
      tMergeTreeAddIter(pMTree, &pLDataIter[i]);
805
    } else {
806
      if (!pMTree->ignoreEarlierTs) {
807
        pMTree->ignoreEarlierTs = pLDataIter[i].ignoreEarlierTs;
808
      }
809 810
    }
  }
811 812 813

  return code;

H
Hongze Cheng 已提交
814
_end:
815 816
  tMergeTreeClose(pMTree);
  return code;
H
Hongze Cheng 已提交
817
}
H
Hongze Cheng 已提交
818

819
int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf) {
H
Haojun Liao 已提交
820
  int32_t code = TSDB_CODE_SUCCESS;
821

H
Haojun Liao 已提交
822
  pMTree->pIter = NULL;
823 824
  pMTree->backward = pConf->backward;
  pMTree->idStr = pConf->idstr;
825

H
Haojun Liao 已提交
826 827 828 829 830
  if (!pMTree->backward) {  // asc
    tRBTreeCreate(&pMTree->rbt, tLDataIterCmprFn);
  } else {  // desc
    tRBTreeCreate(&pMTree->rbt, tLDataIterDescCmprFn);
  }
831

H
Haojun Liao 已提交
832
  pMTree->ignoreEarlierTs = false;
833

834
  int32_t size = ((STFileSet *)pConf->pCurrentFileset)->lvlArr->size;
H
Haojun Liao 已提交
835 836 837
  if (size == 0) {
    goto _end;
  }
838

839
  // add the list/iter placeholder
840
  while (taosArrayGetSize(pConf->pSttFileBlockIterArray) < size) {
841
    SArray *pList = taosArrayInit(4, POINTER_BYTES);
842
    taosArrayPush(pConf->pSttFileBlockIterArray, &pList);
843
  }
844

845
  for (int32_t j = 0; j < size; ++j) {
846
    SSttLvl *pSttLevel = ((STFileSet *)pConf->pCurrentFileset)->lvlArr->data[j];
847
    ASSERT(pSttLevel->level == j);
848

849
    SArray *pList = taosArrayGetP(pConf->pSttFileBlockIterArray, j);
850
    int32_t numOfIter = taosArrayGetSize(pList);
H
Haojun Liao 已提交
851

852 853
    if (numOfIter < TARRAY2_SIZE(pSttLevel->fobjArr)) {
      int32_t inc = TARRAY2_SIZE(pSttLevel->fobjArr) - numOfIter;
854
      for (int32_t k = 0; k < inc; ++k) {
855 856
        SLDataIter *pIter = taosMemoryCalloc(1, sizeof(SLDataIter));
        taosArrayPush(pList, &pIter);
H
Haojun Liao 已提交
857
      }
858 859 860 861 862 863
    } else if (numOfIter > TARRAY2_SIZE(pSttLevel->fobjArr)){
        int32_t inc = numOfIter - TARRAY2_SIZE(pSttLevel->fobjArr);
        for (int i = 0; i < inc; ++i) {
            SLDataIter *pIter = taosArrayPop(pList);
            destroyLDataIter(pIter);
        }
H
Haojun Liao 已提交
864
    }
H
Haojun Liao 已提交
865

866
    for (int32_t i = 0; i < TARRAY2_SIZE(pSttLevel->fobjArr); ++i) {  // open all last file
867
      SLDataIter *pIter = taosArrayGetP(pList, i);
H
Haojun Liao 已提交
868

869 870
      SSttFileReader    *pSttFileReader = pIter->pReader;
      SSttBlockLoadInfo *pLoadInfo = pIter->pBlockLoadInfo;
871 872 873

      // open stt file reader if not
      if (pSttFileReader == NULL) {
874
        SSttFileReaderConfig conf = {.tsdb = pConf->pTsdb, .szPage = pConf->pTsdb->pVnode->config.tsdbPageSize};
875 876 877 878
        conf.file[0] = *pSttLevel->fobjArr->data[i]->f;

        code = tsdbSttFileReaderOpen(pSttLevel->fobjArr->data[i]->fname, &conf, &pSttFileReader);
        if (code != TSDB_CODE_SUCCESS) {
S
slzhou 已提交
879 880
          tsdbError("open stt file reader error. file name %s, code %s, %s", pSttLevel->fobjArr->data[i]->fname,
                    tstrerror(code), pMTree->idStr);
881 882 883 884
        }
      }

      if (pLoadInfo == NULL) {
885
        pLoadInfo = tCreateOneLastBlockLoadInfo(pConf->pSchema, pConf->pCols, pConf->numOfCols);
886 887 888
      }

      memset(pIter, 0, sizeof(SLDataIter));
889
      code = tLDataIterOpen2(pIter, pSttFileReader, i, pMTree->backward, pConf->suid, pConf->uid, &pConf->timewindow,
890 891
                             &pConf->verRange, pLoadInfo, pMTree->idStr, pConf->strictTimeRange, pConf->loadTombFn,
                             pConf->pReader);
892 893 894
      if (code != TSDB_CODE_SUCCESS) {
        goto _end;
      }
S
slzhou 已提交
895
      
896 897 898 899 900 901 902
      bool hasVal = tLDataIterNextRow(pIter, pMTree->idStr);
      if (hasVal) {
        tMergeTreeAddIter(pMTree, pIter);
      } else {
        if (!pMTree->ignoreEarlierTs) {
          pMTree->ignoreEarlierTs = pIter->ignoreEarlierTs;
        }
H
Haojun Liao 已提交
903
      }
904
    }
H
Haojun Liao 已提交
905
  }
906

H
Haojun Liao 已提交
907
  return code;
908

H
Haojun Liao 已提交
909 910 911
_end:
  tMergeTreeClose(pMTree);
  return code;
912 913
}

914 915
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter) { tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pIter); }

916 917
bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree) { return pMTree->ignoreEarlierTs; }

H
Hongze Cheng 已提交
918
bool tMergeTreeNext(SMergeTree *pMTree) {
919 920 921 922
  int32_t code = TSDB_CODE_SUCCESS;
  if (pMTree->pIter) {
    SLDataIter *pIter = pMTree->pIter;

H
Haojun Liao 已提交
923
    bool hasVal = tLDataIterNextRow(pIter, pMTree->idStr);
924 925 926 927 928 929 930
    if (!hasVal) {
      pMTree->pIter = NULL;
    }

    // compare with min in RB Tree
    pIter = (SLDataIter *)tRBTreeMin(&pMTree->rbt);
    if (pMTree->pIter && pIter) {
H
Hongze Cheng 已提交
931
      int32_t c = pMTree->rbt.cmprFn(&pMTree->pIter->node, &pIter->node);
932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950
      if (c > 0) {
        tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pMTree->pIter);
        pMTree->pIter = NULL;
      } else {
        ASSERT(c);
      }
    }
  }

  if (pMTree->pIter == NULL) {
    pMTree->pIter = (SLDataIter *)tRBTreeMin(&pMTree->rbt);
    if (pMTree->pIter) {
      tRBTreeDrop(&pMTree->rbt, (SRBTreeNode *)pMTree->pIter);
    }
  }

  return pMTree->pIter != NULL;
}

H
Hongze Cheng 已提交
951
void tMergeTreeClose(SMergeTree *pMTree) {
952
  pMTree->pIter = NULL;
953 954 955 956
  if (pMTree->destroyLoadInfo) {
    pMTree->pLoadInfo = destroyLastBlockLoadInfo(pMTree->pLoadInfo);
    pMTree->destroyLoadInfo = false;
  }
957
}