tsdbCache.c 56.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tsdb.h"

18 19
static int32_t tsdbOpenBICache(STsdb *pTsdb) {
  int32_t    code = 0;
20
  SLRUCache *pCache = taosLRUCacheInit(10 * 1024 * 1024, 0, .5);
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
  if (pCache == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

  taosLRUCacheSetStrictCapacity(pCache, false);

  taosThreadMutexInit(&pTsdb->biMutex, NULL);

_err:
  pTsdb->biCache = pCache;
  return code;
}

static void tsdbCloseBICache(STsdb *pTsdb) {
  SLRUCache *pCache = pTsdb->biCache;
  if (pCache) {
38 39
    int32_t elems = taosLRUCacheGetElems(pCache);
    tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems);
40
    taosLRUCacheEraseUnrefEntries(pCache);
41 42
    elems = taosLRUCacheGetElems(pCache);
    tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems);
43 44 45 46 47 48 49

    taosLRUCacheCleanup(pCache);

    taosThreadMutexDestroy(&pTsdb->biMutex);
  }
}

50
int32_t tsdbOpenCache(STsdb *pTsdb) {
H
Hongze Cheng 已提交
51
  int32_t    code = 0;
52
  SLRUCache *pCache = NULL;
53
  size_t     cfgCapacity = pTsdb->pVnode->config.cacheLastSize * 1024 * 1024;
54

55
  pCache = taosLRUCacheInit(cfgCapacity, 1, .5);
56 57 58 59 60
  if (pCache == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

61 62 63 64 65 66
  code = tsdbOpenBICache(pTsdb);
  if (code != TSDB_CODE_SUCCESS) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

67
  taosLRUCacheSetStrictCapacity(pCache, false);
68

69 70
  taosThreadMutexInit(&pTsdb->lruMutex, NULL);

71 72 73 74 75
_err:
  pTsdb->lruCache = pCache;
  return code;
}

76 77
void tsdbCloseCache(STsdb *pTsdb) {
  SLRUCache *pCache = pTsdb->lruCache;
78 79 80 81
  if (pCache) {
    taosLRUCacheEraseUnrefEntries(pCache);

    taosLRUCacheCleanup(pCache);
82 83

    taosThreadMutexDestroy(&pTsdb->lruMutex);
84
  }
85 86

  tsdbCloseBICache(pTsdb);
87 88
}

89 90 91 92 93 94 95 96 97 98
static void getTableCacheKey(tb_uid_t uid, int cacheType, char *key, int *len) {
  if (cacheType == 0) {  // last_row
    *(uint64_t *)key = (uint64_t)uid;
  } else {  // last
    *(uint64_t *)key = ((uint64_t)uid) | 0x8000000000000000;
  }

  *len = sizeof(uint64_t);
}

99 100 101 102 103
static void deleteTableCacheLast(const void *key, size_t keyLen, void *value) {
  SArray *pLastArray = (SArray *)value;
  int16_t nCol = taosArrayGetSize(pLastArray);
  for (int16_t iCol = 0; iCol < nCol; ++iCol) {
    SLastCol *pLastCol = (SLastCol *)taosArrayGet(pLastArray, iCol);
104
    if (IS_VAR_DATA_TYPE(pLastCol->colVal.type) && pLastCol->colVal.value.nData > 0) {
105 106 107 108 109 110
      taosMemoryFree(pLastCol->colVal.value.pData);
    }
  }

  taosArrayDestroy(value);
}
111

112
int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) {
113
  int32_t code = 0;
114 115 116

  char key[32] = {0};
  int  keyLen = 0;
117

118 119
  // getTableCacheKey(uid, "lr", key, &keyLen);
  getTableCacheKey(uid, 0, key, &keyLen);
120 121
  LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
  if (h) {
122 123 124 125 126 127 128 129 130 131 132 133
    SArray *pLast = (SArray *)taosLRUCacheValue(pCache, h);
    bool    invalidate = false;
    int16_t nCol = taosArrayGetSize(pLast);

    for (int16_t iCol = 0; iCol < nCol; ++iCol) {
      SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol);
      if (eKey >= tTsVal->ts) {
        invalidate = true;
        break;
      }
    }

134
    taosLRUCacheRelease(pCache, h, invalidate);
135
    if (invalidate) {
136
      taosLRUCacheErase(pCache, key, keyLen);
137 138 139 140 141 142
    }
  }

  return code;
}

143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164
int32_t tsdbCacheDeleteLast(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) {
  int32_t code = 0;

  char key[32] = {0};
  int  keyLen = 0;

  // getTableCacheKey(uid, "l", key, &keyLen);
  getTableCacheKey(uid, 1, key, &keyLen);
  LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
  if (h) {
    SArray *pLast = (SArray *)taosLRUCacheValue(pCache, h);
    bool    invalidate = false;
    int16_t nCol = taosArrayGetSize(pLast);

    for (int16_t iCol = 0; iCol < nCol; ++iCol) {
      SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol);
      if (eKey >= tTsVal->ts) {
        invalidate = true;
        break;
      }
    }

165
    taosLRUCacheRelease(pCache, h, invalidate);
166
    if (invalidate) {
167
      taosLRUCacheErase(pCache, key, keyLen);
168 169 170 171 172 173 174
    }
  }

  return code;
}

int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) {
175 176 177 178
  int32_t code = 0;
  char    key[32] = {0};
  int     keyLen = 0;

179 180 181 182
  // getTableCacheKey(uid, "lr", key, &keyLen);
  getTableCacheKey(uid, 0, key, &keyLen);
  LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
  if (h) {
183 184 185 186 187 188 189 190 191 192 193 194 195
    SArray *pLast = (SArray *)taosLRUCacheValue(pCache, h);
    bool    invalidate = false;
    int16_t nCol = taosArrayGetSize(pLast);

    for (int16_t iCol = 0; iCol < nCol; ++iCol) {
      SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol);
      if (eKey >= tTsVal->ts) {
        invalidate = true;
        break;
      }
    }

    if (invalidate) {
196 197 198 199 200 201
      taosLRUCacheRelease(pCache, h, true);
    } else {
      taosLRUCacheRelease(pCache, h, false);
    }
  }

202 203
  // getTableCacheKey(uid, "l", key, &keyLen);
  getTableCacheKey(uid, 1, key, &keyLen);
204
  h = taosLRUCacheLookup(pCache, key, keyLen);
205
  if (h) {
206 207 208
    SArray *pLast = (SArray *)taosLRUCacheValue(pCache, h);
    bool    invalidate = false;
    int16_t nCol = taosArrayGetSize(pLast);
209

210 211 212 213 214 215 216 217 218 219 220 221 222
    for (int16_t iCol = 0; iCol < nCol; ++iCol) {
      SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol);
      if (eKey >= tTsVal->ts) {
        invalidate = true;
        break;
      }
    }

    if (invalidate) {
      taosLRUCacheRelease(pCache, h, true);
    } else {
      taosLRUCacheRelease(pCache, h, false);
    }
223 224 225 226 227 228
    // void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen);
  }

  return code;
}

H
Hongze Cheng 已提交
229
int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, TSDBROW *row, bool dup) {
230 231
  int32_t code = 0;
  STSRow *cacheRow = NULL;
H
Hongze Cheng 已提交
232 233
  char    key[32] = {0};
  int     keyLen = 0;
234

235 236
  // getTableCacheKey(uid, "lr", key, &keyLen);
  getTableCacheKey(uid, 0, key, &keyLen);
237 238
  LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
  if (h) {
239
    STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
H
Hongze Cheng 已提交
240
    TSKEY     keyTs = TSDBROW_TS(row);
241 242 243 244 245 246
    bool      invalidate = false;

    SArray *pLast = (SArray *)taosLRUCacheValue(pCache, h);
    int16_t nCol = taosArrayGetSize(pLast);
    int16_t iCol = 0;

247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281
    if (nCol <= 0) {
      nCol = pTSchema->numOfCols;

      STColumn *pTColumn = &pTSchema->columns[0];
      SColVal   tColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = keyTs});
      if (taosArrayPush(pLast, &(SLastCol){.ts = keyTs, .colVal = tColVal}) == NULL) {
        terrno = TSDB_CODE_OUT_OF_MEMORY;
        code = TSDB_CODE_OUT_OF_MEMORY;
        goto _invalidate;
      }

      for (iCol = 1; iCol < nCol; ++iCol) {
        SColVal colVal = {0};
        tsdbRowGetColVal(row, pTSchema, iCol, &colVal);

        SLastCol lastCol = {.ts = keyTs, .colVal = colVal};
        if (IS_VAR_DATA_TYPE(colVal.type) && colVal.value.nData > 0) {
          lastCol.colVal.value.pData = taosMemoryMalloc(colVal.value.nData);
          if (lastCol.colVal.value.pData == NULL) {
            terrno = TSDB_CODE_OUT_OF_MEMORY;
            code = TSDB_CODE_OUT_OF_MEMORY;
            goto _invalidate;
          }
          memcpy(lastCol.colVal.value.pData, colVal.value.pData, colVal.value.nData);
        }

        if (taosArrayPush(pLast, &lastCol) == NULL) {
          code = TSDB_CODE_OUT_OF_MEMORY;
          goto _invalidate;
        }
      }

      goto _invalidate;
    }

282 283 284 285 286
    if (nCol != pTSchema->numOfCols) {
      invalidate = true;
      goto _invalidate;
    }

