tsdbCache.c 40.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tsdb.h"

int32_t tsdbOpenCache(STsdb *pTsdb) {
H
Hongze Cheng 已提交
19
  int32_t    code = 0;
20
  SLRUCache *pCache = NULL;
21
  size_t     cfgCapacity = pTsdb->pVnode->config.cacheLastSize * 1024 * 1024;
22 23 24 25 26 27 28 29 30

  pCache = taosLRUCacheInit(cfgCapacity, -1, .5);
  if (pCache == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

  taosLRUCacheSetStrictCapacity(pCache, true);

31 32
  taosThreadMutexInit(&pTsdb->lruMutex, NULL);

33 34 35 36 37
_err:
  pTsdb->lruCache = pCache;
  return code;
}

38 39
void tsdbCloseCache(STsdb *pTsdb) {
  SLRUCache *pCache = pTsdb->lruCache;
40 41 42 43
  if (pCache) {
    taosLRUCacheEraseUnrefEntries(pCache);

    taosLRUCacheCleanup(pCache);
44 45

    taosThreadMutexDestroy(&pTsdb->lruMutex);
46 47 48
  }
}

49 50 51 52 53 54 55 56 57 58
static void getTableCacheKey(tb_uid_t uid, int cacheType, char *key, int *len) {
  if (cacheType == 0) {  // last_row
    *(uint64_t *)key = (uint64_t)uid;
  } else {  // last
    *(uint64_t *)key = ((uint64_t)uid) | 0x8000000000000000;
  }

  *len = sizeof(uint64_t);
}

59 60 61 62 63 64 65 66 67 68 69 70
static void deleteTableCacheLast(const void *key, size_t keyLen, void *value) {
  SArray *pLastArray = (SArray *)value;
  int16_t nCol = taosArrayGetSize(pLastArray);
  for (int16_t iCol = 0; iCol < nCol; ++iCol) {
    SLastCol *pLastCol = (SLastCol *)taosArrayGet(pLastArray, iCol);
    if (IS_VAR_DATA_TYPE(pLastCol->colVal.type)) {
      taosMemoryFree(pLastCol->colVal.value.pData);
    }
  }

  taosArrayDestroy(value);
}
71

72
int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) {
73
  int32_t code = 0;
74 75 76

  char key[32] = {0};
  int  keyLen = 0;
77

78 79
  // getTableCacheKey(uid, "lr", key, &keyLen);
  getTableCacheKey(uid, 0, key, &keyLen);
80 81
  LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
  if (h) {
82 83 84 85 86 87 88 89 90 91 92 93 94
    SArray *pLast = (SArray *)taosLRUCacheValue(pCache, h);
    bool    invalidate = false;
    int16_t nCol = taosArrayGetSize(pLast);

    for (int16_t iCol = 0; iCol < nCol; ++iCol) {
      SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol);
      if (eKey >= tTsVal->ts) {
        invalidate = true;
        break;
      }
    }

    if (invalidate) {
95 96 97 98 99 100 101 102 103 104
      taosLRUCacheRelease(pCache, h, true);
    } else {
      taosLRUCacheRelease(pCache, h, false);
    }
    // void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen);
  }

  return code;
}

105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
int32_t tsdbCacheDeleteLast(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) {
  int32_t code = 0;

  char key[32] = {0};
  int  keyLen = 0;

  // getTableCacheKey(uid, "l", key, &keyLen);
  getTableCacheKey(uid, 1, key, &keyLen);
  LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
  if (h) {
    SArray *pLast = (SArray *)taosLRUCacheValue(pCache, h);
    bool    invalidate = false;
    int16_t nCol = taosArrayGetSize(pLast);

    for (int16_t iCol = 0; iCol < nCol; ++iCol) {
      SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol);
      if (eKey >= tTsVal->ts) {
        invalidate = true;
        break;
      }
    }

    if (invalidate) {
      taosLRUCacheRelease(pCache, h, true);
    } else {
      taosLRUCacheRelease(pCache, h, false);
    }
    // void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen);
  }

  return code;
}

int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) {
139 140 141 142
  int32_t code = 0;
  char    key[32] = {0};
  int     keyLen = 0;

143 144 145 146
  // getTableCacheKey(uid, "lr", key, &keyLen);
  getTableCacheKey(uid, 0, key, &keyLen);
  LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
  if (h) {
147 148 149 150 151 152 153 154 155 156 157 158 159
    SArray *pLast = (SArray *)taosLRUCacheValue(pCache, h);
    bool    invalidate = false;
    int16_t nCol = taosArrayGetSize(pLast);

    for (int16_t iCol = 0; iCol < nCol; ++iCol) {
      SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol);
      if (eKey >= tTsVal->ts) {
        invalidate = true;
        break;
      }
    }

    if (invalidate) {
160 161 162 163 164 165
      taosLRUCacheRelease(pCache, h, true);
    } else {
      taosLRUCacheRelease(pCache, h, false);
    }
  }

166 167
  // getTableCacheKey(uid, "l", key, &keyLen);
  getTableCacheKey(uid, 1, key, &keyLen);
168
  h = taosLRUCacheLookup(pCache, key, keyLen);
169
  if (h) {
170 171 172
    SArray *pLast = (SArray *)taosLRUCacheValue(pCache, h);
    bool    invalidate = false;
    int16_t nCol = taosArrayGetSize(pLast);
173

174 175 176 177 178 179 180 181 182 183 184 185 186
    for (int16_t iCol = 0; iCol < nCol; ++iCol) {
      SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol);
      if (eKey >= tTsVal->ts) {
        invalidate = true;
        break;
      }
    }

    if (invalidate) {
      taosLRUCacheRelease(pCache, h, true);
    } else {
      taosLRUCacheRelease(pCache, h, false);
    }
187 188 189 190 191 192
    // void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen);
  }

  return code;
}

