tsdbMergeTree.c 25.6 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tsdb.h"
17 18
#include "tsdbFSet2.h"
#include "tsdbSttFileRW.h"
H
Hongze Cheng 已提交
19

H
Haojun Liao 已提交
20 21
static void tLDataIterClose2(SLDataIter *pIter);

22
// SLDataIter =================================================
23 24
SSttBlockLoadInfo *tCreateLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols,
                                            int32_t numOfSttTrigger) {
25
  SSttBlockLoadInfo *pLoadInfo = taosMemoryCalloc(numOfSttTrigger, sizeof(SSttBlockLoadInfo));
26
  if (pLoadInfo == NULL) {
H
Hongze Cheng 已提交
27
    terrno = TSDB_CODE_OUT_OF_MEMORY;
28 29 30
    return NULL;
  }

31 32 33
  pLoadInfo->numOfStt = numOfSttTrigger;

  for (int32_t i = 0; i < numOfSttTrigger; ++i) {
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
    pLoadInfo[i].blockIndex[0] = -1;
    pLoadInfo[i].blockIndex[1] = -1;
    pLoadInfo[i].currentLoadBlockIndex = 1;

    int32_t code = tBlockDataCreate(&pLoadInfo[i].blockData[0]);
    if (code) {
      terrno = code;
    }

    code = tBlockDataCreate(&pLoadInfo[i].blockData[1]);
    if (code) {
      terrno = code;
    }

    pLoadInfo[i].aSttBlk = taosArrayInit(4, sizeof(SSttBlk));
H
Haojun Liao 已提交
49 50 51
    pLoadInfo[i].pSchema = pSchema;
    pLoadInfo[i].colIds = colList;
    pLoadInfo[i].numOfCols = numOfCols;
52 53 54 55 56
  }

  return pLoadInfo;
}

57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
SSttBlockLoadInfo *tCreateOneLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols) {
  SSttBlockLoadInfo *pLoadInfo = taosMemoryCalloc(1, sizeof(SSttBlockLoadInfo));
  if (pLoadInfo == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
  pLoadInfo->numOfStt = 1;

  pLoadInfo->blockIndex[0] = -1;
  pLoadInfo->blockIndex[1] = -1;
  pLoadInfo->currentLoadBlockIndex = 1;

  int32_t code = tBlockDataCreate(&pLoadInfo->blockData[0]);
  if (code) {
    terrno = code;
  }

  code = tBlockDataCreate(&pLoadInfo->blockData[1]);
  if (code) {
    terrno = code;
  }

  pLoadInfo->aSttBlk = taosArrayInit(4, sizeof(SSttBlk));
80
  pLoadInfo->pTombBlockArray = taosArrayInit(4, POINTER_BYTES);
81 82 83 84 85 86 87
  pLoadInfo->pSchema = pSchema;
  pLoadInfo->colIds = colList;
  pLoadInfo->numOfCols = numOfCols;

  return pLoadInfo;
}

H
Hongze Cheng 已提交
88
void resetLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
89
  for (int32_t i = 0; i < pLoadInfo->numOfStt; ++i) {
90 91 92
    pLoadInfo[i].currentLoadBlockIndex = 1;
    pLoadInfo[i].blockIndex[0] = -1;
    pLoadInfo[i].blockIndex[1] = -1;
93

94
    taosArrayClear(pLoadInfo[i].aSttBlk);
H
Haojun Liao 已提交
95 96 97

    pLoadInfo[i].elapsedTime = 0;
    pLoadInfo[i].loadBlocks = 0;
98
    pLoadInfo[i].sttBlockLoaded = false;
H
Haojun Liao 已提交
99 100 101
  }
}

H
Hongze Cheng 已提交
102
void getLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, int64_t *blocks, double *el) {
103
  for (int32_t i = 0; i < pLoadInfo->numOfStt; ++i) {
H
Haojun Liao 已提交
104 105
    *el += pLoadInfo[i].elapsedTime;
    *blocks += pLoadInfo[i].loadBlocks;
106 107 108
  }
}

H
Haojun Liao 已提交
109 110 111 112 113 114
static void freeTombBlock(void* param) {
  STombBlock** pTombBlock = (STombBlock**) param;
  tTombBlockDestroy(*pTombBlock);
  taosMemoryFree(*pTombBlock);
}

H
Hongze Cheng 已提交
115
void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
H
Haojun Liao 已提交
116 117 118 119
  if (pLoadInfo == NULL) {
    return NULL;
  }

120
  for (int32_t i = 0; i < pLoadInfo->numOfStt; ++i) {
121 122 123 124
    pLoadInfo[i].currentLoadBlockIndex = 1;
    pLoadInfo[i].blockIndex[0] = -1;
    pLoadInfo[i].blockIndex[1] = -1;

H
Hongze Cheng 已提交
125 126
    tBlockDataDestroy(&pLoadInfo[i].blockData[0]);
    tBlockDataDestroy(&pLoadInfo[i].blockData[1]);
127 128 129 130

    taosArrayDestroy(pLoadInfo[i].aSttBlk);
  }