287 288 289 290 291 292 293 294 295 296 297 298 299 300
    SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol);
    if (keyTs > tTsVal->ts) {
      STColumn *pTColumn = &pTSchema->columns[0];
      SColVal   tColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = keyTs});

      taosArraySet(pLast, iCol, &(SLastCol){.ts = keyTs, .colVal = tColVal});
    }

    for (++iCol; iCol < nCol; ++iCol) {
      SLastCol *tTsVal1 = (SLastCol *)taosArrayGet(pLast, iCol);
      if (keyTs >= tTsVal1->ts) {
        SColVal *tColVal = &tTsVal1->colVal;

        SColVal colVal = {0};
H
Hongze Cheng 已提交
301
        tsdbRowGetColVal(row, pTSchema, iCol, &colVal);
302 303 304 305 306 307

        if (colVal.cid != tColVal->cid) {
          invalidate = true;
          goto _invalidate;
        }

308 309 310 311 312
        if (!COL_VAL_IS_NONE(&colVal)) {
          if (keyTs == tTsVal1->ts && !COL_VAL_IS_NONE(tColVal)) {
            invalidate = true;

            break;
313 314 315 316
          } else {  // new inserting key is greater than cached, update cached entry
            SLastCol lastCol = {.ts = keyTs, .colVal = colVal};
            if (IS_VAR_DATA_TYPE(colVal.type) && colVal.value.nData > 0) {
              SLastCol *pLastCol = (SLastCol *)taosArrayGet(pLast, iCol);
317 318
              if (pLastCol->colVal.value.nData > 0 && NULL != pLastCol->colVal.value.pData)
                taosMemoryFree(pLastCol->colVal.value.pData);
319 320 321 322 323 324 325 326

              lastCol.colVal.value.pData = taosMemoryMalloc(colVal.value.nData);
              if (lastCol.colVal.value.pData == NULL) {
                terrno = TSDB_CODE_OUT_OF_MEMORY;
                code = TSDB_CODE_OUT_OF_MEMORY;
                goto _invalidate;
              }
              memcpy(lastCol.colVal.value.pData, colVal.value.pData, colVal.value.nData);
327 328
            }

329 330
            taosArraySet(pLast, iCol, &lastCol);
          }
331 332 333 334 335 336 337 338
        }
      }
    }

  _invalidate:
    taosMemoryFreeClear(pTSchema);

    taosLRUCacheRelease(pCache, h, invalidate);
339 340
    if (invalidate) {
      taosLRUCacheErase(pCache, key, keyLen);
341
    }
342
  }
343 344 345 346

  return code;
}

H
Hongze Cheng 已提交
347
int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, TSDBROW *row, STsdb *pTsdb) {
348 349 350 351 352
  int32_t code = 0;
  STSRow *cacheRow = NULL;
  char    key[32] = {0};
  int     keyLen = 0;

353 354
  // getTableCacheKey(uid, "l", key, &keyLen);
  getTableCacheKey(uid, 1, key, &keyLen);
355 356
  LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
  if (h) {
357
    STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
H
Hongze Cheng 已提交
358
    TSKEY     keyTs = TSDBROW_TS(row);
359 360 361 362 363 364
    bool      invalidate = false;

    SArray *pLast = (SArray *)taosLRUCacheValue(pCache, h);
    int16_t nCol = taosArrayGetSize(pLast);
    int16_t iCol = 0;

365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399
    if (nCol <= 0) {
      nCol = pTSchema->numOfCols;

      STColumn *pTColumn = &pTSchema->columns[0];
      SColVal   tColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = keyTs});
      if (taosArrayPush(pLast, &(SLastCol){.ts = keyTs, .colVal = tColVal}) == NULL) {
        terrno = TSDB_CODE_OUT_OF_MEMORY;
        code = TSDB_CODE_OUT_OF_MEMORY;
        goto _invalidate;
      }

      for (iCol = 1; iCol < nCol; ++iCol) {
        SColVal colVal = {0};
        tsdbRowGetColVal(row, pTSchema, iCol, &colVal);

        SLastCol lastCol = {.ts = keyTs, .colVal = colVal};
        if (IS_VAR_DATA_TYPE(colVal.type) && colVal.value.nData > 0) {
          lastCol.colVal.value.pData = taosMemoryMalloc(colVal.value.nData);
          if (lastCol.colVal.value.pData == NULL) {
            terrno = TSDB_CODE_OUT_OF_MEMORY;
            code = TSDB_CODE_OUT_OF_MEMORY;
            goto _invalidate;
          }
          memcpy(lastCol.colVal.value.pData, colVal.value.pData, colVal.value.nData);
        }

        if (taosArrayPush(pLast, &lastCol) == NULL) {
          code = TSDB_CODE_OUT_OF_MEMORY;
          goto _invalidate;
        }
      }

      goto _invalidate;
    }

400 401 402 403 404
    if (nCol != pTSchema->numOfCols) {
      invalidate = true;
      goto _invalidate;
    }

405 406 407
    SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol);
    if (keyTs > tTsVal->ts) {
      STColumn *pTColumn = &pTSchema->columns[0];
H
Hongze Cheng 已提交
408
      SColVal   tColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = keyTs});
409 410 411 412 413

      taosArraySet(pLast, iCol, &(SLastCol){.ts = keyTs, .colVal = tColVal});
    }

    for (++iCol; iCol < nCol; ++iCol) {
414 415 416
      SLastCol *tTsVal1 = (SLastCol *)taosArrayGet(pLast, iCol);
      if (keyTs >= tTsVal1->ts) {
        SColVal *tColVal = &tTsVal1->colVal;
417 418

        SColVal colVal = {0};
H
Hongze Cheng 已提交
419
        tsdbRowGetColVal(row, pTSchema, iCol, &colVal);
420 421 422 423 424 425

        if (colVal.cid != tColVal->cid) {
          invalidate = true;
          goto _invalidate;
        }

426
        if (COL_VAL_IS_VALUE(&colVal)) {
H
Hongze Cheng 已提交
427
          if (keyTs == tTsVal1->ts && COL_VAL_IS_VALUE(tColVal)) {
428 429 430
            invalidate = true;

            break;
431 432 433 434
          } else {
            SLastCol lastCol = {.ts = keyTs, .colVal = colVal};
            if (IS_VAR_DATA_TYPE(colVal.type) && colVal.value.nData > 0) {
              SLastCol *pLastCol = (SLastCol *)taosArrayGet(pLast, iCol);
435 436
              if (pLastCol->colVal.value.nData > 0 && NULL != pLastCol->colVal.value.pData)
                taosMemoryFree(pLastCol->colVal.value.pData);
437 438 439 440 441 442 443 444

              lastCol.colVal.value.pData = taosMemoryMalloc(colVal.value.nData);
              if (lastCol.colVal.value.pData == NULL) {
                terrno = TSDB_CODE_OUT_OF_MEMORY;
                code = TSDB_CODE_OUT_OF_MEMORY;
                goto _invalidate;
              }
              memcpy(lastCol.colVal.value.pData, colVal.value.pData, colVal.value.nData);
445 446
            }

447 448
            taosArraySet(pLast, iCol, &lastCol);
          }
449 450 451 452
        }
      }
    }

453
  _invalidate:
454 455 456
    taosMemoryFreeClear(pTSchema);

    taosLRUCacheRelease(pCache, h, invalidate);
457 458 459
    if (invalidate) {
      taosLRUCacheErase(pCache, key, keyLen);
    }
460 461 462 463 464
  }

  return code;
}

465 466 467 468 469
static tb_uid_t getTableSuidByUid(tb_uid_t uid, STsdb *pTsdb) {
  tb_uid_t suid = 0;

  SMetaReader mr = {0};
  metaReaderInit(&mr, pTsdb->pVnode->pMeta, 0);
H
Haojun Liao 已提交
470
  if (metaGetTableEntryByUidCache(&mr, uid) < 0) {
H
Hongze Cheng 已提交
471
    metaReaderClear(&mr);  // table not esist
472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491
    return 0;
  }

  if (mr.me.type == TSDB_CHILD_TABLE) {
    suid = mr.me.ctbEntry.suid;
  } else if (mr.me.type == TSDB_NORMAL_TABLE) {
    suid = 0;
  } else {
    suid = 0;
  }

  metaReaderClear(&mr);

  return suid;
}

static int32_t getTableDelDataFromDelIdx(SDelFReader *pDelReader, SDelIdx *pDelIdx, SArray *aDelData) {
  int32_t code = 0;

  if (pDelIdx) {
H
Hongze Cheng 已提交
492
    code = tsdbReadDelData(pDelReader, pDelIdx, aDelData);
493 494 495 496 497 498
  }

  return code;
}

static int32_t getTableDelDataFromTbData(STbData *pTbData, SArray *aDelData) {
H
Hongze Cheng 已提交
499
  int32_t   code = 0;
500 501 502 503 504 505 506 507 508
  SDelData *pDelData = pTbData ? pTbData->pHead : NULL;

  for (; pDelData; pDelData = pDelData->pNext) {
    taosArrayPush(aDelData, pDelData);
  }

  return code;
}

H
Hongze Cheng 已提交
509 510
static int32_t getTableDelData(STbData *pMem, STbData *pIMem, SDelFReader *pDelReader, SDelIdx *pDelIdx,
                               SArray *aDelData) {
511 512
  int32_t code = 0;

513 514 515 516 517
  if (pDelIdx) {
    code = getTableDelDataFromDelIdx(pDelReader, pDelIdx, aDelData);
    if (code) goto _err;
  }

518 519 520 521 522 523 524 525 526 527 528 529 530 531
  if (pMem) {
    code = getTableDelDataFromTbData(pMem, aDelData);
    if (code) goto _err;
  }

  if (pIMem) {
    code = getTableDelDataFromTbData(pIMem, aDelData);
    if (code) goto _err;
  }

_err:
  return code;
}