193
int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, STSRow *row, bool dup) {
194 195
  int32_t code = 0;
  STSRow *cacheRow = NULL;
H
Hongze Cheng 已提交
196 197
  char    key[32] = {0};
  int     keyLen = 0;
198

199 200
  // getTableCacheKey(uid, "lr", key, &keyLen);
  getTableCacheKey(uid, 0, key, &keyLen);
201 202
  LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
  if (h) {
203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242
    STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
    TSKEY     keyTs = row->ts;
    bool      invalidate = false;

    SArray *pLast = (SArray *)taosLRUCacheValue(pCache, h);
    int16_t nCol = taosArrayGetSize(pLast);
    int16_t iCol = 0;

    SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol);
    if (keyTs > tTsVal->ts) {
      STColumn *pTColumn = &pTSchema->columns[0];
      SColVal   tColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = keyTs});

      taosArraySet(pLast, iCol, &(SLastCol){.ts = keyTs, .colVal = tColVal});
    }

    for (++iCol; iCol < nCol; ++iCol) {
      SLastCol *tTsVal1 = (SLastCol *)taosArrayGet(pLast, iCol);
      if (keyTs >= tTsVal1->ts) {
        SColVal *tColVal = &tTsVal1->colVal;

        SColVal colVal = {0};
        tTSRowGetVal(row, pTSchema, iCol, &colVal);
        if (!COL_VAL_IS_NONE(&colVal)) {
          if (keyTs == tTsVal1->ts && !COL_VAL_IS_NONE(tColVal)) {
            invalidate = true;

            break;
          }
        } else {
          taosArraySet(pLast, iCol, &(SLastCol){.ts = keyTs, .colVal = colVal});
        }
      }
    }

  _invalidate:
    taosMemoryFreeClear(pTSchema);

    taosLRUCacheRelease(pCache, h, invalidate);
    /*
H
Hongze Cheng 已提交
243
    cacheRow = (STSRow *)taosLRUCacheValue(pCache, h);
244
    if (row->ts >= cacheRow->ts) {
245
      if (row->ts == cacheRow->ts) {
M
Minglei Jin 已提交
246
        STSRow    *mergedRow = NULL;
247
        SRowMerger merger = {0};
248
        STSchema  *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
249 250 251 252 253 254 255 256 257 258 259 260 261 262

        tRowMergerInit(&merger, &tsdbRowFromTSRow(0, cacheRow), pTSchema);

        tRowMerge(&merger, &tsdbRowFromTSRow(1, row));

        tRowMergerGetRow(&merger, &mergedRow);
        tRowMergerClear(&merger);

        taosMemoryFreeClear(pTSchema);

        row = mergedRow;
        dup = false;
      }

263
      if (TD_ROW_LEN(row) <= TD_ROW_LEN(cacheRow)) {
H
Hongze Cheng 已提交
264
        tdRowCpy(cacheRow, row);
265 266 267
        if (!dup) {
          taosMemoryFree(row);
        }
268 269

        taosLRUCacheRelease(pCache, h, false);
270
      } else {
271 272
        taosLRUCacheRelease(pCache, h, true);
        // tsdbCacheDeleteLastrow(pCache, uid, TSKEY_MAX);
M
Minglei Jin 已提交
273 274 275 276 277
        if (dup) {
          cacheRow = tdRowDup(row);
        } else {
          cacheRow = row;
        }
278 279 280 281 282 283
        _taos_lru_deleter_t deleter = deleteTableCacheLastrow;
        LRUStatus status = taosLRUCacheInsert(pCache, key, keyLen, cacheRow, TD_ROW_LEN(cacheRow), deleter, NULL,
                                              TAOS_LRU_PRIORITY_LOW);
        if (status != TAOS_LRU_STATUS_OK) {
          code = -1;
        }
284
        // tsdbCacheInsertLastrow(pCache, uid, row, dup);
285
      }
286
    }*/
287
  } /*else {
288 289 290 291 292
    if (dup) {
      cacheRow = tdRowDup(row);
    } else {
      cacheRow = row;
    }
293 294

    _taos_lru_deleter_t deleter = deleteTableCacheLastrow;
H
Hongze Cheng 已提交
295 296
    LRUStatus           status =
        taosLRUCacheInsert(pCache, key, keyLen, cacheRow, TD_ROW_LEN(cacheRow), deleter, NULL, TAOS_LRU_PRIORITY_LOW);
297 298 299
    if (status != TAOS_LRU_STATUS_OK) {
      code = -1;
    }
300
    }*/
301 302 303 304

  return code;
}

305
int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, STSRow *row, STsdb *pTsdb) {
306 307 308 309 310
  int32_t code = 0;
  STSRow *cacheRow = NULL;
  char    key[32] = {0};
  int     keyLen = 0;

311 312
  // getTableCacheKey(uid, "l", key, &keyLen);
  getTableCacheKey(uid, 1, key, &keyLen);
313 314
  LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
  if (h) {
315
    STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
316 317 318 319 320 321 322 323 324 325
    TSKEY     keyTs = row->ts;
    bool      invalidate = false;

    SArray *pLast = (SArray *)taosLRUCacheValue(pCache, h);
    int16_t nCol = taosArrayGetSize(pLast);
    int16_t iCol = 0;

    SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol);
    if (keyTs > tTsVal->ts) {
      STColumn *pTColumn = &pTSchema->columns[0];
H
Hongze Cheng 已提交
326
      SColVal   tColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = keyTs});
327 328 329 330 331

      taosArraySet(pLast, iCol, &(SLastCol){.ts = keyTs, .colVal = tColVal});
    }

    for (++iCol; iCol < nCol; ++iCol) {
332 333 334
      SLastCol *tTsVal1 = (SLastCol *)taosArrayGet(pLast, iCol);
      if (keyTs >= tTsVal1->ts) {
        SColVal *tColVal = &tTsVal1->colVal;
335 336 337

        SColVal colVal = {0};
        tTSRowGetVal(row, pTSchema, iCol, &colVal);
H
Hongze Cheng 已提交
338 339
        if (!COL_VAL_IS_VALUE(&colVal)) {
          if (keyTs == tTsVal1->ts && COL_VAL_IS_VALUE(tColVal)) {
340 341 342 343 344 345 346 347 348 349
            invalidate = true;

            break;
          }
        } else {
          taosArraySet(pLast, iCol, &(SLastCol){.ts = keyTs, .colVal = colVal});
        }
      }
    }

350
  _invalidate:
351 352 353 354
    taosMemoryFreeClear(pTSchema);

    taosLRUCacheRelease(pCache, h, invalidate);

355
    // clear last cache anyway, lazy load when get last lookup
356
    // taosLRUCacheRelease(pCache, h, true);
357 358 359 360 361
  }

  return code;
}

362 363 364 365 366 367
static tb_uid_t getTableSuidByUid(tb_uid_t uid, STsdb *pTsdb) {
  tb_uid_t suid = 0;

  SMetaReader mr = {0};
  metaReaderInit(&mr, pTsdb->pVnode->pMeta, 0);
  if (metaGetTableEntryByUid(&mr, uid) < 0) {
H
Hongze Cheng 已提交
368
    metaReaderClear(&mr);  // table not esist
369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388
    return 0;
  }

  if (mr.me.type == TSDB_CHILD_TABLE) {
    suid = mr.me.ctbEntry.suid;
  } else if (mr.me.type == TSDB_NORMAL_TABLE) {
    suid = 0;
  } else {
    suid = 0;
  }

  metaReaderClear(&mr);

  return suid;
}

static int32_t getTableDelDataFromDelIdx(SDelFReader *pDelReader, SDelIdx *pDelIdx, SArray *aDelData) {
  int32_t code = 0;

  if (pDelIdx) {
H
Hongze Cheng 已提交
389
    code = tsdbReadDelData(pDelReader, pDelIdx, aDelData);
390 391 392 393 394 395
  }

  return code;
}

static int32_t getTableDelDataFromTbData(STbData *pTbData, SArray *aDelData) {
H
Hongze Cheng 已提交
396
  int32_t   code = 0;
397 398 399 400 401 402 403 404 405
  SDelData *pDelData = pTbData ? pTbData->pHead : NULL;

  for (; pDelData; pDelData = pDelData->pNext) {
    taosArrayPush(aDelData, pDelData);
  }

  return code;
}

H
Hongze Cheng 已提交
406 407
static int32_t getTableDelData(STbData *pMem, STbData *pIMem, SDelFReader *pDelReader, SDelIdx *pDelIdx,
                               SArray *aDelData) {
408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428
  int32_t code = 0;

  if (pMem) {
    code = getTableDelDataFromTbData(pMem, aDelData);
    if (code) goto _err;
  }

  if (pIMem) {
    code = getTableDelDataFromTbData(pIMem, aDelData);
    if (code) goto _err;
  }

  if (pDelIdx) {
    code = getTableDelDataFromDelIdx(pDelReader, pDelIdx, aDelData);
    if (code) goto _err;
  }

_err:
  return code;
}