H
Haojun Liao 已提交
131 132
  taosArrayDestroyEx(pLoadInfo->pTombBlockArray, freeTombBlock);

133 134 135 136
  taosMemoryFree(pLoadInfo);
  return NULL;
}

137 138 139 140 141 142 143 144 145 146
static void destroyLDataIterFn(void* param) {
  SLDataIter** pIter = (SLDataIter**) param;
  tLDataIterClose2(*pIter);
  destroyLastBlockLoadInfo((*pIter)->pBlockLoadInfo);
  taosMemoryFree(*pIter);
}

void* destroySttBlockReader(SArray* pLDataIterArray) {
  if (pLDataIterArray == NULL) {
    return NULL;
H
Haojun Liao 已提交
147 148
  }

149 150 151 152
  int32_t numOfLevel = taosArrayGetSize(pLDataIterArray);
  for(int32_t i = 0; i < numOfLevel; ++i) {
    SArray* pList = taosArrayGetP(pLDataIterArray, i);
    taosArrayDestroyEx(pList, destroyLDataIterFn);
H
Haojun Liao 已提交
153
  }
154 155 156

  taosArrayDestroy(pLDataIterArray);
  return NULL;
H
Haojun Liao 已提交
157 158
}

H
Hongze Cheng 已提交
159
static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) {
160 161
  int32_t code = 0;

H
Hongze Cheng 已提交
162 163
  SSttBlockLoadInfo *pInfo = pIter->pBlockLoadInfo;
  if (pInfo->blockIndex[0] == pIter->iSttBlk) {
164 165 166 167 168
    if (pInfo->currentLoadBlockIndex != 0) {
      tsdbDebug("current load index is set to 0, block index:%d, file index:%d, due to uid:%" PRIu64 ", load data, %s",
                pIter->iSttBlk, pIter->iStt, pIter->uid, idStr);
      pInfo->currentLoadBlockIndex = 0;
    }
169 170 171
    return &pInfo->blockData[0];
  }

172
  if (pInfo->blockIndex[1] == pIter->iSttBlk) {
173
    if (pInfo->currentLoadBlockIndex != 1) {
H
Hongze Cheng 已提交
174
      tsdbDebug("current load index is set to 1, block index:%d, file index:%d, due to uid:%" PRIu64 ", load data, %s",
175 176 177
                pIter->iSttBlk, pIter->iStt, pIter->uid, idStr);
      pInfo->currentLoadBlockIndex = 1;
    }
178 179 180
    return &pInfo->blockData[1];
  }

181
  if (pIter->pSttBlk == NULL || pInfo->pSchema == NULL) {
H
Haojun Liao 已提交
182 183 184 185
    return NULL;
  }

  // current block not loaded yet
186
  pInfo->currentLoadBlockIndex ^= 1;
H
Haojun Liao 已提交
187
  int64_t st = taosGetTimestampUs();
188

H
Haojun Liao 已提交
189
  SBlockData *pBlock = &pInfo->blockData[pInfo->currentLoadBlockIndex];
190
  code = tsdbSttFileReadBlockDataByColumn(pIter->pReader, pIter->pSttBlk, pBlock, pInfo->pSchema, &pInfo->colIds[1], pInfo->numOfCols - 1);
H
Haojun Liao 已提交
191 192 193
  if (code != TSDB_CODE_SUCCESS) {
    goto _exit;
  }
H
Haojun Liao 已提交
194

H
Haojun Liao 已提交
195 196 197
  double el = (taosGetTimestampUs() - st) / 1000.0;
  pInfo->elapsedTime += el;
  pInfo->loadBlocks += 1;
198

H
Haojun Liao 已提交
199
  tsdbDebug("read last block, total load:%d, trigger by uid:%" PRIu64
200
            ", last file index:%d, last block index:%d, entry:%d, rows:%d, %p, elapsed time:%.2f ms, %s",
dengyihao's avatar
dengyihao 已提交
201 202
            pInfo->loadBlocks, pIter->uid, pIter->iStt, pIter->iSttBlk, pInfo->currentLoadBlockIndex, pBlock->nRow,
            pBlock, el, idStr);
203

H
Haojun Liao 已提交
204 205
  pInfo->blockIndex[pInfo->currentLoadBlockIndex] = pIter->iSttBlk;
  pIter->iRow = (pIter->backward) ? pInfo->blockData[pInfo->currentLoadBlockIndex].nRow : -1;
H
Haojun Liao 已提交
206

207 208
  tsdbDebug("last block index list:%d, %d, rowIndex:%d %s", pInfo->blockIndex[0], pInfo->blockIndex[1], pIter->iRow,
            idStr);
209 210
  return &pInfo->blockData[pInfo->currentLoadBlockIndex];

H
Hongze Cheng 已提交
211
_exit:
212 213 214 215 216
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
  }

  return NULL;
