tsdbIter.c 22.6 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "tsdbIter.h"
H
Hongze Cheng 已提交
17 18 19

// STsdbIter ================
struct STsdbIter {
H
Hongze Cheng 已提交
20
  EIterType type;
H
Hongze Cheng 已提交
21 22 23
  bool      noMoreData;
  bool      filterByVersion;
  int64_t   range[2];
H
Hongze Cheng 已提交
24 25 26 27
  union {
    SRowInfo    row[1];
    STombRecord record[1];
  };
H
Hongze Cheng 已提交
28
  SRBTreeNode node[1];
H
Hongze Cheng 已提交
29 30 31 32 33
  union {
    struct {
      SSttSegReader      *reader;
      const TSttBlkArray *sttBlkArray;
      int32_t             sttBlkArrayIdx;
H
Hongze Cheng 已提交
34 35 36
      SBlockData          blockData[1];
      int32_t             blockDataIdx;
    } sttData[1];
H
Hongze Cheng 已提交
37
    struct {
H
Hongze Cheng 已提交
38 39 40 41 42
      SDataFileReader     *reader;
      const TBrinBlkArray *brinBlkArray;
      int32_t              brinBlkArrayIdx;
      SBrinBlock           brinBlock[1];
      int32_t              brinBlockIdx;
H
Hongze Cheng 已提交
43 44 45
      SBlockData           blockData[1];
      int32_t              blockDataIdx;
    } dataData[1];
H
Hongze Cheng 已提交
46
    struct {
H
Hongze Cheng 已提交
47 48 49 50 51
      SMemTable  *memt;
      TSDBKEY     from[1];
      SRBTreeIter iter[1];
      STbData    *tbData;
      STbDataIter tbIter[1];
H
Hongze Cheng 已提交
52
    } memtData[1];
H
Hongze Cheng 已提交
53 54 55 56
    struct {
      SSttSegReader       *reader;
      const TTombBlkArray *tombBlkArray;
      int32_t              tombBlkArrayIdx;
H
Hongze Cheng 已提交
57 58
      STombBlock           tombBlock[1];
      int32_t              tombBlockIdx;
H
Hongze Cheng 已提交
59 60 61 62 63
    } sttTomb[1];
    struct {
      SDataFileReader     *reader;
      const TTombBlkArray *tombBlkArray;
      int32_t              tombBlkArrayIdx;
H
Hongze Cheng 已提交
64 65
      STombBlock           tombBlock[1];
      int32_t              tombBlockIdx;
H
Hongze Cheng 已提交
66
    } dataTomb[1];
H
Hongze Cheng 已提交
67 68 69 70 71 72
    struct {
      SMemTable  *memt;
      SRBTreeIter rbtIter[1];
      STbData    *tbData;
      SDelData   *delData;
    } memtTomb[1];
H
Hongze Cheng 已提交
73
  };
H
Hongze Cheng 已提交
74 75
};

H
Hongze Cheng 已提交
76
static int32_t tsdbSttIterNext(STsdbIter *iter, const TABLEID *tbid) {
H
Hongze Cheng 已提交
77
  while (!iter->noMoreData) {
H
Hongze Cheng 已提交
78
    for (; iter->sttData->blockDataIdx < iter->sttData->blockData->nRow; iter->sttData->blockDataIdx++) {
H
Hongze Cheng 已提交
79 80 81 82 83 84
      int64_t version = iter->sttData->blockData->aVersion[iter->sttData->blockDataIdx];

      if (iter->filterByVersion && (version < iter->range[0] || version > iter->range[1])) {
        continue;
      }

H
Hongze Cheng 已提交
85 86 87
      iter->row->suid = iter->sttData->blockData->suid;
      iter->row->uid = iter->sttData->blockData->uid ? iter->sttData->blockData->uid
                                                     : iter->sttData->blockData->aUid[iter->sttData->blockDataIdx];
H
Hongze Cheng 已提交
88 89 90 91 92

      if (tbid && iter->row->suid == tbid->suid && iter->row->uid == tbid->uid) {
        continue;
      }

H
Hongze Cheng 已提交
93 94
      iter->row->row = tsdbRowFromBlockData(iter->sttData->blockData, iter->sttData->blockDataIdx);
      iter->sttData->blockDataIdx++;
H
Hongze Cheng 已提交
95 96 97
      goto _exit;
    }

H
Hongze Cheng 已提交
98
    if (iter->sttData->sttBlkArrayIdx >= TARRAY2_SIZE(iter->sttData->sttBlkArray)) {
H
Hongze Cheng 已提交
99
      iter->noMoreData = true;
H
Hongze Cheng 已提交
100 101 102
      break;
    }

H
Hongze Cheng 已提交
103 104
    for (; iter->sttData->sttBlkArrayIdx < TARRAY2_SIZE(iter->sttData->sttBlkArray); iter->sttData->sttBlkArrayIdx++) {
      const SSttBlk *sttBlk = TARRAY2_GET_PTR(iter->sttData->sttBlkArray, iter->sttData->sttBlkArrayIdx);
H
Hongze Cheng 已提交
105

H
Hongze Cheng 已提交
106 107 108 109
      if (iter->filterByVersion && (sttBlk->maxVer < iter->range[0] || sttBlk->minVer > iter->range[1])) {
        continue;
      }

H
Hongze Cheng 已提交
110 111 112 113
      if (tbid && tbid->suid == sttBlk->suid && tbid->uid == sttBlk->minUid && tbid->uid == sttBlk->maxUid) {
        continue;
      }

H
Hongze Cheng 已提交
114
      int32_t code = tsdbSttFileReadBlockData(iter->sttData->reader, sttBlk, iter->sttData->blockData);
H
Hongze Cheng 已提交
115 116
      if (code) return code;

H
Hongze Cheng 已提交
117 118
      iter->sttData->blockDataIdx = 0;
      iter->sttData->sttBlkArrayIdx++;
H
Hongze Cheng 已提交
119 120 121 122 123 124 125 126 127 128 129
      break;
    }
  }

_exit:
  return 0;
}

