tsdbMergeTree.c 29.3 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tsdb.h"
17
#include "tsdbFSet2.h"
18
#include "tsdbMerge.h"
H
Haojun Liao 已提交
19
#include "tsdbReadUtil.h"
20
#include "tsdbSttFileRW.h"
H
Hongze Cheng 已提交
21

H
Haojun Liao 已提交
22 23
static void tLDataIterClose2(SLDataIter *pIter);

24
// SLDataIter =================================================
25 26
SSttBlockLoadInfo *tCreateLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols,
                                            int32_t numOfSttTrigger) {
27
  SSttBlockLoadInfo *pLoadInfo = taosMemoryCalloc(numOfSttTrigger, sizeof(SSttBlockLoadInfo));
28
  if (pLoadInfo == NULL) {
H
Hongze Cheng 已提交
29
    terrno = TSDB_CODE_OUT_OF_MEMORY;
30 31 32
    return NULL;
  }

33
  for (int32_t i = 0; i < numOfSttTrigger; ++i) {
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
    pLoadInfo[i].blockIndex[0] = -1;
    pLoadInfo[i].blockIndex[1] = -1;
    pLoadInfo[i].currentLoadBlockIndex = 1;

    int32_t code = tBlockDataCreate(&pLoadInfo[i].blockData[0]);
    if (code) {
      terrno = code;
    }

    code = tBlockDataCreate(&pLoadInfo[i].blockData[1]);
    if (code) {
      terrno = code;
    }

    pLoadInfo[i].aSttBlk = taosArrayInit(4, sizeof(SSttBlk));
H
Haojun Liao 已提交
49 50 51
    pLoadInfo[i].pSchema = pSchema;
    pLoadInfo[i].colIds = colList;
    pLoadInfo[i].numOfCols = numOfCols;
52 53 54 55 56
  }

  return pLoadInfo;
}

57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
SSttBlockLoadInfo *tCreateOneLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols) {
  SSttBlockLoadInfo *pLoadInfo = taosMemoryCalloc(1, sizeof(SSttBlockLoadInfo));
  if (pLoadInfo == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  pLoadInfo->blockIndex[0] = -1;
  pLoadInfo->blockIndex[1] = -1;
  pLoadInfo->currentLoadBlockIndex = 1;

  int32_t code = tBlockDataCreate(&pLoadInfo->blockData[0]);
  if (code) {
    terrno = code;
  }

  code = tBlockDataCreate(&pLoadInfo->blockData[1]);
  if (code) {
    terrno = code;
  }

  pLoadInfo->aSttBlk = taosArrayInit(4, sizeof(SSttBlk));
  pLoadInfo->pSchema = pSchema;
  pLoadInfo->colIds = colList;
  pLoadInfo->numOfCols = numOfCols;

  return pLoadInfo;
}

H
Hongze Cheng 已提交
86
void resetLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
87
  for (int32_t i = 0; i < 1; ++i) {
88 89 90
    pLoadInfo[i].currentLoadBlockIndex = 1;
    pLoadInfo[i].blockIndex[0] = -1;
    pLoadInfo[i].blockIndex[1] = -1;
91

92
    taosArrayClear(pLoadInfo[i].aSttBlk);
H
Haojun Liao 已提交
93

94 95 96 97 98 99 100
    pLoadInfo[i].cost.loadBlocks = 0;
    pLoadInfo[i].cost.blockElapsedTime = 0;
    pLoadInfo[i].cost.statisElapsedTime = 0;
    pLoadInfo[i].cost.loadStatisBlocks = 0;
    pLoadInfo[i].statisBlockIndex = -1;
    tStatisBlockDestroy(pLoadInfo[i].statisBlock);

101
    pLoadInfo[i].sttBlockLoaded = false;
H
Haojun Liao 已提交
102 103 104
  }
}

105
void getSttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, SSttBlockLoadCostInfo* pLoadCost) {
106
  for (int32_t i = 0; i < 1; ++i) {
107 108 109 110
    pLoadCost->blockElapsedTime += pLoadInfo[i].cost.blockElapsedTime;
    pLoadCost->loadBlocks += pLoadInfo[i].cost.loadBlocks;
    pLoadCost->loadStatisBlocks += pLoadInfo[i].cost.loadStatisBlocks;
    pLoadCost->statisElapsedTime += pLoadInfo[i].cost.statisElapsedTime;
111 112 113
  }
}

H
Hongze Cheng 已提交
114
void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
H
Haojun Liao 已提交
115 116 117 118
  if (pLoadInfo == NULL) {
    return NULL;
  }

119
  for (int32_t i = 0; i < 1; ++i) {
120 121 122 123
    pLoadInfo[i].currentLoadBlockIndex = 1;
    pLoadInfo[i].blockIndex[0] = -1;
    pLoadInfo[i].blockIndex[1] = -1;

H
Hongze Cheng 已提交
124 125
    tBlockDataDestroy(&pLoadInfo[i].blockData[0]);
    tBlockDataDestroy(&pLoadInfo[i].blockData[1]);
126 127 128 129 130 131 132 133

    taosArrayDestroy(pLoadInfo[i].aSttBlk);
  }

  taosMemoryFree(pLoadInfo);
  return NULL;
}

134
static void destroyLDataIter(SLDataIter *pIter) {
135 136 137
  tLDataIterClose2(pIter);
  destroyLastBlockLoadInfo(pIter->pBlockLoadInfo);
  taosMemoryFree(pIter);
138 139
}