217 218
}

219
// find the earliest block that contains the required records
H
Hongze Cheng 已提交
220 221
static FORCE_INLINE int32_t findEarliestIndex(int32_t index, uint64_t uid, const SSttBlk *pBlockList, int32_t num,
                                              int32_t backward) {
222
  int32_t i = index;
H
Hongze Cheng 已提交
223
  int32_t step = backward ? 1 : -1;
H
Haojun Liao 已提交
224
  while (i >= 0 && i < num && uid >= pBlockList[i].minUid && uid <= pBlockList[i].maxUid) {
225 226 227 228 229
    i += step;
  }
  return i - step;
}

H
Hongze Cheng 已提交
230
static int32_t binarySearchForStartBlock(SSttBlk *pBlockList, int32_t num, uint64_t uid, int32_t backward) {
231 232 233 234 235 236 237 238 239 240 241 242 243 244 245
  int32_t midPos = -1;
  if (num <= 0) {
    return -1;
  }

  int32_t firstPos = 0;
  int32_t lastPos = num - 1;

  // find the first position which is bigger than the key
  if ((uid > pBlockList[lastPos].maxUid) || (uid < pBlockList[firstPos].minUid)) {
    return -1;
  }

  while (1) {
    if (uid >= pBlockList[firstPos].minUid && uid <= pBlockList[firstPos].maxUid) {
246
      return findEarliestIndex(firstPos, uid, pBlockList, num, backward);
247 248 249 250 251 252 253 254 255 256 257 258 259 260
    }

    if (uid > pBlockList[lastPos].maxUid || uid < pBlockList[firstPos].minUid) {
      return -1;
    }

    int32_t numOfRows = lastPos - firstPos + 1;
    midPos = (numOfRows >> 1u) + firstPos;

    if (uid < pBlockList[midPos].minUid) {
      lastPos = midPos - 1;
    } else if (uid > pBlockList[midPos].maxUid) {
      firstPos = midPos + 1;
    } else {
261
      return findEarliestIndex(midPos, uid, pBlockList, num, backward);
262 263 264 265
    }
  }
}

H
Hongze Cheng 已提交
266 267
static FORCE_INLINE int32_t findEarliestRow(int32_t index, uint64_t uid, const uint64_t *uidList, int32_t num,
                                            int32_t backward) {
268
  int32_t i = index;
H
Hongze Cheng 已提交
269
  int32_t step = backward ? 1 : -1;
H
Haojun Liao 已提交
270
  while (i >= 0 && i < num && uid == uidList[i]) {
271 272 273 274 275
    i += step;
  }
  return i - step;
}

H
Hongze Cheng 已提交
276
static int32_t binarySearchForStartRowIndex(uint64_t *uidList, int32_t num, uint64_t uid, int32_t backward) {
277 278 279 280 281 282 283 284 285 286
  int32_t firstPos = 0;
  int32_t lastPos = num - 1;

  // find the first position which is bigger than the key
  if ((uid > uidList[lastPos]) || (uid < uidList[firstPos])) {
    return -1;
  }

  while (1) {
    if (uid == uidList[firstPos]) {
287
      return findEarliestRow(firstPos, uid, uidList, num, backward);
288 289 290 291 292 293 294 295 296 297 298 299 300 301
    }

    if (uid > uidList[lastPos] || uid < uidList[firstPos]) {
      return -1;
    }

    int32_t numOfRows = lastPos - firstPos + 1;
    int32_t midPos = (numOfRows >> 1u) + firstPos;

    if (uid < uidList[midPos]) {
      lastPos = midPos - 1;
    } else if (uid > uidList[midPos]) {
      firstPos = midPos + 1;
    } else {
302
      return findEarliestRow(midPos, uid, uidList, num, backward);
303 304 305 306
    }
  }
}

307
int32_t tLDataIterOpen(struct SLDataIter *pIter, SDataFReader *pReader, int32_t iStt, int8_t backward, uint64_t suid,
308 309 310 311 312
                        uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange, SSttBlockLoadInfo *pBlockLoadInfo,
                        const char *idStr, bool strictTimeRange) {
  return 0;
}