static int32_t tsdbDataIterNext(STsdbIter *iter, const TABLEID *tbid) {
  int32_t code;

H
Hongze Cheng 已提交
130
  while (!iter->noMoreData) {
H
Hongze Cheng 已提交
131
    for (;;) {
H
Hongze Cheng 已提交
132
      // SBlockData
H
Hongze Cheng 已提交
133
      for (; iter->dataData->blockDataIdx < iter->dataData->blockData->nRow; iter->dataData->blockDataIdx++) {
H
Hongze Cheng 已提交
134 135 136 137 138
        int64_t version = iter->dataData->blockData->aVersion[iter->dataData->blockDataIdx];
        if (iter->filterByVersion && (version < iter->range[0] || version > iter->range[1])) {
          continue;
        }

H
Hongze Cheng 已提交
139 140
        if (tbid && tbid->suid == iter->dataData->blockData->suid && tbid->uid == iter->dataData->blockData->uid) {
          iter->dataData->blockDataIdx = iter->dataData->blockData->nRow;
H
Hongze Cheng 已提交
141 142 143
          break;
        }

H
Hongze Cheng 已提交
144 145
        iter->row->row = tsdbRowFromBlockData(iter->dataData->blockData, iter->dataData->blockDataIdx);
        iter->dataData->blockDataIdx++;
H
Hongze Cheng 已提交
146 147 148
        goto _exit;
      }

H
Hongze Cheng 已提交
149
      // SBrinBlock
H
Hongze Cheng 已提交
150
      if (iter->dataData->brinBlockIdx >= BRIN_BLOCK_SIZE(iter->dataData->brinBlock)) {
H
Hongze Cheng 已提交
151 152 153
        break;
      }

H
Hongze Cheng 已提交
154 155
      for (; iter->dataData->brinBlockIdx < BRIN_BLOCK_SIZE(iter->dataData->brinBlock);
           iter->dataData->brinBlockIdx++) {
H
Hongze Cheng 已提交
156
        SBrinRecord record[1];
H
Hongze Cheng 已提交
157
        tBrinBlockGet(iter->dataData->brinBlock, iter->dataData->brinBlockIdx, record);
H
Hongze Cheng 已提交
158

H
Hongze Cheng 已提交
159 160 161 162
        if (iter->filterByVersion && (record->maxVer < iter->range[0] || record->minVer > iter->range[1])) {
          continue;
        }

H
Hongze Cheng 已提交
163 164
        if (tbid && tbid->suid == record->suid && tbid->uid == record->uid) {
          continue;
H
Hongze Cheng 已提交
165 166
        }

H
Hongze Cheng 已提交
167 168 169
        iter->row->suid = record->suid;
        iter->row->uid = record->uid;

H
Hongze Cheng 已提交
170
        code = tsdbDataFileReadBlockData(iter->dataData->reader, record, iter->dataData->blockData);
H
Hongze Cheng 已提交
171 172
        if (code) return code;

H
Hongze Cheng 已提交
173 174
        iter->dataData->blockDataIdx = 0;
        iter->dataData->brinBlockIdx++;
H
Hongze Cheng 已提交
175 176 177 178
        break;
      }
    }

H
Hongze Cheng 已提交
179
    if (iter->dataData->brinBlkArrayIdx >= TARRAY2_SIZE(iter->dataData->brinBlkArray)) {
H
Hongze Cheng 已提交
180
      iter->noMoreData = true;
H
Hongze Cheng 已提交
181 182 183
      break;
    }

H
Hongze Cheng 已提交
184 185 186
    for (; iter->dataData->brinBlkArrayIdx < TARRAY2_SIZE(iter->dataData->brinBlkArray);
         iter->dataData->brinBlkArrayIdx++) {
      const SBrinBlk *brinBlk = TARRAY2_GET_PTR(iter->dataData->brinBlkArray, iter->dataData->brinBlkArrayIdx);
H
Hongze Cheng 已提交
187

H
Hongze Cheng 已提交
188 189 190 191
      if (iter->filterByVersion && (brinBlk->maxVer < iter->range[0] || brinBlk->minVer > iter->range[1])) {
        continue;
      }

H
Hongze Cheng 已提交
192
      if (tbid && tbid->uid == brinBlk->minTbid.uid && tbid->uid == brinBlk->maxTbid.uid) {
H
Hongze Cheng 已提交
193 194 195
        continue;
      }

H
Hongze Cheng 已提交
196
      code = tsdbDataFileReadBrinBlock(iter->dataData->reader, brinBlk, iter->dataData->brinBlock);
H
Hongze Cheng 已提交
197 198
      if (code) return code;

H
Hongze Cheng 已提交
199 200
      iter->dataData->brinBlockIdx = 0;
      iter->dataData->brinBlkArrayIdx++;
H
Hongze Cheng 已提交
201 202 203 204 205 206 207 208 209
      break;
    }
  }

_exit:
  return 0;
}