140
void *destroySttBlockReader(SArray *pLDataIterArray, SSttBlockLoadCostInfo* pLoadCost) {
141 142
  if (pLDataIterArray == NULL) {
    return NULL;
H
Haojun Liao 已提交
143 144
  }

145
  int32_t numOfLevel = taosArrayGetSize(pLDataIterArray);
146 147 148 149
  for (int32_t i = 0; i < numOfLevel; ++i) {
    SArray *pList = taosArrayGetP(pLDataIterArray, i);
    for (int32_t j = 0; j < taosArrayGetSize(pList); ++j) {
      SLDataIter *pIter = taosArrayGetP(pList, j);
150 151 152 153 154 155 156
      if (pLoadCost != NULL) {
        pLoadCost->loadBlocks += pIter->pBlockLoadInfo->cost.loadBlocks;
        pLoadCost->loadStatisBlocks += pIter->pBlockLoadInfo->cost.loadStatisBlocks;
        pLoadCost->blockElapsedTime += pIter->pBlockLoadInfo->cost.blockElapsedTime;
        pLoadCost->statisElapsedTime += pIter->pBlockLoadInfo->cost.statisElapsedTime;
      }

157 158 159
      destroyLDataIter(pIter);
    }
    taosArrayDestroy(pList);
H
Haojun Liao 已提交
160
  }
161 162 163

  taosArrayDestroy(pLDataIterArray);
  return NULL;
H
Haojun Liao 已提交
164 165
}

H
Hongze Cheng 已提交
166
static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) {
167 168
  int32_t code = 0;

H
Hongze Cheng 已提交
169 170
  SSttBlockLoadInfo *pInfo = pIter->pBlockLoadInfo;
  if (pInfo->blockIndex[0] == pIter->iSttBlk) {
171 172 173 174 175
    if (pInfo->currentLoadBlockIndex != 0) {
      tsdbDebug("current load index is set to 0, block index:%d, file index:%d, due to uid:%" PRIu64 ", load data, %s",
                pIter->iSttBlk, pIter->iStt, pIter->uid, idStr);
      pInfo->currentLoadBlockIndex = 0;
    }
176 177 178
    return &pInfo->blockData[0];
  }

179
  if (pInfo->blockIndex[1] == pIter->iSttBlk) {
180
    if (pInfo->currentLoadBlockIndex != 1) {
H
Hongze Cheng 已提交
181
      tsdbDebug("current load index is set to 1, block index:%d, file index:%d, due to uid:%" PRIu64 ", load data, %s",
182 183 184
                pIter->iSttBlk, pIter->iStt, pIter->uid, idStr);
      pInfo->currentLoadBlockIndex = 1;
    }
185 186 187
    return &pInfo->blockData[1];
  }

188
  if (pIter->pSttBlk == NULL || pInfo->pSchema == NULL) {
H
Haojun Liao 已提交
189 190 191 192
    return NULL;
  }

  // current block not loaded yet
193
  pInfo->currentLoadBlockIndex ^= 1;
H
Haojun Liao 已提交
194
  int64_t st = taosGetTimestampUs();
195

H
Haojun Liao 已提交
196
  SBlockData *pBlock = &pInfo->blockData[pInfo->currentLoadBlockIndex];
197 198
  code = tsdbSttFileReadBlockDataByColumn(pIter->pReader, pIter->pSttBlk, pBlock, pInfo->pSchema, &pInfo->colIds[1],
                                          pInfo->numOfCols - 1);
H
Haojun Liao 已提交
199 200 201
  if (code != TSDB_CODE_SUCCESS) {
    goto _exit;
  }
H
Haojun Liao 已提交
202

H
Haojun Liao 已提交
203
  double el = (taosGetTimestampUs() - st) / 1000.0;
204 205
  pInfo->cost.blockElapsedTime += el;
  pInfo->cost.loadBlocks += 1;
206

207
  tsdbDebug("read last block, total load:%"PRId64", trigger by uid:%" PRIu64
208
            ", last file index:%d, last block index:%d, entry:%d, rows:%d, %p, elapsed time:%.2f ms, %s",
209
            pInfo->cost.loadBlocks, pIter->uid, pIter->iStt, pIter->iSttBlk, pInfo->currentLoadBlockIndex, pBlock->nRow,
dengyihao's avatar
dengyihao 已提交
210
            pBlock, el, idStr);
211

H
Haojun Liao 已提交
212 213
  pInfo->blockIndex[pInfo->currentLoadBlockIndex] = pIter->iSttBlk;
  pIter->iRow = (pIter->backward) ? pInfo->blockData[pInfo->currentLoadBlockIndex].nRow : -1;
H
Haojun Liao 已提交
214

215 216
  tsdbDebug("last block index list:%d, %d, rowIndex:%d %s", pInfo->blockIndex[0], pInfo->blockIndex[1], pIter->iRow,
            idStr);
217 218
  return &pInfo->blockData[pInfo->currentLoadBlockIndex];

H
Hongze Cheng 已提交
219
_exit:
220 221 222 223 224
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
  }

  return NULL;
225 226
}

227
// find the earliest block that contains the required records
H
Hongze Cheng 已提交
228 229
static FORCE_INLINE int32_t findEarliestIndex(int32_t index, uint64_t uid, const SSttBlk *pBlockList, int32_t num,
                                              int32_t backward) {
230
  int32_t i = index;
H
Hongze Cheng 已提交
231
  int32_t step = backward ? 1 : -1;
H
Haojun Liao 已提交
232
  while (i >= 0 && i < num && uid >= pBlockList[i].minUid && uid <= pBlockList[i].maxUid) {
233 234 235 236 237
    i += step;
  }
  return i - step;
}