313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389
static int32_t loadSttBlockInfo(SLDataIter *pIter, SSttBlockLoadInfo* pBlockLoadInfo, uint64_t suid) {
  TSttBlkArray* pArray = pBlockLoadInfo->pBlockArray;
  if (TARRAY2_SIZE(pArray) <= 0) {
    return TSDB_CODE_SUCCESS;
  }

  SSttBlk *pStart = &pArray->data[0];
  SSttBlk *pEnd = &pArray->data[TARRAY2_SIZE(pArray) - 1];

  // all identical
  if (pStart->suid == pEnd->suid) {
    if (pStart->suid != suid) { // no qualified stt block existed
      taosArrayClear(pBlockLoadInfo->aSttBlk);
      pIter->iSttBlk = -1;
      return TSDB_CODE_SUCCESS;
    } else {  // all blocks are qualified
      taosArrayClear(pBlockLoadInfo->aSttBlk);
      taosArrayAddBatch(pBlockLoadInfo->aSttBlk, pArray->data, pArray->size);
    }
  } else {
    SArray *pTmp = taosArrayInit(TARRAY2_SIZE(pArray), sizeof(SSttBlk));
    for (int32_t i = 0; i < TARRAY2_SIZE(pArray); ++i) {
      SSttBlk* p = &pArray->data[i];
      if (p->suid < suid) {
        continue;
      }

      if (p->suid == suid) {
        taosArrayPush(pTmp, p);
      } else if (p->suid > suid) {
        break;
      }
    }

    taosArrayDestroy(pBlockLoadInfo->aSttBlk);
    pBlockLoadInfo->aSttBlk = pTmp;
  }

  return TSDB_CODE_SUCCESS;
}

static int32_t loadSttTombBlockData(SSttFileReader* pSttFileReader, uint64_t suid, SSttBlockLoadInfo* pLoadInfo) {
  if (pLoadInfo->pTombBlockArray == NULL) {
    pLoadInfo->pTombBlockArray = taosArrayInit(4, POINTER_BYTES);
  }

  const TTombBlkArray* pBlkArray = NULL;
  int32_t code = tsdbSttFileReadTombBlk(pSttFileReader, &pBlkArray);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  for(int32_t j = 0; j < pBlkArray->size; ++j) {
    STombBlk* pTombBlk = &pBlkArray->data[j];
    if (pTombBlk->maxTbid.suid < suid) {
      continue;  // todo use binary search instead
    }

    if (pTombBlk->minTbid.suid > suid) {
      break;
    }

    STombBlock* pTombBlock = taosMemoryCalloc(1, sizeof(STombBlock));
    code = tsdbSttFileReadTombBlock(pSttFileReader, pTombBlk, pTombBlock);
    if (code != TSDB_CODE_SUCCESS) {
      // todo handle error
    }

    void* p = taosArrayPush(pLoadInfo->pTombBlockArray, &pTombBlock);
    if (p == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
  }

  return TSDB_CODE_SUCCESS;
}

390
int32_t tLDataIterOpen2(struct SLDataIter *pIter, SSttFileReader *pReader, int32_t iStt, int8_t backward, uint64_t suid,
H
Hongze Cheng 已提交
391
                       uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange, SSttBlockLoadInfo *pBlockLoadInfo,
392
                       const char *idStr, bool strictTimeRange) {
H
Haojun Liao 已提交
393 394
  int32_t code = TSDB_CODE_SUCCESS;

395 396 397
  pIter->uid = uid;
  pIter->iStt = iStt;
  pIter->backward = backward;
398 399 400 401
  pIter->verRange.minVer = pRange->minVer;
  pIter->verRange.maxVer = pRange->maxVer;
  pIter->timeWindow.skey = pTimeWindow->skey;
  pIter->timeWindow.ekey = pTimeWindow->ekey;
402
  pIter->pReader = pReader;
403
  pIter->pBlockLoadInfo = pBlockLoadInfo;
404

405
  if (!pBlockLoadInfo->sttBlockLoaded) {
406
    int64_t st = taosGetTimestampUs();
407
    pBlockLoadInfo->sttBlockLoaded = true;
408

409
    code = tsdbSttFileReadSttBlk(pIter->pReader, (const TSttBlkArray **)&pBlockLoadInfo->pBlockArray);
410
    if (code != TSDB_CODE_SUCCESS) {
411
      return code;
412 413
    }

414 415 416 417 418
    code = loadSttBlockInfo(pIter, pBlockLoadInfo, suid);

    code = loadSttTombBlockData(pReader, suid, pBlockLoadInfo);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
419
    }
420

H
Hongze Cheng 已提交
421
    double el = (taosGetTimestampUs() - st) / 1000.0;
422
    tsdbDebug("load the last file info completed, elapsed time:%.2fms, %s", el, idStr);
423
  }
H
Hongze Cheng 已提交
424

425
  // find the start block
H
Haojun Liao 已提交
426 427
  size_t size = taosArrayGetSize(pBlockLoadInfo->aSttBlk);
  pIter->iSttBlk = binarySearchForStartBlock(pBlockLoadInfo->aSttBlk->pData, size, uid, backward);
428
  if (pIter->iSttBlk != -1) {
H
Haojun Liao 已提交
429
    pIter->pSttBlk = taosArrayGet(pBlockLoadInfo->aSttBlk, pIter->iSttBlk);
430 431 432 433 434
    pIter->iRow = (pIter->backward) ? pIter->pSttBlk->nRow : -1;

    if ((!backward) && ((strictTimeRange && pIter->pSttBlk->minKey >= pIter->timeWindow.ekey) ||
                        (!strictTimeRange && pIter->pSttBlk->minKey > pIter->timeWindow.ekey))) {
      pIter->pSttBlk = NULL;
435 436
    }

437 438 439 440
    if (backward && ((strictTimeRange && pIter->pSttBlk->maxKey <= pIter->timeWindow.skey) ||
                     (!strictTimeRange && pIter->pSttBlk->maxKey < pIter->timeWindow.skey))) {
      pIter->pSttBlk = NULL;
      pIter->ignoreEarlierTs = true;
441
    }
H
Hongze Cheng 已提交
442 443
  }

H
Haojun Liao 已提交
444
  return code;
H
Hongze Cheng 已提交
445 446
}