H
Hongze Cheng 已提交
532 533
static int32_t getTableDelSkyline(STbData *pMem, STbData *pIMem, SDelFReader *pDelReader, SDelIdx *pDelIdx,
                                  SArray *aSkyline) {
534
  int32_t code = 0;
M
Minglei Jin 已提交
535
  SArray *aDelData = NULL;
536

M
Minglei Jin 已提交
537
  aDelData = taosArrayInit(32, sizeof(SDelData));
538 539 540 541
  code = getTableDelData(pMem, pIMem, pDelReader, pDelIdx, aDelData);
  if (code) goto _err;

  size_t nDelData = taosArrayGetSize(aDelData);
542 543 544 545
  if (nDelData > 0) {
    code = tsdbBuildDeleteSkyline(aDelData, 0, (int32_t)(nDelData - 1), aSkyline);
    if (code) goto _err;
  }
546

547
_err:
M
Minglei Jin 已提交
548 549 550
  if (aDelData) {
    taosArrayDestroy(aDelData);
  }
551 552
  return code;
}
S
stephenkgu 已提交
553
/*
554 555
static int32_t getTableDelIdx(SDelFReader *pDelFReader, tb_uid_t suid, tb_uid_t uid, SDelIdx *pDelIdx) {
  int32_t code = 0;
M
Minglei Jin 已提交
556
  SArray *pDelIdxArray = NULL;
557

558
  // SMapData delIdxMap;
M
Minglei Jin 已提交
559
  pDelIdxArray = taosArrayInit(32, sizeof(SDelIdx));
560
  SDelIdx idx = {.suid = suid, .uid = uid};
561

562
  // tMapDataReset(&delIdxMap);
H
Hongze Cheng 已提交
563
  code = tsdbReadDelIdx(pDelFReader, pDelIdxArray);
564 565
  if (code) goto _err;

566
  // code = tMapDataSearch(&delIdxMap, &idx, tGetDelIdx, tCmprDelIdx, pDelIdx);
567
  SDelIdx *pIdx = taosArraySearch(pDelIdxArray, &idx, tCmprDelIdx, TD_EQ);
568

569 570
  *pDelIdx = *pIdx;

571
_err:
M
Minglei Jin 已提交
572 573 574
  if (pDelIdxArray) {
    taosArrayDestroy(pDelIdxArray);
  }
575 576
  return code;
}
S
stephenkgu 已提交
577
*/
578 579 580 581 582 583 584 585
typedef enum {
  SFSLASTNEXTROW_FS,
  SFSLASTNEXTROW_FILESET,
  SFSLASTNEXTROW_BLOCKDATA,
  SFSLASTNEXTROW_BLOCKROW
} SFSLASTNEXTROWSTATES;

typedef struct {
586 587 588
  SFSLASTNEXTROWSTATES state;     // [input]
  STsdb               *pTsdb;     // [input]
  STSchema            *pTSchema;  // [input]
589
  tb_uid_t             suid;
590
  tb_uid_t             uid;
591 592 593
  int32_t              nFileSet;
  int32_t              iFileSet;
  SArray              *aDFileSet;
594
  SDataFReader       **pDataFReader;
595
  TSDBROW              row;
596

597
  bool               checkRemainingRow;
598 599 600
  SMergeTree         mergeTree;
  SMergeTree        *pMergeTree;
  SSttBlockLoadInfo *pLoadInfo;
601
  int64_t            lastTs;
602 603
} SFSLastNextRowIter;

604 605
static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols,
                                    int nCols) {
606 607 608 609 610 611 612 613 614 615 616
  SFSLastNextRowIter *state = (SFSLastNextRowIter *)iter;
  int32_t             code = 0;

  switch (state->state) {
    case SFSLASTNEXTROW_FS:
      state->nFileSet = taosArrayGetSize(state->aDFileSet);
      state->iFileSet = state->nFileSet;

    case SFSLASTNEXTROW_FILESET: {
      SDFileSet *pFileSet = NULL;
    _next_fileset:
617 618 619 620 621
      if (state->pMergeTree != NULL) {
        tMergeTreeClose(state->pMergeTree);
        state->pMergeTree = NULL;
      }

622 623 624 625 626 627 628
      if (--state->iFileSet >= 0) {
        pFileSet = (SDFileSet *)taosArrayGet(state->aDFileSet, state->iFileSet);
      } else {
        *ppRow = NULL;
        return code;
      }

629 630 631
      if (*state->pDataFReader == NULL || (*state->pDataFReader)->pSet->fid != pFileSet->fid) {
        if (*state->pDataFReader != NULL) {
          tsdbDataFReaderClose(state->pDataFReader);
M
Minglei Jin 已提交
632

633 634
          resetLastBlockLoadInfo(state->pLoadInfo);
        }
635

636 637 638 639
        code = tsdbDataFReaderOpen(state->pDataFReader, state->pTsdb, pFileSet);
        if (code) goto _err;
      }

640 641 642
      for (int i = 0; i < state->pLoadInfo->numOfStt; ++i) {
        state->pLoadInfo[i].colIds = aCols;
        state->pLoadInfo[i].numOfCols = nCols;
643
        state->pLoadInfo[i].isLast = isLast;
644
      }
645
      tMergeTreeOpen(&state->mergeTree, 1, *state->pDataFReader, state->suid, state->uid,
646 647
                     &(STimeWindow){.skey = state->lastTs, .ekey = TSKEY_MAX},
                     &(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}, state->pLoadInfo, false, NULL, true);
648
      state->pMergeTree = &state->mergeTree;
649 650 651 652 653 654 655
      state->state = SFSLASTNEXTROW_BLOCKROW;
    }
    case SFSLASTNEXTROW_BLOCKROW: {
      if (nCols != state->pLoadInfo->numOfCols) {
        for (int i = 0; i < state->pLoadInfo->numOfStt; ++i) {
          state->pLoadInfo[i].numOfCols = nCols;

656 657
          state->pLoadInfo[i].checkRemainingRow = state->checkRemainingRow;
        }
658
      }
659 660
      bool hasVal = tMergeTreeNext(&state->mergeTree);
      if (!hasVal) {
661 662 663 664 665
        if (tMergeTreeIgnoreEarlierTs(&state->mergeTree)) {
          *pIgnoreEarlierTs = true;
          *ppRow = NULL;
          return code;
        }
666
        state->state = SFSLASTNEXTROW_FILESET;
667 668
        goto _next_fileset;
      }
669 670
      state->row = tMergeTreeGetRow(&state->mergeTree);
      *ppRow = &state->row;
671

672 673 674 675 676
      if (TSDBROW_TS(&state->row) <= state->lastTs) {
        *pIgnoreEarlierTs = true;
        *ppRow = NULL;
        return code;
      }
677

678 679 680 681
      *pIgnoreEarlierTs = false;
      if (!hasVal) {
        state->state = SFSLASTNEXTROW_FILESET;
      }
682

683 684 685
      if (!state->checkRemainingRow) {
        state->checkRemainingRow = true;
      }
686
      return code;
687
    }
688 689 690 691 692 693
    default:
      ASSERT(0);
      break;
  }

_err:
694
  /*if (state->pDataFReader) {
695 696
    tsdbDataFReaderClose(&state->pDataFReader);
    state->pDataFReader = NULL;
697
    }*/
698 699 700 701 702
  if (state->pMergeTree != NULL) {
    tMergeTreeClose(state->pMergeTree);
    state->pMergeTree = NULL;
  }

703 704 705 706 707 708 709 710 711 712 713 714
  *ppRow = NULL;

  return code;
}

int32_t clearNextRowFromFSLast(void *iter) {
  SFSLastNextRowIter *state = (SFSLastNextRowIter *)iter;
  int32_t             code = 0;

  if (!state) {
    return code;
  }
715
  /*
716 717 718 719
  if (state->pDataFReader) {
    tsdbDataFReaderClose(&state->pDataFReader);
    state->pDataFReader = NULL;
  }
720
  */
721 722 723 724 725
  if (state->pMergeTree != NULL) {
    tMergeTreeClose(state->pMergeTree);
    state->pMergeTree = NULL;
  }

726 727 728
  return code;
}

729 730 731 732 733 734 735 736
typedef enum SFSNEXTROWSTATES {
  SFSNEXTROW_FS,
  SFSNEXTROW_FILESET,
  SFSNEXTROW_BLOCKDATA,
  SFSNEXTROW_BLOCKROW
} SFSNEXTROWSTATES;

typedef struct SFSNextRowIter {
737 738 739 740 741 742 743 744 745 746 747
  SFSNEXTROWSTATES   state;         // [input]
  STsdb             *pTsdb;         // [input]
  SBlockIdx         *pBlockIdxExp;  // [input]
  STSchema          *pTSchema;      // [input]
  tb_uid_t           suid;
  tb_uid_t           uid;
  int32_t            nFileSet;
  int32_t            iFileSet;
  SArray            *aDFileSet;
  SDataFReader     **pDataFReader;
  SArray            *aBlockIdx;
748
  LRUHandle         *aBlockIdxHandle;
749 750 751 752 753 754 755 756 757 758 759
  SBlockIdx         *pBlockIdx;
  SMapData           blockMap;
  int32_t            nBlock;
  int32_t            iBlock;
  SDataBlk           block;
  SBlockData         blockData;
  SBlockData        *pBlockData;
  int32_t            nRow;
  int32_t            iRow;
  TSDBROW            row;
  SSttBlockLoadInfo *pLoadInfo;
760
  int64_t            lastTs;
761 762
} SFSNextRowIter;

