From 756fefaa292c22bd294c50017acc7e38807cc7cd Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Tue, 16 May 2023 18:56:05 +0800 Subject: [PATCH] cache/batchread: load columns in one trip --- source/dnode/vnode/src/inc/tsdb.h | 24 +- source/dnode/vnode/src/tsdb/tsdbCache.c | 676 ++++++++++++++++---- source/dnode/vnode/src/tsdb/tsdbCacheRead.c | 24 +- 3 files changed, 567 insertions(+), 157 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index b2bc9abf33..486becdf96 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -346,13 +346,17 @@ struct STsdbFS { }; typedef struct { - rocksdb_t *db; - rocksdb_options_t *options; - rocksdb_flushoptions_t *flushoptions; - rocksdb_writeoptions_t *writeoptions; - rocksdb_readoptions_t *readoptions; - rocksdb_writebatch_t *writebatch; - TdThreadMutex rMutex; + rocksdb_t *db; + rocksdb_comparator_t *my_comparator; + rocksdb_cache_t *blockcache; + rocksdb_block_based_table_options_t *tableoptions; + rocksdb_options_t *options; + rocksdb_flushoptions_t *flushoptions; + rocksdb_writeoptions_t *writeoptions; + rocksdb_readoptions_t *readoptions; + rocksdb_writebatch_t *writebatch; + TdThreadMutex rMutex; + STSchema *pTSchema; } SRocksCache; struct STsdb { @@ -782,7 +786,7 @@ typedef struct SLDataIter { #define tMergeTreeGetRow(_t) (&((_t)->pIter->rInfo.row)) int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo, - bool destroyLoadInfo, const char *idStr, bool strictTimeRange, SLDataIter* pLDataIter); + bool destroyLoadInfo, const char *idStr, bool strictTimeRange, SLDataIter *pLDataIter); void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter); bool tMergeTreeNext(SMergeTree *pMTree); bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree); @@ -822,13 +826,15 @@ typedef struct SCacheRowsReader { typedef struct { TSKEY ts; + int8_t dirty; SColVal colVal; } SLastCol; int32_t tsdbOpenCache(STsdb *pTsdb); void tsdbCloseCache(STsdb *pTsdb); int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *row); -int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int32_t ltype); +int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype); +int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype); int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey); int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, TSDBROW *row, STsdb *pTsdb); diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index c0a8de5743..a637ba4968 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -46,7 +46,13 @@ static void tsdbCloseBICache(STsdb *pTsdb) { } } -#define ROCKS_KEY_LEN 64 +#define ROCKS_KEY_LEN (sizeof(tb_uid_t) + sizeof(int16_t) + sizeof(int8_t)) + +typedef struct { + tb_uid_t uid; + int16_t cid; + int8_t ltype; +} SLastKey; static void tsdbGetRocksPath(STsdb *pTsdb, char *path) { SVnode *pVnode = pTsdb->pVnode; @@ -62,9 +68,56 @@ static void tsdbGetRocksPath(STsdb *pTsdb, char *path) { } } +static const char *myCmpName(void *state) { + (void)state; + return "myCmp"; +} + +static void myCmpDestroy(void *state) { (void)state; } + +static int myCmp(void *state, const char *a, size_t alen, const char *b, size_t blen) { + (void)state; + (void)alen; + (void)blen; + SLastKey *lhs = (SLastKey *)a; + SLastKey *rhs = (SLastKey *)b; + + if (lhs->uid < rhs->uid) { + return -1; + } else if (lhs->uid > rhs->uid) { + return 1; + } + + if (lhs->cid < rhs->cid) { + return -1; + } else if (lhs->cid > rhs->cid) { + return 1; + } + + if (lhs->ltype < rhs->ltype) { + return -1; + } else if (lhs->ltype > rhs->ltype) { + return 1; + } + + return 0; +} + static int32_t tsdbOpenRocksCache(STsdb *pTsdb) { int32_t code = 0; + rocksdb_comparator_t *cmp = rocksdb_comparator_create(NULL, myCmpDestroy, myCmp, myCmpName); + if (NULL == cmp) { + code = TSDB_CODE_OUT_OF_MEMORY; + return code; + } + + rocksdb_cache_t *cache = rocksdb_cache_create_lru(5 * 1024 * 1024); + pTsdb->rCache.blockcache = cache; + + rocksdb_block_based_table_options_t *tableoptions = rocksdb_block_based_options_create(); + pTsdb->rCache.tableoptions = tableoptions; + rocksdb_options_t *options = rocksdb_options_create(); if (NULL == options) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -72,6 +125,9 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) { } rocksdb_options_set_create_if_missing(options, 1); + rocksdb_options_set_comparator(options, cmp); + rocksdb_block_based_options_set_block_cache(tableoptions, cache); + rocksdb_options_set_block_based_table_factory(options, tableoptions); // rocksdb_options_set_inplace_update_support(options, 1); // rocksdb_options_set_allow_concurrent_memtable_write(options, 0); @@ -80,12 +136,12 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err2; } - // rocksdb_writeoptions_disable_WAL(writeoptions, 1); + rocksdb_writeoptions_disable_WAL(writeoptions, 1); rocksdb_readoptions_t *readoptions = rocksdb_readoptions_create(); if (NULL == readoptions) { code = TSDB_CODE_OUT_OF_MEMORY; - goto _err2; + goto _err3; } char *err = NULL; @@ -94,19 +150,23 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) { rocksdb_t *db = rocksdb_open(options, cachePath, &err); if (NULL == db) { - code = -1; - goto _err3; + tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err); + rocksdb_free(err); + + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err4; } rocksdb_flushoptions_t *flushoptions = rocksdb_flushoptions_create(); if (NULL == flushoptions) { code = TSDB_CODE_OUT_OF_MEMORY; - goto _err4; + goto _err5; } rocksdb_writebatch_t *writebatch = rocksdb_writebatch_create(); pTsdb->rCache.writebatch = writebatch; + pTsdb->rCache.my_comparator = cmp; pTsdb->rCache.options = options; pTsdb->rCache.writeoptions = writeoptions; pTsdb->rCache.readoptions = readoptions; @@ -115,15 +175,22 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) { taosThreadMutexInit(&pTsdb->rCache.rMutex, NULL); + pTsdb->rCache.pTSchema = NULL; + return code; +_err5: + rocksdb_close(pTsdb->rCache.db); _err4: rocksdb_readoptions_destroy(readoptions); _err3: rocksdb_writeoptions_destroy(writeoptions); _err2: rocksdb_options_destroy(options); + rocksdb_block_based_options_destroy(tableoptions); + rocksdb_cache_destroy(cache); _err: + rocksdb_comparator_destroy(cmp); return code; } @@ -134,7 +201,11 @@ static void tsdbCloseRocksCache(STsdb *pTsdb) { rocksdb_readoptions_destroy(pTsdb->rCache.readoptions); rocksdb_writeoptions_destroy(pTsdb->rCache.writeoptions); rocksdb_options_destroy(pTsdb->rCache.options); + rocksdb_block_based_options_destroy(pTsdb->rCache.tableoptions); + rocksdb_cache_destroy(pTsdb->rCache.blockcache); + rocksdb_comparator_destroy(pTsdb->rCache.my_comparator); taosThreadMutexDestroy(&pTsdb->rCache.rMutex); + taosMemoryFree(pTsdb->rCache.pTSchema); } int32_t tsdbCacheCommit(STsdb *pTsdb) { @@ -191,15 +262,15 @@ void tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size) { *size = length; } -static SLastCol *tsdbCacheLookup(STsdb *pTsdb, tb_uid_t uid, int16_t cid, char const *lstring) { +static SLastCol *tsdbCacheLookup(STsdb *pTsdb, tb_uid_t uid, int16_t cid, int8_t ltype) { SLastCol *pLastCol = NULL; - char *err = NULL; - size_t vlen = 0; - char key[ROCKS_KEY_LEN]; - size_t klen = snprintf(key, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":%s", uid, cid, lstring); - char *value = NULL; - value = rocksdb_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, key, klen, &vlen, &err); + char *err = NULL; + size_t vlen = 0; + SLastKey *key = &(SLastKey){.ltype = ltype, .uid = uid, .cid = cid}; + size_t klen = ROCKS_KEY_LEN; + char *value = NULL; + value = rocksdb_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, (char *)key, klen, &vlen, &err); if (NULL != err) { tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err); rocksdb_free(err); @@ -210,18 +281,40 @@ static SLastCol *tsdbCacheLookup(STsdb *pTsdb, tb_uid_t uid, int16_t cid, char c return pLastCol; } +static void rocksMayWrite(STsdb *pTsdb) { + rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; + + int count = rocksdb_writebatch_count(wb); + if (count >= 1024) { + char *err = NULL; + rocksdb_write(pTsdb->rCache.db, pTsdb->rCache.writeoptions, wb, &err); + if (NULL != err) { + tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err); + rocksdb_free(err); + } + + rocksdb_writebatch_clear(wb); + } +} + int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow) { int32_t code = 0; // 1, fetch schema - STSchema *pTSchema = NULL; + STSchema *pTSchema = pTsdb->rCache.pTSchema; int32_t sver = TSDBROW_SVERSION(pRow); - code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema); - if (code != TSDB_CODE_SUCCESS) { - terrno = code; - return -1; - } + if (!pTSchema || sver != pTSchema->version) { + if (pTSchema) { + taosMemoryFree(pTSchema); + } + code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + return -1; + } + pTsdb->rCache.pTSchema = pTSchema; + } // 2, iterate col values into array SArray *aColVal = taosArrayInit(32, sizeof(SColVal)); @@ -229,22 +322,6 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow tsdbRowIterOpen(&iter, pRow, pTSchema); for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal; pColVal = tsdbRowIterNext(&iter)) { - /* - if (IS_VAR_DATA_TYPE(pColVal->type)) { - uint8_t *pVal = pColVal->value.pData; - - pColVal->value.pData = NULL; - code = tRealloc(&pColVal->value.pData, pColVal->value.nData); - if (code) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } - - if (pColVal->value.nData) { - memcpy(pColVal->value.pData, pVal, pColVal->value.nData); - } - } - */ taosArrayPush(aColVal, pColVal); } @@ -254,23 +331,18 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow int num_keys = TARRAY_SIZE(aColVal); char **keys_list = taosMemoryCalloc(num_keys * 2, sizeof(char *)); size_t *keys_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t)); + char *key_list = taosMemoryMalloc(num_keys * ROCKS_KEY_LEN * 2); for (int i = 0; i < num_keys; ++i) { SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, i); int16_t cid = pColVal->cid; - char *keys = taosMemoryCalloc(2, ROCKS_KEY_LEN); - int last_key_len = snprintf(keys, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last", uid, cid); - if (last_key_len >= ROCKS_KEY_LEN) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code)); - } - int lr_key_len = snprintf(keys + ROCKS_KEY_LEN, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last_row", uid, cid); - if (lr_key_len >= ROCKS_KEY_LEN) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code)); - } - keys_list[i] = keys; - keys_list[num_keys + i] = keys + ROCKS_KEY_LEN; - keys_list_sizes[i] = last_key_len; - keys_list_sizes[num_keys + i] = lr_key_len; + memcpy(key_list + i * ROCKS_KEY_LEN, &(SLastKey){.ltype = 1, .uid = uid, .cid = cid}, ROCKS_KEY_LEN); + memcpy(key_list + i * ROCKS_KEY_LEN + num_keys * ROCKS_KEY_LEN, &(SLastKey){.ltype = 0, .uid = uid, .cid = cid}, + ROCKS_KEY_LEN); + keys_list[i] = key_list + i * ROCKS_KEY_LEN; + keys_list[num_keys + i] = key_list + i * ROCKS_KEY_LEN + num_keys * ROCKS_KEY_LEN; + keys_list_sizes[i] = ROCKS_KEY_LEN; + keys_list_sizes[num_keys + i] = ROCKS_KEY_LEN; } char **values_list = taosMemoryCalloc(num_keys * 2, sizeof(char *)); size_t *values_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t)); @@ -278,12 +350,10 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow taosThreadMutexLock(&pTsdb->rCache.rMutex); rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys * 2, (const char *const *)keys_list, keys_list_sizes, values_list, values_list_sizes, errs); - for (int i = 0; i < num_keys; ++i) { - taosMemoryFree(keys_list[i]); - } for (int i = 0; i < num_keys * 2; ++i) { rocksdb_free(errs[i]); } + taosMemoryFree(key_list); taosMemoryFree(keys_list); taosMemoryFree(keys_list_sizes); taosMemoryFree(errs); @@ -292,19 +362,6 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; for (int i = 0; i < num_keys; ++i) { SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, i); - if (COL_VAL_IS_VALUE(pColVal)) { - SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]); - - if (NULL == pLastCol || pLastCol->ts <= keyTs) { - char *value = NULL; - size_t vlen = 0; - tsdbCacheSerialize(&(SLastCol){.ts = keyTs, .colVal = *pColVal}, &value, &vlen); - char key[ROCKS_KEY_LEN]; - size_t klen = snprintf(key, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last", uid, pColVal->cid); - rocksdb_writebatch_put(wb, key, klen, value, vlen); - taosMemoryFree(value); - } - } if (!COL_VAL_IS_NONE(pColVal)) { SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i + num_keys]); @@ -313,11 +370,26 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow char *value = NULL; size_t vlen = 0; tsdbCacheSerialize(&(SLastCol){.ts = keyTs, .colVal = *pColVal}, &value, &vlen); - char key[ROCKS_KEY_LEN]; - size_t klen = snprintf(key, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last_row", uid, pColVal->cid); - rocksdb_writebatch_put(wb, key, klen, value, vlen); + SLastKey key = (SLastKey){.ltype = 0, .uid = uid, .cid = pColVal->cid}; + size_t klen = ROCKS_KEY_LEN; + rocksdb_writebatch_put(wb, (char *)&key, klen, value, vlen); taosMemoryFree(value); } + + if (COL_VAL_IS_VALUE(pColVal)) { + SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]); + + if (NULL == pLastCol || pLastCol->ts <= keyTs) { + char *value = NULL; + size_t vlen = 0; + tsdbCacheSerialize(&(SLastCol){.ts = keyTs, .colVal = *pColVal}, &value, &vlen); + SLastKey key = (SLastKey){.ltype = 1, .uid = uid, .cid = pColVal->cid}; + size_t klen = ROCKS_KEY_LEN; + + rocksdb_writebatch_put(wb, (char *)&key, klen, value, vlen); + taosMemoryFree(value); + } + } } rocksdb_free(values_list[i]); @@ -326,18 +398,12 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow taosMemoryFree(values_list); taosMemoryFree(values_list_sizes); - char *err = NULL; - rocksdb_write(pTsdb->rCache.db, pTsdb->rCache.writeoptions, wb, &err); - if (NULL != err) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err); - rocksdb_free(err); - } + rocksMayWrite(pTsdb); taosThreadMutexUnlock(&pTsdb->rCache.rMutex); - rocksdb_writebatch_clear(wb); _exit: taosArrayDestroy(aColVal); - taosMemoryFree(pTSchema); + // taosMemoryFree(pTSchema); return code; } @@ -356,38 +422,36 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols, int nCols, int16_t *slotIds); - -int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int32_t ltype) { - static char const *alstring[2] = {"last_row", "last"}; - char const *lstring = alstring[ltype]; +#if 1 +int32_t tsdbCacheGetSlow(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype) { rocksdb_writebatch_t *wb = NULL; int32_t code = 0; SArray *pCidList = pr->pCidList; int num_keys = TARRAY_SIZE(pCidList); - char **keys_list = taosMemoryCalloc(num_keys, sizeof(char *)); - size_t *keys_list_sizes = taosMemoryCalloc(num_keys, sizeof(size_t)); + + char **keys_list = taosMemoryMalloc(num_keys * sizeof(char *)); + size_t *keys_list_sizes = taosMemoryMalloc(num_keys * sizeof(size_t)); + char *key_list = taosMemoryMalloc(num_keys * ROCKS_KEY_LEN); for (int i = 0; i < num_keys; ++i) { int16_t cid = *(int16_t *)taosArrayGet(pCidList, i); - char *keys = taosMemoryCalloc(2, ROCKS_KEY_LEN); - int last_key_len = snprintf(keys, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":%s", uid, cid, lstring); - if (last_key_len >= ROCKS_KEY_LEN) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code)); - } - - keys_list[i] = keys; - keys_list_sizes[i] = last_key_len; + memcpy(key_list + i * ROCKS_KEY_LEN, &(SLastKey){.ltype = ltype, .uid = uid, .cid = cid}, ROCKS_KEY_LEN); + keys_list[i] = key_list + i * ROCKS_KEY_LEN; + keys_list_sizes[i] = ROCKS_KEY_LEN; } + char **values_list = taosMemoryCalloc(num_keys, sizeof(char *)); size_t *values_list_sizes = taosMemoryCalloc(num_keys, sizeof(size_t)); - char **errs = taosMemoryCalloc(num_keys, sizeof(char *)); + char **errs = taosMemoryMalloc(num_keys * sizeof(char *)); rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys, (const char *const *)keys_list, keys_list_sizes, values_list, values_list_sizes, errs); for (int i = 0; i < num_keys; ++i) { - taosMemoryFree(keys_list[i]); - rocksdb_free(errs[i]); + if (errs[i]) { + rocksdb_free(errs[i]); + } } + taosMemoryFree(key_list); taosMemoryFree(keys_list); taosMemoryFree(keys_list_sizes); taosMemoryFree(errs); @@ -403,7 +467,7 @@ int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsR } else { taosThreadMutexLock(&pTsdb->rCache.rMutex); - pLastCol = tsdbCacheLookup(pTsdb, uid, cid, lstring); + pLastCol = tsdbCacheLookup(pTsdb, uid, cid, ltype); if (!pLastCol) { // recalc: load from tsdb int16_t aCols[1] = {cid}; @@ -432,9 +496,10 @@ int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsR char *value = NULL; size_t vlen = 0; tsdbCacheSerialize(pLastCol, &value, &vlen); - char key[ROCKS_KEY_LEN]; - size_t klen = snprintf(key, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":%s", uid, pLastCol->colVal.cid, lstring); - rocksdb_writebatch_put(wb, key, klen, value, vlen); + + SLastKey *key = &(SLastKey){.ltype = ltype, .uid = uid, .cid = pLastCol->colVal.cid}; + size_t klen = ROCKS_KEY_LEN; + rocksdb_writebatch_put(wb, (char *)key, klen, value, vlen); taosMemoryFree(value); } else { @@ -442,21 +507,13 @@ int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsR } if (wb) { - char *err = NULL; - rocksdb_write(pTsdb->rCache.db, pTsdb->rCache.writeoptions, wb, &err); - if (NULL != err) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err); - rocksdb_free(err); - } - - rocksdb_writebatch_clear(wb); + rocksMayWrite(pTsdb); } taosThreadMutexUnlock(&pTsdb->rCache.rMutex); } taosArrayPush(pLastArray, pLastCol); - taosArrayDestroy(pTmpColArray); if (freeCol) { taosMemoryFree(pLastCol); @@ -467,6 +524,356 @@ int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsR return code; } +#endif + +static SLastCol *tsdbCacheLoadCol(STsdb *pTsdb, SCacheRowsReader *pr, int16_t slotid, tb_uid_t uid, int16_t cid, + int8_t ltype) { + SLastCol *pLastCol = tsdbCacheLookup(pTsdb, uid, cid, ltype); + if (!pLastCol) { + rocksdb_writebatch_t *wb = NULL; + + taosThreadMutexLock(&pTsdb->rCache.rMutex); + pLastCol = tsdbCacheLookup(pTsdb, uid, cid, ltype); + if (!pLastCol) { + // recalc: load from tsdb + int16_t aCols[1] = {cid}; + int16_t slotIds[1] = {slotid}; + SArray *pTmpColArray = NULL; + + if (ltype) { + mergeLastCid(uid, pTsdb, &pTmpColArray, pr, aCols, 1, slotIds); + } else { + mergeLastRowCid(uid, pTsdb, &pTmpColArray, pr, aCols, 1, slotIds); + } + + if (pTmpColArray && TARRAY_SIZE(pTmpColArray) >= 1) { + pLastCol = taosArrayGet(pTmpColArray, 0); + } + + // still null, then make up a none col value + SLastCol noneCol = {.ts = TSKEY_MIN, .colVal = COL_VAL_NONE(cid, pr->pSchema->columns[slotid].type)}; + if (!pLastCol) { + pLastCol = &noneCol; + } + + // store result back to rocks cache + wb = pTsdb->rCache.writebatch; + char *value = NULL; + size_t vlen = 0; + tsdbCacheSerialize(pLastCol, &value, &vlen); + + SLastKey *key = &(SLastKey){.ltype = ltype, .uid = uid, .cid = pLastCol->colVal.cid}; + size_t klen = ROCKS_KEY_LEN; + rocksdb_writebatch_put(wb, (char *)key, klen, value, vlen); + taosMemoryFree(value); + + SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); + *pTmpLastCol = *pLastCol; + pLastCol = pTmpLastCol; + + taosArrayDestroy(pTmpColArray); + } + + if (wb) { + rocksMayWrite(pTsdb); + } + + taosThreadMutexUnlock(&pTsdb->rCache.rMutex); + } + + return pLastCol; +} + +static void tsdbCacheDeleter(const void *key, size_t keyLen, void *value) { + SLastCol *pLastCol = (SLastCol *)value; + + // TODO: add dirty flag to SLastCol + if (pLastCol->dirty) { + // TODO: queue into dirty list, free it after save to backstore + } else { + if (IS_VAR_DATA_TYPE(pLastCol->colVal.type) && pLastCol->colVal.value.nData > 0) { + taosMemoryFree(pLastCol->colVal.value.pData); + } + + taosMemoryFree(value); + } +} + +typedef struct { + int idx; + SLastKey key; +} SIdxKey; + +static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SArray *remainCols, + SCacheRowsReader *pr, int8_t ltype) { + int32_t code = 0; + rocksdb_writebatch_t *wb = NULL; + SArray *pTmpColArray = NULL; + int num_keys = TARRAY_SIZE(remainCols); + int16_t *aCols = taosMemoryMalloc(num_keys * sizeof(int16_t)); + int16_t *slotIds = taosMemoryMalloc(num_keys * sizeof(int16_t)); + + for (int i = 0; i < num_keys; ++i) { + SIdxKey *idxKey = taosArrayGet(remainCols, i); + aCols[i] = idxKey->key.cid; + slotIds[i] = pr->pSlotIds[idxKey->idx]; + } + + if (ltype) { + mergeLastCid(uid, pTsdb, &pTmpColArray, pr, aCols, num_keys, slotIds); + } else { + mergeLastRowCid(uid, pTsdb, &pTmpColArray, pr, aCols, num_keys, slotIds); + } + + SLRUCache *pCache = pTsdb->lruCache; + for (int i = 0; i < num_keys; ++i) { + SIdxKey *idxKey = taosArrayGet(remainCols, i); + SLastCol *pLastCol = NULL; + + if (pTmpColArray && TARRAY_SIZE(pTmpColArray) >= i + 1) { + pLastCol = taosArrayGet(pTmpColArray, i); + } + + // still null, then make up a none col value + SLastCol noneCol = {.ts = TSKEY_MIN, + .colVal = COL_VAL_NONE(idxKey->key.cid, pr->pSchema->columns[slotIds[i]].type)}; + if (!pLastCol) { + pLastCol = &noneCol; + } + + SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); + *pTmpLastCol = *pLastCol; + pLastCol = pTmpLastCol; + + size_t charge = sizeof(*pLastCol); + if (IS_VAR_DATA_TYPE(pLastCol->colVal.type)) { + charge += pLastCol->colVal.value.nData; + } + + LRUStatus status = taosLRUCacheInsert(pCache, &idxKey->key, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, NULL, + TAOS_LRU_PRIORITY_LOW); + if (status != TAOS_LRU_STATUS_OK) { + code = -1; + } + + // store result back to rocks cache + wb = pTsdb->rCache.writebatch; + char *value = NULL; + size_t vlen = 0; + tsdbCacheSerialize(pLastCol, &value, &vlen); + + SLastKey *key = &idxKey->key; + size_t klen = ROCKS_KEY_LEN; + rocksdb_writebatch_put(wb, (char *)key, klen, value, vlen); + taosMemoryFree(value); + + taosArraySet(pLastArray, idxKey->idx, pLastCol); + // taosArrayRemove(remainCols, i); + } + + if (wb) { + rocksMayWrite(pTsdb); + } + + taosArrayDestroy(pTmpColArray); + + taosMemoryFree(aCols); + taosMemoryFree(slotIds); + + return code; +} + +static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SArray *remainCols, + SCacheRowsReader *pr, int8_t ltype) { + int32_t code = 0; + int num_keys = TARRAY_SIZE(remainCols); + char **keys_list = taosMemoryMalloc(num_keys * sizeof(char *)); + size_t *keys_list_sizes = taosMemoryMalloc(num_keys * sizeof(size_t)); + char *key_list = taosMemoryMalloc(num_keys * ROCKS_KEY_LEN); + for (int i = 0; i < num_keys; ++i) { + int16_t cid = *(int16_t *)taosArrayGet(remainCols, i); + + memcpy(key_list + i * ROCKS_KEY_LEN, &((SIdxKey *)taosArrayGet(remainCols, i))->key, ROCKS_KEY_LEN); + keys_list[i] = key_list + i * ROCKS_KEY_LEN; + keys_list_sizes[i] = ROCKS_KEY_LEN; + } + + char **values_list = taosMemoryCalloc(num_keys, sizeof(char *)); + size_t *values_list_sizes = taosMemoryCalloc(num_keys, sizeof(size_t)); + char **errs = taosMemoryMalloc(num_keys * sizeof(char *)); + rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys, (const char *const *)keys_list, + keys_list_sizes, values_list, values_list_sizes, errs); + for (int i = 0; i < num_keys; ++i) { + if (errs[i]) { + rocksdb_free(errs[i]); + } + } + taosMemoryFree(key_list); + taosMemoryFree(keys_list); + taosMemoryFree(keys_list_sizes); + taosMemoryFree(errs); + + SLRUCache *pCache = pTsdb->lruCache; + for (int i = 0, j = 0; i < num_keys && j < TARRAY_SIZE(remainCols); ++i) { + SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]); + SIdxKey *idxKey = taosArrayGet(remainCols, j); + int16_t cid = idxKey->key.cid; + SLastCol noneCol = {.ts = TSKEY_MIN, .colVal = COL_VAL_NONE(cid, pr->pSchema->columns[pr->pSlotIds[i]].type)}; + if (pLastCol) { + SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); + *pTmpLastCol = *pLastCol; + pLastCol = pTmpLastCol; + + reallocVarData(&pLastCol->colVal); + size_t charge = sizeof(*pLastCol); + if (IS_VAR_DATA_TYPE(pLastCol->colVal.type)) { + charge += pLastCol->colVal.value.nData; + } + + LRUStatus status = taosLRUCacheInsert(pCache, &idxKey->key, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, + NULL, TAOS_LRU_PRIORITY_LOW); + if (status != TAOS_LRU_STATUS_OK) { + code = -1; + } + + SLastCol lastCol = *pLastCol; + reallocVarData(&lastCol.colVal); + + taosArraySet(pLastArray, idxKey->idx, &lastCol); + taosArrayRemove(remainCols, j); + + taosMemoryFree(values_list[i]); + } else { + ++j; + } + } + taosMemoryFree(values_list); + taosMemoryFree(values_list_sizes); + + if (TARRAY_SIZE(remainCols) > 0) { + code = tsdbCacheLoadFromRaw(pTsdb, uid, pLastArray, remainCols, pr, ltype); + } + + return code; +} + +int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype) { + int32_t code = 0; + SLRUCache *pCache = pTsdb->lruCache; + SArray *pCidList = pr->pCidList; + int num_keys = TARRAY_SIZE(pCidList); + + SArray *remainCols = NULL; + + for (int i = 0; i < num_keys; ++i) { + int16_t cid = *(int16_t *)taosArrayGet(pCidList, i); + + SLastKey *key = &(SLastKey){.ltype = ltype, .uid = uid, .cid = cid}; + + LRUHandle *h = taosLRUCacheLookup(pCache, key, ROCKS_KEY_LEN); + if (h) { + SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h); + + SLastCol lastCol = *pLastCol; + reallocVarData(&lastCol.colVal); + taosArrayPush(pLastArray, &lastCol); + + if (h) { + taosLRUCacheRelease(pCache, h, false); + } + } else { + SLastCol noneCol = {.ts = TSKEY_MIN, .colVal = COL_VAL_NONE(cid, pr->pSchema->columns[pr->pSlotIds[i]].type)}; + + taosArrayPush(pLastArray, &noneCol); + + if (!remainCols) { + remainCols = taosArrayInit(num_keys, sizeof(SIdxKey)); + } + taosArrayPush(remainCols, &(SIdxKey){i, *key}); + } + } + + if (remainCols && TARRAY_SIZE(remainCols) > 0) { + taosThreadMutexLock(&pTsdb->lruMutex); + for (int i = 0; i < TARRAY_SIZE(remainCols);) { + SIdxKey *idxKey = taosArrayGet(remainCols, i); + LRUHandle *h = taosLRUCacheLookup(pCache, &idxKey->key, ROCKS_KEY_LEN); + if (h) { + SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h); + + SLastCol lastCol = *pLastCol; + reallocVarData(&lastCol.colVal); + + taosArraySet(pLastArray, idxKey->idx, &lastCol); + if (h) { + taosLRUCacheRelease(pCache, h, false); + } + + taosArrayRemove(remainCols, i); + } else { + ++i; + } + } + + code = tsdbCacheLoadFromRocks(pTsdb, uid, pLastArray, remainCols, pr, ltype); + + taosThreadMutexUnlock(&pTsdb->lruMutex); + } + + if (remainCols) { + taosArrayDestroy(remainCols); + } + + return code; +} + +int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype) { + int32_t code = 0; + SLRUCache *pCache = pTsdb->lruCache; + SArray *pCidList = pr->pCidList; + int num_keys = TARRAY_SIZE(pCidList); + + for (int i = 0; i < num_keys; ++i) { + SLastCol *pLastCol = NULL; + int16_t cid = *(int16_t *)taosArrayGet(pCidList, i); + + SLastKey *key = &(SLastKey){.ltype = ltype, .uid = uid, .cid = cid}; + LRUHandle *h = taosLRUCacheLookup(pCache, key, ROCKS_KEY_LEN); + if (!h) { + taosThreadMutexLock(&pTsdb->lruMutex); + h = taosLRUCacheLookup(pCache, key, ROCKS_KEY_LEN); + if (!h) { + pLastCol = tsdbCacheLoadCol(pTsdb, pr, pr->pSlotIds[i], uid, cid, ltype); + + size_t charge = sizeof(*pLastCol); + if (IS_VAR_DATA_TYPE(pLastCol->colVal.type)) { + charge += pLastCol->colVal.value.nData; + } + + LRUStatus status = taosLRUCacheInsert(pCache, key, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, &h, + TAOS_LRU_PRIORITY_LOW); + if (status != TAOS_LRU_STATUS_OK) { + code = -1; + } + } + + taosThreadMutexUnlock(&pTsdb->lruMutex); + } + + pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h); + + SLastCol lastCol = *pLastCol; + reallocVarData(&lastCol.colVal); + + if (h) { + taosLRUCacheRelease(pCache, h, false); + } + + taosArrayPush(pLastArray, &lastCol); + } + + return code; +} int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey) { int32_t code = 0; @@ -486,19 +893,15 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE for (int i = 0; i < num_keys; ++i) { int16_t cid = pTSchema->columns[i].colId; - char *keys = taosMemoryCalloc(2, ROCKS_KEY_LEN); - int last_key_len = snprintf(keys, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last", uid, cid); - if (last_key_len >= ROCKS_KEY_LEN) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code)); - } - int lr_key_len = snprintf(keys + ROCKS_KEY_LEN, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last_row", uid, cid); - if (lr_key_len >= ROCKS_KEY_LEN) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code)); - } + size_t klen = ROCKS_KEY_LEN; + char *keys = taosMemoryCalloc(2, klen); + ((SLastKey *)keys)[0] = (SLastKey){.ltype = 1, .uid = uid, .cid = cid}; + ((SLastKey *)keys)[1] = (SLastKey){.ltype = 0, .uid = uid, .cid = cid}; + keys_list[i] = keys; - keys_list[num_keys + i] = keys + ROCKS_KEY_LEN; - keys_list_sizes[i] = last_key_len; - keys_list_sizes[num_keys + i] = lr_key_len; + keys_list[num_keys + i] = keys + klen; + keys_list_sizes[i] = klen; + keys_list_sizes[num_keys + i] = klen; } char **values_list = taosMemoryCalloc(num_keys * 2, sizeof(char *)); size_t *values_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t)); @@ -520,16 +923,18 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE for (int i = 0; i < num_keys; ++i) { SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]); if (NULL != pLastCol && (pLastCol->ts <= eKey && pLastCol->ts >= sKey)) { - char key[ROCKS_KEY_LEN]; - size_t klen = snprintf(key, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last", uid, pLastCol->colVal.cid); - rocksdb_writebatch_delete(wb, key, klen); + SLastKey *key = &(SLastKey){.ltype = 1, .uid = uid, .cid = pLastCol->colVal.cid}; + size_t klen = sizeof(*key); + + rocksdb_writebatch_delete(wb, (char *)key, klen); } pLastCol = tsdbCacheDeserialize(values_list[i + num_keys]); if (NULL != pLastCol && (pLastCol->ts <= eKey && pLastCol->ts >= sKey)) { - char key[ROCKS_KEY_LEN]; - size_t klen = snprintf(key, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last_row", uid, pLastCol->colVal.cid); - rocksdb_writebatch_delete(wb, key, klen); + SLastKey *key = &(SLastKey){.ltype = 0, .uid = uid, .cid = pLastCol->colVal.cid}; + size_t klen = sizeof(*key); + + rocksdb_writebatch_delete(wb, (char *)key, klen); } rocksdb_free(values_list[i]); @@ -538,14 +943,8 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE taosMemoryFree(values_list); taosMemoryFree(values_list_sizes); - char *err = NULL; - rocksdb_write(pTsdb->rCache.db, pTsdb->rCache.writeoptions, wb, &err); - if (NULL != err) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err); - rocksdb_free(err); - } + rocksMayWrite(pTsdb); taosThreadMutexUnlock(&pTsdb->rCache.rMutex); - rocksdb_writebatch_clear(wb); _exit: taosMemoryFree(pTSchema); @@ -1111,7 +1510,7 @@ typedef struct { SMergeTree mergeTree; SMergeTree *pMergeTree; SSttBlockLoadInfo *pLoadInfo; - SLDataIter* pDataIter; + SLDataIter *pDataIter; int64_t lastTs; } SFSLastNextRowIter; @@ -1159,7 +1558,8 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow, bool *pIgnoreEa } tMergeTreeOpen(&state->mergeTree, 1, *state->pDataFReader, state->suid, state->uid, &(STimeWindow){.skey = state->lastTs, .ekey = TSKEY_MAX}, - &(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}, state->pLoadInfo, false, NULL, true, state->pDataIter); + &(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}, state->pLoadInfo, false, NULL, true, + state->pDataIter); state->pMergeTree = &state->mergeTree; state->state = SFSLASTNEXTROW_BLOCKROW; } @@ -1394,11 +1794,13 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie tBlockDataReset(state->pBlockData); TABLEID tid = {.suid = state->suid, .uid = state->uid}; int nTmpCols = nCols; - if (aCols[0] == PRIMARYKEY_TIMESTAMP_COL_ID && nCols == 1) { - nTmpCols = 0; + bool hasTs = false; + if (aCols[0] == PRIMARYKEY_TIMESTAMP_COL_ID) { + --nTmpCols; skipBlock = false; + hasTs = true; } - code = tBlockDataInit(state->pBlockData, &tid, state->pTSchema, aCols, nTmpCols); + code = tBlockDataInit(state->pBlockData, &tid, state->pTSchema, hasTs ? aCols + 1 : aCols, nTmpCols); if (code) goto _err; code = tsdbReadDataBlock(*state->pDataFReader, &block, state->pBlockData); @@ -1730,8 +2132,8 @@ typedef struct { } CacheNextRowIter; static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid, - SSttBlockLoadInfo *pLoadInfo, SLDataIter* pLDataIter, STsdbReadSnap *pReadSnap, SDataFReader **pDataFReader, - SDataFReader **pDataFReaderLast, int64_t lastTs) { + SSttBlockLoadInfo *pLoadInfo, SLDataIter *pLDataIter, STsdbReadSnap *pReadSnap, + SDataFReader **pDataFReader, SDataFReader **pDataFReaderLast, int64_t lastTs) { int code = 0; STbData *pMem = NULL; diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index 53103e9fbb..f6e37d9427 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -168,8 +168,7 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, } SVnodeCfg* pCfg = &((SVnode*)pVnode)->config; - - int32_t numOfStt = pCfg->sttTrigger; + int32_t numOfStt = pCfg->sttTrigger; p->pLoadInfo = tCreateLastBlockLoadInfo(p->pSchema, NULL, 0, numOfStt); if (p->pLoadInfo == NULL) { tsdbCacherowsReaderClose(p); @@ -203,7 +202,7 @@ void* tsdbCacherowsReaderClose(void* pReader) { taosMemoryFree(p->pSchema); } - taosMemoryFreeClear(p->pDataIter); + taosMemoryFree(p->pDataIter); taosMemoryFree(p->pCurrSchema); destroyLastBlockLoadInfo(p->pLoadInfo); @@ -294,21 +293,23 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 pr->pDataFReader = NULL; pr->pDataFReaderLast = NULL; - int32_t ltype = (pr->type & CACHESCAN_RETRIEVE_LAST) >> 3; + int8_t ltype = (pr->type & CACHESCAN_RETRIEVE_LAST) >> 3; // retrieve the only one last row of all tables in the uid list. if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_SINGLE)) { int64_t st = taosGetTimestampUs(); int64_t totalLastTs = INT64_MAX; + for (int32_t i = 0; i < pr->numOfTables; ++i) { STableKeyInfo* pKeyInfo = &pr->pTableList[i]; - tsdbCacheGet(pr->pTsdb, pKeyInfo->uid, pRow, pr, ltype); + tsdbCacheGetBatch(pr->pTsdb, pKeyInfo->uid, pRow, pr, ltype); + // tsdbCacheGet(pr->pTsdb, pKeyInfo->uid, pRow, pr, ltype); if (TARRAY_SIZE(pRow) <= 0) { taosArrayClearEx(pRow, freeItem); continue; } - SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, 0); + SLastCol* pColVal = taosArrayGet(pRow, 0); if (COL_VAL_IS_NONE(&pColVal->colVal)) { taosArrayClearEx(pRow, freeItem); continue; @@ -361,7 +362,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 } } - if (taosArrayGetSize(pTableUidList) == 0) { + if (TARRAY_SIZE(pTableUidList) == 0) { taosArrayPush(pTableUidList, &pKeyInfo->uid); } else { taosArraySet(pTableUidList, 0, &pKeyInfo->uid); @@ -375,9 +376,9 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 } } else if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_ALL)) { for (int32_t i = pr->tableIndex; i < pr->numOfTables; ++i) { - STableKeyInfo* pKeyInfo = &pr->pTableList[i]; + tb_uid_t uid = pr->pTableList[i].uid; - tsdbCacheGet(pr->pTsdb, pKeyInfo->uid, pRow, pr, ltype); + tsdbCacheGetBatch(pr->pTsdb, uid, pRow, pr, ltype); if (TARRAY_SIZE(pRow) <= 0) { taosArrayClearEx(pRow, freeItem); continue; @@ -391,9 +392,9 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 saveOneRow(pRow, pResBlock, pr, slotIds, dstSlotIds, pRes, pr->idstr); taosArrayClearEx(pRow, freeItem); - taosArrayPush(pTableUidList, &pKeyInfo->uid); + taosArrayPush(pTableUidList, &uid); - pr->tableIndex += 1; + ++pr->tableIndex; if (pResBlock->info.rows >= pResBlock->info.capacity) { goto _end; } @@ -419,5 +420,6 @@ _end: taosMemoryFree(pRes); taosArrayDestroyEx(pRow, freeItem); taosArrayDestroyEx(pLastCols, freeItem); + return code; } -- GitLab