H
Haojun Liao 已提交
447 448
void tLDataIterClose2(SLDataIter *pIter) {
  tsdbSttFileReaderClose(&pIter->pReader);
H
Haojun Liao 已提交
449
  pIter->pReader = NULL;
H
Haojun Liao 已提交
450
}
H
Hongze Cheng 已提交
451

H
Hongze Cheng 已提交
452
void tLDataIterNextBlock(SLDataIter *pIter, const char *idStr) {
H
Hongze Cheng 已提交
453
  int32_t step = pIter->backward ? -1 : 1;
454 455
  int32_t oldIndex = pIter->iSttBlk;

H
Hongze Cheng 已提交
456
  pIter->iSttBlk += step;
H
Hongze Cheng 已提交
457

458
  int32_t index = -1;
H
Haojun Liao 已提交
459
  size_t  size = pIter->pBlockLoadInfo->aSttBlk->size;
H
Hongze Cheng 已提交
460
  for (int32_t i = pIter->iSttBlk; i < size && i >= 0; i += step) {
461
    SSttBlk *p = taosArrayGet(pIter->pBlockLoadInfo->aSttBlk, i);
462 463 464 465 466
    if ((!pIter->backward) && p->minUid > pIter->uid) {
      break;
    }

    if (pIter->backward && p->maxUid < pIter->uid) {
467 468 469
      break;
    }

470
    // check uid firstly
471
    if (p->minUid <= pIter->uid && p->maxUid >= pIter->uid) {
472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494
      if ((!pIter->backward) && p->minKey > pIter->timeWindow.ekey) {
        break;
      }

      if (pIter->backward && p->maxKey < pIter->timeWindow.skey) {
        break;
      }

      // check time range secondly
      if (p->minKey <= pIter->timeWindow.ekey && p->maxKey >= pIter->timeWindow.skey) {
        if ((!pIter->backward) && p->minVer > pIter->verRange.maxVer) {
          break;
        }

        if (pIter->backward && p->maxVer < pIter->verRange.minVer) {
          break;
        }

        if (p->minVer <= pIter->verRange.maxVer && p->maxVer >= pIter->verRange.minVer) {
          index = i;
          break;
        }
      }
495 496 497
    }
  }

498 499
  pIter->pSttBlk = NULL;
  if (index != -1) {
H
Haojun Liao 已提交
500
    pIter->iSttBlk = index;
501
    pIter->pSttBlk = (SSttBlk *)taosArrayGet(pIter->pBlockLoadInfo->aSttBlk, pIter->iSttBlk);
H
Hongze Cheng 已提交
502 503
    tsdbDebug("try next last file block:%d from %d, trigger by uid:%" PRIu64 ", file index:%d, %s", pIter->iSttBlk,
              oldIndex, pIter->uid, pIter->iStt, idStr);
504
  } else {
505
    tsdbDebug("no more last block qualified, uid:%" PRIu64 ", file index:%d, %s", pIter->uid, oldIndex, idStr);
506 507 508
  }
}

H
Hongze Cheng 已提交
509 510
static void findNextValidRow(SLDataIter *pIter, const char *idStr) {
  bool    hasVal = false;
511
  int32_t step = pIter->backward ? -1 : 1;
H
Hongze Cheng 已提交
512
  int32_t i = pIter->iRow;
513

514
  SBlockData *pData = loadLastBlock(pIter, idStr);
515

516
  // mostly we only need to find the start position for a given table
517 518
  if ((((i == 0) && (!pIter->backward)) || (i == pData->nRow - 1 && pIter->backward)) && pData->aUid != NULL) {
    i = binarySearchForStartRowIndex((uint64_t *)pData->aUid, pData->nRow, pIter->uid, pIter->backward);
H
Haojun Liao 已提交
519
    if (i == -1) {
520
      tsdbDebug("failed to find the data in pBlockData, uid:%" PRIu64 " , %s", pIter->uid, idStr);
H
Haojun Liao 已提交
521 522 523
      pIter->iRow = -1;
      return;
    }
524 525
  }

526 527
  for (; i < pData->nRow && i >= 0; i += step) {
    if (pData->aUid != NULL) {
528
      if (!pIter->backward) {
529
        if (pData->aUid[i] > pIter->uid) {
530 531 532
          break;
        }
      } else {
533
        if (pData->aUid[i] < pIter->uid) {
534 535 536 537 538
          break;
        }
      }
    }

539
    int64_t ts = pData->aTSKEY[i];
H
Hongze Cheng 已提交
540
    if (!pIter->backward) {               // asc
541 542 543 544 545 546 547 548 549 550 551
      if (ts > pIter->timeWindow.ekey) {  // no more data
        break;
      } else if (ts < pIter->timeWindow.skey) {
        continue;
      }
    } else {
      if (ts < pIter->timeWindow.skey) {
        break;
      } else if (ts > pIter->timeWindow.ekey) {
        continue;
      }
552 553
    }

554
    int64_t ver = pData->aVersion[i];
555 556 557 558 559 560 561 562 563 564 565
    if (ver < pIter->verRange.minVer) {
      continue;
    }

    // todo opt handle desc case
    if (ver > pIter->verRange.maxVer) {
      continue;
    }

    hasVal = true;
    break;
H
Hongze Cheng 已提交
566
  }
567

H
Hongze Cheng 已提交
568
  pIter->iRow = (hasVal) ? i : -1;
H
Hongze Cheng 已提交
569 570
}