static int32_t tsdbMemTableIterNext(STsdbIter *iter, const TABLEID *tbid) {
H
Hongze Cheng 已提交
210 211
  SRBTreeNode *node;

H
Hongze Cheng 已提交
212
  while (!iter->noMoreData) {
H
Hongze Cheng 已提交
213 214 215
    for (TSDBROW *row; iter->memtData->tbData && (row = tsdbTbDataIterGet(iter->memtData->tbIter));) {
      if (tbid && tbid->suid == iter->memtData->tbData->suid && tbid->uid == iter->memtData->tbData->uid) {
        iter->memtData->tbData = NULL;
H
Hongze Cheng 已提交
216 217
        break;
      }
H
Hongze Cheng 已提交
218 219 220 221 222 223 224 225

      if (iter->filterByVersion) {
        int64_t version = TSDBROW_VERSION(row);
        if (version < iter->range[0] || version > iter->range[1]) {
          continue;
        }
      }

H
Hongze Cheng 已提交
226 227
      iter->row->row = row[0];

H
Hongze Cheng 已提交
228
      tsdbTbDataIterNext(iter->memtData->tbIter);
H
Hongze Cheng 已提交
229 230 231 232
      goto _exit;
    }

    for (;;) {
H
Hongze Cheng 已提交
233
      node = tRBTreeIterNext(iter->memtData->iter);
H
Hongze Cheng 已提交
234
      if (!node) {
H
Hongze Cheng 已提交
235
        iter->noMoreData = true;
H
Hongze Cheng 已提交
236
        goto _exit;
H
Hongze Cheng 已提交
237 238
      }

H
Hongze Cheng 已提交
239 240
      iter->memtData->tbData = TCONTAINER_OF(node, STbData, rbtn);
      if (tbid && tbid->suid == iter->memtData->tbData->suid && tbid->uid == iter->memtData->tbData->uid) {
H
Hongze Cheng 已提交
241 242
        continue;
      } else {
H
Hongze Cheng 已提交
243 244 245
        iter->row->suid = iter->memtData->tbData->suid;
        iter->row->uid = iter->memtData->tbData->uid;
        tsdbTbDataIterOpen(iter->memtData->tbData, iter->memtData->from, 0, iter->memtData->tbIter);
H
Hongze Cheng 已提交
246 247 248 249 250 251
        break;
      }
    }
  }

_exit:
H
Hongze Cheng 已提交
252 253 254
  return 0;
}

H
Hongze Cheng 已提交
255
static int32_t tsdbDataTombIterNext(STsdbIter *iter, const TABLEID *tbid) {
H
Hongze Cheng 已提交
256
  while (!iter->noMoreData) {
H
Hongze Cheng 已提交
257 258 259
    for (; iter->dataTomb->tombBlockIdx < TOMB_BLOCK_SIZE(iter->dataTomb->tombBlock); iter->dataTomb->tombBlockIdx++) {
      iter->record->suid = TARRAY2_GET(iter->dataTomb->tombBlock->suid, iter->dataTomb->tombBlockIdx);
      iter->record->uid = TARRAY2_GET(iter->dataTomb->tombBlock->uid, iter->dataTomb->tombBlockIdx);
H
Hongze Cheng 已提交
260 261 262 263 264
      iter->record->version = TARRAY2_GET(iter->dataTomb->tombBlock->version, iter->dataTomb->tombBlockIdx);

      if (iter->filterByVersion && (iter->record->version < iter->range[0] || iter->record->version > iter->range[1])) {
        continue;
      }
H
Hongze Cheng 已提交
265 266 267 268 269

      if (tbid && iter->record->suid == tbid->suid && iter->record->uid == tbid->uid) {
        continue;
      }

H
Hongze Cheng 已提交
270 271 272
      iter->record->skey = TARRAY2_GET(iter->dataTomb->tombBlock->skey, iter->dataTomb->tombBlockIdx);
      iter->record->ekey = TARRAY2_GET(iter->dataTomb->tombBlock->ekey, iter->dataTomb->tombBlockIdx);
      iter->dataTomb->tombBlockIdx++;
H
Hongze Cheng 已提交
273 274 275 276
      goto _exit;
    }

    if (iter->dataTomb->tombBlkArrayIdx >= TARRAY2_SIZE(iter->dataTomb->tombBlkArray)) {
H
Hongze Cheng 已提交
277
      iter->noMoreData = true;
H
Hongze Cheng 已提交
278
      goto _exit;
H
Hongze Cheng 已提交
279 280 281 282 283 284
    }

    for (; iter->dataTomb->tombBlkArrayIdx < TARRAY2_SIZE(iter->dataTomb->tombBlkArray);
         iter->dataTomb->tombBlkArrayIdx++) {
      const STombBlk *tombBlk = TARRAY2_GET_PTR(iter->dataTomb->tombBlkArray, iter->dataTomb->tombBlkArrayIdx);

H
Hongze Cheng 已提交
285 286
      if (tbid && tbid->suid == tombBlk->minTbid.suid && tbid->uid == tombBlk->minTbid.uid &&
          tbid->suid == tombBlk->maxTbid.suid && tbid->uid == tombBlk->maxTbid.uid) {
H
Hongze Cheng 已提交
287 288 289
        continue;
      }

H
Hongze Cheng 已提交
290
      int32_t code = tsdbDataFileReadTombBlock(iter->dataTomb->reader, tombBlk, iter->dataTomb->tombBlock);
H
Hongze Cheng 已提交
291 292
      if (code) return code;

H
Hongze Cheng 已提交
293
      iter->dataTomb->tombBlockIdx = 0;
H
Hongze Cheng 已提交
294 295 296 297 298 299 300 301 302
      iter->dataTomb->tombBlkArrayIdx++;
      break;
    }
  }

_exit:
  return 0;
}

