diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index b698909af1268bf5e6c216ab0324b45a7479e9ca..16afa9f596f50a37a2ddfab4c0a3619148763d05 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -348,6 +348,7 @@ typedef struct { rocksdb_options_t *options; rocksdb_writeoptions_t *writeoptions; rocksdb_readoptions_t *readoptions; + rocksdb_writebatch_t *writebatch; TdThreadMutex rMutex; } SRocksCache; @@ -805,7 +806,7 @@ typedef struct { int32_t tsdbOpenCache(STsdb *pTsdb); void tsdbCloseCache(STsdb *pTsdb); -int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t uid, TSDBROW *row); +int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *row); int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, TSDBROW *row, STsdb *pTsdb); int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, TSDBROW *row, bool dup); int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr, LRUHandle **h); diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 171a63aa33685f7bb98cb77bf5eaabe5faf244fb..393e8ffdf5cf7cbfe33e6dde31ed246f33f09c4b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -43,6 +43,8 @@ static void tsdbCloseBICache(STsdb *pTsdb) { } } +#define ROCKS_KEY_LEN 64 + static void tsdbGetRocksPath(STsdb *pTsdb, char *path) { SVnode *pVnode = pTsdb->pVnode; if (pVnode->pTfs) { @@ -92,8 +94,11 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) { goto _err3; } + rocksdb_writebatch_t *writebatch = rocksdb_writebatch_create(); + taosThreadMutexInit(&pTsdb->rCache.rMutex, NULL); + pTsdb->rCache.writebatch = writebatch; pTsdb->rCache.options = options; pTsdb->rCache.writeoptions = writeoptions; pTsdb->rCache.readoptions = readoptions; @@ -113,37 +118,167 @@ _err: static void tsdbCloseRocksCache(STsdb *pTsdb) { rocksdb_close(pTsdb->rCache.db); + rocksdb_writebatch_destroy(pTsdb->rCache.writebatch); rocksdb_readoptions_destroy(pTsdb->rCache.readoptions); rocksdb_writeoptions_destroy(pTsdb->rCache.writeoptions); rocksdb_options_destroy(pTsdb->rCache.options); } -int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t uid, TSDBROW *pRow) { +SLastCol *tsdbCacheDeserialize(char const *value) { + SLastCol *pLastCol = (SLastCol *)value; + SColVal *pColVal = &pLastCol->colVal; + if (IS_VAR_DATA_TYPE(pColVal->type)) { + pColVal->value.pData = (char *)value + sizeof(*pColVal); + } + + return pLastCol; +} + +void tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size) { + SColVal *pColVal = &pLastCol->colVal; + size_t length = sizeof(*pLastCol); + if (IS_VAR_DATA_TYPE(pColVal->type)) { + length += pColVal->value.nData; + } + *value = taosMemoryMalloc(length); + + *(SLastCol *)(*value) = *pLastCol; + if (IS_VAR_DATA_TYPE(pColVal->type)) { + uint8_t *pVal = pColVal->value.pData; + pColVal->value.pData = *value + sizeof(*pLastCol); + if (pColVal->value.nData) { + memcpy(pColVal->value.pData, pVal, pColVal->value.nData); + } + } + *size = length; +} + +int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow) { int32_t code = 0; + // 1, fetch schema + STSchema *pTSchema = NULL; + int32_t sver = TSDBROW_SVERSION(pRow); + code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + return -1; + } + + // 2, iterate col values into array + SArray *aColVal = taosArrayInit(32, sizeof(SColVal)); + STSDBRowIter iter = {0}; tsdbRowIterOpen(&iter, pRow, pTSchema); for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal; pColVal = tsdbRowIterNext(&iter)) { - SColVal *pRColVal = tsdbCacheGetRColVal(pTsdb); - if (pRColVal) { - // merge pColVal with pRColVal + if (IS_VAR_DATA_TYPE(pColVal->type)) { + uint8_t *pVal = pColVal->value.pData; + + pColVal->value.pData = NULL; + code = tRealloc(&pColVal->value.pData, pColVal->value.nData); + if (code) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + + if (pColVal->value.nData) { + memcpy(pColVal->value.pData, pVal, pColVal->value.nData); + } } - tsdbCachePutRColVal(pColVal); + taosArrayPush(aColVal, pColVal); } tsdbRowClose(&iter); - char *err = NULL; - char buf[256] = {0}; - size_t vallen = 0; - char *val = rocksdb_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, "key", 3, &vallen, &err); - if (val) { - } else { + // 3, build keys & multi get from rocks + int max_key_len = 0; + int num_keys = TARRAY_SIZE(aColVal); + char **keys_list = taosMemoryCalloc(num_keys * 2, sizeof(char *)); + size_t *keys_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t)); + for (int i = 0; i < num_keys; ++i) { + SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, i); + + char *keys = taosMemoryCalloc(2, ROCKS_KEY_LEN); + int last_key_len = snprintf(keys, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last", uid, pColVal->cid); + if (last_key_len >= ROCKS_KEY_LEN) { + tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code)); + } + int lr_key_len = + snprintf(keys + ROCKS_KEY_LEN, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last_row", uid, pColVal->cid); + if (lr_key_len >= ROCKS_KEY_LEN) { + tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code)); + } + keys_list[i] = keys; + keys_list[num_keys + i] = keys + ROCKS_KEY_LEN; + keys_list_sizes[i] = last_key_len; + keys_list_sizes[num_keys + i] = lr_key_len; + } + char **values_list = taosMemoryCalloc(num_keys * 2, sizeof(char *)); + size_t *values_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t)); + char **errs = taosMemoryCalloc(num_keys * 2, sizeof(char *)); + rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys * 2, (const char *const *)keys_list, + keys_list_sizes, values_list, values_list_sizes, errs); + for (int i = 0; i < num_keys; ++i) { + taosMemoryFree(keys_list[i]); + } + taosMemoryFree(keys_list); + taosMemoryFree(keys_list_sizes); + taosMemoryFree(errs); + + TSKEY keyTs = TSDBROW_TS(pRow); + rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; + for (int i = 0; i < num_keys; ++i) { + SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, i); + if (COL_VAL_IS_VALUE(pColVal)) { + SLastCol *pLastCol = NULL; + if (NULL != values_list[i]) { + pLastCol = tsdbCacheDeserialize(values_list[i]); + } + + if (NULL == pLastCol || pLastCol->ts <= keyTs) { + char *value = NULL; + size_t vlen = 0; + tsdbCacheSerialize(&(SLastCol){.ts = keyTs, .colVal = *pColVal}, &value, &vlen); + char key[ROCKS_KEY_LEN]; + size_t klen = snprintf(key, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last", uid, pColVal->cid); + rocksdb_writebatch_put(wb, key, klen, value, vlen); + taosMemoryFree(value); + } + } + + if (!COL_VAL_IS_NONE(pColVal)) { + SLastCol *pLastCol = NULL; + if (NULL != values_list[i + num_keys]) { + pLastCol = tsdbCacheDeserialize(values_list[i]); + } + + if (NULL == pLastCol || pLastCol->ts <= keyTs) { + char *value = NULL; + size_t vlen = 0; + tsdbCacheSerialize(&(SLastCol){.ts = keyTs, .colVal = *pColVal}, &value, &vlen); + char key[ROCKS_KEY_LEN]; + size_t klen = snprintf(key, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last_row", uid, pColVal->cid); + rocksdb_writebatch_put(wb, key, klen, value, vlen); + taosMemoryFree(value); + } + } + } + taosMemoryFree(values_list); + taosMemoryFree(values_list_sizes); + + char *err = NULL; + rocksdb_write(pTsdb->rCache.db, pTsdb->rCache.writeoptions, wb, &err); + if (NULL != err) { + tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err); + rocksdb_free(err); } - rocksdb_put(pTsdb->rCache.db, pTsdb->rCache.writeoptions, "key", 3, "value", 5, &err); + rocksdb_writebatch_clear(wb); +_exit: + taosArrayDestroy(aColVal); + taosMemoryFree(pTSchema); return code; } diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index f480fb6081d78305c3215f330fbcce03f87bf5bd..6b4ab4d5cf7cf5ae430f0eeb6e94d113bb6de141 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -678,7 +678,7 @@ static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData, tsdbCacheInsertLast(pMemTable->pTsdb->lruCache, pTbData->uid, &lRow, pMemTable->pTsdb); } */ - tsdbCacheUpdate(pMemTable->pTsdb, pTbData->uid, &lRow); + tsdbCacheUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, &lRow); // SMemTable pMemTable->minKey = TMIN(pMemTable->minKey, pTbData->minKey); @@ -748,7 +748,7 @@ static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData, tsdbCacheInsertLast(pMemTable->pTsdb->lruCache, pTbData->uid, &lRow, pMemTable->pTsdb); } */ - tsdbCacheUpdate(pMemTable->pTsdb, pTbData->uid, &lRow); + tsdbCacheUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, &lRow); // SMemTable pMemTable->minKey = TMIN(pMemTable->minKey, pTbData->minKey);