H
Hongze Cheng 已提交
571
bool tLDataIterNextRow(SLDataIter *pIter, const char *idStr) {
H
Hongze Cheng 已提交
572
  int32_t step = pIter->backward ? -1 : 1;
H
Haojun Liao 已提交
573
  terrno = TSDB_CODE_SUCCESS;
574 575

  // no qualified last file block in current file, no need to fetch row
H
Hongze Cheng 已提交
576
  if (pIter->pSttBlk == NULL) {
577 578
    return false;
  }
H
Hongze Cheng 已提交
579

H
Hongze Cheng 已提交
580
  int32_t     iBlockL = pIter->iSttBlk;
H
Haojun Liao 已提交
581
  SBlockData *pBlockData = loadLastBlock(pIter, idStr);
H
Haojun Liao 已提交
582
  if (pBlockData == NULL || terrno != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
583 584 585
    goto _exit;
  }

586 587
  pIter->iRow += step;

H
Hongze Cheng 已提交
588
  while (1) {
589
    bool skipBlock = false;
H
Haojun Liao 已提交
590
    findNextValidRow(pIter, idStr);
591

592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615
    if (pIter->pBlockLoadInfo->checkRemainingRow) {
      skipBlock = true;
      int16_t *aCols = pIter->pBlockLoadInfo->colIds;
      int      nCols = pIter->pBlockLoadInfo->numOfCols;
      bool     isLast = pIter->pBlockLoadInfo->isLast;
      for (int inputColIndex = 0; inputColIndex < nCols; ++inputColIndex) {
        for (int colIndex = 0; colIndex < pBlockData->nColData; ++colIndex) {
          SColData *pColData = &pBlockData->aColData[colIndex];
          int16_t   cid = pColData->cid;

          if (cid == aCols[inputColIndex]) {
            if (isLast && (pColData->flag & HAS_VALUE)) {
              skipBlock = false;
              break;
            } else if (pColData->flag & (HAS_VALUE | HAS_NULL)) {
              skipBlock = false;
              break;
            }
          }
        }
      }
    }

    if (skipBlock || pIter->iRow >= pBlockData->nRow || pIter->iRow < 0) {
616
      tLDataIterNextBlock(pIter, idStr);
H
Hongze Cheng 已提交
617
      if (pIter->pSttBlk == NULL) {  // no more data
618 619 620 621
        goto _exit;
      }
    } else {
      break;
H
Hongze Cheng 已提交
622 623
    }

H
Hongze Cheng 已提交
624
    if (iBlockL != pIter->iSttBlk) {
H
Haojun Liao 已提交
625
      pBlockData = loadLastBlock(pIter, idStr);
626 627 628
      if (pBlockData == NULL) {
        goto _exit;
      }
H
Haojun Liao 已提交
629 630

      // set start row index
631
      pIter->iRow = pIter->backward ? pBlockData->nRow - 1 : 0;
H
Hongze Cheng 已提交
632 633 634
    }
  }

635 636 637
  pIter->rInfo.suid = pBlockData->suid;
  pIter->rInfo.uid = pBlockData->uid;
  pIter->rInfo.row = tsdbRowFromBlockData(pBlockData, pIter->iRow);
H
Hongze Cheng 已提交
638 639

_exit:
640
  return (terrno == TSDB_CODE_SUCCESS) && (pIter->pSttBlk != NULL) && (pBlockData != NULL);
H
Hongze Cheng 已提交
641 642
}

H
Hongze Cheng 已提交
643
SRowInfo *tLDataIterGet(SLDataIter *pIter) { return &pIter->rInfo; }
H
Hongze Cheng 已提交
644 645