763 764
static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols,
                                int nCols) {
765 766
  SFSNextRowIter *state = (SFSNextRowIter *)iter;
  int32_t         code = 0;
767
  bool            checkRemainingRow = true;
768 769 770

  switch (state->state) {
    case SFSNEXTROW_FS:
H
Hongze Cheng 已提交
771
      // state->aDFileSet = state->pTsdb->pFS->cState->aDFileSet;
772
      state->nFileSet = taosArrayGetSize(state->aDFileSet);
M
Minglei Jin 已提交
773
      state->iFileSet = state->nFileSet;
774 775

      state->pBlockData = NULL;
776 777 778

    case SFSNEXTROW_FILESET: {
      SDFileSet *pFileSet = NULL;
779
    _next_fileset:
780 781 782
      if (--state->iFileSet >= 0) {
        pFileSet = (SDFileSet *)taosArrayGet(state->aDFileSet, state->iFileSet);
      } else {
H
Hongze Cheng 已提交
783
        // tBlockDataDestroy(&state->blockData, 1);
784
        if (state->pBlockData) {
H
Hongze Cheng 已提交
785
          tBlockDataDestroy(state->pBlockData);
786 787
          state->pBlockData = NULL;
        }
788

789 790 791 792
        *ppRow = NULL;
        return code;
      }

793 794 795 796
      if (*state->pDataFReader == NULL || (*state->pDataFReader)->pSet->fid != pFileSet->fid) {
        if (*state->pDataFReader != NULL) {
          tsdbDataFReaderClose(state->pDataFReader);

797
          // resetLastBlockLoadInfo(state->pLoadInfo);
798 799 800 801 802
        }

        code = tsdbDataFReaderOpen(state->pDataFReader, state->pTsdb, pFileSet);
        if (code) goto _err;
      }
803 804

      // tMapDataReset(&state->blockIdxMap);
805
      /*
806 807 808 809 810
      if (!state->aBlockIdx) {
        state->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
      } else {
        taosArrayClear(state->aBlockIdx);
      }
811
      code = tsdbReadBlockIdx(*state->pDataFReader, state->aBlockIdx);
812
      if (code) goto _err;
813
      */
814
      int32_t code = tsdbCacheGetBlockIdx(state->pTsdb->biCache, *state->pDataFReader, &state->aBlockIdxHandle);
815 816 817
      if (code != TSDB_CODE_SUCCESS || state->aBlockIdxHandle == NULL) {
        goto _err;
      }
818
      state->aBlockIdx = (SArray *)taosLRUCacheValue(state->pTsdb->biCache, state->aBlockIdxHandle);
819

820 821
      /* if (state->pBlockIdx) { */
      /* } */
822 823
      /* code = tMapDataSearch(&state->blockIdxMap, state->pBlockIdxExp, tGetBlockIdx, tCmprBlockIdx,
       * &state->blockIdx);
824 825
       */
      state->pBlockIdx = taosArraySearch(state->aBlockIdx, state->pBlockIdxExp, tCmprBlockIdx, TD_EQ);
826 827 828 829 830 831
      if (!state->pBlockIdx) {
        tsdbBICacheRelease(state->pTsdb->biCache, state->aBlockIdxHandle);

        state->aBlockIdxHandle = NULL;
        state->aBlockIdx = NULL;
        /*
832 833 834
         tsdbDataFReaderClose(state->pDataFReader);
         *state->pDataFReader = NULL;
         resetLastBlockLoadInfo(state->pLoadInfo);*/
835 836 837
        goto _next_fileset;
      }

838 839
      tMapDataReset(&state->blockMap);
      /*
840 841 842
      if (state->blockMap.pData != NULL) {
        tMapDataClear(&state->blockMap);
      }
843 844
      */
      code = tsdbReadDataBlk(*state->pDataFReader, state->pBlockIdx, &state->blockMap);
845 846 847 848
      if (code) goto _err;

      state->nBlock = state->blockMap.nItem;
      state->iBlock = state->nBlock - 1;
849

850 851 852
      if (!state->pBlockData) {
        state->pBlockData = &state->blockData;

853 854
        code = tBlockDataCreate(&state->blockData);
        if (code) goto _err;
855
      }
856 857
    }
    case SFSNEXTROW_BLOCKDATA:
858
    _next_datablock:
859
      if (state->iBlock >= 0) {
H
Hongze Cheng 已提交
860
        SDataBlk block = {0};
861 862
        bool     skipBlock = true;
        int      inputColIndex = 0;
863

H
Hongze Cheng 已提交
864
        tDataBlkReset(&block);
865
        tBlockDataReset(state->pBlockData);
866

H
Hongze Cheng 已提交
867
        tMapDataGetItemByIdx(&state->blockMap, state->iBlock, &block, tGetDataBlk);
868
        if (block.maxKey.ts <= state->lastTs) {
869
          *pIgnoreEarlierTs = true;
X
Xiaoyu Wang 已提交
870 871 872 873 874 875 876
          if (state->pBlockData) {
            tBlockDataDestroy(state->pBlockData);
            state->pBlockData = NULL;
          }

          *ppRow = NULL;
          return code;
877
        }
878
        *pIgnoreEarlierTs = false;
879
        tBlockDataReset(state->pBlockData);
H
Hongze Cheng 已提交
880
        TABLEID tid = {.suid = state->suid, .uid = state->uid};
881
        code = tBlockDataInit(state->pBlockData, &tid, state->pTSchema, aCols, nCols);
882 883
        if (code) goto _err;

884
        code = tsdbReadDataBlock(*state->pDataFReader, &block, state->pBlockData);
885 886
        if (code) goto _err;

887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929
        for (int colIndex = 0; colIndex < state->pBlockData->nColData; ++colIndex) {
          SColData *pColData = &state->pBlockData->aColData[colIndex];

          if (isLast && (pColData->flag & HAS_VALUE)) {
            skipBlock = false;
            break;
          } else if (pColData->flag & (HAS_VALUE | HAS_NULL)) {
            skipBlock = false;
            break;
          }
        }

        if (skipBlock) {
          if (--state->iBlock < 0) {
            tsdbDataFReaderClose(state->pDataFReader);
            *state->pDataFReader = NULL;
            // resetLastBlockLoadInfo(state->pLoadInfo);

            if (state->aBlockIdx) {
              // taosArrayDestroy(state->aBlockIdx);
              tsdbBICacheRelease(state->pTsdb->biCache, state->aBlockIdxHandle);

              state->aBlockIdxHandle = NULL;
              state->aBlockIdx = NULL;
            }

            state->state = SFSNEXTROW_FILESET;
            goto _next_fileset;
          } else {
            goto _next_datablock;
          }
        }

        state->nRow = state->blockData.nRow;
        state->iRow = state->nRow - 1;

        state->state = SFSNEXTROW_BLOCKROW;
        checkRemainingRow = false;
      }
    case SFSNEXTROW_BLOCKROW: {
      if (checkRemainingRow) {
        bool skipBlock = true;
        int  inputColIndex = 0;
930 931 932 933
        for (int colIndex = 0; colIndex < state->pBlockData->nColData; ++colIndex) {
          SColData *pColData = &state->pBlockData->aColData[colIndex];
          int16_t   cid = pColData->cid;

934
          if (inputColIndex < nCols && cid == aCols[inputColIndex]) {
935
            if (isLast && (pColData->flag & HAS_VALUE)) {
936 937
              skipBlock = false;
              break;
938
            } else if (pColData->flag & (HAS_VALUE | HAS_NULL)) {
939 940 941
              skipBlock = false;
              break;
            }
942 943

            ++inputColIndex;
944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966
          }
        }

        if (skipBlock) {
          if (--state->iBlock < 0) {
            tsdbDataFReaderClose(state->pDataFReader);
            *state->pDataFReader = NULL;
            // resetLastBlockLoadInfo(state->pLoadInfo);

            if (state->aBlockIdx) {
              // taosArrayDestroy(state->aBlockIdx);
              tsdbBICacheRelease(state->pTsdb->biCache, state->aBlockIdxHandle);

              state->aBlockIdxHandle = NULL;
              state->aBlockIdx = NULL;
            }

            state->state = SFSNEXTROW_FILESET;
            goto _next_fileset;
          } else {
            goto _next_datablock;
          }
        }
967
      }
968

969
      if (state->iRow >= 0) {
970
        state->row = tsdbRowFromBlockData(state->pBlockData, state->iRow);
971 972 973 974 975
        *ppRow = &state->row;

        if (--state->iRow < 0) {
          state->state = SFSNEXTROW_BLOCKDATA;
          if (--state->iBlock < 0) {
976 977
            tsdbDataFReaderClose(state->pDataFReader);
            *state->pDataFReader = NULL;
978
            // resetLastBlockLoadInfo(state->pLoadInfo);
979

980
            if (state->aBlockIdx) {
981
              // taosArrayDestroy(state->aBlockIdx);
982
              tsdbBICacheRelease(state->pTsdb->biCache, state->aBlockIdxHandle);
983 984

              state->aBlockIdxHandle = NULL;
985
              state->aBlockIdx = NULL;
986
            }
987

988 989 990 991 992 993
            state->state = SFSNEXTROW_FILESET;
          }
        }
      }

      return code;
994
    }
995 996 997 998 999 1000
    default:
      ASSERT(0);
      break;
  }

_err:
1001 1002 1003 1004 1005 1006
  /*
  if (*state->pDataFReader) {
    tsdbDataFReaderClose(state->pDataFReader);
    *state->pDataFReader = NULL;
    resetLastBlockLoadInfo(state->pLoadInfo);
    }*/
1007
  if (state->aBlockIdx) {
1008
    // taosArrayDestroy(state->aBlockIdx);
1009
    tsdbBICacheRelease(state->pTsdb->biCache, state->aBlockIdxHandle);
1010 1011

    state->aBlockIdxHandle = NULL;
1012 1013 1014
    state->aBlockIdx = NULL;
  }
  if (state->pBlockData) {
H
Hongze Cheng 已提交
1015
    tBlockDataDestroy(state->pBlockData);
1016
    state->pBlockData = NULL;
1017 1018
  }

1019
  *ppRow = NULL;
1020

1021 1022 1023
  return code;
}

