提交 756fefaa 编写于 作者: M Minglei Jin

cache/batchread: load columns in one trip

上级 ef80b5b0
...@@ -347,12 +347,16 @@ struct STsdbFS { ...@@ -347,12 +347,16 @@ struct STsdbFS {
typedef struct { typedef struct {
rocksdb_t *db; rocksdb_t *db;
rocksdb_comparator_t *my_comparator;
rocksdb_cache_t *blockcache;
rocksdb_block_based_table_options_t *tableoptions;
rocksdb_options_t *options; rocksdb_options_t *options;
rocksdb_flushoptions_t *flushoptions; rocksdb_flushoptions_t *flushoptions;
rocksdb_writeoptions_t *writeoptions; rocksdb_writeoptions_t *writeoptions;
rocksdb_readoptions_t *readoptions; rocksdb_readoptions_t *readoptions;
rocksdb_writebatch_t *writebatch; rocksdb_writebatch_t *writebatch;
TdThreadMutex rMutex; TdThreadMutex rMutex;
STSchema *pTSchema;
} SRocksCache; } SRocksCache;
struct STsdb { struct STsdb {
...@@ -782,7 +786,7 @@ typedef struct SLDataIter { ...@@ -782,7 +786,7 @@ typedef struct SLDataIter {
#define tMergeTreeGetRow(_t) (&((_t)->pIter->rInfo.row)) #define tMergeTreeGetRow(_t) (&((_t)->pIter->rInfo.row))
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid, int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid,
STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo, STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo,
bool destroyLoadInfo, const char *idStr, bool strictTimeRange, SLDataIter* pLDataIter); bool destroyLoadInfo, const char *idStr, bool strictTimeRange, SLDataIter *pLDataIter);
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter); void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter);
bool tMergeTreeNext(SMergeTree *pMTree); bool tMergeTreeNext(SMergeTree *pMTree);
bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree); bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree);
...@@ -822,13 +826,15 @@ typedef struct SCacheRowsReader { ...@@ -822,13 +826,15 @@ typedef struct SCacheRowsReader {
typedef struct { typedef struct {
TSKEY ts; TSKEY ts;
int8_t dirty;
SColVal colVal; SColVal colVal;
} SLastCol; } SLastCol;
int32_t tsdbOpenCache(STsdb *pTsdb); int32_t tsdbOpenCache(STsdb *pTsdb);
void tsdbCloseCache(STsdb *pTsdb); void tsdbCloseCache(STsdb *pTsdb);
int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *row); int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *row);
int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int32_t ltype); int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype);
int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype);
int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey); int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey);
int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, TSDBROW *row, STsdb *pTsdb); int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, TSDBROW *row, STsdb *pTsdb);
......
...@@ -46,7 +46,13 @@ static void tsdbCloseBICache(STsdb *pTsdb) { ...@@ -46,7 +46,13 @@ static void tsdbCloseBICache(STsdb *pTsdb) {
} }
} }
#define ROCKS_KEY_LEN 64 #define ROCKS_KEY_LEN (sizeof(tb_uid_t) + sizeof(int16_t) + sizeof(int8_t))
typedef struct {
tb_uid_t uid;
int16_t cid;
int8_t ltype;
} SLastKey;
static void tsdbGetRocksPath(STsdb *pTsdb, char *path) { static void tsdbGetRocksPath(STsdb *pTsdb, char *path) {
SVnode *pVnode = pTsdb->pVnode; SVnode *pVnode = pTsdb->pVnode;
...@@ -62,9 +68,56 @@ static void tsdbGetRocksPath(STsdb *pTsdb, char *path) { ...@@ -62,9 +68,56 @@ static void tsdbGetRocksPath(STsdb *pTsdb, char *path) {
} }
} }
static const char *myCmpName(void *state) {
(void)state;
return "myCmp";
}
static void myCmpDestroy(void *state) { (void)state; }
static int myCmp(void *state, const char *a, size_t alen, const char *b, size_t blen) {
(void)state;
(void)alen;
(void)blen;
SLastKey *lhs = (SLastKey *)a;
SLastKey *rhs = (SLastKey *)b;
if (lhs->uid < rhs->uid) {
return -1;
} else if (lhs->uid > rhs->uid) {
return 1;
}
if (lhs->cid < rhs->cid) {
return -1;
} else if (lhs->cid > rhs->cid) {
return 1;
}
if (lhs->ltype < rhs->ltype) {
return -1;
} else if (lhs->ltype > rhs->ltype) {
return 1;
}
return 0;
}
static int32_t tsdbOpenRocksCache(STsdb *pTsdb) { static int32_t tsdbOpenRocksCache(STsdb *pTsdb) {
int32_t code = 0; int32_t code = 0;
rocksdb_comparator_t *cmp = rocksdb_comparator_create(NULL, myCmpDestroy, myCmp, myCmpName);
if (NULL == cmp) {
code = TSDB_CODE_OUT_OF_MEMORY;
return code;
}
rocksdb_cache_t *cache = rocksdb_cache_create_lru(5 * 1024 * 1024);
pTsdb->rCache.blockcache = cache;
rocksdb_block_based_table_options_t *tableoptions = rocksdb_block_based_options_create();
pTsdb->rCache.tableoptions = tableoptions;
rocksdb_options_t *options = rocksdb_options_create(); rocksdb_options_t *options = rocksdb_options_create();
if (NULL == options) { if (NULL == options) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
...@@ -72,6 +125,9 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) { ...@@ -72,6 +125,9 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) {
} }
rocksdb_options_set_create_if_missing(options, 1); rocksdb_options_set_create_if_missing(options, 1);
rocksdb_options_set_comparator(options, cmp);
rocksdb_block_based_options_set_block_cache(tableoptions, cache);
rocksdb_options_set_block_based_table_factory(options, tableoptions);
// rocksdb_options_set_inplace_update_support(options, 1); // rocksdb_options_set_inplace_update_support(options, 1);
// rocksdb_options_set_allow_concurrent_memtable_write(options, 0); // rocksdb_options_set_allow_concurrent_memtable_write(options, 0);
...@@ -80,12 +136,12 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) { ...@@ -80,12 +136,12 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err2; goto _err2;
} }
// rocksdb_writeoptions_disable_WAL(writeoptions, 1); rocksdb_writeoptions_disable_WAL(writeoptions, 1);
rocksdb_readoptions_t *readoptions = rocksdb_readoptions_create(); rocksdb_readoptions_t *readoptions = rocksdb_readoptions_create();
if (NULL == readoptions) { if (NULL == readoptions) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err2; goto _err3;
} }
char *err = NULL; char *err = NULL;
...@@ -94,19 +150,23 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) { ...@@ -94,19 +150,23 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) {
rocksdb_t *db = rocksdb_open(options, cachePath, &err); rocksdb_t *db = rocksdb_open(options, cachePath, &err);
if (NULL == db) { if (NULL == db) {
code = -1; tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
goto _err3; rocksdb_free(err);
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err4;
} }
rocksdb_flushoptions_t *flushoptions = rocksdb_flushoptions_create(); rocksdb_flushoptions_t *flushoptions = rocksdb_flushoptions_create();
if (NULL == flushoptions) { if (NULL == flushoptions) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err4; goto _err5;
} }
rocksdb_writebatch_t *writebatch = rocksdb_writebatch_create(); rocksdb_writebatch_t *writebatch = rocksdb_writebatch_create();
pTsdb->rCache.writebatch = writebatch; pTsdb->rCache.writebatch = writebatch;
pTsdb->rCache.my_comparator = cmp;
pTsdb->rCache.options = options; pTsdb->rCache.options = options;
pTsdb->rCache.writeoptions = writeoptions; pTsdb->rCache.writeoptions = writeoptions;
pTsdb->rCache.readoptions = readoptions; pTsdb->rCache.readoptions = readoptions;
...@@ -115,15 +175,22 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) { ...@@ -115,15 +175,22 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) {
taosThreadMutexInit(&pTsdb->rCache.rMutex, NULL); taosThreadMutexInit(&pTsdb->rCache.rMutex, NULL);
pTsdb->rCache.pTSchema = NULL;
return code; return code;
_err5:
rocksdb_close(pTsdb->rCache.db);
_err4: _err4:
rocksdb_readoptions_destroy(readoptions); rocksdb_readoptions_destroy(readoptions);
_err3: _err3:
rocksdb_writeoptions_destroy(writeoptions); rocksdb_writeoptions_destroy(writeoptions);
_err2: _err2:
rocksdb_options_destroy(options); rocksdb_options_destroy(options);
rocksdb_block_based_options_destroy(tableoptions);
rocksdb_cache_destroy(cache);
_err: _err:
rocksdb_comparator_destroy(cmp);
return code; return code;
} }
...@@ -134,7 +201,11 @@ static void tsdbCloseRocksCache(STsdb *pTsdb) { ...@@ -134,7 +201,11 @@ static void tsdbCloseRocksCache(STsdb *pTsdb) {
rocksdb_readoptions_destroy(pTsdb->rCache.readoptions); rocksdb_readoptions_destroy(pTsdb->rCache.readoptions);
rocksdb_writeoptions_destroy(pTsdb->rCache.writeoptions); rocksdb_writeoptions_destroy(pTsdb->rCache.writeoptions);
rocksdb_options_destroy(pTsdb->rCache.options); rocksdb_options_destroy(pTsdb->rCache.options);
rocksdb_block_based_options_destroy(pTsdb->rCache.tableoptions);
rocksdb_cache_destroy(pTsdb->rCache.blockcache);
rocksdb_comparator_destroy(pTsdb->rCache.my_comparator);
taosThreadMutexDestroy(&pTsdb->rCache.rMutex); taosThreadMutexDestroy(&pTsdb->rCache.rMutex);
taosMemoryFree(pTsdb->rCache.pTSchema);
} }
int32_t tsdbCacheCommit(STsdb *pTsdb) { int32_t tsdbCacheCommit(STsdb *pTsdb) {
...@@ -191,15 +262,15 @@ void tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size) { ...@@ -191,15 +262,15 @@ void tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size) {
*size = length; *size = length;
} }
static SLastCol *tsdbCacheLookup(STsdb *pTsdb, tb_uid_t uid, int16_t cid, char const *lstring) { static SLastCol *tsdbCacheLookup(STsdb *pTsdb, tb_uid_t uid, int16_t cid, int8_t ltype) {
SLastCol *pLastCol = NULL; SLastCol *pLastCol = NULL;
char *err = NULL; char *err = NULL;
size_t vlen = 0; size_t vlen = 0;
char key[ROCKS_KEY_LEN]; SLastKey *key = &(SLastKey){.ltype = ltype, .uid = uid, .cid = cid};
size_t klen = snprintf(key, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":%s", uid, cid, lstring); size_t klen = ROCKS_KEY_LEN;
char *value = NULL; char *value = NULL;
value = rocksdb_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, key, klen, &vlen, &err); value = rocksdb_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, (char *)key, klen, &vlen, &err);
if (NULL != err) { if (NULL != err) {
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err); tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
rocksdb_free(err); rocksdb_free(err);
...@@ -210,18 +281,40 @@ static SLastCol *tsdbCacheLookup(STsdb *pTsdb, tb_uid_t uid, int16_t cid, char c ...@@ -210,18 +281,40 @@ static SLastCol *tsdbCacheLookup(STsdb *pTsdb, tb_uid_t uid, int16_t cid, char c
return pLastCol; return pLastCol;
} }
static void rocksMayWrite(STsdb *pTsdb) {
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
int count = rocksdb_writebatch_count(wb);
if (count >= 1024) {
char *err = NULL;
rocksdb_write(pTsdb->rCache.db, pTsdb->rCache.writeoptions, wb, &err);
if (NULL != err) {
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
rocksdb_free(err);
}
rocksdb_writebatch_clear(wb);
}
}
int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow) { int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow) {
int32_t code = 0; int32_t code = 0;
// 1, fetch schema // 1, fetch schema
STSchema *pTSchema = NULL; STSchema *pTSchema = pTsdb->rCache.pTSchema;
int32_t sver = TSDBROW_SVERSION(pRow); int32_t sver = TSDBROW_SVERSION(pRow);
if (!pTSchema || sver != pTSchema->version) {
if (pTSchema) {
taosMemoryFree(pTSchema);
}
code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema); code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
terrno = code; terrno = code;
return -1; return -1;
} }
pTsdb->rCache.pTSchema = pTSchema;
}
// 2, iterate col values into array // 2, iterate col values into array
SArray *aColVal = taosArrayInit(32, sizeof(SColVal)); SArray *aColVal = taosArrayInit(32, sizeof(SColVal));
...@@ -229,22 +322,6 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow ...@@ -229,22 +322,6 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
tsdbRowIterOpen(&iter, pRow, pTSchema); tsdbRowIterOpen(&iter, pRow, pTSchema);
for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal; pColVal = tsdbRowIterNext(&iter)) { for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal; pColVal = tsdbRowIterNext(&iter)) {
/*
if (IS_VAR_DATA_TYPE(pColVal->type)) {
uint8_t *pVal = pColVal->value.pData;
pColVal->value.pData = NULL;
code = tRealloc(&pColVal->value.pData, pColVal->value.nData);
if (code) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
if (pColVal->value.nData) {
memcpy(pColVal->value.pData, pVal, pColVal->value.nData);
}
}
*/
taosArrayPush(aColVal, pColVal); taosArrayPush(aColVal, pColVal);
} }
...@@ -254,23 +331,18 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow ...@@ -254,23 +331,18 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
int num_keys = TARRAY_SIZE(aColVal); int num_keys = TARRAY_SIZE(aColVal);
char **keys_list = taosMemoryCalloc(num_keys * 2, sizeof(char *)); char **keys_list = taosMemoryCalloc(num_keys * 2, sizeof(char *));
size_t *keys_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t)); size_t *keys_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t));
char *key_list = taosMemoryMalloc(num_keys * ROCKS_KEY_LEN * 2);
for (int i = 0; i < num_keys; ++i) { for (int i = 0; i < num_keys; ++i) {
SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, i); SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, i);
int16_t cid = pColVal->cid; int16_t cid = pColVal->cid;
char *keys = taosMemoryCalloc(2, ROCKS_KEY_LEN); memcpy(key_list + i * ROCKS_KEY_LEN, &(SLastKey){.ltype = 1, .uid = uid, .cid = cid}, ROCKS_KEY_LEN);
int last_key_len = snprintf(keys, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last", uid, cid); memcpy(key_list + i * ROCKS_KEY_LEN + num_keys * ROCKS_KEY_LEN, &(SLastKey){.ltype = 0, .uid = uid, .cid = cid},
if (last_key_len >= ROCKS_KEY_LEN) { ROCKS_KEY_LEN);
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code)); keys_list[i] = key_list + i * ROCKS_KEY_LEN;
} keys_list[num_keys + i] = key_list + i * ROCKS_KEY_LEN + num_keys * ROCKS_KEY_LEN;
int lr_key_len = snprintf(keys + ROCKS_KEY_LEN, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last_row", uid, cid); keys_list_sizes[i] = ROCKS_KEY_LEN;
if (lr_key_len >= ROCKS_KEY_LEN) { keys_list_sizes[num_keys + i] = ROCKS_KEY_LEN;
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
}
keys_list[i] = keys;
keys_list[num_keys + i] = keys + ROCKS_KEY_LEN;
keys_list_sizes[i] = last_key_len;
keys_list_sizes[num_keys + i] = lr_key_len;
} }
char **values_list = taosMemoryCalloc(num_keys * 2, sizeof(char *)); char **values_list = taosMemoryCalloc(num_keys * 2, sizeof(char *));
size_t *values_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t)); size_t *values_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t));
...@@ -278,12 +350,10 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow ...@@ -278,12 +350,10 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
taosThreadMutexLock(&pTsdb->rCache.rMutex); taosThreadMutexLock(&pTsdb->rCache.rMutex);
rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys * 2, (const char *const *)keys_list, rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys * 2, (const char *const *)keys_list,
keys_list_sizes, values_list, values_list_sizes, errs); keys_list_sizes, values_list, values_list_sizes, errs);
for (int i = 0; i < num_keys; ++i) {
taosMemoryFree(keys_list[i]);
}
for (int i = 0; i < num_keys * 2; ++i) { for (int i = 0; i < num_keys * 2; ++i) {
rocksdb_free(errs[i]); rocksdb_free(errs[i]);
} }
taosMemoryFree(key_list);
taosMemoryFree(keys_list); taosMemoryFree(keys_list);
taosMemoryFree(keys_list_sizes); taosMemoryFree(keys_list_sizes);
taosMemoryFree(errs); taosMemoryFree(errs);
...@@ -292,33 +362,35 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow ...@@ -292,33 +362,35 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
for (int i = 0; i < num_keys; ++i) { for (int i = 0; i < num_keys; ++i) {
SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, i); SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, i);
if (COL_VAL_IS_VALUE(pColVal)) {
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]); if (!COL_VAL_IS_NONE(pColVal)) {
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i + num_keys]);
if (NULL == pLastCol || pLastCol->ts <= keyTs) { if (NULL == pLastCol || pLastCol->ts <= keyTs) {
char *value = NULL; char *value = NULL;
size_t vlen = 0; size_t vlen = 0;
tsdbCacheSerialize(&(SLastCol){.ts = keyTs, .colVal = *pColVal}, &value, &vlen); tsdbCacheSerialize(&(SLastCol){.ts = keyTs, .colVal = *pColVal}, &value, &vlen);
char key[ROCKS_KEY_LEN]; SLastKey key = (SLastKey){.ltype = 0, .uid = uid, .cid = pColVal->cid};
size_t klen = snprintf(key, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last", uid, pColVal->cid); size_t klen = ROCKS_KEY_LEN;
rocksdb_writebatch_put(wb, key, klen, value, vlen); rocksdb_writebatch_put(wb, (char *)&key, klen, value, vlen);
taosMemoryFree(value); taosMemoryFree(value);
} }
}
if (!COL_VAL_IS_NONE(pColVal)) { if (COL_VAL_IS_VALUE(pColVal)) {
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i + num_keys]); SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]);
if (NULL == pLastCol || pLastCol->ts <= keyTs) { if (NULL == pLastCol || pLastCol->ts <= keyTs) {
char *value = NULL; char *value = NULL;
size_t vlen = 0; size_t vlen = 0;
tsdbCacheSerialize(&(SLastCol){.ts = keyTs, .colVal = *pColVal}, &value, &vlen); tsdbCacheSerialize(&(SLastCol){.ts = keyTs, .colVal = *pColVal}, &value, &vlen);
char key[ROCKS_KEY_LEN]; SLastKey key = (SLastKey){.ltype = 1, .uid = uid, .cid = pColVal->cid};
size_t klen = snprintf(key, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last_row", uid, pColVal->cid); size_t klen = ROCKS_KEY_LEN;
rocksdb_writebatch_put(wb, key, klen, value, vlen);
rocksdb_writebatch_put(wb, (char *)&key, klen, value, vlen);
taosMemoryFree(value); taosMemoryFree(value);
} }
} }
}
rocksdb_free(values_list[i]); rocksdb_free(values_list[i]);
rocksdb_free(values_list[i + num_keys]); rocksdb_free(values_list[i + num_keys]);
...@@ -326,18 +398,12 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow ...@@ -326,18 +398,12 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
taosMemoryFree(values_list); taosMemoryFree(values_list);
taosMemoryFree(values_list_sizes); taosMemoryFree(values_list_sizes);
char *err = NULL; rocksMayWrite(pTsdb);
rocksdb_write(pTsdb->rCache.db, pTsdb->rCache.writeoptions, wb, &err);
if (NULL != err) {
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
rocksdb_free(err);
}
taosThreadMutexUnlock(&pTsdb->rCache.rMutex); taosThreadMutexUnlock(&pTsdb->rCache.rMutex);
rocksdb_writebatch_clear(wb);
_exit: _exit:
taosArrayDestroy(aColVal); taosArrayDestroy(aColVal);
taosMemoryFree(pTSchema); // taosMemoryFree(pTSchema);
return code; return code;
} }
...@@ -356,38 +422,36 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC ...@@ -356,38 +422,36 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC
static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols, static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
int nCols, int16_t *slotIds); int nCols, int16_t *slotIds);
#if 1
int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int32_t ltype) { int32_t tsdbCacheGetSlow(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype) {
static char const *alstring[2] = {"last_row", "last"};
char const *lstring = alstring[ltype];
rocksdb_writebatch_t *wb = NULL; rocksdb_writebatch_t *wb = NULL;
int32_t code = 0; int32_t code = 0;
SArray *pCidList = pr->pCidList; SArray *pCidList = pr->pCidList;
int num_keys = TARRAY_SIZE(pCidList); int num_keys = TARRAY_SIZE(pCidList);
char **keys_list = taosMemoryCalloc(num_keys, sizeof(char *));
size_t *keys_list_sizes = taosMemoryCalloc(num_keys, sizeof(size_t)); char **keys_list = taosMemoryMalloc(num_keys * sizeof(char *));
size_t *keys_list_sizes = taosMemoryMalloc(num_keys * sizeof(size_t));
char *key_list = taosMemoryMalloc(num_keys * ROCKS_KEY_LEN);
for (int i = 0; i < num_keys; ++i) { for (int i = 0; i < num_keys; ++i) {
int16_t cid = *(int16_t *)taosArrayGet(pCidList, i); int16_t cid = *(int16_t *)taosArrayGet(pCidList, i);
char *keys = taosMemoryCalloc(2, ROCKS_KEY_LEN); memcpy(key_list + i * ROCKS_KEY_LEN, &(SLastKey){.ltype = ltype, .uid = uid, .cid = cid}, ROCKS_KEY_LEN);
int last_key_len = snprintf(keys, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":%s", uid, cid, lstring); keys_list[i] = key_list + i * ROCKS_KEY_LEN;
if (last_key_len >= ROCKS_KEY_LEN) { keys_list_sizes[i] = ROCKS_KEY_LEN;
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
} }
keys_list[i] = keys;
keys_list_sizes[i] = last_key_len;
}
char **values_list = taosMemoryCalloc(num_keys, sizeof(char *)); char **values_list = taosMemoryCalloc(num_keys, sizeof(char *));
size_t *values_list_sizes = taosMemoryCalloc(num_keys, sizeof(size_t)); size_t *values_list_sizes = taosMemoryCalloc(num_keys, sizeof(size_t));
char **errs = taosMemoryCalloc(num_keys, sizeof(char *)); char **errs = taosMemoryMalloc(num_keys * sizeof(char *));
rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys, (const char *const *)keys_list, rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys, (const char *const *)keys_list,
keys_list_sizes, values_list, values_list_sizes, errs); keys_list_sizes, values_list, values_list_sizes, errs);
for (int i = 0; i < num_keys; ++i) { for (int i = 0; i < num_keys; ++i) {
taosMemoryFree(keys_list[i]); if (errs[i]) {
rocksdb_free(errs[i]); rocksdb_free(errs[i]);
} }
}
taosMemoryFree(key_list);
taosMemoryFree(keys_list); taosMemoryFree(keys_list);
taosMemoryFree(keys_list_sizes); taosMemoryFree(keys_list_sizes);
taosMemoryFree(errs); taosMemoryFree(errs);
...@@ -403,7 +467,7 @@ int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsR ...@@ -403,7 +467,7 @@ int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsR
} else { } else {
taosThreadMutexLock(&pTsdb->rCache.rMutex); taosThreadMutexLock(&pTsdb->rCache.rMutex);
pLastCol = tsdbCacheLookup(pTsdb, uid, cid, lstring); pLastCol = tsdbCacheLookup(pTsdb, uid, cid, ltype);
if (!pLastCol) { if (!pLastCol) {
// recalc: load from tsdb // recalc: load from tsdb
int16_t aCols[1] = {cid}; int16_t aCols[1] = {cid};
...@@ -432,9 +496,10 @@ int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsR ...@@ -432,9 +496,10 @@ int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsR
char *value = NULL; char *value = NULL;
size_t vlen = 0; size_t vlen = 0;
tsdbCacheSerialize(pLastCol, &value, &vlen); tsdbCacheSerialize(pLastCol, &value, &vlen);
char key[ROCKS_KEY_LEN];
size_t klen = snprintf(key, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":%s", uid, pLastCol->colVal.cid, lstring); SLastKey *key = &(SLastKey){.ltype = ltype, .uid = uid, .cid = pLastCol->colVal.cid};
rocksdb_writebatch_put(wb, key, klen, value, vlen); size_t klen = ROCKS_KEY_LEN;
rocksdb_writebatch_put(wb, (char *)key, klen, value, vlen);
taosMemoryFree(value); taosMemoryFree(value);
} else { } else {
...@@ -442,21 +507,13 @@ int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsR ...@@ -442,21 +507,13 @@ int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsR
} }
if (wb) { if (wb) {
char *err = NULL; rocksMayWrite(pTsdb);
rocksdb_write(pTsdb->rCache.db, pTsdb->rCache.writeoptions, wb, &err);
if (NULL != err) {
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
rocksdb_free(err);
}
rocksdb_writebatch_clear(wb);
} }
taosThreadMutexUnlock(&pTsdb->rCache.rMutex); taosThreadMutexUnlock(&pTsdb->rCache.rMutex);
} }
taosArrayPush(pLastArray, pLastCol); taosArrayPush(pLastArray, pLastCol);
taosArrayDestroy(pTmpColArray); taosArrayDestroy(pTmpColArray);
if (freeCol) { if (freeCol) {
taosMemoryFree(pLastCol); taosMemoryFree(pLastCol);
...@@ -467,6 +524,356 @@ int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsR ...@@ -467,6 +524,356 @@ int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsR
return code; return code;
} }
#endif
static SLastCol *tsdbCacheLoadCol(STsdb *pTsdb, SCacheRowsReader *pr, int16_t slotid, tb_uid_t uid, int16_t cid,
int8_t ltype) {
SLastCol *pLastCol = tsdbCacheLookup(pTsdb, uid, cid, ltype);
if (!pLastCol) {
rocksdb_writebatch_t *wb = NULL;
taosThreadMutexLock(&pTsdb->rCache.rMutex);
pLastCol = tsdbCacheLookup(pTsdb, uid, cid, ltype);
if (!pLastCol) {
// recalc: load from tsdb
int16_t aCols[1] = {cid};
int16_t slotIds[1] = {slotid};
SArray *pTmpColArray = NULL;
if (ltype) {
mergeLastCid(uid, pTsdb, &pTmpColArray, pr, aCols, 1, slotIds);
} else {
mergeLastRowCid(uid, pTsdb, &pTmpColArray, pr, aCols, 1, slotIds);
}
if (pTmpColArray && TARRAY_SIZE(pTmpColArray) >= 1) {
pLastCol = taosArrayGet(pTmpColArray, 0);
}
// still null, then make up a none col value
SLastCol noneCol = {.ts = TSKEY_MIN, .colVal = COL_VAL_NONE(cid, pr->pSchema->columns[slotid].type)};
if (!pLastCol) {
pLastCol = &noneCol;
}
// store result back to rocks cache
wb = pTsdb->rCache.writebatch;
char *value = NULL;
size_t vlen = 0;
tsdbCacheSerialize(pLastCol, &value, &vlen);
SLastKey *key = &(SLastKey){.ltype = ltype, .uid = uid, .cid = pLastCol->colVal.cid};
size_t klen = ROCKS_KEY_LEN;
rocksdb_writebatch_put(wb, (char *)key, klen, value, vlen);
taosMemoryFree(value);
SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
*pTmpLastCol = *pLastCol;
pLastCol = pTmpLastCol;
taosArrayDestroy(pTmpColArray);
}
if (wb) {
rocksMayWrite(pTsdb);
}
taosThreadMutexUnlock(&pTsdb->rCache.rMutex);
}
return pLastCol;
}
static void tsdbCacheDeleter(const void *key, size_t keyLen, void *value) {
SLastCol *pLastCol = (SLastCol *)value;
// TODO: add dirty flag to SLastCol
if (pLastCol->dirty) {
// TODO: queue into dirty list, free it after save to backstore
} else {
if (IS_VAR_DATA_TYPE(pLastCol->colVal.type) && pLastCol->colVal.value.nData > 0) {
taosMemoryFree(pLastCol->colVal.value.pData);
}
taosMemoryFree(value);
}
}
typedef struct {
int idx;
SLastKey key;
} SIdxKey;
static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SArray *remainCols,
SCacheRowsReader *pr, int8_t ltype) {
int32_t code = 0;
rocksdb_writebatch_t *wb = NULL;
SArray *pTmpColArray = NULL;
int num_keys = TARRAY_SIZE(remainCols);
int16_t *aCols = taosMemoryMalloc(num_keys * sizeof(int16_t));
int16_t *slotIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
for (int i = 0; i < num_keys; ++i) {
SIdxKey *idxKey = taosArrayGet(remainCols, i);
aCols[i] = idxKey->key.cid;
slotIds[i] = pr->pSlotIds[idxKey->idx];
}
if (ltype) {
mergeLastCid(uid, pTsdb, &pTmpColArray, pr, aCols, num_keys, slotIds);
} else {
mergeLastRowCid(uid, pTsdb, &pTmpColArray, pr, aCols, num_keys, slotIds);
}
SLRUCache *pCache = pTsdb->lruCache;
for (int i = 0; i < num_keys; ++i) {
SIdxKey *idxKey = taosArrayGet(remainCols, i);
SLastCol *pLastCol = NULL;
if (pTmpColArray && TARRAY_SIZE(pTmpColArray) >= i + 1) {
pLastCol = taosArrayGet(pTmpColArray, i);
}
// still null, then make up a none col value
SLastCol noneCol = {.ts = TSKEY_MIN,
.colVal = COL_VAL_NONE(idxKey->key.cid, pr->pSchema->columns[slotIds[i]].type)};
if (!pLastCol) {
pLastCol = &noneCol;
}
SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
*pTmpLastCol = *pLastCol;
pLastCol = pTmpLastCol;
size_t charge = sizeof(*pLastCol);
if (IS_VAR_DATA_TYPE(pLastCol->colVal.type)) {
charge += pLastCol->colVal.value.nData;
}
LRUStatus status = taosLRUCacheInsert(pCache, &idxKey->key, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, NULL,
TAOS_LRU_PRIORITY_LOW);
if (status != TAOS_LRU_STATUS_OK) {
code = -1;
}
// store result back to rocks cache
wb = pTsdb->rCache.writebatch;
char *value = NULL;
size_t vlen = 0;
tsdbCacheSerialize(pLastCol, &value, &vlen);
SLastKey *key = &idxKey->key;
size_t klen = ROCKS_KEY_LEN;
rocksdb_writebatch_put(wb, (char *)key, klen, value, vlen);
taosMemoryFree(value);
taosArraySet(pLastArray, idxKey->idx, pLastCol);
// taosArrayRemove(remainCols, i);
}
if (wb) {
rocksMayWrite(pTsdb);
}
taosArrayDestroy(pTmpColArray);
taosMemoryFree(aCols);
taosMemoryFree(slotIds);
return code;
}
static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SArray *remainCols,
SCacheRowsReader *pr, int8_t ltype) {
int32_t code = 0;
int num_keys = TARRAY_SIZE(remainCols);
char **keys_list = taosMemoryMalloc(num_keys * sizeof(char *));
size_t *keys_list_sizes = taosMemoryMalloc(num_keys * sizeof(size_t));
char *key_list = taosMemoryMalloc(num_keys * ROCKS_KEY_LEN);
for (int i = 0; i < num_keys; ++i) {
int16_t cid = *(int16_t *)taosArrayGet(remainCols, i);
memcpy(key_list + i * ROCKS_KEY_LEN, &((SIdxKey *)taosArrayGet(remainCols, i))->key, ROCKS_KEY_LEN);
keys_list[i] = key_list + i * ROCKS_KEY_LEN;
keys_list_sizes[i] = ROCKS_KEY_LEN;
}
char **values_list = taosMemoryCalloc(num_keys, sizeof(char *));
size_t *values_list_sizes = taosMemoryCalloc(num_keys, sizeof(size_t));
char **errs = taosMemoryMalloc(num_keys * sizeof(char *));
rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys, (const char *const *)keys_list,
keys_list_sizes, values_list, values_list_sizes, errs);
for (int i = 0; i < num_keys; ++i) {
if (errs[i]) {
rocksdb_free(errs[i]);
}
}
taosMemoryFree(key_list);
taosMemoryFree(keys_list);
taosMemoryFree(keys_list_sizes);
taosMemoryFree(errs);
SLRUCache *pCache = pTsdb->lruCache;
for (int i = 0, j = 0; i < num_keys && j < TARRAY_SIZE(remainCols); ++i) {
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]);
SIdxKey *idxKey = taosArrayGet(remainCols, j);
int16_t cid = idxKey->key.cid;
SLastCol noneCol = {.ts = TSKEY_MIN, .colVal = COL_VAL_NONE(cid, pr->pSchema->columns[pr->pSlotIds[i]].type)};
if (pLastCol) {
SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
*pTmpLastCol = *pLastCol;
pLastCol = pTmpLastCol;
reallocVarData(&pLastCol->colVal);
size_t charge = sizeof(*pLastCol);
if (IS_VAR_DATA_TYPE(pLastCol->colVal.type)) {
charge += pLastCol->colVal.value.nData;
}
LRUStatus status = taosLRUCacheInsert(pCache, &idxKey->key, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter,
NULL, TAOS_LRU_PRIORITY_LOW);
if (status != TAOS_LRU_STATUS_OK) {
code = -1;
}
SLastCol lastCol = *pLastCol;
reallocVarData(&lastCol.colVal);
taosArraySet(pLastArray, idxKey->idx, &lastCol);
taosArrayRemove(remainCols, j);
taosMemoryFree(values_list[i]);
} else {
++j;
}
}
taosMemoryFree(values_list);
taosMemoryFree(values_list_sizes);
if (TARRAY_SIZE(remainCols) > 0) {
code = tsdbCacheLoadFromRaw(pTsdb, uid, pLastArray, remainCols, pr, ltype);
}
return code;
}
int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype) {
int32_t code = 0;
SLRUCache *pCache = pTsdb->lruCache;
SArray *pCidList = pr->pCidList;
int num_keys = TARRAY_SIZE(pCidList);
SArray *remainCols = NULL;
for (int i = 0; i < num_keys; ++i) {
int16_t cid = *(int16_t *)taosArrayGet(pCidList, i);
SLastKey *key = &(SLastKey){.ltype = ltype, .uid = uid, .cid = cid};
LRUHandle *h = taosLRUCacheLookup(pCache, key, ROCKS_KEY_LEN);
if (h) {
SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h);
SLastCol lastCol = *pLastCol;
reallocVarData(&lastCol.colVal);
taosArrayPush(pLastArray, &lastCol);
if (h) {
taosLRUCacheRelease(pCache, h, false);
}
} else {
SLastCol noneCol = {.ts = TSKEY_MIN, .colVal = COL_VAL_NONE(cid, pr->pSchema->columns[pr->pSlotIds[i]].type)};
taosArrayPush(pLastArray, &noneCol);
if (!remainCols) {
remainCols = taosArrayInit(num_keys, sizeof(SIdxKey));
}
taosArrayPush(remainCols, &(SIdxKey){i, *key});
}
}
if (remainCols && TARRAY_SIZE(remainCols) > 0) {
taosThreadMutexLock(&pTsdb->lruMutex);
for (int i = 0; i < TARRAY_SIZE(remainCols);) {
SIdxKey *idxKey = taosArrayGet(remainCols, i);
LRUHandle *h = taosLRUCacheLookup(pCache, &idxKey->key, ROCKS_KEY_LEN);
if (h) {
SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h);
SLastCol lastCol = *pLastCol;
reallocVarData(&lastCol.colVal);
taosArraySet(pLastArray, idxKey->idx, &lastCol);
if (h) {
taosLRUCacheRelease(pCache, h, false);
}
taosArrayRemove(remainCols, i);
} else {
++i;
}
}
code = tsdbCacheLoadFromRocks(pTsdb, uid, pLastArray, remainCols, pr, ltype);
taosThreadMutexUnlock(&pTsdb->lruMutex);
}
if (remainCols) {
taosArrayDestroy(remainCols);
}
return code;
}
int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype) {
int32_t code = 0;
SLRUCache *pCache = pTsdb->lruCache;
SArray *pCidList = pr->pCidList;
int num_keys = TARRAY_SIZE(pCidList);
for (int i = 0; i < num_keys; ++i) {
SLastCol *pLastCol = NULL;
int16_t cid = *(int16_t *)taosArrayGet(pCidList, i);
SLastKey *key = &(SLastKey){.ltype = ltype, .uid = uid, .cid = cid};
LRUHandle *h = taosLRUCacheLookup(pCache, key, ROCKS_KEY_LEN);
if (!h) {
taosThreadMutexLock(&pTsdb->lruMutex);
h = taosLRUCacheLookup(pCache, key, ROCKS_KEY_LEN);
if (!h) {
pLastCol = tsdbCacheLoadCol(pTsdb, pr, pr->pSlotIds[i], uid, cid, ltype);
size_t charge = sizeof(*pLastCol);
if (IS_VAR_DATA_TYPE(pLastCol->colVal.type)) {
charge += pLastCol->colVal.value.nData;
}
LRUStatus status = taosLRUCacheInsert(pCache, key, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, &h,
TAOS_LRU_PRIORITY_LOW);
if (status != TAOS_LRU_STATUS_OK) {
code = -1;
}
}
taosThreadMutexUnlock(&pTsdb->lruMutex);
}
pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h);
SLastCol lastCol = *pLastCol;
reallocVarData(&lastCol.colVal);
if (h) {
taosLRUCacheRelease(pCache, h, false);
}
taosArrayPush(pLastArray, &lastCol);
}
return code;
}
int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey) { int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey) {
int32_t code = 0; int32_t code = 0;
...@@ -486,19 +893,15 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE ...@@ -486,19 +893,15 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
for (int i = 0; i < num_keys; ++i) { for (int i = 0; i < num_keys; ++i) {
int16_t cid = pTSchema->columns[i].colId; int16_t cid = pTSchema->columns[i].colId;
char *keys = taosMemoryCalloc(2, ROCKS_KEY_LEN); size_t klen = ROCKS_KEY_LEN;
int last_key_len = snprintf(keys, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last", uid, cid); char *keys = taosMemoryCalloc(2, klen);
if (last_key_len >= ROCKS_KEY_LEN) { ((SLastKey *)keys)[0] = (SLastKey){.ltype = 1, .uid = uid, .cid = cid};
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code)); ((SLastKey *)keys)[1] = (SLastKey){.ltype = 0, .uid = uid, .cid = cid};
}
int lr_key_len = snprintf(keys + ROCKS_KEY_LEN, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last_row", uid, cid);
if (lr_key_len >= ROCKS_KEY_LEN) {
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
}
keys_list[i] = keys; keys_list[i] = keys;
keys_list[num_keys + i] = keys + ROCKS_KEY_LEN; keys_list[num_keys + i] = keys + klen;
keys_list_sizes[i] = last_key_len; keys_list_sizes[i] = klen;
keys_list_sizes[num_keys + i] = lr_key_len; keys_list_sizes[num_keys + i] = klen;
} }
char **values_list = taosMemoryCalloc(num_keys * 2, sizeof(char *)); char **values_list = taosMemoryCalloc(num_keys * 2, sizeof(char *));
size_t *values_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t)); size_t *values_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t));
...@@ -520,16 +923,18 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE ...@@ -520,16 +923,18 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
for (int i = 0; i < num_keys; ++i) { for (int i = 0; i < num_keys; ++i) {
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]); SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]);
if (NULL != pLastCol && (pLastCol->ts <= eKey && pLastCol->ts >= sKey)) { if (NULL != pLastCol && (pLastCol->ts <= eKey && pLastCol->ts >= sKey)) {
char key[ROCKS_KEY_LEN]; SLastKey *key = &(SLastKey){.ltype = 1, .uid = uid, .cid = pLastCol->colVal.cid};
size_t klen = snprintf(key, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last", uid, pLastCol->colVal.cid); size_t klen = sizeof(*key);
rocksdb_writebatch_delete(wb, key, klen);
rocksdb_writebatch_delete(wb, (char *)key, klen);
} }
pLastCol = tsdbCacheDeserialize(values_list[i + num_keys]); pLastCol = tsdbCacheDeserialize(values_list[i + num_keys]);
if (NULL != pLastCol && (pLastCol->ts <= eKey && pLastCol->ts >= sKey)) { if (NULL != pLastCol && (pLastCol->ts <= eKey && pLastCol->ts >= sKey)) {
char key[ROCKS_KEY_LEN]; SLastKey *key = &(SLastKey){.ltype = 0, .uid = uid, .cid = pLastCol->colVal.cid};
size_t klen = snprintf(key, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last_row", uid, pLastCol->colVal.cid); size_t klen = sizeof(*key);
rocksdb_writebatch_delete(wb, key, klen);
rocksdb_writebatch_delete(wb, (char *)key, klen);
} }
rocksdb_free(values_list[i]); rocksdb_free(values_list[i]);
...@@ -538,14 +943,8 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE ...@@ -538,14 +943,8 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
taosMemoryFree(values_list); taosMemoryFree(values_list);
taosMemoryFree(values_list_sizes); taosMemoryFree(values_list_sizes);
char *err = NULL; rocksMayWrite(pTsdb);
rocksdb_write(pTsdb->rCache.db, pTsdb->rCache.writeoptions, wb, &err);
if (NULL != err) {
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
rocksdb_free(err);
}
taosThreadMutexUnlock(&pTsdb->rCache.rMutex); taosThreadMutexUnlock(&pTsdb->rCache.rMutex);
rocksdb_writebatch_clear(wb);
_exit: _exit:
taosMemoryFree(pTSchema); taosMemoryFree(pTSchema);
...@@ -1111,7 +1510,7 @@ typedef struct { ...@@ -1111,7 +1510,7 @@ typedef struct {
SMergeTree mergeTree; SMergeTree mergeTree;
SMergeTree *pMergeTree; SMergeTree *pMergeTree;
SSttBlockLoadInfo *pLoadInfo; SSttBlockLoadInfo *pLoadInfo;
SLDataIter* pDataIter; SLDataIter *pDataIter;
int64_t lastTs; int64_t lastTs;
} SFSLastNextRowIter; } SFSLastNextRowIter;
...@@ -1159,7 +1558,8 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow, bool *pIgnoreEa ...@@ -1159,7 +1558,8 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow, bool *pIgnoreEa
} }
tMergeTreeOpen(&state->mergeTree, 1, *state->pDataFReader, state->suid, state->uid, tMergeTreeOpen(&state->mergeTree, 1, *state->pDataFReader, state->suid, state->uid,
&(STimeWindow){.skey = state->lastTs, .ekey = TSKEY_MAX}, &(STimeWindow){.skey = state->lastTs, .ekey = TSKEY_MAX},
&(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}, state->pLoadInfo, false, NULL, true, state->pDataIter); &(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}, state->pLoadInfo, false, NULL, true,
state->pDataIter);
state->pMergeTree = &state->mergeTree; state->pMergeTree = &state->mergeTree;
state->state = SFSLASTNEXTROW_BLOCKROW; state->state = SFSLASTNEXTROW_BLOCKROW;
} }
...@@ -1394,11 +1794,13 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie ...@@ -1394,11 +1794,13 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
tBlockDataReset(state->pBlockData); tBlockDataReset(state->pBlockData);
TABLEID tid = {.suid = state->suid, .uid = state->uid}; TABLEID tid = {.suid = state->suid, .uid = state->uid};
int nTmpCols = nCols; int nTmpCols = nCols;
if (aCols[0] == PRIMARYKEY_TIMESTAMP_COL_ID && nCols == 1) { bool hasTs = false;
nTmpCols = 0; if (aCols[0] == PRIMARYKEY_TIMESTAMP_COL_ID) {
--nTmpCols;
skipBlock = false; skipBlock = false;
hasTs = true;
} }
code = tBlockDataInit(state->pBlockData, &tid, state->pTSchema, aCols, nTmpCols); code = tBlockDataInit(state->pBlockData, &tid, state->pTSchema, hasTs ? aCols + 1 : aCols, nTmpCols);
if (code) goto _err; if (code) goto _err;
code = tsdbReadDataBlock(*state->pDataFReader, &block, state->pBlockData); code = tsdbReadDataBlock(*state->pDataFReader, &block, state->pBlockData);
...@@ -1730,8 +2132,8 @@ typedef struct { ...@@ -1730,8 +2132,8 @@ typedef struct {
} CacheNextRowIter; } CacheNextRowIter;
static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid, static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid,
SSttBlockLoadInfo *pLoadInfo, SLDataIter* pLDataIter, STsdbReadSnap *pReadSnap, SDataFReader **pDataFReader, SSttBlockLoadInfo *pLoadInfo, SLDataIter *pLDataIter, STsdbReadSnap *pReadSnap,
SDataFReader **pDataFReaderLast, int64_t lastTs) { SDataFReader **pDataFReader, SDataFReader **pDataFReaderLast, int64_t lastTs) {
int code = 0; int code = 0;
STbData *pMem = NULL; STbData *pMem = NULL;
......
...@@ -168,7 +168,6 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, ...@@ -168,7 +168,6 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList,
} }
SVnodeCfg* pCfg = &((SVnode*)pVnode)->config; SVnodeCfg* pCfg = &((SVnode*)pVnode)->config;
int32_t numOfStt = pCfg->sttTrigger; int32_t numOfStt = pCfg->sttTrigger;
p->pLoadInfo = tCreateLastBlockLoadInfo(p->pSchema, NULL, 0, numOfStt); p->pLoadInfo = tCreateLastBlockLoadInfo(p->pSchema, NULL, 0, numOfStt);
if (p->pLoadInfo == NULL) { if (p->pLoadInfo == NULL) {
...@@ -203,7 +202,7 @@ void* tsdbCacherowsReaderClose(void* pReader) { ...@@ -203,7 +202,7 @@ void* tsdbCacherowsReaderClose(void* pReader) {
taosMemoryFree(p->pSchema); taosMemoryFree(p->pSchema);
} }
taosMemoryFreeClear(p->pDataIter); taosMemoryFree(p->pDataIter);
taosMemoryFree(p->pCurrSchema); taosMemoryFree(p->pCurrSchema);
destroyLastBlockLoadInfo(p->pLoadInfo); destroyLastBlockLoadInfo(p->pLoadInfo);
...@@ -294,21 +293,23 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 ...@@ -294,21 +293,23 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
pr->pDataFReader = NULL; pr->pDataFReader = NULL;
pr->pDataFReaderLast = NULL; pr->pDataFReaderLast = NULL;
int32_t ltype = (pr->type & CACHESCAN_RETRIEVE_LAST) >> 3; int8_t ltype = (pr->type & CACHESCAN_RETRIEVE_LAST) >> 3;
// retrieve the only one last row of all tables in the uid list. // retrieve the only one last row of all tables in the uid list.
if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_SINGLE)) { if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_SINGLE)) {
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
int64_t totalLastTs = INT64_MAX; int64_t totalLastTs = INT64_MAX;
for (int32_t i = 0; i < pr->numOfTables; ++i) { for (int32_t i = 0; i < pr->numOfTables; ++i) {
STableKeyInfo* pKeyInfo = &pr->pTableList[i]; STableKeyInfo* pKeyInfo = &pr->pTableList[i];
tsdbCacheGet(pr->pTsdb, pKeyInfo->uid, pRow, pr, ltype); tsdbCacheGetBatch(pr->pTsdb, pKeyInfo->uid, pRow, pr, ltype);
// tsdbCacheGet(pr->pTsdb, pKeyInfo->uid, pRow, pr, ltype);
if (TARRAY_SIZE(pRow) <= 0) { if (TARRAY_SIZE(pRow) <= 0) {
taosArrayClearEx(pRow, freeItem); taosArrayClearEx(pRow, freeItem);
continue; continue;
} }
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, 0); SLastCol* pColVal = taosArrayGet(pRow, 0);
if (COL_VAL_IS_NONE(&pColVal->colVal)) { if (COL_VAL_IS_NONE(&pColVal->colVal)) {
taosArrayClearEx(pRow, freeItem); taosArrayClearEx(pRow, freeItem);
continue; continue;
...@@ -361,7 +362,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 ...@@ -361,7 +362,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
} }
} }
if (taosArrayGetSize(pTableUidList) == 0) { if (TARRAY_SIZE(pTableUidList) == 0) {
taosArrayPush(pTableUidList, &pKeyInfo->uid); taosArrayPush(pTableUidList, &pKeyInfo->uid);
} else { } else {
taosArraySet(pTableUidList, 0, &pKeyInfo->uid); taosArraySet(pTableUidList, 0, &pKeyInfo->uid);
...@@ -375,9 +376,9 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 ...@@ -375,9 +376,9 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
} }
} else if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_ALL)) { } else if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_ALL)) {
for (int32_t i = pr->tableIndex; i < pr->numOfTables; ++i) { for (int32_t i = pr->tableIndex; i < pr->numOfTables; ++i) {
STableKeyInfo* pKeyInfo = &pr->pTableList[i]; tb_uid_t uid = pr->pTableList[i].uid;
tsdbCacheGet(pr->pTsdb, pKeyInfo->uid, pRow, pr, ltype); tsdbCacheGetBatch(pr->pTsdb, uid, pRow, pr, ltype);
if (TARRAY_SIZE(pRow) <= 0) { if (TARRAY_SIZE(pRow) <= 0) {
taosArrayClearEx(pRow, freeItem); taosArrayClearEx(pRow, freeItem);
continue; continue;
...@@ -391,9 +392,9 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 ...@@ -391,9 +392,9 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
saveOneRow(pRow, pResBlock, pr, slotIds, dstSlotIds, pRes, pr->idstr); saveOneRow(pRow, pResBlock, pr, slotIds, dstSlotIds, pRes, pr->idstr);
taosArrayClearEx(pRow, freeItem); taosArrayClearEx(pRow, freeItem);
taosArrayPush(pTableUidList, &pKeyInfo->uid); taosArrayPush(pTableUidList, &uid);
pr->tableIndex += 1; ++pr->tableIndex;
if (pResBlock->info.rows >= pResBlock->info.capacity) { if (pResBlock->info.rows >= pResBlock->info.capacity) {
goto _end; goto _end;
} }
...@@ -419,5 +420,6 @@ _end: ...@@ -419,5 +420,6 @@ _end:
taosMemoryFree(pRes); taosMemoryFree(pRes);
taosArrayDestroyEx(pRow, freeItem); taosArrayDestroyEx(pRow, freeItem);
taosArrayDestroyEx(pLastCols, freeItem); taosArrayDestroyEx(pLastCols, freeItem);
return code; return code;
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册