H
Hongze Cheng 已提交
429 430
static int32_t getTableDelSkyline(STbData *pMem, STbData *pIMem, SDelFReader *pDelReader, SDelIdx *pDelIdx,
                                  SArray *aSkyline) {
431
  int32_t code = 0;
M
Minglei Jin 已提交
432
  SArray *aDelData = NULL;
433

M
Minglei Jin 已提交
434
  aDelData = taosArrayInit(32, sizeof(SDelData));
435 436 437 438
  code = getTableDelData(pMem, pIMem, pDelReader, pDelIdx, aDelData);
  if (code) goto _err;

  size_t nDelData = taosArrayGetSize(aDelData);
439 440 441 442
  if (nDelData > 0) {
    code = tsdbBuildDeleteSkyline(aDelData, 0, (int32_t)(nDelData - 1), aSkyline);
    if (code) goto _err;
  }
443

444
_err:
M
Minglei Jin 已提交
445 446 447
  if (aDelData) {
    taosArrayDestroy(aDelData);
  }
448 449 450 451 452
  return code;
}

static int32_t getTableDelIdx(SDelFReader *pDelFReader, tb_uid_t suid, tb_uid_t uid, SDelIdx *pDelIdx) {
  int32_t code = 0;
M
Minglei Jin 已提交
453
  SArray *pDelIdxArray = NULL;
454

455
  // SMapData delIdxMap;
M
Minglei Jin 已提交
456
  pDelIdxArray = taosArrayInit(32, sizeof(SDelIdx));
457
  SDelIdx idx = {.suid = suid, .uid = uid};
458

459
  // tMapDataReset(&delIdxMap);
H
Hongze Cheng 已提交
460
  code = tsdbReadDelIdx(pDelFReader, pDelIdxArray);
461 462
  if (code) goto _err;

463
  // code = tMapDataSearch(&delIdxMap, &idx, tGetDelIdx, tCmprDelIdx, pDelIdx);
464
  SDelIdx *pIdx = taosArraySearch(pDelIdxArray, &idx, tCmprDelIdx, TD_EQ);
465

466 467
  *pDelIdx = *pIdx;

468
_err:
M
Minglei Jin 已提交
469 470 471
  if (pDelIdxArray) {
    taosArrayDestroy(pDelIdxArray);
  }
472 473
  return code;
}
474

475 476 477 478 479 480 481 482
typedef enum {
  SFSLASTNEXTROW_FS,
  SFSLASTNEXTROW_FILESET,
  SFSLASTNEXTROW_BLOCKDATA,
  SFSLASTNEXTROW_BLOCKROW
} SFSLASTNEXTROWSTATES;

typedef struct {
483 484 485
  SFSLASTNEXTROWSTATES state;     // [input]
  STsdb               *pTsdb;     // [input]
  STSchema            *pTSchema;  // [input]
486
  tb_uid_t             suid;
487
  tb_uid_t             uid;
488 489 490 491 492
  int32_t              nFileSet;
  int32_t              iFileSet;
  SArray              *aDFileSet;
  SDataFReader        *pDataFReader;
  TSDBROW              row;
493

494 495
  SMergeTree  mergeTree;
  SMergeTree *pMergeTree;
496 497 498 499 500 501 502 503 504 505 506 507 508 509
} SFSLastNextRowIter;

static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) {
  SFSLastNextRowIter *state = (SFSLastNextRowIter *)iter;
  int32_t             code = 0;

  switch (state->state) {
    case SFSLASTNEXTROW_FS:
      state->nFileSet = taosArrayGetSize(state->aDFileSet);
      state->iFileSet = state->nFileSet;

    case SFSLASTNEXTROW_FILESET: {
      SDFileSet *pFileSet = NULL;
    _next_fileset:
510 511 512 513 514
      if (state->pMergeTree != NULL) {
        tMergeTreeClose(state->pMergeTree);
        state->pMergeTree = NULL;
      }

515 516 517 518 519 520 521
      if (--state->iFileSet >= 0) {
        pFileSet = (SDFileSet *)taosArrayGet(state->aDFileSet, state->iFileSet);
      } else {
        *ppRow = NULL;
        return code;
      }

M
Minglei Jin 已提交
522 523 524 525 526
      if (state->pDataFReader != NULL) {
        tsdbDataFReaderClose(&state->pDataFReader);
        state->pDataFReader = NULL;
      }

527 528 529
      code = tsdbDataFReaderOpen(&state->pDataFReader, state->pTsdb, pFileSet);
      if (code) goto _err;

530
      SSttBlockLoadInfo *pLoadInfo = tCreateLastBlockLoadInfo(state->pTSchema, NULL, 0);
531
      tMergeTreeOpen(&state->mergeTree, 1, state->pDataFReader, state->suid, state->uid,
532
                     &(STimeWindow){.skey = TSKEY_MIN, .ekey = TSKEY_MAX},
533
                     &(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}, pLoadInfo, true, NULL);
534
      state->pMergeTree = &state->mergeTree;
535 536 537
      bool hasVal = tMergeTreeNext(&state->mergeTree);
      if (!hasVal) {
        state->state = SFSLASTNEXTROW_FILESET;
538 539 540
        goto _next_fileset;
      }
      state->state = SFSLASTNEXTROW_BLOCKROW;
541
    }
542
    case SFSLASTNEXTROW_BLOCKROW:
543 544 545 546 547
      state->row = tMergeTreeGetRow(&state->mergeTree);
      *ppRow = &state->row;
      bool hasVal = tMergeTreeNext(&state->mergeTree);
      if (!hasVal) {
        state->state = SFSLASTNEXTROW_FILESET;
548
      }
549

550 551 552 553 554 555 556 557 558 559 560
      return code;
    default:
      ASSERT(0);
      break;
  }

_err:
  if (state->pDataFReader) {
    tsdbDataFReaderClose(&state->pDataFReader);
    state->pDataFReader = NULL;
  }
561 562 563 564 565
  if (state->pMergeTree != NULL) {
    tMergeTreeClose(state->pMergeTree);
    state->pMergeTree = NULL;
  }

566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583
  *ppRow = NULL;

  return code;
}

int32_t clearNextRowFromFSLast(void *iter) {
  SFSLastNextRowIter *state = (SFSLastNextRowIter *)iter;
  int32_t             code = 0;

  if (!state) {
    return code;
  }

  if (state->pDataFReader) {
    tsdbDataFReaderClose(&state->pDataFReader);
    state->pDataFReader = NULL;
  }

584 585 586 587 588
  if (state->pMergeTree != NULL) {
    tMergeTreeClose(state->pMergeTree);
    state->pMergeTree = NULL;
  }

589 590 591
  return code;
}

592 593 594 595 596 597 598 599 600 601 602
typedef enum SFSNEXTROWSTATES {
  SFSNEXTROW_FS,
  SFSNEXTROW_FILESET,
  SFSNEXTROW_BLOCKDATA,
  SFSNEXTROW_BLOCKROW
} SFSNEXTROWSTATES;