H
Hongze Cheng 已提交
238
static int32_t binarySearchForStartBlock(SSttBlk *pBlockList, int32_t num, uint64_t uid, int32_t backward) {
239 240 241 242 243 244 245 246 247 248 249 250 251 252 253
  int32_t midPos = -1;
  if (num <= 0) {
    return -1;
  }

  int32_t firstPos = 0;
  int32_t lastPos = num - 1;

  // find the first position which is bigger than the key
  if ((uid > pBlockList[lastPos].maxUid) || (uid < pBlockList[firstPos].minUid)) {
    return -1;
  }

  while (1) {
    if (uid >= pBlockList[firstPos].minUid && uid <= pBlockList[firstPos].maxUid) {
254
      return findEarliestIndex(firstPos, uid, pBlockList, num, backward);
255 256 257 258 259 260 261 262 263 264 265 266 267 268
    }

    if (uid > pBlockList[lastPos].maxUid || uid < pBlockList[firstPos].minUid) {
      return -1;
    }

    int32_t numOfRows = lastPos - firstPos + 1;
    midPos = (numOfRows >> 1u) + firstPos;

    if (uid < pBlockList[midPos].minUid) {
      lastPos = midPos - 1;
    } else if (uid > pBlockList[midPos].maxUid) {
      firstPos = midPos + 1;
    } else {
269
      return findEarliestIndex(midPos, uid, pBlockList, num, backward);
270 271 272 273
    }
  }
}

H
Hongze Cheng 已提交
274 275
static FORCE_INLINE int32_t findEarliestRow(int32_t index, uint64_t uid, const uint64_t *uidList, int32_t num,
                                            int32_t backward) {
276
  int32_t i = index;
H
Hongze Cheng 已提交
277
  int32_t step = backward ? 1 : -1;
H
Haojun Liao 已提交
278
  while (i >= 0 && i < num && uid == uidList[i]) {
279 280 281 282 283
    i += step;
  }
  return i - step;
}

H
Hongze Cheng 已提交
284
static int32_t binarySearchForStartRowIndex(uint64_t *uidList, int32_t num, uint64_t uid, int32_t backward) {
285 286 287 288 289 290 291 292 293 294
  int32_t firstPos = 0;
  int32_t lastPos = num - 1;

  // find the first position which is bigger than the key
  if ((uid > uidList[lastPos]) || (uid < uidList[firstPos])) {
    return -1;
  }

  while (1) {
    if (uid == uidList[firstPos]) {
295
      return findEarliestRow(firstPos, uid, uidList, num, backward);
296 297 298 299 300 301 302 303 304 305 306 307 308 309
    }

    if (uid > uidList[lastPos] || uid < uidList[firstPos]) {
      return -1;
    }

    int32_t numOfRows = lastPos - firstPos + 1;
    int32_t midPos = (numOfRows >> 1u) + firstPos;

    if (uid < uidList[midPos]) {
      lastPos = midPos - 1;
    } else if (uid > uidList[midPos]) {
      firstPos = midPos + 1;
    } else {
310
      return findEarliestRow(midPos, uid, uidList, num, backward);
311 312 313 314
    }
  }
}

315
int32_t tLDataIterOpen(struct SLDataIter *pIter, SDataFReader *pReader, int32_t iStt, int8_t backward, uint64_t suid,
316 317
                       uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange, SSttBlockLoadInfo *pBlockLoadInfo,
                       const char *idStr, bool strictTimeRange) {
318 319 320
  return 0;
}

321 322
static int32_t extractSttBlockInfo(SLDataIter *pIter, const TSttBlkArray *pArray, SSttBlockLoadInfo *pBlockLoadInfo,
                                   uint64_t suid) {
323 324 325 326 327 328 329 330 331
  if (TARRAY2_SIZE(pArray) <= 0) {
    return TSDB_CODE_SUCCESS;
  }

  SSttBlk *pStart = &pArray->data[0];
  SSttBlk *pEnd = &pArray->data[TARRAY2_SIZE(pArray) - 1];

  // all identical
  if (pStart->suid == pEnd->suid) {
332
    if (pStart->suid != suid) {  // no qualified stt block existed
333 334 335 336 337 338 339 340 341 342
      taosArrayClear(pBlockLoadInfo->aSttBlk);
      pIter->iSttBlk = -1;
      return TSDB_CODE_SUCCESS;
    } else {  // all blocks are qualified
      taosArrayClear(pBlockLoadInfo->aSttBlk);
      taosArrayAddBatch(pBlockLoadInfo->aSttBlk, pArray->data, pArray->size);
    }
  } else {
    SArray *pTmp = taosArrayInit(TARRAY2_SIZE(pArray), sizeof(SSttBlk));
    for (int32_t i = 0; i < TARRAY2_SIZE(pArray); ++i) {
343
      SSttBlk *p = &pArray->data[i];
344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361
      if (p->suid < suid) {
        continue;
      }

      if (p->suid == suid) {
        taosArrayPush(pTmp, p);
      } else if (p->suid > suid) {
        break;
      }
    }

    taosArrayDestroy(pBlockLoadInfo->aSttBlk);
    pBlockLoadInfo->aSttBlk = pTmp;
  }

  return TSDB_CODE_SUCCESS;
}

362 363
static int32_t suidComparFn(const void *target, const void *p2) {
  const uint64_t *targetUid = target;
364
  const uint64_t *uid2 = p2;
365 366 367
  if (*uid2 == (*targetUid)) {
    return 0;
  } else {
H
Hongze Cheng 已提交
368
    return (*targetUid) < (*uid2) ? -1 : 1;
369
  }
370
}
371

