diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index fb1c9514ce29b7ba5402e39103540e2ed6d0fc55..4f9b676e53367f280e0a63a4723b5687bd251e9c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -340,6 +340,16 @@ _exit: return code; } +static void reallocVarData(SColVal *pColVal) { + if (IS_VAR_DATA_TYPE(pColVal->type)) { + uint8_t *pVal = pColVal->value.pData; + pColVal->value.pData = taosMemoryMalloc(pColVal->value.nData); + if (pColVal->value.nData) { + memcpy(pColVal->value.pData, pVal, pColVal->value.nData); + } + } +} + static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols, int nCols, int16_t *slotIds); @@ -347,9 +357,10 @@ static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, int nCols, int16_t *slotIds); int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int32_t ltype) { - static char const *alstring[2] = {"last_row", "last"}; - char const *lstring = alstring[ltype]; - int32_t code = 0; + static char const *alstring[2] = {"last_row", "last"}; + char const *lstring = alstring[ltype]; + rocksdb_writebatch_t *wb = NULL; + int32_t code = 0; SArray *pCidList = pr->pCidList; int num_keys = TARRAY_SIZE(pCidList); @@ -387,14 +398,7 @@ int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsR int16_t cid = *(int16_t *)taosArrayGet(pCidList, i); SLastCol noneCol = {.ts = TSKEY_MIN, .colVal = COL_VAL_NONE(cid, pr->pSchema->columns[pr->pSlotIds[i]].type)}; if (pLastCol) { - SColVal *pColVal = &pLastCol->colVal; - if (IS_VAR_DATA_TYPE(pColVal->type)) { - uint8_t *pVal = pColVal->value.pData; - pColVal->value.pData = taosMemoryMalloc(pColVal->value.nData); - if (pColVal->value.nData) { - memcpy(pColVal->value.pData, pVal, pColVal->value.nData); - } - } + reallocVarData(&pLastCol->colVal); } else { taosThreadMutexLock(&pTsdb->rCache.rMutex); @@ -423,31 +427,17 @@ int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsR } // store result back to rocks cache - rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; - char *value = NULL; - size_t vlen = 0; + wb = pTsdb->rCache.writebatch; + char *value = NULL; + size_t vlen = 0; tsdbCacheSerialize(pLastCol, &value, &vlen); char key[ROCKS_KEY_LEN]; size_t klen = snprintf(key, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":%s", uid, pLastCol->colVal.cid, lstring); rocksdb_writebatch_put(wb, key, klen, value, vlen); taosMemoryFree(value); - - char *err = NULL; - rocksdb_write(pTsdb->rCache.db, pTsdb->rCache.writeoptions, wb, &err); - if (NULL != err) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err); - rocksdb_free(err); - } } else { - SColVal *pColVal = &pLastCol->colVal; - if (IS_VAR_DATA_TYPE(pColVal->type)) { - uint8_t *pVal = pColVal->value.pData; - pColVal->value.pData = taosMemoryMalloc(pColVal->value.nData); - if (pColVal->value.nData) { - memcpy(pColVal->value.pData, pVal, pColVal->value.nData); - } - } + reallocVarData(&pLastCol->colVal); } taosThreadMutexUnlock(&pTsdb->rCache.rMutex); @@ -463,6 +453,14 @@ int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsR taosMemoryFree(values_list); taosMemoryFree(values_list_sizes); + if (wb) { + char *err = NULL; + rocksdb_write(pTsdb->rCache.db, pTsdb->rCache.writeoptions, wb, &err); + if (NULL != err) { + tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err); + rocksdb_free(err); + } + } return code; }