typedef struct SFSNextRowIter {
  SFSNEXTROWSTATES state;         // [input]
  STsdb           *pTsdb;         // [input]
  SBlockIdx       *pBlockIdxExp;  // [input]
603 604 605
  STSchema        *pTSchema;      // [input]
  tb_uid_t         suid;
  tb_uid_t         uid;
606 607 608 609
  int32_t          nFileSet;
  int32_t          iFileSet;
  SArray          *aDFileSet;
  SDataFReader    *pDataFReader;
610
  SArray          *aBlockIdx;
611 612 613 614
  SBlockIdx       *pBlockIdx;
  SMapData         blockMap;
  int32_t          nBlock;
  int32_t          iBlock;
H
Hongze Cheng 已提交
615
  SDataBlk         block;
616 617 618 619 620
  SBlockData       blockData;
  SBlockData      *pBlockData;
  int32_t          nRow;
  int32_t          iRow;
  TSDBROW          row;
621 622 623 624 625 626 627 628
} SFSNextRowIter;

static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
  SFSNextRowIter *state = (SFSNextRowIter *)iter;
  int32_t         code = 0;

  switch (state->state) {
    case SFSNEXTROW_FS:
H
Hongze Cheng 已提交
629
      // state->aDFileSet = state->pTsdb->pFS->cState->aDFileSet;
630
      state->nFileSet = taosArrayGetSize(state->aDFileSet);
M
Minglei Jin 已提交
631
      state->iFileSet = state->nFileSet;
632 633

      state->pBlockData = NULL;
634 635 636

    case SFSNEXTROW_FILESET: {
      SDFileSet *pFileSet = NULL;
637
    _next_fileset:
638 639 640
      if (--state->iFileSet >= 0) {
        pFileSet = (SDFileSet *)taosArrayGet(state->aDFileSet, state->iFileSet);
      } else {
H
Hongze Cheng 已提交
641
        // tBlockDataDestroy(&state->blockData, 1);
642
        if (state->pBlockData) {
H
Hongze Cheng 已提交
643
          tBlockDataDestroy(state->pBlockData, 1);
644 645
          state->pBlockData = NULL;
        }
646

647 648 649 650 651 652
        *ppRow = NULL;
        return code;
      }

      code = tsdbDataFReaderOpen(&state->pDataFReader, state->pTsdb, pFileSet);
      if (code) goto _err;
653 654 655 656 657 658 659

      // tMapDataReset(&state->blockIdxMap);
      if (!state->aBlockIdx) {
        state->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
      } else {
        taosArrayClear(state->aBlockIdx);
      }
H
Hongze Cheng 已提交
660
      code = tsdbReadBlockIdx(state->pDataFReader, state->aBlockIdx);
661 662
      if (code) goto _err;

663 664
      /* if (state->pBlockIdx) { */
      /* } */
665 666
      /* code = tMapDataSearch(&state->blockIdxMap, state->pBlockIdxExp, tGetBlockIdx, tCmprBlockIdx,
       * &state->blockIdx);
667 668
       */
      state->pBlockIdx = taosArraySearch(state->aBlockIdx, state->pBlockIdxExp, tCmprBlockIdx, TD_EQ);
669
      if (!state->pBlockIdx) {
M
Minglei Jin 已提交
670 671
        tsdbDataFReaderClose(&state->pDataFReader);
        state->pDataFReader = NULL;
672 673 674
        goto _next_fileset;
      }

675 676 677 678
      if (state->blockMap.pData != NULL) {
        tMapDataClear(&state->blockMap);
      }

H
Hongze Cheng 已提交
679
      code = tsdbReadDataBlk(state->pDataFReader, state->pBlockIdx, &state->blockMap);
680 681 682 683
      if (code) goto _err;

      state->nBlock = state->blockMap.nItem;
      state->iBlock = state->nBlock - 1;
684

685 686 687
      if (!state->pBlockData) {
        state->pBlockData = &state->blockData;

H
Hongze Cheng 已提交
688
        tBlockDataCreate(&state->blockData);
689
      }
690 691 692
    }
    case SFSNEXTROW_BLOCKDATA:
      if (state->iBlock >= 0) {
H
Hongze Cheng 已提交
693
        SDataBlk block = {0};
694

H
Hongze Cheng 已提交
695
        tDataBlkReset(&block);
696 697
        // tBlockDataReset(&state->blockData);
        tBlockDataReset(state->pBlockData);
698

H
Hongze Cheng 已提交
699
        tMapDataGetItemByIdx(&state->blockMap, state->iBlock, &block, tGetDataBlk);
700
        /* code = tsdbReadBlockData(state->pDataFReader, &state->blockIdx, &block, &state->blockData, NULL, NULL); */
701
        tBlockDataReset(state->pBlockData);
H
Hongze Cheng 已提交
702 703
        TABLEID tid = {.suid = state->suid, .uid = state->uid};
        code = tBlockDataInit(state->pBlockData, &tid, state->pTSchema, NULL, 0);
704 705
        if (code) goto _err;

H
Hongze Cheng 已提交
706
        code = tsdbReadDataBlock(state->pDataFReader, &block, state->pBlockData);
707 708 709 710 711 712 713 714 715
        if (code) goto _err;

        state->nRow = state->blockData.nRow;
        state->iRow = state->nRow - 1;

        state->state = SFSNEXTROW_BLOCKROW;
      }
    case SFSNEXTROW_BLOCKROW:
      if (state->iRow >= 0) {
716
        state->row = tsdbRowFromBlockData(state->pBlockData, state->iRow);
717 718 719 720 721 722
        *ppRow = &state->row;

        if (--state->iRow < 0) {
          state->state = SFSNEXTROW_BLOCKDATA;
          if (--state->iBlock < 0) {
            tsdbDataFReaderClose(&state->pDataFReader);
723 724
            state->pDataFReader = NULL;

725 726
            if (state->aBlockIdx) {
              taosArrayDestroy(state->aBlockIdx);
727
              state->aBlockIdx = NULL;
728
            }
729

730 731 732 733 734 735 736 737 738 739 740 741
            state->state = SFSNEXTROW_FILESET;
          }
        }
      }

      return code;
    default:
      ASSERT(0);
      break;
  }

_err:
742 743
  if (state->pDataFReader) {
    tsdbDataFReaderClose(&state->pDataFReader);
744
    state->pDataFReader = NULL;
745 746 747
  }
  if (state->aBlockIdx) {
    taosArrayDestroy(state->aBlockIdx);
748 749 750
    state->aBlockIdx = NULL;
  }
  if (state->pBlockData) {
H
Hongze Cheng 已提交
751 752
    // tBlockDataDestroy(&state->blockData, 1);
    tBlockDataDestroy(state->pBlockData, 1);
753
    state->pBlockData = NULL;
754 755
  }

756
  *ppRow = NULL;
757

758 759 760
  return code;
}

761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777
int32_t clearNextRowFromFS(void *iter) {
  int32_t code = 0;

  SFSNextRowIter *state = (SFSNextRowIter *)iter;
  if (!state) {
    return code;
  }

  if (state->pDataFReader) {
    tsdbDataFReaderClose(&state->pDataFReader);
    state->pDataFReader = NULL;
  }
  if (state->aBlockIdx) {
    taosArrayDestroy(state->aBlockIdx);
    state->aBlockIdx = NULL;
  }
  if (state->pBlockData) {
H
Hongze Cheng 已提交
778 779
    // tBlockDataDestroy(&state->blockData, 1);
    tBlockDataDestroy(state->pBlockData, 1);
780 781 782
    state->pBlockData = NULL;
  }

783 784 785 786
  if (state->blockMap.pData != NULL) {
    tMapDataClear(&state->blockMap);
  }

787 788 789
  return code;
}