372
static bool existsFromSttBlkStatis(SSttBlockLoadInfo *pBlockLoadInfo, uint64_t suid, uint64_t uid,
373
                                   SSttFileReader *pReader) {
374
  const TStatisBlkArray *pStatisBlkArray = pBlockLoadInfo->pSttStatisBlkArray;
375 376
  if (TARRAY2_SIZE(pStatisBlkArray) <= 0) {
    return true;
377 378
  }

379 380 381
  int32_t i = 0;
  for (i = 0; i < TARRAY2_SIZE(pStatisBlkArray); ++i) {
    SStatisBlk *p = &pStatisBlkArray->data[i];
H
Haojun Liao 已提交
382
    if (p->minTbid.suid <= suid && p->maxTbid.suid >= suid) {
383
      break;
384
    }
385
  }
386

387 388
  if (i >= TARRAY2_SIZE(pStatisBlkArray)) {
    return false;
389 390
  }

H
Hongze Cheng 已提交
391
  while (i < TARRAY2_SIZE(pStatisBlkArray)) {
392 393 394 395
    SStatisBlk *p = &pStatisBlkArray->data[i];
    if (p->minTbid.suid > suid) {
      return false;
    }
396

397 398 399 400 401 402 403 404 405 406 407
    if (pBlockLoadInfo->statisBlock == NULL) {
      pBlockLoadInfo->statisBlock = taosMemoryCalloc(1, sizeof(STbStatisBlock));
      tsdbSttFileReadStatisBlock(pReader, p, pBlockLoadInfo->statisBlock);
      pBlockLoadInfo->statisBlockIndex = i;
      pBlockLoadInfo->cost.loadStatisBlocks += 1;
    } else if (pBlockLoadInfo->statisBlockIndex != i) {
      tStatisBlockDestroy(pBlockLoadInfo->statisBlock);
      tsdbSttFileReadStatisBlock(pReader, p, pBlockLoadInfo->statisBlock);
      pBlockLoadInfo->statisBlockIndex = i;
      pBlockLoadInfo->cost.loadStatisBlocks += 1;
    }
408

409 410
    STbStatisBlock* pBlock = pBlockLoadInfo->statisBlock;
    int32_t index = tarray2SearchIdx(pBlock->suid, &suid, sizeof(int64_t), suidComparFn, TD_EQ);
411 412
    if (index == -1) {
      return false;
413
    }
414 415

    int32_t j = index;
416
    if (pBlock->uid->data[j] == uid) {
417
      return true;
418 419 420
    } else if (pBlock->uid->data[j] > uid) {
      while (j >= 0 && pBlock->suid->data[j] == suid) {
        if (pBlock->uid->data[j] == uid) {
421 422 423 424 425 426 427
          return true;
        } else {
          j -= 1;
        }
      }
    } else {
      j = index + 1;
428 429
      while (j < pBlock->suid->size && pBlock->suid->data[j] == suid) {
        if (pBlock->uid->data[j] == uid) {
430 431 432 433
          return true;
        } else {
          j += 1;
        }
434 435
      }
    }
436 437

    i += 1;
438
  }
439

440
  return false;
441 442
}

443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477
static int32_t doLoadSttFilesBlk(SSttBlockLoadInfo *pBlockLoadInfo, SLDataIter *pIter, int64_t suid,
                                 _load_tomb_fn loadTombFn, void *pReader1, const char *idStr) {
  int64_t st = taosGetTimestampUs();

  const TSttBlkArray *pSttBlkArray = NULL;
  pBlockLoadInfo->sttBlockLoaded = true;

  // load the stt block info for each stt-block
  int32_t code = tsdbSttFileReadSttBlk(pIter->pReader, &pSttBlkArray);
  if (code != TSDB_CODE_SUCCESS) {
    tsdbError("load stt blk failed, code:%s, %s", tstrerror(code), idStr);
    return code;
  }

  code = extractSttBlockInfo(pIter, pSttBlkArray, pBlockLoadInfo, suid);
  if (code != TSDB_CODE_SUCCESS) {
    tsdbError("load stt block info failed, code:%s, %s", tstrerror(code), idStr);
    return code;
  }

  // load stt blocks statis for all stt-blocks, to decide if the data of queried table exists in current stt file
  code = tsdbSttFileReadStatisBlk(pIter->pReader, (const TStatisBlkArray **)&pBlockLoadInfo->pSttStatisBlkArray);
  if (code != TSDB_CODE_SUCCESS) {
    tsdbError("failed to load stt block statistics, code:%s, %s", tstrerror(code), idStr);
    return code;
  }

  code = loadTombFn(pReader1, pIter->pReader, pIter->pBlockLoadInfo);

  double el = (taosGetTimestampUs() - st) / 1000.0;
  tsdbDebug("load the stt file info completed, elapsed time:%.2fms, %s", el, idStr);
  return code;
}