1024 1025 1026 1027 1028 1029 1030
int32_t clearNextRowFromFS(void *iter) {
  int32_t code = 0;

  SFSNextRowIter *state = (SFSNextRowIter *)iter;
  if (!state) {
    return code;
  }
1031
  /*
1032 1033 1034
  if (state->pDataFReader) {
    tsdbDataFReaderClose(&state->pDataFReader);
    state->pDataFReader = NULL;
1035
    }*/
1036
  if (state->aBlockIdx) {
1037
    // taosArrayDestroy(state->aBlockIdx);
1038
    tsdbBICacheRelease(state->pTsdb->biCache, state->aBlockIdxHandle);
1039 1040

    state->aBlockIdxHandle = NULL;
1041 1042 1043
    state->aBlockIdx = NULL;
  }
  if (state->pBlockData) {
H
Hongze Cheng 已提交
1044
    // tBlockDataDestroy(&state->blockData, 1);
H
Hongze Cheng 已提交
1045
    tBlockDataDestroy(state->pBlockData);
1046 1047 1048
    state->pBlockData = NULL;
  }

1049 1050 1051 1052
  if (state->blockMap.pData != NULL) {
    tMapDataClear(&state->blockMap);
  }

1053 1054 1055
  return code;
}

1056 1057 1058 1059 1060 1061 1062 1063 1064
typedef enum SMEMNEXTROWSTATES {
  SMEMNEXTROW_ENTER,
  SMEMNEXTROW_NEXT,
} SMEMNEXTROWSTATES;

typedef struct SMemNextRowIter {
  SMEMNEXTROWSTATES state;
  STbData          *pMem;  // [input]
  STbDataIter       iter;  // mem buffer skip list iterator
1065
  int64_t           lastTs;
1066 1067
  // bool              iterOpened;
  // TSDBROW          *curRow;
1068 1069
} SMemNextRowIter;

1070 1071
static int32_t getNextRowFromMem(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols,
                                 int nCols) {
1072 1073
  SMemNextRowIter *state = (SMemNextRowIter *)iter;
  int32_t          code = 0;
1074
  *pIgnoreEarlierTs = false;
1075 1076 1077
  switch (state->state) {
    case SMEMNEXTROW_ENTER: {
      if (state->pMem != NULL) {
1078 1079 1080 1081 1082
        if (state->pMem->maxKey <= state->lastTs) {
          *ppRow = NULL;
          *pIgnoreEarlierTs = true;
          return code;
        }
1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117
        tsdbTbDataIterOpen(state->pMem, NULL, 1, &state->iter);

        TSDBROW *pMemRow = tsdbTbDataIterGet(&state->iter);
        if (pMemRow) {
          *ppRow = pMemRow;
          state->state = SMEMNEXTROW_NEXT;

          return code;
        }
      }

      *ppRow = NULL;

      return code;
    }
    case SMEMNEXTROW_NEXT:
      if (tsdbTbDataIterNext(&state->iter)) {
        *ppRow = tsdbTbDataIterGet(&state->iter);

        return code;
      } else {
        *ppRow = NULL;

        return code;
      }
    default:
      ASSERT(0);
      break;
  }

_err:
  *ppRow = NULL;
  return code;
}

1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155
/* static int32_t tsRowFromTsdbRow(STSchema *pTSchema, TSDBROW *pRow, STSRow **ppRow) { */
/*   int32_t code = 0; */

/*   SColVal *pColVal = &(SColVal){0}; */

/*   if (pRow->type == 0) { */
/*     *ppRow = tdRowDup(pRow->pTSRow); */
/*   } else { */
/*     SArray *pArray = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal)); */
/*     if (pArray == NULL) { */
/*       code = TSDB_CODE_OUT_OF_MEMORY; */
/*       goto _exit; */
/*     } */

/*     TSDBKEY   key = TSDBROW_KEY(pRow); */
/*     STColumn *pTColumn = &pTSchema->columns[0]; */
/*     *pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.ts = key.ts}); */

/*     if (taosArrayPush(pArray, pColVal) == NULL) { */
/*       code = TSDB_CODE_OUT_OF_MEMORY; */
/*       goto _exit; */
/*     } */

/*     for (int16_t iCol = 1; iCol < pTSchema->numOfCols; iCol++) { */
/*       tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal); */
/*       if (taosArrayPush(pArray, pColVal) == NULL) { */
/*         code = TSDB_CODE_OUT_OF_MEMORY; */
/*         goto _exit; */
/*       } */
/*     } */

/*     code = tdSTSRowNew(pArray, pTSchema, ppRow); */
/*     if (code) goto _exit; */
/*   } */

/* _exit: */
/*   return code; */
/* } */
1156

1157
static bool tsdbKeyDeleted(TSDBKEY *key, SArray *pSkyline, int64_t *iSkyline) {
1158 1159 1160 1161 1162 1163 1164 1165
  bool deleted = false;
  while (*iSkyline > 0) {
    TSDBKEY *pItemBack = (TSDBKEY *)taosArrayGet(pSkyline, *iSkyline);
    TSDBKEY *pItemFront = (TSDBKEY *)taosArrayGet(pSkyline, *iSkyline - 1);

    if (key->ts > pItemBack->ts) {
      return false;
    } else if (key->ts >= pItemFront->ts && key->ts <= pItemBack->ts) {
1166
      if (key->version <= pItemFront->version || (key->ts == pItemBack->ts && key->version <= pItemBack->version)) {
1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182
        return true;
      } else {
        return false;
      }
    } else {
      if (*iSkyline > 1) {
        --*iSkyline;
      } else {
        return false;
      }
    }
  }

  return deleted;
}

1183 1184
typedef int32_t (*_next_row_fn_t)(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols,
                                  int nCols);
1185
typedef int32_t (*_next_row_clear_fn_t)(void *iter);
1186

1187
typedef struct {
1188 1189 1190
  TSDBROW             *pRow;
  bool                 stop;
  bool                 next;
1191
  bool                 ignoreEarlierTs;
1192 1193 1194
  void                *iter;
  _next_row_fn_t       nextRowFn;
  _next_row_clear_fn_t nextRowClearFn;
1195
} TsdbNextRowState;
1196

1197 1198 1199 1200
typedef struct {
  SArray *pSkyline;
  int64_t iSkyline;

1201 1202 1203 1204 1205 1206
  SBlockIdx          idx;
  SMemNextRowIter    memState;
  SMemNextRowIter    imemState;
  SFSLastNextRowIter fsLastState;
  SFSNextRowIter     fsState;
  TSDBROW            memRow, imemRow, fsLastRow, fsRow;
1207

1208
  TsdbNextRowState input[4];
1209
  STsdb           *pTsdb;
1210 1211
} CacheNextRowIter;

1212
static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid,
1213
                               SSttBlockLoadInfo *pLoadInfo, STsdbReadSnap *pReadSnap, SDataFReader **pDataFReader,
1214
                               SDataFReader **pDataFReaderLast, int64_t lastTs) {
1215 1216 1217
  int code = 0;

  STbData *pMem = NULL;
1218 1219
  if (pReadSnap->pMem) {
    pMem = tsdbGetTbDataFromMemTable(pReadSnap->pMem, suid, uid);
1220 1221 1222
  }

  STbData *pIMem = NULL;
1223 1224
  if (pReadSnap->pIMem) {
    pIMem = tsdbGetTbDataFromMemTable(pReadSnap->pIMem, suid, uid);
1225 1226
  }

H
Hongze Cheng 已提交
1227 1228
  pIter->pTsdb = pTsdb;

1229 1230
  pIter->pSkyline = taosArrayInit(32, sizeof(TSDBKEY));

1231
  SDelFile *pDelFile = pReadSnap->fs.pDelFile;
1232 1233 1234
  if (pDelFile) {
    SDelFReader *pDelFReader;

H
Hongze Cheng 已提交
1235
    code = tsdbDelFReaderOpen(&pDelFReader, pDelFile, pTsdb);
1236 1237
    if (code) goto _err;

S
stephenkgu 已提交
1238 1239 1240
    SArray *pDelIdxArray = taosArrayInit(32, sizeof(SDelIdx));

    code = tsdbReadDelIdx(pDelFReader, pDelIdxArray);
M
Minglei Jin 已提交
1241
    if (code) {
1242
      taosArrayDestroy(pDelIdxArray);
M
Minglei Jin 已提交
1243 1244 1245
      tsdbDelFReaderClose(&pDelFReader);
      goto _err;
    }
S
stephenkgu 已提交
1246 1247

    SDelIdx *delIdx = taosArraySearch(pDelIdxArray, &(SDelIdx){.suid = suid, .uid = uid}, tCmprDelIdx, TD_EQ);
1248

S
stephenkgu 已提交
1249
    code = getTableDelSkyline(pMem, pIMem, pDelFReader, delIdx, pIter->pSkyline);
1250
    if (code) {
1251
      taosArrayDestroy(pDelIdxArray);
1252 1253 1254
      tsdbDelFReaderClose(&pDelFReader);
      goto _err;
    }
1255

S
stephenkgu 已提交
1256
    taosArrayDestroy(pDelIdxArray);
1257 1258 1259 1260 1261 1262 1263 1264 1265 1266
    tsdbDelFReaderClose(&pDelFReader);
  } else {
    code = getTableDelSkyline(pMem, pIMem, NULL, NULL, pIter->pSkyline);
    if (code) goto _err;
  }

  pIter->iSkyline = taosArrayGetSize(pIter->pSkyline) - 1;

  pIter->idx = (SBlockIdx){.suid = suid, .uid = uid};

1267
  pIter->fsLastState.state = (SFSLASTNEXTROWSTATES)SFSNEXTROW_FS;
1268
  pIter->fsLastState.pTsdb = pTsdb;
1269
  pIter->fsLastState.aDFileSet = pReadSnap->fs.aDFileSet;
1270
  pIter->fsLastState.pTSchema = pTSchema;
1271
  pIter->fsLastState.suid = suid;
1272
  pIter->fsLastState.uid = uid;
1273
  pIter->fsLastState.pLoadInfo = pLoadInfo;
1274
  pIter->fsLastState.pDataFReader = pDataFReaderLast;
1275
  pIter->fsLastState.lastTs = lastTs;
1276

1277 1278
  pIter->fsState.state = SFSNEXTROW_FS;
  pIter->fsState.pTsdb = pTsdb;
1279
  pIter->fsState.aDFileSet = pReadSnap->fs.aDFileSet;
1280
  pIter->fsState.pBlockIdxExp = &pIter->idx;
1281 1282 1283
  pIter->fsState.pTSchema = pTSchema;
  pIter->fsState.suid = suid;
  pIter->fsState.uid = uid;
1284 1285
  pIter->fsState.pLoadInfo = pLoadInfo;
  pIter->fsState.pDataFReader = pDataFReader;
1286
  pIter->fsState.lastTs = lastTs;
1287

1288 1289 1290 1291
  pIter->input[0] = (TsdbNextRowState){&pIter->memRow, true, false, false, &pIter->memState, getNextRowFromMem, NULL};
  pIter->input[1] = (TsdbNextRowState){&pIter->imemRow, true, false, false, &pIter->imemState, getNextRowFromMem, NULL};
  pIter->input[2] = (TsdbNextRowState){
      &pIter->fsLastRow, false, true, false, &pIter->fsLastState, getNextRowFromFSLast, clearNextRowFromFSLast};
1292
  pIter->input[3] =
1293
      (TsdbNextRowState){&pIter->fsRow, false, true, false, &pIter->fsState, getNextRowFromFS, clearNextRowFromFS};
1294 1295 1296 1297

  if (pMem) {
    pIter->memState.pMem = pMem;
    pIter->memState.state = SMEMNEXTROW_ENTER;
1298
    pIter->memState.lastTs = lastTs;
1299 1300 1301 1302 1303 1304 1305
    pIter->input[0].stop = false;
    pIter->input[0].next = true;
  }

  if (pIMem) {
    pIter->imemState.pMem = pIMem;
    pIter->imemState.state = SMEMNEXTROW_ENTER;
1306
    pIter->imemState.lastTs = lastTs;
1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318
    pIter->input[1].stop = false;
    pIter->input[1].next = true;
  }

  return code;
_err:
  return code;
}