790 791 792 793 794 795 796 797 798
typedef enum SMEMNEXTROWSTATES {
  SMEMNEXTROW_ENTER,
  SMEMNEXTROW_NEXT,
} SMEMNEXTROWSTATES;

typedef struct SMemNextRowIter {
  SMEMNEXTROWSTATES state;
  STbData          *pMem;  // [input]
  STbDataIter       iter;  // mem buffer skip list iterator
799 800
  // bool              iterOpened;
  // TSDBROW          *curRow;
801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843
} SMemNextRowIter;

static int32_t getNextRowFromMem(void *iter, TSDBROW **ppRow) {
  SMemNextRowIter *state = (SMemNextRowIter *)iter;
  int32_t          code = 0;
  switch (state->state) {
    case SMEMNEXTROW_ENTER: {
      if (state->pMem != NULL) {
        tsdbTbDataIterOpen(state->pMem, NULL, 1, &state->iter);

        TSDBROW *pMemRow = tsdbTbDataIterGet(&state->iter);
        if (pMemRow) {
          *ppRow = pMemRow;
          state->state = SMEMNEXTROW_NEXT;

          return code;
        }
      }

      *ppRow = NULL;

      return code;
    }
    case SMEMNEXTROW_NEXT:
      if (tsdbTbDataIterNext(&state->iter)) {
        *ppRow = tsdbTbDataIterGet(&state->iter);

        return code;
      } else {
        *ppRow = NULL;

        return code;
      }
    default:
      ASSERT(0);
      break;
  }

_err:
  *ppRow = NULL;
  return code;
}

844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881
/* static int32_t tsRowFromTsdbRow(STSchema *pTSchema, TSDBROW *pRow, STSRow **ppRow) { */
/*   int32_t code = 0; */

/*   SColVal *pColVal = &(SColVal){0}; */

/*   if (pRow->type == 0) { */
/*     *ppRow = tdRowDup(pRow->pTSRow); */
/*   } else { */
/*     SArray *pArray = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal)); */
/*     if (pArray == NULL) { */
/*       code = TSDB_CODE_OUT_OF_MEMORY; */
/*       goto _exit; */
/*     } */

/*     TSDBKEY   key = TSDBROW_KEY(pRow); */
/*     STColumn *pTColumn = &pTSchema->columns[0]; */
/*     *pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.ts = key.ts}); */

/*     if (taosArrayPush(pArray, pColVal) == NULL) { */
/*       code = TSDB_CODE_OUT_OF_MEMORY; */
/*       goto _exit; */
/*     } */

/*     for (int16_t iCol = 1; iCol < pTSchema->numOfCols; iCol++) { */
/*       tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal); */
/*       if (taosArrayPush(pArray, pColVal) == NULL) { */
/*         code = TSDB_CODE_OUT_OF_MEMORY; */
/*         goto _exit; */
/*       } */
/*     } */

/*     code = tdSTSRowNew(pArray, pTSchema, ppRow); */
/*     if (code) goto _exit; */
/*   } */

/* _exit: */
/*   return code; */
/* } */
882

883
static bool tsdbKeyDeleted(TSDBKEY *key, SArray *pSkyline, int64_t *iSkyline) {
884 885 886 887 888 889 890 891
  bool deleted = false;
  while (*iSkyline > 0) {
    TSDBKEY *pItemBack = (TSDBKEY *)taosArrayGet(pSkyline, *iSkyline);
    TSDBKEY *pItemFront = (TSDBKEY *)taosArrayGet(pSkyline, *iSkyline - 1);

    if (key->ts > pItemBack->ts) {
      return false;
    } else if (key->ts >= pItemFront->ts && key->ts <= pItemBack->ts) {
892
      if (key->version <= pItemFront->version || (key->ts == pItemBack->ts && key->version <= pItemBack->version)) {
893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908
        return true;
      } else {
        return false;
      }
    } else {
      if (*iSkyline > 1) {
        --*iSkyline;
      } else {
        return false;
      }
    }
  }

  return deleted;
}

909
typedef int32_t (*_next_row_fn_t)(void *iter, TSDBROW **ppRow);
910
typedef int32_t (*_next_row_clear_fn_t)(void *iter);
911

912
typedef struct {
913 914 915 916 917 918
  TSDBROW             *pRow;
  bool                 stop;
  bool                 next;
  void                *iter;
  _next_row_fn_t       nextRowFn;
  _next_row_clear_fn_t nextRowClearFn;
919
} TsdbNextRowState;
920

921 922 923 924
typedef struct {
  SArray *pSkyline;
  int64_t iSkyline;

925 926 927 928 929 930
  SBlockIdx          idx;
  SMemNextRowIter    memState;
  SMemNextRowIter    imemState;
  SFSLastNextRowIter fsLastState;
  SFSNextRowIter     fsState;
  TSDBROW            memRow, imemRow, fsLastRow, fsRow;
931

932
  TsdbNextRowState input[4];
933 934
  STsdbReadSnap   *pReadSnap;
  STsdb           *pTsdb;
935 936
} CacheNextRowIter;