H
Hongze Cheng 已提交
303
static int32_t tsdbMemTombIterNext(STsdbIter *iter, const TABLEID *tbid) {
H
Hongze Cheng 已提交
304
  while (!iter->noMoreData) {
H
Hongze Cheng 已提交
305 306 307 308 309 310
    for (; iter->memtTomb->delData;) {
      if (tbid && tbid->uid == iter->memtTomb->tbData->uid) {
        iter->memtTomb->delData = NULL;
        break;
      }

H
Hongze Cheng 已提交
311 312 313 314 315
      if (iter->filterByVersion &&
          (iter->memtTomb->delData->version < iter->range[0] || iter->memtTomb->delData->version > iter->range[1])) {
        continue;
      }

H
Hongze Cheng 已提交
316 317 318 319 320 321 322 323 324 325 326 327 328
      iter->record->suid = iter->memtTomb->tbData->suid;
      iter->record->uid = iter->memtTomb->tbData->uid;
      iter->record->version = iter->memtTomb->delData->version;
      iter->record->skey = iter->memtTomb->delData->sKey;
      iter->record->ekey = iter->memtTomb->delData->eKey;

      iter->memtTomb->delData = iter->memtTomb->delData->pNext;
      goto _exit;
    }

    for (;;) {
      SRBTreeNode *node = tRBTreeIterNext(iter->memtTomb->rbtIter);
      if (node == NULL) {
H
Hongze Cheng 已提交
329
        iter->noMoreData = true;
H
Hongze Cheng 已提交
330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346
        goto _exit;
      }

      iter->memtTomb->tbData = TCONTAINER_OF(node, STbData, rbtn);
      if (tbid && tbid->uid == iter->memtTomb->tbData->uid) {
        continue;
      } else {
        iter->memtTomb->delData = iter->memtTomb->tbData->pHead;
        break;
      }
    }
  }

_exit:
  return 0;
}

H
Hongze Cheng 已提交
347 348 349
static int32_t tsdbSttIterOpen(STsdbIter *iter) {
  int32_t code;

H
Hongze Cheng 已提交
350
  code = tsdbSttFileReadSttBlk(iter->sttData->reader, &iter->sttData->sttBlkArray);
H
Hongze Cheng 已提交
351 352
  if (code) return code;

H
Hongze Cheng 已提交
353
  if (TARRAY2_SIZE(iter->sttData->sttBlkArray) == 0) {
H
Hongze Cheng 已提交
354
    iter->noMoreData = true;
H
Hongze Cheng 已提交
355 356 357
    return 0;
  }

H
Hongze Cheng 已提交
358 359 360
  iter->sttData->sttBlkArrayIdx = 0;
  tBlockDataCreate(iter->sttData->blockData);
  iter->sttData->blockDataIdx = 0;
H
Hongze Cheng 已提交
361 362 363 364 365 366 367

  return tsdbSttIterNext(iter, NULL);
}

static int32_t tsdbDataIterOpen(STsdbIter *iter) {
  int32_t code;

H
Hongze Cheng 已提交
368
  // SBrinBlk
H
Hongze Cheng 已提交
369
  code = tsdbDataFileReadBrinBlk(iter->dataData->reader, &iter->dataData->brinBlkArray);
H
Hongze Cheng 已提交
370 371
  if (code) return code;

H
Hongze Cheng 已提交
372
  if (TARRAY2_SIZE(iter->dataData->brinBlkArray) == 0) {
H
Hongze Cheng 已提交
373
    iter->noMoreData = true;
H
Hongze Cheng 已提交
374 375 376
    return 0;
  }

H
Hongze Cheng 已提交
377
  iter->dataData->brinBlkArrayIdx = 0;
H
Hongze Cheng 已提交
378 379

  // SBrinBlock
H
Hongze Cheng 已提交
380 381
  tBrinBlockInit(iter->dataData->brinBlock);
  iter->dataData->brinBlockIdx = 0;
H
Hongze Cheng 已提交
382 383

  // SBlockData
H
Hongze Cheng 已提交
384 385
  tBlockDataCreate(iter->dataData->blockData);
  iter->dataData->blockDataIdx = 0;
H
Hongze Cheng 已提交
386 387 388 389 390

  return tsdbDataIterNext(iter, NULL);
}

