提交 5eb57f2e 编写于 作者: M Minglei Jin

enh(tsdb/cache): writebatch with rcache locked

上级 f9b97f7c
......@@ -215,7 +215,7 @@ static void tsdbCloseRocksCache(STsdb *pTsdb) {
}
static void rocksMayWrite(STsdb *pTsdb, bool force, bool read, bool lock) {
rocksdb_writebatch_t *wb = NULL;
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
if (read) {
if (lock) {
taosThreadMutexLock(&pTsdb->lruMutex);
......@@ -225,11 +225,12 @@ static void rocksMayWrite(STsdb *pTsdb, bool force, bool read, bool lock) {
if (lock) {
taosThreadMutexLock(&pTsdb->rCache.rMutex);
}
wb = pTsdb->rCache.writebatch;
}
int count = rocksdb_writebatch_count(wb);
if ((force && count > 0) || count >= ROCKS_BATCH_SIZE) {
char *err = NULL;
rocksdb_write(pTsdb->rCache.db, pTsdb->rCache.writeoptions, wb, &err);
if (NULL != err) {
tsdbError("vgId:%d, %s failed at line %d, count: %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, count,
......@@ -240,10 +241,13 @@ static void rocksMayWrite(STsdb *pTsdb, bool force, bool read, bool lock) {
rocksdb_writebatch_clear(wb);
}
if (read) {
if (lock) taosThreadMutexUnlock(&pTsdb->lruMutex);
} else {
if (lock) taosThreadMutexUnlock(&pTsdb->rCache.rMutex);
if (lock) {
if (read) {
taosThreadMutexUnlock(&pTsdb->lruMutex);
} else {
taosThreadMutexUnlock(&pTsdb->rCache.rMutex);
}
}
}
......@@ -287,36 +291,45 @@ static void tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size) {
*size = length;
}
int tsdbCacheFlushDirty(const void *key, size_t klen, void *value, void *ud) {
SLastCol *pLastCol = (SLastCol *)value;
static void tsdbCachePutBatch(SLastCol *pLastCol, const void *key, size_t klen, SCacheFlushState *state) {
STsdb *pTsdb = state->pTsdb;
SRocksCache *rCache = &pTsdb->rCache;
rocksdb_writebatch_t *wb = rCache->writebatch;
char *rocks_value = NULL;
size_t vlen = 0;
if (pLastCol->dirty) {
SCacheFlushState *state = (SCacheFlushState *)ud;
STsdb *pTsdb = state->pTsdb;
SRocksCache *rCache = &pTsdb->rCache;
rocksdb_writebatch_t *wb = rCache->writebatch;
char *rocks_value = NULL;
size_t vlen = 0;
tsdbCacheSerialize(pLastCol, &rocks_value, &vlen);
tsdbCacheSerialize(pLastCol, &rocks_value, &vlen);
rocksdb_writebatch_put(wb, (char *)key, klen, rocks_value, vlen);
taosThreadMutexLock(&rCache->rMutex);
taosMemoryFree(rocks_value);
rocksdb_writebatch_put(wb, (char *)key, klen, rocks_value, vlen);
if (++state->flush_count >= ROCKS_BATCH_SIZE) {
char *err = NULL;
rocksdb_write(rCache->db, rCache->writeoptions, wb, &err);
if (NULL != err) {
tsdbError("vgId:%d, %s failed at line %d, count: %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
state->flush_count, err);
rocksdb_free(err);
}
taosMemoryFree(rocks_value);
rocksdb_writebatch_clear(wb);
if (++state->flush_count >= ROCKS_BATCH_SIZE) {
char *err = NULL;
state->flush_count = 0;
rocksdb_write(rCache->db, rCache->writeoptions, wb, &err);
if (NULL != err) {
tsdbError("vgId:%d, %s failed at line %d, count: %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
state->flush_count, err);
rocksdb_free(err);
}
rocksdb_writebatch_clear(wb);
state->flush_count = 0;
}
taosThreadMutexUnlock(&rCache->rMutex);
}
int tsdbCacheFlushDirty(const void *key, size_t klen, void *value, void *ud) {
SLastCol *pLastCol = (SLastCol *)value;
if (pLastCol->dirty) {
tsdbCachePutBatch(pLastCol, key, klen, (SCacheFlushState *)ud);
pLastCol->dirty = 0;
}
......@@ -379,36 +392,10 @@ static void reallocVarData(SColVal *pColVal) {
}
static void tsdbCacheDeleter(const void *key, size_t klen, void *value, void *ud) {
(void)key;
(void)klen;
SLastCol *pLastCol = (SLastCol *)value;
if (pLastCol->dirty) {
SCacheFlushState *state = (SCacheFlushState *)ud;
STsdb *pTsdb = state->pTsdb;
SRocksCache *rCache = &pTsdb->rCache;
rocksdb_writebatch_t *wb = rCache->writebatch;
char *rocks_value = NULL;
size_t vlen = 0;
tsdbCacheSerialize(pLastCol, &rocks_value, &vlen);
rocksdb_writebatch_put(wb, (char *)key, klen, rocks_value, vlen);
taosMemoryFree(rocks_value);
if (++state->flush_count >= ROCKS_BATCH_SIZE) {
char *err = NULL;
rocksdb_write(rCache->db, rCache->writeoptions, wb, &err);
if (NULL != err) {
tsdbError("vgId:%d, %s failed at line %d, count: %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
state->flush_count, err);
rocksdb_free(err);
}
rocksdb_writebatch_clear(wb);
state->flush_count = 0;
}
tsdbCachePutBatch(pLastCol, key, klen, (SCacheFlushState *)ud);
}
if (IS_VAR_DATA_TYPE(pLastCol->colVal.type) /* && pLastCol->colVal.value.nData > 0*/) {
......@@ -449,11 +436,10 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
tsdbRowClose(&iter);
// 3, build keys & multi get from rocks
int num_keys = TARRAY_SIZE(aColVal);
TSKEY keyTs = TSDBROW_TS(pRow);
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
SArray *remainCols = NULL;
SLRUCache *pCache = pTsdb->lruCache;
int num_keys = TARRAY_SIZE(aColVal);
TSKEY keyTs = TSDBROW_TS(pRow);
SArray *remainCols = NULL;
SLRUCache *pCache = pTsdb->lruCache;
taosThreadMutexLock(&pTsdb->lruMutex);
for (int i = 0; i < num_keys; ++i) {
......@@ -489,14 +475,6 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
if (!pLastCol->dirty) {
pLastCol->dirty = 1;
}
/*
char *value = NULL;
size_t vlen = 0;
tsdbCacheSerialize(pLastCol, &value, &vlen);
// tsdbCacheSerialize(&(SLastCol){.ts = keyTs, .colVal = *pColVal}, &value, &vlen);
rocksdb_writebatch_put(wb, (char *)key, klen, value, vlen);
taosMemoryFree(value);
*/
}
taosLRUCacheRelease(pCache, h, false);
......@@ -536,13 +514,6 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
if (!pLastCol->dirty) {
pLastCol->dirty = 1;
}
/*
char *value = NULL;
size_t vlen = 0;
tsdbCacheSerialize(pLastCol, &value, &vlen);
rocksdb_writebatch_put(wb, (char *)key, klen, value, vlen);
taosMemoryFree(value);
*/
}
taosLRUCacheRelease(pCache, h, false);
......@@ -580,6 +551,7 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
taosMemoryFree(keys_list_sizes);
taosMemoryFree(values_list_sizes);
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
for (int i = 0; i < num_keys; ++i) {
SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i];
SColVal *pColVal = (SColVal *)TARRAY_DATA(aColVal) + idxKey->idx;
......@@ -593,8 +565,12 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
size_t vlen = 0;
tsdbCacheSerialize(&(SLastCol){.ts = keyTs, .colVal = *pColVal}, &value, &vlen);
// SLastKey key = (SLastKey){.ltype = 0, .uid = uid, .cid = pColVal->cid};
taosThreadMutexLock(&pTsdb->rCache.rMutex);
rocksdb_writebatch_put(wb, (char *)&idxKey->key, ROCKS_KEY_LEN, value, vlen);
taosThreadMutexUnlock(&pTsdb->rCache.rMutex);
pLastCol = (SLastCol *)value;
SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
*pTmpLastCol = *pLastCol;
......@@ -621,8 +597,12 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
size_t vlen = 0;
tsdbCacheSerialize(&(SLastCol){.ts = keyTs, .colVal = *pColVal}, &value, &vlen);
// SLastKey key = (SLastKey){.ltype = 1, .uid = uid, .cid = pColVal->cid};
taosThreadMutexLock(&pTsdb->rCache.rMutex);
rocksdb_writebatch_put(wb, (char *)&idxKey->key, ROCKS_KEY_LEN, value, vlen);
taosThreadMutexUnlock(&pTsdb->rCache.rMutex);
pLastCol = (SLastCol *)value;
SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
*pTmpLastCol = *pLastCol;
......@@ -647,12 +627,14 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
rocksdb_free(values_list[i]);
}
rocksMayWrite(pTsdb, true, false, true);
taosMemoryFree(values_list);
taosArrayDestroy(remainCols);
}
rocksMayWrite(pTsdb, true, false, false);
taosThreadMutexUnlock(&pTsdb->lruMutex);
_exit:
......@@ -1005,9 +987,7 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache
reallocVarData(&lastCol.colVal);
taosArrayPush(pLastArray, &lastCol);
if (h) {
taosLRUCacheRelease(pCache, h, false);
}
taosLRUCacheRelease(pCache, h, false);
} else {
SLastCol noneCol = {.ts = TSKEY_MIN, .colVal = COL_VAL_NONE(cid, pr->pSchema->columns[pr->pSlotIds[i]].type)};
......@@ -1032,9 +1012,7 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache
reallocVarData(&lastCol.colVal);
taosArraySet(pLastArray, idxKey->idx, &lastCol);
if (h) {
taosLRUCacheRelease(pCache, h, false);
}
taosLRUCacheRelease(pCache, h, false);
taosArrayRemove(remainCols, i);
} else {
......@@ -1183,7 +1161,7 @@ int32_t tsdbOpenCache(STsdb *pTsdb) {
SLRUCache *pCache = NULL;
size_t cfgCapacity = pTsdb->pVnode->config.cacheLastSize * 1024 * 1024;
pCache = taosLRUCacheInit(cfgCapacity, 1, .5);
pCache = taosLRUCacheInit(cfgCapacity, 0, .5);
if (pCache == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册