937
static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb, STSchema *pTSchema) {
938 939 940 941
  int code = 0;

  tb_uid_t suid = getTableSuidByUid(uid, pTsdb);

H
Haojun Liao 已提交
942
  tsdbTakeReadSnap(pTsdb, &pIter->pReadSnap, NULL);
H
Hongze Cheng 已提交
943

944
  STbData *pMem = NULL;
H
Hongze Cheng 已提交
945
  if (pIter->pReadSnap->pMem) {
H
Hongze Cheng 已提交
946
    pMem = tsdbGetTbDataFromMemTable(pIter->pReadSnap->pMem, suid, uid);
947 948 949
  }

  STbData *pIMem = NULL;
H
Hongze Cheng 已提交
950
  if (pIter->pReadSnap->pIMem) {
H
Hongze Cheng 已提交
951
    pIMem = tsdbGetTbDataFromMemTable(pIter->pReadSnap->pIMem, suid, uid);
952 953
  }

H
Hongze Cheng 已提交
954 955
  pIter->pTsdb = pTsdb;

956 957 958 959
  pIter->pSkyline = taosArrayInit(32, sizeof(TSDBKEY));

  SDelIdx delIdx;

H
Hongze Cheng 已提交
960
  SDelFile *pDelFile = pIter->pReadSnap->fs.pDelFile;
961 962 963
  if (pDelFile) {
    SDelFReader *pDelFReader;

H
Hongze Cheng 已提交
964
    code = tsdbDelFReaderOpen(&pDelFReader, pDelFile, pTsdb);
965 966 967
    if (code) goto _err;

    code = getTableDelIdx(pDelFReader, suid, uid, &delIdx);
968 969 970 971
    if (code) {
      tsdbDelFReaderClose(&pDelFReader);
      goto _err;
    }
972 973

    code = getTableDelSkyline(pMem, pIMem, pDelFReader, &delIdx, pIter->pSkyline);
974 975 976 977
    if (code) {
      tsdbDelFReaderClose(&pDelFReader);
      goto _err;
    }
978 979 980 981 982 983 984 985 986 987 988

    tsdbDelFReaderClose(&pDelFReader);
  } else {
    code = getTableDelSkyline(pMem, pIMem, NULL, NULL, pIter->pSkyline);
    if (code) goto _err;
  }

  pIter->iSkyline = taosArrayGetSize(pIter->pSkyline) - 1;

  pIter->idx = (SBlockIdx){.suid = suid, .uid = uid};

989
  pIter->fsLastState.state = (SFSLASTNEXTROWSTATES)SFSNEXTROW_FS;
990 991
  pIter->fsLastState.pTsdb = pTsdb;
  pIter->fsLastState.aDFileSet = pIter->pReadSnap->fs.aDFileSet;
992
  pIter->fsLastState.pTSchema = pTSchema;
993
  pIter->fsLastState.suid = suid;
994
  pIter->fsLastState.uid = uid;
995

996 997
  pIter->fsState.state = SFSNEXTROW_FS;
  pIter->fsState.pTsdb = pTsdb;
H
Hongze Cheng 已提交
998
  pIter->fsState.aDFileSet = pIter->pReadSnap->fs.aDFileSet;
999
  pIter->fsState.pBlockIdxExp = &pIter->idx;
1000 1001 1002
  pIter->fsState.pTSchema = pTSchema;
  pIter->fsState.suid = suid;
  pIter->fsState.uid = uid;
1003 1004 1005

  pIter->input[0] = (TsdbNextRowState){&pIter->memRow, true, false, &pIter->memState, getNextRowFromMem, NULL};
  pIter->input[1] = (TsdbNextRowState){&pIter->imemRow, true, false, &pIter->imemState, getNextRowFromMem, NULL};
1006 1007 1008
  pIter->input[2] = (TsdbNextRowState){&pIter->fsLastRow,     false, true, &pIter->fsLastState, getNextRowFromFSLast,
                                       clearNextRowFromFSLast};
  pIter->input[3] =
1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032
      (TsdbNextRowState){&pIter->fsRow, false, true, &pIter->fsState, getNextRowFromFS, clearNextRowFromFS};

  if (pMem) {
    pIter->memState.pMem = pMem;
    pIter->memState.state = SMEMNEXTROW_ENTER;
    pIter->input[0].stop = false;
    pIter->input[0].next = true;
  }

  if (pIMem) {
    pIter->imemState.pMem = pIMem;
    pIter->imemState.state = SMEMNEXTROW_ENTER;
    pIter->input[1].stop = false;
    pIter->input[1].next = true;
  }

  return code;
_err:
  return code;
}

static int32_t nextRowIterClose(CacheNextRowIter *pIter) {
  int code = 0;

1033
  for (int i = 0; i < 4; ++i) {
1034 1035 1036 1037 1038 1039 1040 1041 1042
    if (pIter->input[i].nextRowClearFn) {
      pIter->input[i].nextRowClearFn(pIter->input[i].iter);
    }
  }

  if (pIter->pSkyline) {
    taosArrayDestroy(pIter->pSkyline);
  }

H
Haojun Liao 已提交
1043
  tsdbUntakeReadSnap(pIter->pTsdb, pIter->pReadSnap, NULL);
H
Hongze Cheng 已提交
1044

1045 1046 1047 1048 1049 1050 1051 1052
_err:
  return code;
}

// iterate next row non deleted backward ts, version (from high to low)
static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow) {
  int code = 0;

1053
  for (int i = 0; i < 4; ++i) {
1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064
    if (pIter->input[i].next && !pIter->input[i].stop) {
      code = pIter->input[i].nextRowFn(pIter->input[i].iter, &pIter->input[i].pRow);
      if (code) goto _err;

      if (pIter->input[i].pRow == NULL) {
        pIter->input[i].stop = true;
        pIter->input[i].next = false;
      }
    }
  }

1065
  if (pIter->input[0].stop && pIter->input[1].stop && pIter->input[2].stop && pIter->input[3].stop) {
1066 1067 1068 1069
    *ppRow = NULL;
    return code;
  }

1070 1071 1072
  // select maxpoint(s) from mem, imem, fs and last
  TSDBROW *max[4] = {0};
  int      iMax[4] = {-1, -1, -1, -1};
1073 1074 1075
  int      nMax = 0;
  TSKEY    maxKey = TSKEY_MIN;

1076
  for (int i = 0; i < 4; ++i) {
1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093
    if (!pIter->input[i].stop && pIter->input[i].pRow != NULL) {
      TSDBKEY key = TSDBROW_KEY(pIter->input[i].pRow);

      // merging & deduplicating on client side
      if (maxKey <= key.ts) {
        if (maxKey < key.ts) {
          nMax = 0;
          maxKey = key.ts;
        }

        iMax[nMax] = i;
        max[nMax++] = pIter->input[i].pRow;
      }
    }
  }

  // delete detection
1094 1095
  TSDBROW *merge[4] = {0};
  int      iMerge[4] = {-1, -1, -1, -1};
1096 1097
  int      nMerge = 0;
  for (int i = 0; i < nMax; ++i) {
1098
    TSDBKEY maxKey1 = TSDBROW_KEY(max[i]);
1099

1100
    bool deleted = tsdbKeyDeleted(&maxKey1, pIter->pSkyline, &pIter->iSkyline);
1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121
    if (!deleted) {
      iMerge[nMerge] = iMax[i];
      merge[nMerge++] = max[i];
    }

    pIter->input[iMax[i]].next = deleted;
  }

  if (nMerge > 0) {
    pIter->input[iMerge[0]].next = true;

    *ppRow = merge[0];
  } else {
    *ppRow = NULL;
  }

  return code;
_err:
  return code;
}