// SMergeTree =================================================
H
Hongze Cheng 已提交
646 647 648
static FORCE_INLINE int32_t tLDataIterCmprFn(const SRBTreeNode *p1, const SRBTreeNode *p2) {
  SLDataIter *pIter1 = (SLDataIter *)(((uint8_t *)p1) - offsetof(SLDataIter, node));
  SLDataIter *pIter2 = (SLDataIter *)(((uint8_t *)p2) - offsetof(SLDataIter, node));
H
Hongze Cheng 已提交
649

650 651
  TSDBKEY key1 = TSDBROW_KEY(&pIter1->rInfo.row);
  TSDBKEY key2 = TSDBROW_KEY(&pIter2->rInfo.row);
H
Hongze Cheng 已提交
652

653 654 655 656 657 658 659 660 661 662 663 664 665
  if (key1.ts < key2.ts) {
    return -1;
  } else if (key1.ts > key2.ts) {
    return 1;
  } else {
    if (key1.version < key2.version) {
      return -1;
    } else if (key1.version > key2.version) {
      return 1;
    } else {
      return 0;
    }
  }
H
Hongze Cheng 已提交
666 667
}

668 669 670 671
static FORCE_INLINE int32_t tLDataIterDescCmprFn(const SRBTreeNode *p1, const SRBTreeNode *p2) {
  return -1 * tLDataIterCmprFn(p1, p2);
}

672
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid,
673
                       STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo,
674 675 676
                       bool destroyLoadInfo, const char *idStr, bool strictTimeRange, SLDataIter* pLDataIter) {
  int32_t code = TSDB_CODE_SUCCESS;

H
Hongze Cheng 已提交
677
  pMTree->backward = backward;
678
  pMTree->pIter = NULL;
H
Haojun Liao 已提交
679
  pMTree->idStr = idStr;
680

dengyihao's avatar
dengyihao 已提交
681
  if (!pMTree->backward) {  // asc
682
    tRBTreeCreate(&pMTree->rbt, tLDataIterCmprFn);
dengyihao's avatar
dengyihao 已提交
683
  } else {  // desc
684 685
    tRBTreeCreate(&pMTree->rbt, tLDataIterDescCmprFn);
  }
686

687 688
  pMTree->pLoadInfo = pBlockLoadInfo;
  pMTree->destroyLoadInfo = destroyLoadInfo;
689
  pMTree->ignoreEarlierTs = false;
690

H
Hongze Cheng 已提交
691
  for (int32_t i = 0; i < pFReader->pSet->nSttF; ++i) {  // open all last file
692 693
    memset(&pLDataIter[i], 0, sizeof(SLDataIter));
    code = tLDataIterOpen(&pLDataIter[i], pFReader, i, pMTree->backward, suid, uid, pTimeWindow, pVerRange,
694
                          &pMTree->pLoadInfo[i], pMTree->idStr, strictTimeRange);
695 696 697 698
    if (code != TSDB_CODE_SUCCESS) {
      goto _end;
    }

699
    bool hasVal = tLDataIterNextRow(&pLDataIter[i], pMTree->idStr);
700
    if (hasVal) {
701
      tMergeTreeAddIter(pMTree, &pLDataIter[i]);
702
    } else {
703
      if (!pMTree->ignoreEarlierTs) {
704
        pMTree->ignoreEarlierTs = pLDataIter[i].ignoreEarlierTs;
705
      }
706 707
    }
  }
708 709 710

  return code;

H
Hongze Cheng 已提交
711
_end:
712 713
  tMergeTreeClose(pMTree);
  return code;
H
Hongze Cheng 已提交
714
}
H
Hongze Cheng 已提交
715