static int32_t tsdbMemTableIterOpen(STsdbIter *iter) {
H
Hongze Cheng 已提交
391 392 393 394 395
  if (iter->memtData->memt->nRow == 0) {
    iter->noMoreData = true;
    return 0;
  }

H
Hongze Cheng 已提交
396
  iter->memtData->iter[0] = tRBTreeIterCreate(iter->memtData->memt->tbDataTree, 1);
H
Hongze Cheng 已提交
397
  return tsdbMemTableIterNext(iter, NULL);
H
Hongze Cheng 已提交
398 399 400
}

static int32_t tsdbSttIterClose(STsdbIter *iter) {
H
Hongze Cheng 已提交
401
  tBlockDataDestroy(iter->sttData->blockData);
H
Hongze Cheng 已提交
402 403 404
  return 0;
}

H
Hongze Cheng 已提交
405 406 407 408 409 410 411
static int32_t tsdbDataTombIterOpen(STsdbIter *iter) {
  int32_t code;

  code = tsdbDataFileReadTombBlk(iter->dataTomb->reader, &iter->dataTomb->tombBlkArray);
  if (code) return code;

  if (TARRAY2_SIZE(iter->dataTomb->tombBlkArray) == 0) {
H
Hongze Cheng 已提交
412
    iter->noMoreData = true;
H
Hongze Cheng 已提交
413 414
    return 0;
  }
H
Hongze Cheng 已提交
415
  iter->dataTomb->tombBlkArrayIdx = 0;
H
Hongze Cheng 已提交
416

H
Hongze Cheng 已提交
417 418
  tTombBlockInit(iter->dataTomb->tombBlock);
  iter->dataTomb->tombBlockIdx = 0;
H
Hongze Cheng 已提交
419 420 421 422

  return tsdbDataTombIterNext(iter, NULL);
}

H
Hongze Cheng 已提交
423 424 425
static int32_t tsdbMemTombIterOpen(STsdbIter *iter) {
  int32_t code;

H
Hongze Cheng 已提交
426
  if (iter->memtTomb->memt->nDel == 0) {
H
Hongze Cheng 已提交
427
    iter->noMoreData = true;
H
Hongze Cheng 已提交
428 429 430
    return 0;
  }

H
Hongze Cheng 已提交
431 432 433 434
  iter->memtTomb->rbtIter[0] = tRBTreeIterCreate(iter->memtTomb->memt->tbDataTree, 1);
  return tsdbMemTombIterNext(iter, NULL);
}

H
Hongze Cheng 已提交
435
static int32_t tsdbDataIterClose(STsdbIter *iter) {
H
Hongze Cheng 已提交
436 437
  tBrinBlockDestroy(iter->dataData->brinBlock);
  tBlockDataDestroy(iter->dataData->blockData);
H
Hongze Cheng 已提交
438 439 440
  return 0;
}

H
Hongze Cheng 已提交
441
static int32_t tsdbMemTableIterClose(STsdbIter *iter) { return 0; }
H
Hongze Cheng 已提交
442