int32_t tLDataIterOpen2(SLDataIter *pIter, SSttFileReader *pSttFileReader, int32_t iStt, int8_t backward,
478 479 480
                        uint64_t suid, uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange,
                        SSttBlockLoadInfo *pBlockLoadInfo, const char *idStr, bool strictTimeRange,
                        _load_tomb_fn loadTombFn, void *pReader1) {
H
Haojun Liao 已提交
481 482
  int32_t code = TSDB_CODE_SUCCESS;

483 484 485
  pIter->uid = uid;
  pIter->iStt = iStt;
  pIter->backward = backward;
486 487 488 489
  pIter->verRange.minVer = pRange->minVer;
  pIter->verRange.maxVer = pRange->maxVer;
  pIter->timeWindow.skey = pTimeWindow->skey;
  pIter->timeWindow.ekey = pTimeWindow->ekey;
H
Haojun Liao 已提交
490
  pIter->pReader = pSttFileReader;
491
  pIter->pBlockLoadInfo = pBlockLoadInfo;
492

493
  // open stt file failed, ignore and continue
S
slzhou 已提交
494 495 496 497 498 499 500
  if (pIter->pReader == NULL) {
    tsdbError("stt file reader is null, %s", idStr);
    pIter->pSttBlk = NULL;
    pIter->iSttBlk = -1;
    return TSDB_CODE_SUCCESS;
  }

501
  if (!pBlockLoadInfo->sttBlockLoaded) {
502
    code = doLoadSttFilesBlk(pBlockLoadInfo, pIter, suid, loadTombFn, pReader1, idStr);
503 504
    if (code != TSDB_CODE_SUCCESS) {
      return code;
505
    }
506 507
  }

508 509 510 511 512 513
  bool exists = existsFromSttBlkStatis(pBlockLoadInfo, suid, uid, pIter->pReader);
  if (!exists) {
    pIter->iSttBlk = -1;
    pIter->pSttBlk = NULL;
    return TSDB_CODE_SUCCESS;
   }
H
Hongze Cheng 已提交
514

515 516
  // find the start block, actually we could load the position to avoid repeatly searching for the start position when
  // the skey is updated.
H
Haojun Liao 已提交
517 518
  size_t size = taosArrayGetSize(pBlockLoadInfo->aSttBlk);
  pIter->iSttBlk = binarySearchForStartBlock(pBlockLoadInfo->aSttBlk->pData, size, uid, backward);
519
  if (pIter->iSttBlk != -1) {
H
Haojun Liao 已提交
520
    pIter->pSttBlk = taosArrayGet(pBlockLoadInfo->aSttBlk, pIter->iSttBlk);
521 522 523 524 525
    pIter->iRow = (pIter->backward) ? pIter->pSttBlk->nRow : -1;

    if ((!backward) && ((strictTimeRange && pIter->pSttBlk->minKey >= pIter->timeWindow.ekey) ||
                        (!strictTimeRange && pIter->pSttBlk->minKey > pIter->timeWindow.ekey))) {
      pIter->pSttBlk = NULL;
526 527
    }

528 529 530 531
    if (backward && ((strictTimeRange && pIter->pSttBlk->maxKey <= pIter->timeWindow.skey) ||
                     (!strictTimeRange && pIter->pSttBlk->maxKey < pIter->timeWindow.skey))) {
      pIter->pSttBlk = NULL;
      pIter->ignoreEarlierTs = true;
532
    }
H
Hongze Cheng 已提交
533 534
  }

H
Haojun Liao 已提交
535
  return code;
H
Hongze Cheng 已提交
536 537
}

H
Haojun Liao 已提交
538 539
void tLDataIterClose2(SLDataIter *pIter) {
  tsdbSttFileReaderClose(&pIter->pReader);
H
Haojun Liao 已提交
540
  pIter->pReader = NULL;
H
Haojun Liao 已提交
541
}
H
Hongze Cheng 已提交
542

H
Hongze Cheng 已提交
543
void tLDataIterNextBlock(SLDataIter *pIter, const char *idStr) {
H
Hongze Cheng 已提交
544
  int32_t step = pIter->backward ? -1 : 1;
545 546
  int32_t oldIndex = pIter->iSttBlk;

H
Hongze Cheng 已提交
547
  pIter->iSttBlk += step;
H
Hongze Cheng 已提交
548

549
  int32_t index = -1;
H
Haojun Liao 已提交
550
  size_t  size = pIter->pBlockLoadInfo->aSttBlk->size;
H
Hongze Cheng 已提交
551
  for (int32_t i = pIter->iSttBlk; i < size && i >= 0; i += step) {
552
    SSttBlk *p = taosArrayGet(pIter->pBlockLoadInfo->aSttBlk, i);
553 554 555 556 557
    if ((!pIter->backward) && p->minUid > pIter->uid) {
      break;
    }

    if (pIter->backward && p->maxUid < pIter->uid) {
558 559 560
      break;
    }

561
    // check uid firstly
562
    if (p->minUid <= pIter->uid && p->maxUid >= pIter->uid) {
563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585
      if ((!pIter->backward) && p->minKey > pIter->timeWindow.ekey) {
        break;
      }

      if (pIter->backward && p->maxKey < pIter->timeWindow.skey) {
        break;
      }

      // check time range secondly
      if (p->minKey <= pIter->timeWindow.ekey && p->maxKey >= pIter->timeWindow.skey) {
        if ((!pIter->backward) && p->minVer > pIter->verRange.maxVer) {
          break;
        }

        if (pIter->backward && p->maxVer < pIter->verRange.minVer) {
          break;
        }

        if (p->minVer <= pIter->verRange.maxVer && p->maxVer >= pIter->verRange.minVer) {
          index = i;
          break;
        }
      }
586 587 588
    }
  }

589 590
  pIter->pSttBlk = NULL;
  if (index != -1) {
H
Haojun Liao 已提交
591
    pIter->iSttBlk = index;
592
    pIter->pSttBlk = (SSttBlk *)taosArrayGet(pIter->pBlockLoadInfo->aSttBlk, pIter->iSttBlk);
593 594
    tsdbDebug("try next last file block:%d from stt fileIdx:%d, trigger by uid:%" PRIu64 ", file index:%d, %s",
              pIter->iSttBlk, oldIndex, pIter->uid, pIter->iStt, idStr);
595
  } else {
596
    tsdbDebug("no more last block qualified, uid:%" PRIu64 ", file index:%d, %s", pIter->uid, oldIndex, idStr);
597 598 599
  }
}