1122
static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, SArray **ppColArray) {
1123 1124
  int32_t code = 0;

1125
  STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
1126 1127 1128 1129
  int16_t   nCol = pTSchema->numOfCols;
  int16_t   iCol = 0;
  int16_t   noneCol = 0;
  bool      setNoneCol = false;
M
Minglei Jin 已提交
1130
  SArray   *pColArray = taosArrayInit(nCol, sizeof(SLastCol));
1131 1132 1133 1134 1135
  SColVal  *pColVal = &(SColVal){0};

  TSKEY lastRowTs = TSKEY_MAX;

  CacheNextRowIter iter = {0};
1136
  nextRowIterOpen(&iter, uid, pTsdb, pTSchema);
1137 1138 1139 1140 1141 1142 1143 1144 1145

  do {
    TSDBROW *pRow = NULL;
    nextRowIterGet(&iter, &pRow);

    if (!pRow) {
      break;
    }

1146 1147
    TSKEY rowTs = TSDBROW_TS(pRow);

1148
    if (lastRowTs == TSKEY_MAX) {
1149
      lastRowTs = rowTs;
1150 1151
      STColumn *pTColumn = &pTSchema->columns[0];

H
Hongze Cheng 已提交
1152
      *pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = lastRowTs});
1153
      if (taosArrayPush(pColArray, &(SLastCol){.ts = lastRowTs, .colVal = *pColVal}) == NULL) {
M
Minglei Jin 已提交
1154
        terrno = TSDB_CODE_OUT_OF_MEMORY;
1155 1156 1157 1158 1159 1160 1161
        code = TSDB_CODE_OUT_OF_MEMORY;
        goto _err;
      }

      for (iCol = 1; iCol < nCol; ++iCol) {
        tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal);

M
Minglei Jin 已提交
1162 1163 1164
        SLastCol lastCol = {.ts = lastRowTs, .colVal = *pColVal};
        if (IS_VAR_DATA_TYPE(pColVal->type)) {
          lastCol.colVal.value.pData = taosMemoryMalloc(lastCol.colVal.value.nData);
M
Minglei Jin 已提交
1165 1166 1167 1168 1169
          if (lastCol.colVal.value.pData == NULL) {
            terrno = TSDB_CODE_OUT_OF_MEMORY;
            code = TSDB_CODE_OUT_OF_MEMORY;
            goto _err;
          }
M
Minglei Jin 已提交
1170 1171
          memcpy(lastCol.colVal.value.pData, pColVal->value.pData, pColVal->value.nData);
        }
1172

M
Minglei Jin 已提交
1173
        if (taosArrayPush(pColArray, &lastCol) == NULL) {
1174 1175 1176 1177
          code = TSDB_CODE_OUT_OF_MEMORY;
          goto _err;
        }

H
Hongze Cheng 已提交
1178
        if (COL_VAL_IS_NONE(pColVal) && !setNoneCol) {
1179 1180 1181 1182 1183
          noneCol = iCol;
          setNoneCol = true;
        }
      }
      if (!setNoneCol) {
1184
        // done, goto return pColArray
1185 1186 1187 1188 1189 1190
        break;
      } else {
        continue;
      }
    }

1191 1192
    if ((rowTs < lastRowTs)) {
      // done, goto return pColArray
1193 1194 1195 1196 1197 1198 1199 1200 1201 1202
      break;
    }

    // merge into pColArray
    setNoneCol = false;
    for (iCol = noneCol; iCol < nCol; ++iCol) {
      // high version's column value
      SColVal *tColVal = (SColVal *)taosArrayGet(pColArray, iCol);

      tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal);
H
Hongze Cheng 已提交
1203
      if (COL_VAL_IS_NONE(tColVal) && !COL_VAL_IS_NONE(pColVal)) {
1204 1205 1206 1207 1208 1209
        SLastCol lastCol = {.ts = rowTs, .colVal = *pColVal};
        if (IS_VAR_DATA_TYPE(pColVal->type)) {
          SLastCol *pLastCol = (SLastCol *)taosArrayGet(pColArray, iCol);
          taosMemoryFree(pLastCol->colVal.value.pData);

          lastCol.colVal.value.pData = taosMemoryMalloc(lastCol.colVal.value.nData);
M
Minglei Jin 已提交
1210 1211 1212 1213 1214
          if (lastCol.colVal.value.pData == NULL) {
            terrno = TSDB_CODE_OUT_OF_MEMORY;
            code = TSDB_CODE_OUT_OF_MEMORY;
            goto _err;
          }
1215 1216 1217 1218
          memcpy(lastCol.colVal.value.pData, pColVal->value.pData, pColVal->value.nData);
        }

        taosArraySet(pColArray, iCol, &lastCol);
H
Hongze Cheng 已提交
1219
      } else if (COL_VAL_IS_NONE(tColVal) && COL_VAL_IS_NONE(pColVal) && !setNoneCol) {
1220 1221 1222 1223 1224 1225 1226 1227
        noneCol = iCol;
        setNoneCol = true;
      }
    }
  } while (setNoneCol);

  // build the result ts row here
  *dup = false;
M
Minglei Jin 已提交
1228
  if (taosArrayGetSize(pColArray) != nCol) {
1229 1230
    *ppColArray = NULL;
    taosArrayDestroy(pColArray);
1231
  } else {
1232
    *ppColArray = pColArray;
1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245
  }

  nextRowIterClose(&iter);
  taosMemoryFreeClear(pTSchema);
  return code;

_err:
  nextRowIterClose(&iter);
  taosArrayDestroy(pColArray);
  taosMemoryFreeClear(pTSchema);
  return code;
}

1246
static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray) {
1247 1248
  int32_t code = 0;

1249
  STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
1250 1251 1252 1253 1254 1255 1256 1257 1258 1259
  int16_t   nCol = pTSchema->numOfCols;
  int16_t   iCol = 0;
  int16_t   noneCol = 0;
  bool      setNoneCol = false;
  SArray   *pColArray = taosArrayInit(nCol, sizeof(SLastCol));
  SColVal  *pColVal = &(SColVal){0};

  TSKEY lastRowTs = TSKEY_MAX;

  CacheNextRowIter iter = {0};
1260
  nextRowIterOpen(&iter, uid, pTsdb, pTSchema);
1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275

  do {
    TSDBROW *pRow = NULL;
    nextRowIterGet(&iter, &pRow);

    if (!pRow) {
      break;
    }

    TSKEY rowTs = TSDBROW_TS(pRow);

    if (lastRowTs == TSKEY_MAX) {
      lastRowTs = rowTs;
      STColumn *pTColumn = &pTSchema->columns[0];

H
Hongze Cheng 已提交
1276
      *pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = lastRowTs});
1277 1278 1279 1280 1281 1282 1283 1284
      if (taosArrayPush(pColArray, &(SLastCol){.ts = lastRowTs, .colVal = *pColVal}) == NULL) {
        code = TSDB_CODE_OUT_OF_MEMORY;
        goto _err;
      }

      for (iCol = 1; iCol < nCol; ++iCol) {
        tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal);

1285 1286 1287
        SLastCol lastCol = {.ts = lastRowTs, .colVal = *pColVal};
        if (IS_VAR_DATA_TYPE(pColVal->type)) {
          lastCol.colVal.value.pData = taosMemoryMalloc(lastCol.colVal.value.nData);
M
Minglei Jin 已提交
1288 1289 1290 1291 1292
          if (lastCol.colVal.value.pData == NULL) {
            terrno = TSDB_CODE_OUT_OF_MEMORY;
            code = TSDB_CODE_OUT_OF_MEMORY;
            goto _err;
          }
1293 1294 1295 1296
          memcpy(lastCol.colVal.value.pData, pColVal->value.pData, pColVal->value.nData);
        }

        if (taosArrayPush(pColArray, &lastCol) == NULL) {
1297 1298 1299 1300
          code = TSDB_CODE_OUT_OF_MEMORY;
          goto _err;
        }

H
Hongze Cheng 已提交
1301
        if (!COL_VAL_IS_VALUE(pColVal) && !setNoneCol) {
1302 1303 1304 1305 1306
          noneCol = iCol;
          setNoneCol = true;
        }
      }
      if (!setNoneCol) {
1307
        // done, goto return pColArray
1308 1309 1310 1311 1312
        break;
      } else {
        continue;
      }
    }
1313

1314 1315 1316 1317
    // merge into pColArray
    setNoneCol = false;
    for (iCol = noneCol; iCol < nCol; ++iCol) {
      // high version's column value
1318 1319
      SLastCol *lastColVal = (SLastCol *)taosArrayGet(pColArray, iCol);
      SColVal  *tColVal = &lastColVal->colVal;
1320 1321

      tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal);