static int32_t nextRowIterClose(CacheNextRowIter *pIter) {
  int code = 0;

1319
  for (int i = 0; i < 4; ++i) {
1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333
    if (pIter->input[i].nextRowClearFn) {
      pIter->input[i].nextRowClearFn(pIter->input[i].iter);
    }
  }

  if (pIter->pSkyline) {
    taosArrayDestroy(pIter->pSkyline);
  }

_err:
  return code;
}

// iterate next row non deleted backward ts, version (from high to low)
1334 1335
static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast,
                              int16_t *aCols, int nCols) {
1336
  int code = 0;
1337 1338 1339
  for (;;) {
    for (int i = 0; i < 4; ++i) {
      if (pIter->input[i].next && !pIter->input[i].stop) {
1340 1341
        code = pIter->input[i].nextRowFn(pIter->input[i].iter, &pIter->input[i].pRow, &pIter->input[i].ignoreEarlierTs,
                                         isLast, aCols, nCols);
1342
        if (code) goto _err;
1343

1344 1345 1346 1347
        if (pIter->input[i].pRow == NULL) {
          pIter->input[i].stop = true;
          pIter->input[i].next = false;
        }
1348 1349 1350
      }
    }

1351 1352
    if (pIter->input[0].stop && pIter->input[1].stop && pIter->input[2].stop && pIter->input[3].stop) {
      *ppRow = NULL;
1353 1354
      *pIgnoreEarlierTs = (pIter->input[0].ignoreEarlierTs || pIter->input[1].ignoreEarlierTs ||
                           pIter->input[2].ignoreEarlierTs || pIter->input[3].ignoreEarlierTs);
1355 1356
      return code;
    }
1357

1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373
    // select maxpoint(s) from mem, imem, fs and last
    TSDBROW *max[4] = {0};
    int      iMax[4] = {-1, -1, -1, -1};
    int      nMax = 0;
    TSKEY    maxKey = TSKEY_MIN;

    for (int i = 0; i < 4; ++i) {
      if (!pIter->input[i].stop && pIter->input[i].pRow != NULL) {
        TSDBKEY key = TSDBROW_KEY(pIter->input[i].pRow);

        // merging & deduplicating on client side
        if (maxKey <= key.ts) {
          if (maxKey < key.ts) {
            nMax = 0;
            maxKey = key.ts;
          }
1374

1375 1376
          iMax[nMax] = i;
          max[nMax++] = pIter->input[i].pRow;
1377 1378
        } else {
          pIter->input[i].next = false;
1379 1380 1381 1382
        }
      }
    }

1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394
    // delete detection
    TSDBROW *merge[4] = {0};
    int      iMerge[4] = {-1, -1, -1, -1};
    int      nMerge = 0;
    for (int i = 0; i < nMax; ++i) {
      TSDBKEY maxKey1 = TSDBROW_KEY(max[i]);

      bool deleted = tsdbKeyDeleted(&maxKey1, pIter->pSkyline, &pIter->iSkyline);
      if (!deleted) {
        iMerge[nMerge] = iMax[i];
        merge[nMerge++] = max[i];
      }
1395

1396 1397
      pIter->input[iMax[i]].next = deleted;
    }
1398

1399 1400
    if (nMerge > 0) {
      pIter->input[iMerge[0]].next = true;
1401

1402 1403 1404
      *ppRow = merge[0];
      return code;
    }
1405 1406 1407 1408 1409 1410
  }

_err:
  return code;
}

1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446
static int32_t initLastColArray(STSchema *pTSchema, SArray **ppColArray) {
  SArray *pColArray = taosArrayInit(pTSchema->numOfCols, sizeof(SLastCol));
  if (NULL == pColArray) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  for (int32_t i = 0; i < pTSchema->numOfCols; ++i) {
    SLastCol col = {.ts = 0, .colVal = COL_VAL_NULL(pTSchema->columns[i].colId, pTSchema->columns[i].type)};
    taosArrayPush(pColArray, &col);
  }
  *ppColArray = pColArray;
  return TSDB_CODE_SUCCESS;
}

static int32_t cloneTSchema(STSchema *pSrc, STSchema **ppDst) {
  int32_t len = sizeof(STSchema) + sizeof(STColumn) * pSrc->numOfCols;
  *ppDst = taosMemoryMalloc(len);
  if (NULL == *ppDst) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  memcpy(*ppDst, pSrc, len);
  return TSDB_CODE_SUCCESS;
}

static int32_t updateTSchema(int32_t sversion, SCacheRowsReader *pReader, uint64_t uid) {
  if (NULL == pReader->pCurrSchema && sversion == pReader->pSchema->version) {
    return cloneTSchema(pReader->pSchema, &pReader->pCurrSchema);
  }

  if (NULL != pReader->pCurrSchema && sversion == pReader->pCurrSchema->version) {
    return TSDB_CODE_SUCCESS;
  }

  taosMemoryFreeClear(pReader->pCurrSchema);
  return metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pCurrSchema);
}
1447

1448
static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, SArray **ppColArray, SCacheRowsReader *pr) {
1449
  STSchema *pTSchema = pr->pSchema;  // metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
1450
  int16_t   nLastCol = pTSchema->numOfCols;
1451 1452 1453
  int16_t   iCol = 0;
  int16_t   noneCol = 0;
  bool      setNoneCol = false;
1454
  bool      hasRow = false;
1455
  bool      ignoreEarlierTs = false;
1456
  SArray   *pColArray = NULL;
1457 1458
  SColVal  *pColVal = &(SColVal){0};

1459 1460 1461 1462 1463
  int32_t code = initLastColArray(pTSchema, &pColArray);
  if (TSDB_CODE_SUCCESS != code) {
    return code;
  }

1464 1465 1466
  TSKEY lastRowTs = TSKEY_MAX;

  CacheNextRowIter iter = {0};
1467
  nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->suid, pr->pLoadInfo, pr->pReadSnap, &pr->pDataFReader,
1468
                  &pr->pDataFReaderLast, pr->lastTs);
1469 1470 1471

  do {
    TSDBROW *pRow = NULL;
1472
    nextRowIterGet(&iter, &pRow, &ignoreEarlierTs, false, NULL, 0);
1473 1474 1475 1476 1477

    if (!pRow) {
      break;
    }

1478 1479
    hasRow = true;

1480 1481 1482 1483 1484 1485 1486
    int32_t sversion = TSDBROW_SVERSION(pRow);
    if (sversion != -1) {
      code = updateTSchema(sversion, pr, uid);
      if (TSDB_CODE_SUCCESS != code) {
        goto _err;
      }
      pTSchema = pr->pCurrSchema;
1487 1488 1489
    }
    int16_t nCol = pTSchema->numOfCols;

1490 1491
    TSKEY rowTs = TSDBROW_TS(pRow);

1492
    if (lastRowTs == TSKEY_MAX) {
1493
      lastRowTs = rowTs;
1494 1495
      STColumn *pTColumn = &pTSchema->columns[0];

H
Hongze Cheng 已提交
1496
      *pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = lastRowTs});
1497
      taosArraySet(pColArray, 0, &(SLastCol){.ts = lastRowTs, .colVal = *pColVal});
1498 1499

      for (iCol = 1; iCol < nCol; ++iCol) {
1500 1501 1502 1503 1504 1505 1506
        if (iCol >= nLastCol) {
          break;
        }
        SLastCol *pCol = taosArrayGet(pColArray, iCol);
        if (pCol->colVal.cid != pTSchema->columns[iCol].colId) {
          continue;
        }
1507 1508
        tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal);

1509
        *pCol = (SLastCol){.ts = lastRowTs, .colVal = *pColVal};