H
Hongze Cheng 已提交
600 601
static void findNextValidRow(SLDataIter *pIter, const char *idStr) {
  bool    hasVal = false;
602
  int32_t step = pIter->backward ? -1 : 1;
H
Hongze Cheng 已提交
603
  int32_t i = pIter->iRow;
604

605
  SBlockData *pData = loadLastBlock(pIter, idStr);
606

607
  // mostly we only need to find the start position for a given table
608 609
  if ((((i == 0) && (!pIter->backward)) || (i == pData->nRow - 1 && pIter->backward)) && pData->aUid != NULL) {
    i = binarySearchForStartRowIndex((uint64_t *)pData->aUid, pData->nRow, pIter->uid, pIter->backward);
H
Haojun Liao 已提交
610
    if (i == -1) {
611
      tsdbDebug("failed to find the data in pBlockData, uid:%" PRIu64 " , %s", pIter->uid, idStr);
H
Haojun Liao 已提交
612 613 614
      pIter->iRow = -1;
      return;
    }
615 616
  }

617 618
  for (; i < pData->nRow && i >= 0; i += step) {
    if (pData->aUid != NULL) {
619
      if (!pIter->backward) {
620
        if (pData->aUid[i] > pIter->uid) {
621 622 623
          break;
        }
      } else {
624
        if (pData->aUid[i] < pIter->uid) {
625 626 627 628 629
          break;
        }
      }
    }

630
    int64_t ts = pData->aTSKEY[i];
H
Hongze Cheng 已提交
631
    if (!pIter->backward) {               // asc
632 633 634 635 636 637 638 639 640 641 642
      if (ts > pIter->timeWindow.ekey) {  // no more data
        break;
      } else if (ts < pIter->timeWindow.skey) {
        continue;
      }
    } else {
      if (ts < pIter->timeWindow.skey) {
        break;
      } else if (ts > pIter->timeWindow.ekey) {
        continue;
      }
643 644
    }

645
    int64_t ver = pData->aVersion[i];
646 647 648 649 650 651 652 653 654 655 656
    if (ver < pIter->verRange.minVer) {
      continue;
    }

    // todo opt handle desc case
    if (ver > pIter->verRange.maxVer) {
      continue;
    }

    hasVal = true;
    break;
H
Hongze Cheng 已提交
657
  }
658

H
Hongze Cheng 已提交
659
  pIter->iRow = (hasVal) ? i : -1;
H
Hongze Cheng 已提交
660 661
}

H
Hongze Cheng 已提交
662
bool tLDataIterNextRow(SLDataIter *pIter, const char *idStr) {
H
Hongze Cheng 已提交
663
  int32_t step = pIter->backward ? -1 : 1;
H
Haojun Liao 已提交
664
  terrno = TSDB_CODE_SUCCESS;
665 666

  // no qualified last file block in current file, no need to fetch row
H
Hongze Cheng 已提交
667
  if (pIter->pSttBlk == NULL) {
668 669
    return false;
  }
H
Hongze Cheng 已提交
670

H
Hongze Cheng 已提交
671
  int32_t     iBlockL = pIter->iSttBlk;
H
Haojun Liao 已提交
672
  SBlockData *pBlockData = loadLastBlock(pIter, idStr);
H
Haojun Liao 已提交
673
  if (pBlockData == NULL || terrno != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
674 675 676
    goto _exit;
  }

677 678
  pIter->iRow += step;

H
Hongze Cheng 已提交
679
  while (1) {
680
    bool skipBlock = false;
H
Haojun Liao 已提交
681
    findNextValidRow(pIter, idStr);
682

683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706
    if (pIter->pBlockLoadInfo->checkRemainingRow) {
      skipBlock = true;
      int16_t *aCols = pIter->pBlockLoadInfo->colIds;
      int      nCols = pIter->pBlockLoadInfo->numOfCols;
      bool     isLast = pIter->pBlockLoadInfo->isLast;
      for (int inputColIndex = 0; inputColIndex < nCols; ++inputColIndex) {
        for (int colIndex = 0; colIndex < pBlockData->nColData; ++colIndex) {
          SColData *pColData = &pBlockData->aColData[colIndex];
          int16_t   cid = pColData->cid;

          if (cid == aCols[inputColIndex]) {
            if (isLast && (pColData->flag & HAS_VALUE)) {
              skipBlock = false;
              break;
            } else if (pColData->flag & (HAS_VALUE | HAS_NULL)) {
              skipBlock = false;
              break;
            }
          }
        }
      }
    }

    if (skipBlock || pIter->iRow >= pBlockData->nRow || pIter->iRow < 0) {
707
      tLDataIterNextBlock(pIter, idStr);
H
Hongze Cheng 已提交
708
      if (pIter->pSttBlk == NULL) {  // no more data
709 710 711 712
        goto _exit;
      }
    } else {
      break;
H
Hongze Cheng 已提交
713 714
    }

H
Hongze Cheng 已提交
715
    if (iBlockL != pIter->iSttBlk) {
H
Haojun Liao 已提交
716
      pBlockData = loadLastBlock(pIter, idStr);
717 718 719
      if (pBlockData == NULL) {
        goto _exit;
      }
H
Haojun Liao 已提交
720 721

      // set start row index
722
      pIter->iRow = pIter->backward ? pBlockData->nRow - 1 : 0;
H
Hongze Cheng 已提交
723 724 725
    }
  }

726 727 728
  pIter->rInfo.suid = pBlockData->suid;
  pIter->rInfo.uid = pBlockData->uid;
  pIter->rInfo.row = tsdbRowFromBlockData(pBlockData, pIter->iRow);
H
Hongze Cheng 已提交
729 730

_exit:
731
  return (terrno == TSDB_CODE_SUCCESS) && (pIter->pSttBlk != NULL) && (pBlockData != NULL);
H
Hongze Cheng 已提交
732 733
}