H
Hongze Cheng 已提交
1322
      if (!COL_VAL_IS_VALUE(tColVal) && COL_VAL_IS_VALUE(pColVal)) {
1323 1324 1325 1326 1327 1328
        SLastCol lastCol = {.ts = rowTs, .colVal = *pColVal};
        if (IS_VAR_DATA_TYPE(pColVal->type)) {
          SLastCol *pLastCol = (SLastCol *)taosArrayGet(pColArray, iCol);
          taosMemoryFree(pLastCol->colVal.value.pData);

          lastCol.colVal.value.pData = taosMemoryMalloc(lastCol.colVal.value.nData);
M
Minglei Jin 已提交
1329 1330 1331 1332 1333
          if (lastCol.colVal.value.pData == NULL) {
            terrno = TSDB_CODE_OUT_OF_MEMORY;
            code = TSDB_CODE_OUT_OF_MEMORY;
            goto _err;
          }
1334 1335 1336 1337
          memcpy(lastCol.colVal.value.pData, pColVal->value.pData, pColVal->value.nData);
        }

        taosArraySet(pColArray, iCol, &lastCol);
H
Hongze Cheng 已提交
1338
      } else if (!COL_VAL_IS_VALUE(tColVal) && !COL_VAL_IS_VALUE(pColVal) && !setNoneCol) {
1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358
        noneCol = iCol;
        setNoneCol = true;
      }
    }
  } while (setNoneCol);

  if (taosArrayGetSize(pColArray) <= 0) {
    *ppLastArray = NULL;
    taosArrayDestroy(pColArray);
  } else {
    *ppLastArray = pColArray;
  }

  nextRowIterClose(&iter);
  taosMemoryFreeClear(pTSchema);
  return code;

_err:
  nextRowIterClose(&iter);
  taosMemoryFreeClear(pTSchema);
1359 1360
  *ppLastArray = NULL;
  taosArrayDestroy(pColArray);
1361 1362 1363
  return code;
}

1364 1365 1366 1367 1368
int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **handle) {
  int32_t code = 0;
  char    key[32] = {0};
  int     keyLen = 0;

1369
  //  getTableCacheKeyS(uid, "lr", key, &keyLen);
1370
  getTableCacheKey(uid, 0, key, &keyLen);
1371
  LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
1372 1373 1374 1375 1376
  if (!h) {
    taosThreadMutexLock(&pTsdb->lruMutex);

    h = taosLRUCacheLookup(pCache, key, keyLen);
    if (!h) {
1377
      SArray *pArray = NULL;
1378
      bool    dup = false;  // which is always false for now
1379
      code = mergeLastRow(uid, pTsdb, &dup, &pArray);
1380
      // if table's empty or error, return code of -1
1381 1382 1383
      if (code < 0 || pArray == NULL) {
        if (!dup && pArray) {
          taosArrayDestroy(pArray);
1384 1385
        }

M
Minglei Jin 已提交
1386 1387
        taosThreadMutexUnlock(&pTsdb->lruMutex);

1388
        *handle = NULL;
M
Minglei Jin 已提交
1389

1390
        return 0;
1391 1392
      }

M
Minglei Jin 已提交
1393
      size_t              charge = pArray->capacity * pArray->elemSize + sizeof(*pArray);
1394
      _taos_lru_deleter_t deleter = deleteTableCacheLast;
M
Minglei Jin 已提交
1395
      LRUStatus status = taosLRUCacheInsert(pCache, key, keyLen, pArray, charge, deleter, NULL, TAOS_LRU_PRIORITY_LOW);
1396 1397 1398
      if (status != TAOS_LRU_STATUS_OK) {
        code = -1;
      }
1399

1400
      taosThreadMutexUnlock(&pTsdb->lruMutex);
1401

M
Minglei Jin 已提交
1402
      h = taosLRUCacheLookup(pCache, key, keyLen);
1403 1404
    } else {
      taosThreadMutexUnlock(&pTsdb->lruMutex);
1405
    }
1406 1407 1408 1409 1410 1411 1412
  }

  *handle = h;

  return code;
}

1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436
int32_t tsdbCacheLastArray2Row(SArray *pLastArray, STSRow **ppRow, STSchema *pTSchema) {
  int32_t code = 0;
  int16_t nCol = taosArrayGetSize(pLastArray);
  SArray *pColArray = taosArrayInit(nCol, sizeof(SColVal));

  for (int16_t iCol = 0; iCol < nCol; ++iCol) {
    SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLastArray, iCol);
    SColVal  *tColVal = &tTsVal->colVal;
    taosArrayPush(pColArray, tColVal);
  }

  code = tdSTSRowNew(pColArray, pTSchema, ppRow);
  if (code) goto _err;

  taosArrayDestroy(pColArray);

  return code;

_err:
  taosArrayDestroy(pColArray);

  return code;
}

1437 1438 1439 1440 1441
int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **handle) {
  int32_t code = 0;
  char    key[32] = {0};
  int     keyLen = 0;

1442
  // getTableCacheKeyS(uid, "l", key, &keyLen);
1443
  getTableCacheKey(uid, 1, key, &keyLen);
1444
  LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
1445 1446
  if (!h) {
    taosThreadMutexLock(&pTsdb->lruMutex);
1447

1448
    h = taosLRUCacheLookup(pCache, key, keyLen);
1449 1450 1451 1452 1453
    if (!h) {
      SArray *pLastArray = NULL;
      code = mergeLast(uid, pTsdb, &pLastArray);
      // if table's empty or error, return code of -1
      if (code < 0 || pLastArray == NULL) {
1454 1455
        taosThreadMutexUnlock(&pTsdb->lruMutex);

1456 1457 1458 1459
        *handle = NULL;
        return 0;
      }

M
Minglei Jin 已提交
1460
      size_t              charge = pLastArray->capacity * pLastArray->elemSize + sizeof(*pLastArray);
1461
      _taos_lru_deleter_t deleter = deleteTableCacheLast;
M
Minglei Jin 已提交
1462 1463
      LRUStatus           status =
          taosLRUCacheInsert(pCache, key, keyLen, pLastArray, charge, deleter, NULL, TAOS_LRU_PRIORITY_LOW);
1464 1465 1466 1467 1468 1469 1470 1471 1472 1473
      if (status != TAOS_LRU_STATUS_OK) {
        code = -1;
      }

      taosThreadMutexUnlock(&pTsdb->lruMutex);

      h = taosLRUCacheLookup(pCache, key, keyLen);
    } else {
      taosThreadMutexUnlock(&pTsdb->lruMutex);
    }
1474 1475 1476
  }

  *handle = h;
M
Minglei Jin 已提交
1477

1478 1479 1480
  return code;
}

M
Minglei Jin 已提交
1481 1482 1483 1484 1485 1486 1487
int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h) {
  int32_t code = 0;

  taosLRUCacheRelease(pCache, h, false);

  return code;
}
1488 1489 1490 1491 1492 1493

void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity) {
  taosLRUCacheSetCapacity(pVnode->pTsdb->lruCache, capacity);
}

size_t tsdbCacheGetCapacity(SVnode *pVnode) { return taosLRUCacheGetCapacity(pVnode->pTsdb->lruCache); }
1494 1495

size_t tsdbCacheGetUsage(SVnode *pVnode) { return taosLRUCacheGetUsage(pVnode->pTsdb->lruCache); }