H
Hongze Cheng 已提交
443
static int32_t tsdbSttTombIterNext(STsdbIter *iter, const TABLEID *tbid) {
H
Hongze Cheng 已提交
444
  while (!iter->noMoreData) {
H
Hongze Cheng 已提交
445 446 447
    for (; iter->sttTomb->tombBlockIdx < TOMB_BLOCK_SIZE(iter->sttTomb->tombBlock); iter->sttTomb->tombBlockIdx++) {
      iter->record->suid = TARRAY2_GET(iter->sttTomb->tombBlock->suid, iter->sttTomb->tombBlockIdx);
      iter->record->uid = TARRAY2_GET(iter->sttTomb->tombBlock->uid, iter->sttTomb->tombBlockIdx);
H
Hongze Cheng 已提交
448 449 450 451 452
      iter->record->version = TARRAY2_GET(iter->sttTomb->tombBlock->version, iter->sttTomb->tombBlockIdx);

      if (iter->filterByVersion && (iter->record->version < iter->range[0] || iter->record->version > iter->range[1])) {
        continue;
      }
H
Hongze Cheng 已提交
453 454 455 456 457

      if (tbid && iter->record->suid == tbid->suid && iter->record->uid == tbid->uid) {
        continue;
      }

H
Hongze Cheng 已提交
458 459 460
      iter->record->skey = TARRAY2_GET(iter->sttTomb->tombBlock->skey, iter->sttTomb->tombBlockIdx);
      iter->record->ekey = TARRAY2_GET(iter->sttTomb->tombBlock->ekey, iter->sttTomb->tombBlockIdx);
      iter->sttTomb->tombBlockIdx++;
H
Hongze Cheng 已提交
461 462 463 464
      goto _exit;
    }

    if (iter->sttTomb->tombBlkArrayIdx >= TARRAY2_SIZE(iter->sttTomb->tombBlkArray)) {
H
Hongze Cheng 已提交
465
      iter->noMoreData = true;
H
Hongze Cheng 已提交
466
      goto _exit;
H
Hongze Cheng 已提交
467 468 469 470 471 472
    }

    for (; iter->sttTomb->tombBlkArrayIdx < TARRAY2_SIZE(iter->sttTomb->tombBlkArray);
         iter->sttTomb->tombBlkArrayIdx++) {
      const STombBlk *tombBlk = TARRAY2_GET_PTR(iter->sttTomb->tombBlkArray, iter->sttTomb->tombBlkArrayIdx);

H
Hongze Cheng 已提交
473 474 475 476
      if (iter->filterByVersion && (tombBlk->maxVer < iter->range[0] || tombBlk->minVer > iter->range[1])) {
        continue;
      }

H
Hongze Cheng 已提交
477 478
      if (tbid && tbid->suid == tombBlk->minTbid.suid && tbid->uid == tombBlk->minTbid.uid &&
          tbid->suid == tombBlk->maxTbid.suid && tbid->uid == tombBlk->maxTbid.uid) {
H
Hongze Cheng 已提交
479 480 481
        continue;
      }

H
Hongze Cheng 已提交
482
      int32_t code = tsdbSttFileReadTombBlock(iter->sttTomb->reader, tombBlk, iter->sttTomb->tombBlock);
H
Hongze Cheng 已提交
483 484
      if (code) return code;

H
Hongze Cheng 已提交
485
      iter->sttTomb->tombBlockIdx = 0;
H
Hongze Cheng 已提交
486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501
      iter->sttTomb->tombBlkArrayIdx++;
      break;
    }
  }

_exit:
  return 0;
}

static int32_t tsdbSttTombIterOpen(STsdbIter *iter) {
  int32_t code;

  code = tsdbSttFileReadTombBlk(iter->sttTomb->reader, &iter->sttTomb->tombBlkArray);
  if (code) return code;

  if (TARRAY2_SIZE(iter->sttTomb->tombBlkArray) == 0) {
H
Hongze Cheng 已提交
502
    iter->noMoreData = true;
H
Hongze Cheng 已提交
503 504 505 506
    return 0;
  }

  iter->sttTomb->tombBlkArrayIdx = 0;
H
Hongze Cheng 已提交
507 508
  tTombBlockInit(iter->sttTomb->tombBlock);
  iter->sttTomb->tombBlockIdx = 0;
H
Hongze Cheng 已提交
509 510 511 512

  return tsdbSttTombIterNext(iter, NULL);
}