H
Hongze Cheng 已提交
734
SRowInfo *tLDataIterGet(SLDataIter *pIter) { return &pIter->rInfo; }
H
Hongze Cheng 已提交
735 736

// SMergeTree =================================================
H
Hongze Cheng 已提交
737 738 739
static FORCE_INLINE int32_t tLDataIterCmprFn(const SRBTreeNode *p1, const SRBTreeNode *p2) {
  SLDataIter *pIter1 = (SLDataIter *)(((uint8_t *)p1) - offsetof(SLDataIter, node));
  SLDataIter *pIter2 = (SLDataIter *)(((uint8_t *)p2) - offsetof(SLDataIter, node));
H
Hongze Cheng 已提交
740

741 742
  TSDBKEY key1 = TSDBROW_KEY(&pIter1->rInfo.row);
  TSDBKEY key2 = TSDBROW_KEY(&pIter2->rInfo.row);
H
Hongze Cheng 已提交
743

744 745 746 747 748 749 750 751 752 753 754 755 756
  if (key1.ts < key2.ts) {
    return -1;
  } else if (key1.ts > key2.ts) {
    return 1;
  } else {
    if (key1.version < key2.version) {
      return -1;
    } else if (key1.version > key2.version) {
      return 1;
    } else {
      return 0;
    }
  }
H
Hongze Cheng 已提交
757 758
}

759 760 761 762
static FORCE_INLINE int32_t tLDataIterDescCmprFn(const SRBTreeNode *p1, const SRBTreeNode *p2) {
  return -1 * tLDataIterCmprFn(p1, p2);
}

763
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid,
764
                       STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo,
765
                       bool destroyLoadInfo, const char *idStr, bool strictTimeRange, SLDataIter *pLDataIter) {
766 767
  int32_t code = TSDB_CODE_SUCCESS;

H
Hongze Cheng 已提交
768
  pMTree->backward = backward;
769
  pMTree->pIter = NULL;
H
Haojun Liao 已提交
770
  pMTree->idStr = idStr;
771

dengyihao's avatar
dengyihao 已提交
772
  if (!pMTree->backward) {  // asc
773
    tRBTreeCreate(&pMTree->rbt, tLDataIterCmprFn);
dengyihao's avatar
dengyihao 已提交
774
  } else {  // desc
775 776
    tRBTreeCreate(&pMTree->rbt, tLDataIterDescCmprFn);
  }
777

778 779
  pMTree->pLoadInfo = pBlockLoadInfo;
  pMTree->destroyLoadInfo = destroyLoadInfo;
780
  pMTree->ignoreEarlierTs = false;
781

H
Hongze Cheng 已提交
782
  for (int32_t i = 0; i < pFReader->pSet->nSttF; ++i) {  // open all last file
783 784
    memset(&pLDataIter[i], 0, sizeof(SLDataIter));
    code = tLDataIterOpen(&pLDataIter[i], pFReader, i, pMTree->backward, suid, uid, pTimeWindow, pVerRange,
785
                          &pMTree->pLoadInfo[i], pMTree->idStr, strictTimeRange);
786 787 788 789
    if (code != TSDB_CODE_SUCCESS) {
      goto _end;
    }

790
    bool hasVal = tLDataIterNextRow(&pLDataIter[i], pMTree->idStr);
791
    if (hasVal) {
792
      tMergeTreeAddIter(pMTree, &pLDataIter[i]);
793
    } else {
794
      if (!pMTree->ignoreEarlierTs) {
795
        pMTree->ignoreEarlierTs = pLDataIter[i].ignoreEarlierTs;
796
      }
797 798
    }
  }
799 800 801

  return code;

H
Hongze Cheng 已提交
802
_end:
803 804
  tMergeTreeClose(pMTree);
  return code;
H
Hongze Cheng 已提交
805
}
H
Hongze Cheng 已提交
806