H
Haojun Liao 已提交
716
int32_t tMergeTreeOpen2(SMergeTree *pMTree, int8_t backward, STsdb *pTsdb, uint64_t suid, uint64_t uid,
717 718 719
                        STimeWindow *pTimeWindow, SVersionRange *pVerRange, const char *idStr,
                        bool strictTimeRange, SArray *pSttFileBlockIterArray, void *pCurrentFileSet, STSchema* pSchema,
                        int16_t* pCols, int32_t numOfCols) {
H
Haojun Liao 已提交
720
  int32_t code = TSDB_CODE_SUCCESS;
721

H
Haojun Liao 已提交
722 723 724
  pMTree->backward = backward;
  pMTree->pIter = NULL;
  pMTree->idStr = idStr;
725

H
Haojun Liao 已提交
726 727 728 729 730
  if (!pMTree->backward) {  // asc
    tRBTreeCreate(&pMTree->rbt, tLDataIterCmprFn);
  } else {  // desc
    tRBTreeCreate(&pMTree->rbt, tLDataIterDescCmprFn);
  }
731

H
Haojun Liao 已提交
732
  pMTree->ignoreEarlierTs = false;
733

H
Haojun Liao 已提交
734
  // todo handle other level of stt files, here only deal with the first level stt
735
  int32_t size = ((STFileSet *)pCurrentFileSet)->lvlArr->size;
H
Haojun Liao 已提交
736 737 738
  if (size == 0) {
    goto _end;
  }
739

740 741 742 743
  while (taosArrayGetSize(pSttFileBlockIterArray) < size) {
    SArray* pList = taosArrayInit(4, POINTER_BYTES);
    taosArrayPush(pSttFileBlockIterArray, &pList);
  }
744

745 746 747
  for(int32_t j = 0; j < size; ++j) {
    SSttLvl *pSttLevel = ((STFileSet *)pCurrentFileSet)->lvlArr->data[j];
    ASSERT(pSttLevel->level == j);
748

749 750
    SArray* pList = taosArrayGetP(pSttFileBlockIterArray, j);
    int32_t numOfIter = taosArrayGetSize(pList);
H
Haojun Liao 已提交
751

752 753 754 755 756
    if (numOfIter < TARRAY2_SIZE(pSttLevel->fobjArr)) {
      int32_t inc = TARRAY2_SIZE(pSttLevel->fobjArr) - numOfIter;
      for(int32_t k = 0; k < inc; ++k) {
        SLDataIter *pIter = taosMemoryCalloc(1, sizeof(SLDataIter));
        taosArrayPush(pList, &pIter);
H
Haojun Liao 已提交
757
      }
H
Haojun Liao 已提交
758
    }
H
Haojun Liao 已提交
759

760 761
    for (int32_t i = 0; i < TARRAY2_SIZE(pSttLevel->fobjArr); ++i) {  // open all last file
      SLDataIter* pIter = taosArrayGetP(pList, i);
H
Haojun Liao 已提交
762

763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794
      SSttFileReader *pSttFileReader = pIter->pReader;
      SSttBlockLoadInfo* pLoadInfo = pIter->pBlockLoadInfo;

      // open stt file reader if not
      if (pSttFileReader == NULL) {
        SSttFileReaderConfig conf = {.tsdb = pTsdb, .szPage = pTsdb->pVnode->config.szPage};
        conf.file[0] = *pSttLevel->fobjArr->data[i]->f;

        code = tsdbSttFileReaderOpen(pSttLevel->fobjArr->data[i]->fname, &conf, &pSttFileReader);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
      }

      if (pLoadInfo == NULL) {
        pLoadInfo = tCreateOneLastBlockLoadInfo(pSchema, pCols, numOfCols);
      }

      memset(pIter, 0, sizeof(SLDataIter));
      code = tLDataIterOpen2(pIter, pSttFileReader, i, pMTree->backward, suid, uid, pTimeWindow, pVerRange,
                             pLoadInfo, pMTree->idStr, strictTimeRange);
      if (code != TSDB_CODE_SUCCESS) {
        goto _end;
      }

      bool hasVal = tLDataIterNextRow(pIter, pMTree->idStr);
      if (hasVal) {
        tMergeTreeAddIter(pMTree, pIter);
      } else {
        if (!pMTree->ignoreEarlierTs) {
          pMTree->ignoreEarlierTs = pIter->ignoreEarlierTs;
        }
H
Haojun Liao 已提交
795
      }
796
    }
H
Haojun Liao 已提交
797
  }
798

H
Haojun Liao 已提交
799
  return code;
800

H
Haojun Liao 已提交
801 802 803
_end:
  tMergeTreeClose(pMTree);
  return code;
804 805
}

806 807
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter) { tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pIter); }

808 809
bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree) { return pMTree->ignoreEarlierTs; }

H
Hongze Cheng 已提交
810
bool tMergeTreeNext(SMergeTree *pMTree) {
811 812 813 814
  int32_t code = TSDB_CODE_SUCCESS;
  if (pMTree->pIter) {
    SLDataIter *pIter = pMTree->pIter;

H
Haojun Liao 已提交
815
    bool hasVal = tLDataIterNextRow(pIter, pMTree->idStr);
816 817 818 819 820 821 822
    if (!hasVal) {
      pMTree->pIter = NULL;
    }

    // compare with min in RB Tree
    pIter = (SLDataIter *)tRBTreeMin(&pMTree->rbt);
    if (pMTree->pIter && pIter) {
H
Hongze Cheng 已提交
823
      int32_t c = pMTree->rbt.cmprFn(&pMTree->pIter->node, &pIter->node);
824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842
      if (c > 0) {
        tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pMTree->pIter);
        pMTree->pIter = NULL;
      } else {
        ASSERT(c);
      }
    }
  }

  if (pMTree->pIter == NULL) {
    pMTree->pIter = (SLDataIter *)tRBTreeMin(&pMTree->rbt);
    if (pMTree->pIter) {
      tRBTreeDrop(&pMTree->rbt, (SRBTreeNode *)pMTree->pIter);
    }
  }

  return pMTree->pIter != NULL;
}

H
Hongze Cheng 已提交
843
void tMergeTreeClose(SMergeTree *pMTree) {
844
  pMTree->pIter = NULL;
845 846 847 848
  if (pMTree->destroyLoadInfo) {
    pMTree->pLoadInfo = destroyLastBlockLoadInfo(pMTree->pLoadInfo);
    pMTree->destroyLoadInfo = false;
  }
849
}