H
Hongze Cheng 已提交
513 514 515 516
int32_t tsdbIterOpen(const STsdbIterConfig *config, STsdbIter **iter) {
  int32_t code;

  iter[0] = taosMemoryCalloc(1, sizeof(*iter[0]));
H
Hongze Cheng 已提交
517 518 519
  if (iter[0] == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
H
Hongze Cheng 已提交
520 521

  iter[0]->type = config->type;
H
Hongze Cheng 已提交
522 523 524 525 526 527 528
  iter[0]->noMoreData = false;
  iter[0]->filterByVersion = config->filterByVersion;
  if (iter[0]->filterByVersion) {
    iter[0]->range[0] = config->verRange[0];
    iter[0]->range[1] = config->verRange[1];
  }

H
Hongze Cheng 已提交
529 530
  switch (config->type) {
    case TSDB_ITER_TYPE_STT:
H
Hongze Cheng 已提交
531
      iter[0]->sttData->reader = config->sttReader;
H
Hongze Cheng 已提交
532 533 534
      code = tsdbSttIterOpen(iter[0]);
      break;
    case TSDB_ITER_TYPE_DATA:
H
Hongze Cheng 已提交
535
      iter[0]->dataData->reader = config->dataReader;
H
Hongze Cheng 已提交
536 537 538
      code = tsdbDataIterOpen(iter[0]);
      break;
    case TSDB_ITER_TYPE_MEMT:
H
Hongze Cheng 已提交
539 540
      iter[0]->memtData->memt = config->memt;
      iter[0]->memtData->from[0] = config->from[0];
H
Hongze Cheng 已提交
541 542
      code = tsdbMemTableIterOpen(iter[0]);
      break;
H
Hongze Cheng 已提交
543 544 545 546 547 548 549 550
    case TSDB_ITER_TYPE_STT_TOMB:
      iter[0]->sttTomb->reader = config->sttReader;
      code = tsdbSttTombIterOpen(iter[0]);
      break;
    case TSDB_ITER_TYPE_DATA_TOMB:
      iter[0]->dataTomb->reader = config->dataReader;
      code = tsdbDataTombIterOpen(iter[0]);
      break;
H
Hongze Cheng 已提交
551 552 553 554
    case TSDB_ITER_TYPE_MEMT_TOMB:
      iter[0]->memtTomb->memt = config->memt;
      code = tsdbMemTombIterOpen(iter[0]);
      break;
H
Hongze Cheng 已提交
555
    default:
H
Hongze Cheng 已提交
556
      code = TSDB_CODE_INVALID_PARA;
H
Hongze Cheng 已提交
557
      ASSERTS(false, "Not implemented");
H
Hongze Cheng 已提交
558 559 560 561 562 563 564 565 566
  }

  if (code) {
    taosMemoryFree(iter[0]);
    iter[0] = NULL;
  }
  return code;
}

H
Hongze Cheng 已提交
567
static int32_t tsdbSttTombIterClose(STsdbIter *iter) {
H
Hongze Cheng 已提交
568
  tTombBlockDestroy(iter->sttTomb->tombBlock);
H
Hongze Cheng 已提交
569 570 571 572
  return 0;
}

static int32_t tsdbDataTombIterClose(STsdbIter *iter) {
H
Hongze Cheng 已提交
573
  tTombBlockDestroy(iter->dataTomb->tombBlock);
H
Hongze Cheng 已提交
574 575 576
  return 0;
}

H
Hongze Cheng 已提交
577 578 579 580 581 582 583 584 585 586 587
int32_t tsdbIterClose(STsdbIter **iter) {
  switch (iter[0]->type) {
    case TSDB_ITER_TYPE_STT:
      tsdbSttIterClose(iter[0]);
      break;
    case TSDB_ITER_TYPE_DATA:
      tsdbDataIterClose(iter[0]);
      break;
    case TSDB_ITER_TYPE_MEMT:
      tsdbMemTableIterClose(iter[0]);
      break;
H
Hongze Cheng 已提交
588 589 590 591 592 593
    case TSDB_ITER_TYPE_STT_TOMB:
      tsdbSttTombIterClose(iter[0]);
      break;
    case TSDB_ITER_TYPE_DATA_TOMB:
      tsdbDataTombIterClose(iter[0]);
      break;
H
Hongze Cheng 已提交
594 595
    case TSDB_ITER_TYPE_MEMT_TOMB:
      break;
H
Hongze Cheng 已提交
596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611
    default:
      ASSERT(false);
  }
  taosMemoryFree(iter[0]);
  iter[0] = NULL;
  return 0;
}

int32_t tsdbIterNext(STsdbIter *iter) {
  switch (iter->type) {
    case TSDB_ITER_TYPE_STT:
      return tsdbSttIterNext(iter, NULL);
    case TSDB_ITER_TYPE_DATA:
      return tsdbDataIterNext(iter, NULL);
    case TSDB_ITER_TYPE_MEMT:
      return tsdbMemTableIterNext(iter, NULL);
H
Hongze Cheng 已提交
612 613 614 615
    case TSDB_ITER_TYPE_STT_TOMB:
      return tsdbSttTombIterNext(iter, NULL);
    case TSDB_ITER_TYPE_DATA_TOMB:
      return tsdbDataTombIterNext(iter, NULL);
H
Hongze Cheng 已提交
616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632
    default:
      ASSERT(false);
  }
  return 0;
}

static int32_t tsdbIterSkipTableData(STsdbIter *iter, const TABLEID *tbid) {
  switch (iter->type) {
    case TSDB_ITER_TYPE_STT:
      return tsdbSttIterNext(iter, tbid);
    case TSDB_ITER_TYPE_DATA:
      return tsdbDataIterNext(iter, tbid);
    case TSDB_ITER_TYPE_MEMT:
      return tsdbMemTableIterNext(iter, tbid);
    default:
      ASSERT(false);
  }
H
Hongze Cheng 已提交
633 634 635 636 637 638 639 640 641
  return 0;
}

static int32_t tsdbIterCmprFn(const SRBTreeNode *n1, const SRBTreeNode *n2) {
  STsdbIter *iter1 = TCONTAINER_OF(n1, STsdbIter, node);
  STsdbIter *iter2 = TCONTAINER_OF(n2, STsdbIter, node);
  return tRowInfoCmprFn(&iter1->row, &iter2->row);
}

H
Hongze Cheng 已提交
642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666
static int32_t tsdbTombIterCmprFn(const SRBTreeNode *n1, const SRBTreeNode *n2) {
  STsdbIter *iter1 = TCONTAINER_OF(n1, STsdbIter, node);
  STsdbIter *iter2 = TCONTAINER_OF(n2, STsdbIter, node);

  if (iter1->record->suid < iter2->record->suid) {
    return -1;
  } else if (iter1->record->suid > iter2->record->suid) {
    return 1;
  }

  if (iter1->record->uid < iter2->record->uid) {
    return -1;
  } else if (iter1->record->uid > iter2->record->uid) {
    return 1;
  }

  if (iter1->record->version < iter2->record->version) {
    return -1;
  } else if (iter1->record->version > iter2->record->version) {
    return 1;
  }

  return 0;
}

H
Hongze Cheng 已提交
667 668
// SIterMerger ================
struct SIterMerger {
H
Hongze Cheng 已提交
669
  bool       isTomb;
H
Hongze Cheng 已提交
670 671 672 673
  STsdbIter *iter;
  SRBTree    iterTree[1];
};

H
Hongze Cheng 已提交
674
int32_t tsdbIterMergerOpen(const TTsdbIterArray *iterArray, SIterMerger **merger, bool isTomb) {
H
Hongze Cheng 已提交
675 676
  STsdbIter   *iter;
  SRBTreeNode *node;
H
Hongze Cheng 已提交
677 678

  merger[0] = taosMemoryCalloc(1, sizeof(*merger[0]));
H
Hongze Cheng 已提交
679 680 681
  if (merger[0] == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
H
Hongze Cheng 已提交
682

H
Hongze Cheng 已提交
683
  merger[0]->isTomb = isTomb;
H
Hongze Cheng 已提交
684 685 686 687 688
  if (isTomb) {
    tRBTreeCreate(merger[0]->iterTree, tsdbTombIterCmprFn);
  } else {
    tRBTreeCreate(merger[0]->iterTree, tsdbIterCmprFn);
  }
H
Hongze Cheng 已提交
689
  TARRAY2_FOREACH(iterArray, iter) {
H
Hongze Cheng 已提交
690
    if (iter->noMoreData) continue;
H
Hongze Cheng 已提交
691
    node = tRBTreePut(merger[0]->iterTree, iter->node);
H
Hongze Cheng 已提交
692 693 694 695 696 697
    ASSERT(node);
  }

  return tsdbIterMergerNext(merger[0]);
}

H
Hongze Cheng 已提交
698
int32_t tsdbIterMergerClose(SIterMerger **merger) {
H
Hongze Cheng 已提交
699 700 701 702
  if (merger[0]) {
    taosMemoryFree(merger[0]);
    merger[0] = NULL;
  }
H
Hongze Cheng 已提交
703 704 705 706 707 708 709 710 711 712 713 714
  return 0;
}

int32_t tsdbIterMergerNext(SIterMerger *merger) {
  int32_t      code;
  int32_t      c;
  SRBTreeNode *node;

  if (merger->iter) {
    code = tsdbIterNext(merger->iter);
    if (code) return code;

H
Hongze Cheng 已提交
715
    if (merger->iter->noMoreData) {
H
Hongze Cheng 已提交
716 717
      merger->iter = NULL;
    } else if ((node = tRBTreeMin(merger->iterTree))) {
H
Hongze Cheng 已提交
718
      c = merger->iterTree->cmprFn(merger->iter->node, node);
H
Hongze Cheng 已提交
719 720 721 722
      ASSERT(c);
      if (c > 0) {
        node = tRBTreePut(merger->iterTree, merger->iter->node);
        ASSERT(node);
H
Hongze Cheng 已提交
723
        merger->iter = NULL;
H
Hongze Cheng 已提交
724 725 726 727
      }
    }
  }

H
Hongze Cheng 已提交
728
  if (merger->iter == NULL && (node = tRBTreeDropMin(merger->iterTree))) {
H
Hongze Cheng 已提交
729 730 731 732 733 734
    merger->iter = TCONTAINER_OF(node, STsdbIter, node);
  }

  return 0;
}

H
Hongze Cheng 已提交
735 736 737 738 739 740 741 742 743
SRowInfo *tsdbIterMergerGetData(SIterMerger *merger) {
  ASSERT(!merger->isTomb);
  return merger->iter ? merger->iter->row : NULL;
}

STombRecord *tsdbIterMergerGetTombRecord(SIterMerger *merger) {
  ASSERT(merger->isTomb);
  return merger->iter ? merger->iter->record : NULL;
}
H
Hongze Cheng 已提交
744 745 746 747 748 749 750

int32_t tsdbIterMergerSkipTableData(SIterMerger *merger, const TABLEID *tbid) {
  int32_t      code;
  int32_t      c;
  SRBTreeNode *node;

  while (merger->iter && tbid->suid == merger->iter->row->suid && tbid->uid == merger->iter->row->uid) {
H
Hongze Cheng 已提交
751
    int32_t code = tsdbIterSkipTableData(merger->iter, tbid);
H
Hongze Cheng 已提交
752 753
    if (code) return code;

H
Hongze Cheng 已提交
754
    if (merger->iter->noMoreData) {
H
Hongze Cheng 已提交
755
      merger->iter = NULL;
H
Hongze Cheng 已提交
756
    } else if ((node = tRBTreeMin(merger->iterTree))) {
H
Hongze Cheng 已提交
757
      c = merger->iterTree->cmprFn(merger->iter->node, node);
H
Hongze Cheng 已提交
758 759 760 761
      ASSERT(c);
      if (c > 0) {
        node = tRBTreePut(merger->iterTree, merger->iter->node);
        ASSERT(node);
H
Hongze Cheng 已提交
762
        merger->iter = NULL;
H
Hongze Cheng 已提交
763 764 765 766 767 768 769
      }
    }

    if (!merger->iter && (node = tRBTreeDropMin(merger->iterTree))) {
      merger->iter = TCONTAINER_OF(node, STsdbIter, node);
    }
  }
H
Hongze Cheng 已提交
770

H
Hongze Cheng 已提交
771 772
  return 0;
}