1510
        if (IS_VAR_DATA_TYPE(pColVal->type) && pColVal->value.nData > 0) {
1511 1512
          pCol->colVal.value.pData = taosMemoryMalloc(pCol->colVal.value.nData);
          if (pCol->colVal.value.pData == NULL) {
M
Minglei Jin 已提交
1513 1514 1515 1516
            terrno = TSDB_CODE_OUT_OF_MEMORY;
            code = TSDB_CODE_OUT_OF_MEMORY;
            goto _err;
          }
1517
          memcpy(pCol->colVal.value.pData, pColVal->value.pData, pColVal->value.nData);
1518 1519
        }

H
Hongze Cheng 已提交
1520
        if (COL_VAL_IS_NONE(pColVal) && !setNoneCol) {
1521 1522 1523 1524 1525
          noneCol = iCol;
          setNoneCol = true;
        }
      }
      if (!setNoneCol) {
1526
        // done, goto return pColArray
1527 1528 1529 1530 1531 1532
        break;
      } else {
        continue;
      }
    }

1533 1534
    if ((rowTs < lastRowTs)) {
      // done, goto return pColArray
1535 1536 1537 1538 1539 1540 1541 1542 1543 1544
      break;
    }

    // merge into pColArray
    setNoneCol = false;
    for (iCol = noneCol; iCol < nCol; ++iCol) {
      // high version's column value
      SColVal *tColVal = (SColVal *)taosArrayGet(pColArray, iCol);

      tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal);
H
Hongze Cheng 已提交
1545
      if (COL_VAL_IS_NONE(tColVal) && !COL_VAL_IS_NONE(pColVal)) {
1546
        SLastCol lastCol = {.ts = rowTs, .colVal = *pColVal};
1547
        if (IS_VAR_DATA_TYPE(pColVal->type) && pColVal->value.nData > 0) {
1548 1549 1550 1551
          SLastCol *pLastCol = (SLastCol *)taosArrayGet(pColArray, iCol);
          taosMemoryFree(pLastCol->colVal.value.pData);

          lastCol.colVal.value.pData = taosMemoryMalloc(lastCol.colVal.value.nData);
M
Minglei Jin 已提交
1552 1553 1554 1555 1556
          if (lastCol.colVal.value.pData == NULL) {
            terrno = TSDB_CODE_OUT_OF_MEMORY;
            code = TSDB_CODE_OUT_OF_MEMORY;
            goto _err;
          }
1557 1558 1559 1560
          memcpy(lastCol.colVal.value.pData, pColVal->value.pData, pColVal->value.nData);
        }

        taosArraySet(pColArray, iCol, &lastCol);
H
Hongze Cheng 已提交
1561
      } else if (COL_VAL_IS_NONE(tColVal) && COL_VAL_IS_NONE(pColVal) && !setNoneCol) {
1562 1563 1564 1565 1566 1567 1568 1569
        noneCol = iCol;
        setNoneCol = true;
      }
    }
  } while (setNoneCol);

  // build the result ts row here
  *dup = false;
1570 1571 1572 1573
  // if (taosArrayGetSize(pColArray) != nCol) {
  //*ppColArray = NULL;
  // taosArrayDestroy(pColArray);
  //} else {
1574
  if (!hasRow) {
1575 1576 1577 1578 1579 1580
    if (ignoreEarlierTs) {
      taosArrayDestroy(pColArray);
      pColArray = NULL;
    } else {
      taosArrayClear(pColArray);
    }
1581
  }
1582 1583
  *ppColArray = pColArray;
  //}
1584 1585

  nextRowIterClose(&iter);
1586
  // taosMemoryFreeClear(pTSchema);
1587 1588 1589 1590 1591
  return code;

_err:
  nextRowIterClose(&iter);
  taosArrayDestroy(pColArray);
1592
  // taosMemoryFreeClear(pTSchema);
1593 1594 1595
  return code;
}

1596
static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr) {
1597
  STSchema *pTSchema = pr->pSchema;  // metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
1598
  int16_t   nLastCol = pTSchema->numOfCols;
1599 1600
  int16_t   noneCol = 0;
  bool      setNoneCol = false;
1601
  bool      hasRow = false;
1602
  bool      ignoreEarlierTs = false;
1603
  SArray   *pColArray = NULL;
1604
  SColVal  *pColVal = &(SColVal){0};
1605
  int16_t   nCols = nLastCol;
1606

1607 1608 1609 1610
  int32_t code = initLastColArray(pTSchema, &pColArray);
  if (TSDB_CODE_SUCCESS != code) {
    return code;
  }
1611 1612 1613 1614 1615 1616
  SArray *aColArray = taosArrayInit(nCols, sizeof(int16_t));
  if (NULL == aColArray) {
    taosArrayDestroy(pColArray);

    return TSDB_CODE_OUT_OF_MEMORY;
  }
1617 1618 1619
  for (int i = 1; i < pTSchema->numOfCols; ++i) {
    taosArrayPush(aColArray, &pTSchema->columns[i].colId);
  }
1620

1621 1622 1623
  TSKEY lastRowTs = TSKEY_MAX;

  CacheNextRowIter iter = {0};
1624
  nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->suid, pr->pLoadInfo, pr->pReadSnap, &pr->pDataFReader,
1625
                  &pr->pDataFReaderLast, pr->lastTs);
1626 1627 1628

  do {
    TSDBROW *pRow = NULL;
1629
    nextRowIterGet(&iter, &pRow, &ignoreEarlierTs, true, TARRAY_DATA(aColArray), TARRAY_SIZE(aColArray));
1630 1631 1632 1633 1634

    if (!pRow) {
      break;
    }

1635 1636
    hasRow = true;

1637 1638 1639 1640 1641 1642 1643
    int32_t sversion = TSDBROW_SVERSION(pRow);
    if (sversion != -1) {
      code = updateTSchema(sversion, pr, uid);
      if (TSDB_CODE_SUCCESS != code) {
        goto _err;
      }
      pTSchema = pr->pCurrSchema;
1644 1645 1646
    }
    int16_t nCol = pTSchema->numOfCols;

1647 1648 1649 1650 1651 1652
    TSKEY rowTs = TSDBROW_TS(pRow);

    if (lastRowTs == TSKEY_MAX) {
      lastRowTs = rowTs;
      STColumn *pTColumn = &pTSchema->columns[0];

H
Hongze Cheng 已提交
1653
      *pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = lastRowTs});
1654
      taosArraySet(pColArray, 0, &(SLastCol){.ts = lastRowTs, .colVal = *pColVal});
1655

1656 1657 1658 1659 1660 1661 1662 1663
      for (int16_t iCol = 1; iCol < nCol; ++iCol) {
        if (iCol >= nLastCol) {
          break;
        }
        SLastCol *pCol = taosArrayGet(pColArray, iCol);
        if (pCol->colVal.cid != pTSchema->columns[iCol].colId) {
          continue;
        }
1664 1665
        tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal);

1666
        *pCol = (SLastCol){.ts = lastRowTs, .colVal = *pColVal};
1667
        if (IS_VAR_DATA_TYPE(pColVal->type) && pColVal->value.nData > 0) {
1668 1669
          pCol->colVal.value.pData = taosMemoryMalloc(pCol->colVal.value.nData);
          if (pCol->colVal.value.pData == NULL) {
M
Minglei Jin 已提交
1670 1671 1672 1673
            terrno = TSDB_CODE_OUT_OF_MEMORY;
            code = TSDB_CODE_OUT_OF_MEMORY;
            goto _err;
          }
1674
          memcpy(pCol->colVal.value.pData, pColVal->value.pData, pColVal->value.nData);
1675 1676
        }

1677 1678 1679 1680 1681
        if (!COL_VAL_IS_VALUE(pColVal)) {
          if (!setNoneCol) {
            noneCol = iCol;
            setNoneCol = true;
          }
1682 1683 1684
        } else {
          int32_t aColIndex = taosArraySearchIdx(aColArray, &pColVal->cid, compareInt16Val, TD_EQ);
          taosArrayRemove(aColArray, aColIndex);
1685 1686 1687
        }
      }
      if (!setNoneCol) {
1688
        // done, goto return pColArray
1689 1690 1691 1692 1693
        break;
      } else {
        continue;
      }
    }
1694

1695 1696
    // merge into pColArray
    setNoneCol = false;
1697 1698 1699 1700
    for (int16_t iCol = noneCol; iCol < nCol; ++iCol) {
      if (iCol >= nLastCol) {
        break;
      }
1701
      // high version's column value
1702
      SLastCol *lastColVal = (SLastCol *)taosArrayGet(pColArray, iCol);
1703 1704 1705 1706
      if (lastColVal->colVal.cid != pTSchema->columns[iCol].colId) {
        continue;
      }
      SColVal *tColVal = &lastColVal->colVal;
1707 1708

      tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal);
H
Hongze Cheng 已提交
1709
      if (!COL_VAL_IS_VALUE(tColVal) && COL_VAL_IS_VALUE(pColVal)) {
1710
        SLastCol lastCol = {.ts = rowTs, .colVal = *pColVal};
1711
        if (IS_VAR_DATA_TYPE(pColVal->type) && pColVal->value.nData > 0) {
1712 1713 1714 1715
          SLastCol *pLastCol = (SLastCol *)taosArrayGet(pColArray, iCol);
          taosMemoryFree(pLastCol->colVal.value.pData);

          lastCol.colVal.value.pData = taosMemoryMalloc(lastCol.colVal.value.nData);
M
Minglei Jin 已提交
1716 1717 1718 1719 1720
          if (lastCol.colVal.value.pData == NULL) {
            terrno = TSDB_CODE_OUT_OF_MEMORY;
            code = TSDB_CODE_OUT_OF_MEMORY;
            goto _err;
          }
1721 1722 1723 1724
          memcpy(lastCol.colVal.value.pData, pColVal->value.pData, pColVal->value.nData);
        }

        taosArraySet(pColArray, iCol, &lastCol);
1725 1726
        int32_t aColIndex = taosArraySearchIdx(aColArray, &lastCol.colVal.cid, compareInt16Val, TD_EQ);
        taosArrayRemove(aColArray, aColIndex);
H
Hongze Cheng 已提交
1727
      } else if (!COL_VAL_IS_VALUE(tColVal) && !COL_VAL_IS_VALUE(pColVal) && !setNoneCol) {
1728 1729 1730 1731 1732 1733
        noneCol = iCol;
        setNoneCol = true;
      }
    }
  } while (setNoneCol);

