未验证 提交 25e6b123 编写于 作者: W wade zhang 提交者: GitHub

Merge pull request #21834 from taosdata/fix/TD-24938

enh(tsdb/cache): writebatch with rcache locked
...@@ -215,7 +215,7 @@ static void tsdbCloseRocksCache(STsdb *pTsdb) { ...@@ -215,7 +215,7 @@ static void tsdbCloseRocksCache(STsdb *pTsdb) {
} }
static void rocksMayWrite(STsdb *pTsdb, bool force, bool read, bool lock) { static void rocksMayWrite(STsdb *pTsdb, bool force, bool read, bool lock) {
rocksdb_writebatch_t *wb = NULL; rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
if (read) { if (read) {
if (lock) { if (lock) {
taosThreadMutexLock(&pTsdb->lruMutex); taosThreadMutexLock(&pTsdb->lruMutex);
...@@ -225,11 +225,12 @@ static void rocksMayWrite(STsdb *pTsdb, bool force, bool read, bool lock) { ...@@ -225,11 +225,12 @@ static void rocksMayWrite(STsdb *pTsdb, bool force, bool read, bool lock) {
if (lock) { if (lock) {
taosThreadMutexLock(&pTsdb->rCache.rMutex); taosThreadMutexLock(&pTsdb->rCache.rMutex);
} }
wb = pTsdb->rCache.writebatch;
} }
int count = rocksdb_writebatch_count(wb); int count = rocksdb_writebatch_count(wb);
if ((force && count > 0) || count >= ROCKS_BATCH_SIZE) { if ((force && count > 0) || count >= ROCKS_BATCH_SIZE) {
char *err = NULL; char *err = NULL;
rocksdb_write(pTsdb->rCache.db, pTsdb->rCache.writeoptions, wb, &err); rocksdb_write(pTsdb->rCache.db, pTsdb->rCache.writeoptions, wb, &err);
if (NULL != err) { if (NULL != err) {
tsdbError("vgId:%d, %s failed at line %d, count: %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, count, tsdbError("vgId:%d, %s failed at line %d, count: %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, count,
...@@ -240,10 +241,13 @@ static void rocksMayWrite(STsdb *pTsdb, bool force, bool read, bool lock) { ...@@ -240,10 +241,13 @@ static void rocksMayWrite(STsdb *pTsdb, bool force, bool read, bool lock) {
rocksdb_writebatch_clear(wb); rocksdb_writebatch_clear(wb);
} }
if (read) {
if (lock) taosThreadMutexUnlock(&pTsdb->lruMutex); if (lock) {
} else { if (read) {
if (lock) taosThreadMutexUnlock(&pTsdb->rCache.rMutex); taosThreadMutexUnlock(&pTsdb->lruMutex);
} else {
taosThreadMutexUnlock(&pTsdb->rCache.rMutex);
}
} }
} }
...@@ -287,36 +291,45 @@ static void tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size) { ...@@ -287,36 +291,45 @@ static void tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size) {
*size = length; *size = length;
} }
int tsdbCacheFlushDirty(const void *key, size_t klen, void *value, void *ud) { static void tsdbCachePutBatch(SLastCol *pLastCol, const void *key, size_t klen, SCacheFlushState *state) {
SLastCol *pLastCol = (SLastCol *)value; STsdb *pTsdb = state->pTsdb;
SRocksCache *rCache = &pTsdb->rCache;
rocksdb_writebatch_t *wb = rCache->writebatch;
char *rocks_value = NULL;
size_t vlen = 0;
if (pLastCol->dirty) { tsdbCacheSerialize(pLastCol, &rocks_value, &vlen);
SCacheFlushState *state = (SCacheFlushState *)ud;
STsdb *pTsdb = state->pTsdb;
SRocksCache *rCache = &pTsdb->rCache;
rocksdb_writebatch_t *wb = rCache->writebatch;
char *rocks_value = NULL;
size_t vlen = 0;
tsdbCacheSerialize(pLastCol, &rocks_value, &vlen); taosThreadMutexLock(&rCache->rMutex);
rocksdb_writebatch_put(wb, (char *)key, klen, rocks_value, vlen);
taosMemoryFree(rocks_value); rocksdb_writebatch_put(wb, (char *)key, klen, rocks_value, vlen);
if (++state->flush_count >= ROCKS_BATCH_SIZE) { taosMemoryFree(rocks_value);
char *err = NULL;
rocksdb_write(rCache->db, rCache->writeoptions, wb, &err);
if (NULL != err) {
tsdbError("vgId:%d, %s failed at line %d, count: %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
state->flush_count, err);
rocksdb_free(err);
}
rocksdb_writebatch_clear(wb); if (++state->flush_count >= ROCKS_BATCH_SIZE) {
char *err = NULL;
state->flush_count = 0; rocksdb_write(rCache->db, rCache->writeoptions, wb, &err);
if (NULL != err) {
tsdbError("vgId:%d, %s failed at line %d, count: %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
state->flush_count, err);
rocksdb_free(err);
} }
rocksdb_writebatch_clear(wb);
state->flush_count = 0;
}
taosThreadMutexUnlock(&rCache->rMutex);
}
int tsdbCacheFlushDirty(const void *key, size_t klen, void *value, void *ud) {
SLastCol *pLastCol = (SLastCol *)value;
if (pLastCol->dirty) {
tsdbCachePutBatch(pLastCol, key, klen, (SCacheFlushState *)ud);
pLastCol->dirty = 0; pLastCol->dirty = 0;
} }
...@@ -379,36 +392,10 @@ static void reallocVarData(SColVal *pColVal) { ...@@ -379,36 +392,10 @@ static void reallocVarData(SColVal *pColVal) {
} }
static void tsdbCacheDeleter(const void *key, size_t klen, void *value, void *ud) { static void tsdbCacheDeleter(const void *key, size_t klen, void *value, void *ud) {
(void)key;
(void)klen;
SLastCol *pLastCol = (SLastCol *)value; SLastCol *pLastCol = (SLastCol *)value;
if (pLastCol->dirty) { if (pLastCol->dirty) {
SCacheFlushState *state = (SCacheFlushState *)ud; tsdbCachePutBatch(pLastCol, key, klen, (SCacheFlushState *)ud);
STsdb *pTsdb = state->pTsdb;
SRocksCache *rCache = &pTsdb->rCache;
rocksdb_writebatch_t *wb = rCache->writebatch;
char *rocks_value = NULL;
size_t vlen = 0;
tsdbCacheSerialize(pLastCol, &rocks_value, &vlen);
rocksdb_writebatch_put(wb, (char *)key, klen, rocks_value, vlen);
taosMemoryFree(rocks_value);
if (++state->flush_count >= ROCKS_BATCH_SIZE) {
char *err = NULL;
rocksdb_write(rCache->db, rCache->writeoptions, wb, &err);
if (NULL != err) {
tsdbError("vgId:%d, %s failed at line %d, count: %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
state->flush_count, err);
rocksdb_free(err);
}
rocksdb_writebatch_clear(wb);
state->flush_count = 0;
}
} }
if (IS_VAR_DATA_TYPE(pLastCol->colVal.type) /* && pLastCol->colVal.value.nData > 0*/) { if (IS_VAR_DATA_TYPE(pLastCol->colVal.type) /* && pLastCol->colVal.value.nData > 0*/) {
...@@ -449,11 +436,10 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow ...@@ -449,11 +436,10 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
tsdbRowClose(&iter); tsdbRowClose(&iter);
// 3, build keys & multi get from rocks // 3, build keys & multi get from rocks
int num_keys = TARRAY_SIZE(aColVal); int num_keys = TARRAY_SIZE(aColVal);
TSKEY keyTs = TSDBROW_TS(pRow); TSKEY keyTs = TSDBROW_TS(pRow);
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; SArray *remainCols = NULL;
SArray *remainCols = NULL; SLRUCache *pCache = pTsdb->lruCache;
SLRUCache *pCache = pTsdb->lruCache;
taosThreadMutexLock(&pTsdb->lruMutex); taosThreadMutexLock(&pTsdb->lruMutex);
for (int i = 0; i < num_keys; ++i) { for (int i = 0; i < num_keys; ++i) {
...@@ -489,14 +475,6 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow ...@@ -489,14 +475,6 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
if (!pLastCol->dirty) { if (!pLastCol->dirty) {
pLastCol->dirty = 1; pLastCol->dirty = 1;
} }
/*
char *value = NULL;
size_t vlen = 0;
tsdbCacheSerialize(pLastCol, &value, &vlen);
// tsdbCacheSerialize(&(SLastCol){.ts = keyTs, .colVal = *pColVal}, &value, &vlen);
rocksdb_writebatch_put(wb, (char *)key, klen, value, vlen);
taosMemoryFree(value);
*/
} }
taosLRUCacheRelease(pCache, h, false); taosLRUCacheRelease(pCache, h, false);
...@@ -536,13 +514,6 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow ...@@ -536,13 +514,6 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
if (!pLastCol->dirty) { if (!pLastCol->dirty) {
pLastCol->dirty = 1; pLastCol->dirty = 1;
} }
/*
char *value = NULL;
size_t vlen = 0;
tsdbCacheSerialize(pLastCol, &value, &vlen);
rocksdb_writebatch_put(wb, (char *)key, klen, value, vlen);
taosMemoryFree(value);
*/
} }
taosLRUCacheRelease(pCache, h, false); taosLRUCacheRelease(pCache, h, false);
...@@ -580,6 +551,7 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow ...@@ -580,6 +551,7 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
taosMemoryFree(keys_list_sizes); taosMemoryFree(keys_list_sizes);
taosMemoryFree(values_list_sizes); taosMemoryFree(values_list_sizes);
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
for (int i = 0; i < num_keys; ++i) { for (int i = 0; i < num_keys; ++i) {
SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i]; SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i];
SColVal *pColVal = (SColVal *)TARRAY_DATA(aColVal) + idxKey->idx; SColVal *pColVal = (SColVal *)TARRAY_DATA(aColVal) + idxKey->idx;
...@@ -593,8 +565,12 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow ...@@ -593,8 +565,12 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
size_t vlen = 0; size_t vlen = 0;
tsdbCacheSerialize(&(SLastCol){.ts = keyTs, .colVal = *pColVal}, &value, &vlen); tsdbCacheSerialize(&(SLastCol){.ts = keyTs, .colVal = *pColVal}, &value, &vlen);
// SLastKey key = (SLastKey){.ltype = 0, .uid = uid, .cid = pColVal->cid}; // SLastKey key = (SLastKey){.ltype = 0, .uid = uid, .cid = pColVal->cid};
taosThreadMutexLock(&pTsdb->rCache.rMutex);
rocksdb_writebatch_put(wb, (char *)&idxKey->key, ROCKS_KEY_LEN, value, vlen); rocksdb_writebatch_put(wb, (char *)&idxKey->key, ROCKS_KEY_LEN, value, vlen);
taosThreadMutexUnlock(&pTsdb->rCache.rMutex);
pLastCol = (SLastCol *)value; pLastCol = (SLastCol *)value;
SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
*pTmpLastCol = *pLastCol; *pTmpLastCol = *pLastCol;
...@@ -621,8 +597,12 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow ...@@ -621,8 +597,12 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
size_t vlen = 0; size_t vlen = 0;
tsdbCacheSerialize(&(SLastCol){.ts = keyTs, .colVal = *pColVal}, &value, &vlen); tsdbCacheSerialize(&(SLastCol){.ts = keyTs, .colVal = *pColVal}, &value, &vlen);
// SLastKey key = (SLastKey){.ltype = 1, .uid = uid, .cid = pColVal->cid}; // SLastKey key = (SLastKey){.ltype = 1, .uid = uid, .cid = pColVal->cid};
taosThreadMutexLock(&pTsdb->rCache.rMutex);
rocksdb_writebatch_put(wb, (char *)&idxKey->key, ROCKS_KEY_LEN, value, vlen); rocksdb_writebatch_put(wb, (char *)&idxKey->key, ROCKS_KEY_LEN, value, vlen);
taosThreadMutexUnlock(&pTsdb->rCache.rMutex);
pLastCol = (SLastCol *)value; pLastCol = (SLastCol *)value;
SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
*pTmpLastCol = *pLastCol; *pTmpLastCol = *pLastCol;
...@@ -647,12 +627,14 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow ...@@ -647,12 +627,14 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
rocksdb_free(values_list[i]); rocksdb_free(values_list[i]);
} }
rocksMayWrite(pTsdb, true, false, true);
taosMemoryFree(values_list); taosMemoryFree(values_list);
taosArrayDestroy(remainCols); taosArrayDestroy(remainCols);
} }
rocksMayWrite(pTsdb, true, false, false);
taosThreadMutexUnlock(&pTsdb->lruMutex); taosThreadMutexUnlock(&pTsdb->lruMutex);
_exit: _exit:
...@@ -1005,9 +987,7 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache ...@@ -1005,9 +987,7 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache
reallocVarData(&lastCol.colVal); reallocVarData(&lastCol.colVal);
taosArrayPush(pLastArray, &lastCol); taosArrayPush(pLastArray, &lastCol);
if (h) { taosLRUCacheRelease(pCache, h, false);
taosLRUCacheRelease(pCache, h, false);
}
} else { } else {
SLastCol noneCol = {.ts = TSKEY_MIN, .colVal = COL_VAL_NONE(cid, pr->pSchema->columns[pr->pSlotIds[i]].type)}; SLastCol noneCol = {.ts = TSKEY_MIN, .colVal = COL_VAL_NONE(cid, pr->pSchema->columns[pr->pSlotIds[i]].type)};
...@@ -1032,9 +1012,7 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache ...@@ -1032,9 +1012,7 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache
reallocVarData(&lastCol.colVal); reallocVarData(&lastCol.colVal);
taosArraySet(pLastArray, idxKey->idx, &lastCol); taosArraySet(pLastArray, idxKey->idx, &lastCol);
if (h) { taosLRUCacheRelease(pCache, h, false);
taosLRUCacheRelease(pCache, h, false);
}
taosArrayRemove(remainCols, i); taosArrayRemove(remainCols, i);
} else { } else {
...@@ -1137,6 +1115,8 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE ...@@ -1137,6 +1115,8 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
rocksMayWrite(pTsdb, true, false, false); rocksMayWrite(pTsdb, true, false, false);
rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys * 2, (const char *const *)keys_list, rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys * 2, (const char *const *)keys_list,
keys_list_sizes, values_list, values_list_sizes, errs); keys_list_sizes, values_list, values_list_sizes, errs);
taosThreadMutexUnlock(&pTsdb->rCache.rMutex);
for (int i = 0; i < num_keys * 2; ++i) { for (int i = 0; i < num_keys * 2; ++i) {
if (errs[i]) { if (errs[i]) {
rocksdb_free(errs[i]); rocksdb_free(errs[i]);
...@@ -1147,19 +1127,42 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE ...@@ -1147,19 +1127,42 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
for (int i = 0; i < num_keys; ++i) { for (int i = 0; i < num_keys; ++i) {
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]); SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]);
taosThreadMutexLock(&pTsdb->rCache.rMutex);
if (NULL != pLastCol && (pLastCol->ts <= eKey && pLastCol->ts >= sKey)) { if (NULL != pLastCol && (pLastCol->ts <= eKey && pLastCol->ts >= sKey)) {
rocksdb_writebatch_delete(wb, keys_list[i], klen); rocksdb_writebatch_delete(wb, keys_list[i], klen);
} }
taosLRUCacheErase(pTsdb->lruCache, keys_list[i], klen);
pLastCol = tsdbCacheDeserialize(values_list[i + num_keys]); pLastCol = tsdbCacheDeserialize(values_list[i + num_keys]);
if (NULL != pLastCol && (pLastCol->ts <= eKey && pLastCol->ts >= sKey)) { if (NULL != pLastCol && (pLastCol->ts <= eKey && pLastCol->ts >= sKey)) {
rocksdb_writebatch_delete(wb, keys_list[num_keys + i], klen); rocksdb_writebatch_delete(wb, keys_list[num_keys + i], klen);
} }
taosLRUCacheErase(pTsdb->lruCache, keys_list[num_keys + i], klen); taosThreadMutexUnlock(&pTsdb->rCache.rMutex);
rocksdb_free(values_list[i]); rocksdb_free(values_list[i]);
rocksdb_free(values_list[i + num_keys]); rocksdb_free(values_list[i + num_keys]);
taosThreadMutexLock(&pTsdb->lruMutex);
LRUHandle *h = taosLRUCacheLookup(pTsdb->lruCache, keys_list[i], klen);
if (h) {
SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pTsdb->lruCache, h);
if (pLastCol->dirty) {
pLastCol->dirty = 0;
}
taosLRUCacheRelease(pTsdb->lruCache, h, true);
}
taosLRUCacheErase(pTsdb->lruCache, keys_list[i], klen);
h = taosLRUCacheLookup(pTsdb->lruCache, keys_list[num_keys + i], klen);
if (h) {
SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pTsdb->lruCache, h);
if (pLastCol->dirty) {
pLastCol->dirty = 0;
}
taosLRUCacheRelease(pTsdb->lruCache, h, true);
}
taosLRUCacheErase(pTsdb->lruCache, keys_list[num_keys + i], klen);
taosThreadMutexUnlock(&pTsdb->lruMutex);
} }
for (int i = 0; i < num_keys; ++i) { for (int i = 0; i < num_keys; ++i) {
taosMemoryFree(keys_list[i]); taosMemoryFree(keys_list[i]);
...@@ -1169,8 +1172,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE ...@@ -1169,8 +1172,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
taosMemoryFree(values_list); taosMemoryFree(values_list);
taosMemoryFree(values_list_sizes); taosMemoryFree(values_list_sizes);
rocksMayWrite(pTsdb, true, false, false); rocksMayWrite(pTsdb, true, false, true);
taosThreadMutexUnlock(&pTsdb->rCache.rMutex);
_exit: _exit:
taosMemoryFree(pTSchema); taosMemoryFree(pTSchema);
...@@ -1183,7 +1185,7 @@ int32_t tsdbOpenCache(STsdb *pTsdb) { ...@@ -1183,7 +1185,7 @@ int32_t tsdbOpenCache(STsdb *pTsdb) {
SLRUCache *pCache = NULL; SLRUCache *pCache = NULL;
size_t cfgCapacity = pTsdb->pVnode->config.cacheLastSize * 1024 * 1024; size_t cfgCapacity = pTsdb->pVnode->config.cacheLastSize * 1024 * 1024;
pCache = taosLRUCacheInit(cfgCapacity, 1, .5); pCache = taosLRUCacheInit(cfgCapacity, 0, .5);
if (pCache == NULL) { if (pCache == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册