807
int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf) {
H
Haojun Liao 已提交
808
  int32_t code = TSDB_CODE_SUCCESS;
809

H
Haojun Liao 已提交
810
  pMTree->pIter = NULL;
811 812
  pMTree->backward = pConf->backward;
  pMTree->idStr = pConf->idstr;
813

H
Haojun Liao 已提交
814 815 816 817 818
  if (!pMTree->backward) {  // asc
    tRBTreeCreate(&pMTree->rbt, tLDataIterCmprFn);
  } else {  // desc
    tRBTreeCreate(&pMTree->rbt, tLDataIterDescCmprFn);
  }
819

H
Haojun Liao 已提交
820
  pMTree->ignoreEarlierTs = false;
821

822
  int32_t size = ((STFileSet *)pConf->pCurrentFileset)->lvlArr->size;
H
Haojun Liao 已提交
823 824 825
  if (size == 0) {
    goto _end;
  }
826

827
  // add the list/iter placeholder
828
  while (taosArrayGetSize(pConf->pSttFileBlockIterArray) < size) {
829
    SArray *pList = taosArrayInit(4, POINTER_BYTES);
830
    taosArrayPush(pConf->pSttFileBlockIterArray, &pList);
831
  }
832

833
  for (int32_t j = 0; j < size; ++j) {
834
    SSttLvl *pSttLevel = ((STFileSet *)pConf->pCurrentFileset)->lvlArr->data[j];
835
    ASSERT(pSttLevel->level == j);
836

837
    SArray *pList = taosArrayGetP(pConf->pSttFileBlockIterArray, j);
838
    int32_t numOfIter = taosArrayGetSize(pList);
H
Haojun Liao 已提交
839

840 841
    if (numOfIter < TARRAY2_SIZE(pSttLevel->fobjArr)) {
      int32_t inc = TARRAY2_SIZE(pSttLevel->fobjArr) - numOfIter;
842
      for (int32_t k = 0; k < inc; ++k) {
843 844
        SLDataIter *pIter = taosMemoryCalloc(1, sizeof(SLDataIter));
        taosArrayPush(pList, &pIter);
H
Haojun Liao 已提交
845
      }
846 847 848 849 850 851
    } else if (numOfIter > TARRAY2_SIZE(pSttLevel->fobjArr)){
        int32_t inc = numOfIter - TARRAY2_SIZE(pSttLevel->fobjArr);
        for (int i = 0; i < inc; ++i) {
            SLDataIter *pIter = taosArrayPop(pList);
            destroyLDataIter(pIter);
        }
H
Haojun Liao 已提交
852
    }
H
Haojun Liao 已提交
853

854
    for (int32_t i = 0; i < TARRAY2_SIZE(pSttLevel->fobjArr); ++i) {  // open all last file
855
      SLDataIter *pIter = taosArrayGetP(pList, i);
H
Haojun Liao 已提交
856

857 858
      SSttFileReader    *pSttFileReader = pIter->pReader;
      SSttBlockLoadInfo *pLoadInfo = pIter->pBlockLoadInfo;
859 860 861

      // open stt file reader if not
      if (pSttFileReader == NULL) {
862
        SSttFileReaderConfig conf = {.tsdb = pConf->pTsdb, .szPage = pConf->pTsdb->pVnode->config.tsdbPageSize};
863 864 865 866
        conf.file[0] = *pSttLevel->fobjArr->data[i]->f;

        code = tsdbSttFileReaderOpen(pSttLevel->fobjArr->data[i]->fname, &conf, &pSttFileReader);
        if (code != TSDB_CODE_SUCCESS) {
S
slzhou 已提交
867 868
          tsdbError("open stt file reader error. file name %s, code %s, %s", pSttLevel->fobjArr->data[i]->fname,
                    tstrerror(code), pMTree->idStr);
869 870 871 872
        }
      }

      if (pLoadInfo == NULL) {
873
        pLoadInfo = tCreateOneLastBlockLoadInfo(pConf->pSchema, pConf->pCols, pConf->numOfCols);
874 875 876
      }

      memset(pIter, 0, sizeof(SLDataIter));
877
      code = tLDataIterOpen2(pIter, pSttFileReader, i, pMTree->backward, pConf->suid, pConf->uid, &pConf->timewindow,
878 879
                             &pConf->verRange, pLoadInfo, pMTree->idStr, pConf->strictTimeRange, pConf->loadTombFn,
                             pConf->pReader);
880 881 882
      if (code != TSDB_CODE_SUCCESS) {
        goto _end;
      }
S
slzhou 已提交
883
      
884 885 886 887 888 889 890
      bool hasVal = tLDataIterNextRow(pIter, pMTree->idStr);
      if (hasVal) {
        tMergeTreeAddIter(pMTree, pIter);
      } else {
        if (!pMTree->ignoreEarlierTs) {
          pMTree->ignoreEarlierTs = pIter->ignoreEarlierTs;
        }
H
Haojun Liao 已提交
891
      }
892
    }
H
Haojun Liao 已提交
893
  }
894

H
Haojun Liao 已提交
895
  return code;
896

H
Haojun Liao 已提交
897 898 899
_end:
  tMergeTreeClose(pMTree);
  return code;
900 901
}

902 903
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter) { tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pIter); }

904 905
bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree) { return pMTree->ignoreEarlierTs; }

H
Hongze Cheng 已提交
906
bool tMergeTreeNext(SMergeTree *pMTree) {
907 908 909 910
  int32_t code = TSDB_CODE_SUCCESS;
  if (pMTree->pIter) {
    SLDataIter *pIter = pMTree->pIter;

H
Haojun Liao 已提交
911
    bool hasVal = tLDataIterNextRow(pIter, pMTree->idStr);
912 913 914 915 916 917 918
    if (!hasVal) {
      pMTree->pIter = NULL;
    }

    // compare with min in RB Tree
    pIter = (SLDataIter *)tRBTreeMin(&pMTree->rbt);
    if (pMTree->pIter && pIter) {
H
Hongze Cheng 已提交
919
      int32_t c = pMTree->rbt.cmprFn(&pMTree->pIter->node, &pIter->node);
920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938
      if (c > 0) {
        tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pMTree->pIter);
        pMTree->pIter = NULL;
      } else {
        ASSERT(c);
      }
    }
  }

  if (pMTree->pIter == NULL) {
    pMTree->pIter = (SLDataIter *)tRBTreeMin(&pMTree->rbt);
    if (pMTree->pIter) {
      tRBTreeDrop(&pMTree->rbt, (SRBTreeNode *)pMTree->pIter);
    }
  }

  return pMTree->pIter != NULL;
}

H
Hongze Cheng 已提交
939
void tMergeTreeClose(SMergeTree *pMTree) {
940
  pMTree->pIter = NULL;
941 942 943 944
  if (pMTree->destroyLoadInfo) {
    pMTree->pLoadInfo = destroyLastBlockLoadInfo(pMTree->pLoadInfo);
    pMTree->destroyLoadInfo = false;
  }
945
}