1734 1735 1736 1737
  // if (taosArrayGetSize(pColArray) <= 0) {
  //*ppLastArray = NULL;
  // taosArrayDestroy(pColArray);
  //} else {
1738
  if (!hasRow) {
1739 1740 1741 1742 1743 1744
    if (ignoreEarlierTs) {
      taosArrayDestroy(pColArray);
      pColArray = NULL;
    } else {
      taosArrayClear(pColArray);
    }
1745
  }
1746 1747
  *ppLastArray = pColArray;
  //}
1748 1749

  nextRowIterClose(&iter);
M
Minglei Jin 已提交
1750
  taosArrayDestroy(aColArray);
1751
  // taosMemoryFreeClear(pTSchema);
1752 1753 1754 1755
  return code;

_err:
  nextRowIterClose(&iter);
1756
  // taosMemoryFreeClear(pTSchema);
1757 1758
  *ppLastArray = NULL;
  taosArrayDestroy(pColArray);
1759
  taosArrayDestroy(aColArray);
1760 1761 1762
  return code;
}

1763
int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr, LRUHandle **handle) {
1764 1765 1766 1767
  int32_t code = 0;
  char    key[32] = {0};
  int     keyLen = 0;

1768
  //  getTableCacheKeyS(uid, "lr", key, &keyLen);
1769
  getTableCacheKey(uid, 0, key, &keyLen);
1770
  LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
1771
  if (!h) {
1772
    STsdb *pTsdb = pr->pVnode->pTsdb;
1773 1774 1775 1776
    taosThreadMutexLock(&pTsdb->lruMutex);

    h = taosLRUCacheLookup(pCache, key, keyLen);
    if (!h) {
1777
      SArray *pArray = NULL;
1778
      bool    dup = false;  // which is always false for now
1779
      code = mergeLastRow(uid, pTsdb, &dup, &pArray, pr);
1780 1781
      // if table's empty or error or ignore ignore earlier ts, set handle NULL and return
      if (code < 0 || pArray == NULL) {
1782 1783
        if (!dup && pArray) {
          taosArrayDestroy(pArray);
1784 1785
        }

M
Minglei Jin 已提交
1786 1787
        taosThreadMutexUnlock(&pTsdb->lruMutex);

1788
        *handle = NULL;
M
Minglei Jin 已提交
1789

1790
        return 0;
1791 1792
      }

M
Minglei Jin 已提交
1793
      size_t              charge = pArray->capacity * pArray->elemSize + sizeof(*pArray);
1794
      _taos_lru_deleter_t deleter = deleteTableCacheLast;
1795
      LRUStatus status = taosLRUCacheInsert(pCache, key, keyLen, pArray, charge, deleter, &h, TAOS_LRU_PRIORITY_LOW);
1796 1797 1798
      if (status != TAOS_LRU_STATUS_OK) {
        code = -1;
      }
1799
    }
1800
    taosThreadMutexUnlock(&pTsdb->lruMutex);
1801 1802 1803 1804 1805 1806
  }

  *handle = h;

  return code;
}
1807

1808
int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr, LRUHandle **handle) {
1809 1810 1811 1812
  int32_t code = 0;
  char    key[32] = {0};
  int     keyLen = 0;

1813
  // getTableCacheKeyS(uid, "l", key, &keyLen);
1814
  getTableCacheKey(uid, 1, key, &keyLen);
1815
  LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
1816
  if (!h) {
1817
    STsdb *pTsdb = pr->pVnode->pTsdb;
1818
    taosThreadMutexLock(&pTsdb->lruMutex);
1819

1820
    h = taosLRUCacheLookup(pCache, key, keyLen);
1821 1822
    if (!h) {
      SArray *pLastArray = NULL;
1823
      code = mergeLast(uid, pTsdb, &pLastArray, pr);
1824 1825
      // if table's empty or error or ignore ignore earlier ts, set handle NULL and return
      if (code < 0 || pLastArray == NULL) {
1826 1827
        taosThreadMutexUnlock(&pTsdb->lruMutex);

1828 1829 1830 1831
        *handle = NULL;
        return 0;
      }

M
Minglei Jin 已提交
1832
      size_t              charge = pLastArray->capacity * pLastArray->elemSize + sizeof(*pLastArray);
1833
      _taos_lru_deleter_t deleter = deleteTableCacheLast;
M
Minglei Jin 已提交
1834
      LRUStatus           status =
1835
          taosLRUCacheInsert(pCache, key, keyLen, pLastArray, charge, deleter, &h, TAOS_LRU_PRIORITY_LOW);
1836 1837 1838
      if (status != TAOS_LRU_STATUS_OK) {
        code = -1;
      }
1839
    }
1840
    taosThreadMutexUnlock(&pTsdb->lruMutex);
1841 1842 1843
  }

  *handle = h;
M
Minglei Jin 已提交
1844

1845 1846 1847
  return code;
}

M
Minglei Jin 已提交
1848 1849 1850 1851 1852 1853 1854
int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h) {
  int32_t code = 0;

  taosLRUCacheRelease(pCache, h, false);

  return code;
}
1855 1856 1857 1858 1859 1860

void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity) {
  taosLRUCacheSetCapacity(pVnode->pTsdb->lruCache, capacity);
}

size_t tsdbCacheGetCapacity(SVnode *pVnode) { return taosLRUCacheGetCapacity(pVnode->pTsdb->lruCache); }
1861

1862 1863 1864 1865 1866 1867 1868 1869
size_t tsdbCacheGetUsage(SVnode *pVnode) {
  size_t usage = 0;
  if (pVnode->pTsdb != NULL) {
    usage = taosLRUCacheGetUsage(pVnode->pTsdb->lruCache);
  }

  return usage;
}
1870

1871 1872 1873 1874 1875 1876 1877 1878 1879
int32_t tsdbCacheGetElems(SVnode *pVnode) {
  int32_t elems = 0;
  if (pVnode->pTsdb != NULL) {
    elems = taosLRUCacheGetElems(pVnode->pTsdb->lruCache);
  }

  return elems;
}

1880 1881 1882 1883 1884 1885 1886 1887 1888 1889 1890 1891 1892 1893 1894 1895 1896 1897 1898 1899 1900 1901 1902 1903 1904 1905 1906 1907 1908 1909 1910 1911 1912 1913 1914 1915 1916 1917 1918 1919 1920 1921 1922 1923 1924 1925 1926 1927 1928 1929 1930 1931 1932 1933 1934 1935 1936 1937 1938 1939 1940 1941 1942 1943 1944 1945 1946 1947
static void getBICacheKey(int32_t fid, int64_t commitID, char *key, int *len) {
  struct {
    int32_t fid;
    int64_t commitID;
  } biKey = {0};

  biKey.fid = fid;
  biKey.commitID = commitID;

  *len = sizeof(biKey);
  memcpy(key, &biKey, *len);
}

static int32_t tsdbCacheLoadBlockIdx(SDataFReader *pFileReader, SArray **aBlockIdx) {
  SArray *pArray = taosArrayInit(8, sizeof(SBlockIdx));
  int32_t code = tsdbReadBlockIdx(pFileReader, pArray);

  if (code != TSDB_CODE_SUCCESS) {
    taosArrayDestroy(pArray);
    code = TSDB_CODE_OUT_OF_MEMORY;
    return code;
  }

  *aBlockIdx = pArray;

  return code;
}

static void deleteBICache(const void *key, size_t keyLen, void *value) {
  SArray *pArray = (SArray *)value;

  taosArrayDestroy(pArray);
}

int32_t tsdbCacheGetBlockIdx(SLRUCache *pCache, SDataFReader *pFileReader, LRUHandle **handle) {
  int32_t code = 0;
  char    key[128] = {0};
  int     keyLen = 0;

  getBICacheKey(pFileReader->pSet->fid, pFileReader->pSet->pHeadF->commitID, key, &keyLen);
  LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
  if (!h) {
    STsdb *pTsdb = pFileReader->pTsdb;
    taosThreadMutexLock(&pTsdb->biMutex);

    h = taosLRUCacheLookup(pCache, key, keyLen);
    if (!h) {
      SArray *pArray = NULL;
      code = tsdbCacheLoadBlockIdx(pFileReader, &pArray);
      //  if table's empty or error, return code of -1
      if (code != TSDB_CODE_SUCCESS || pArray == NULL) {
        taosThreadMutexUnlock(&pTsdb->biMutex);

        *handle = NULL;
        return 0;
      }

      size_t              charge = pArray->capacity * pArray->elemSize + sizeof(*pArray);
      _taos_lru_deleter_t deleter = deleteBICache;
      LRUStatus status = taosLRUCacheInsert(pCache, key, keyLen, pArray, charge, deleter, &h, TAOS_LRU_PRIORITY_LOW);
      if (status != TAOS_LRU_STATUS_OK) {
        code = -1;
      }
    }

    taosThreadMutexUnlock(&pTsdb->biMutex);
  }

1948
  tsdbTrace("bi cache:%p, ref", pCache);
1949 1950 1951 1952 1953 1954 1955 1956 1957
  *handle = h;

  return code;
}

int32_t tsdbBICacheRelease(SLRUCache *pCache, LRUHandle *h) {
  int32_t code = 0;

  taosLRUCacheRelease(pCache, h, false);
1958
  tsdbTrace("bi cache:%p, release", pCache);
1959 1960 1961

  return code;
}