提交 b892b1fd 编写于 作者: M Minglei Jin

cache/flag: fix cache flag compiling issue

上级 e95613ec
......@@ -305,10 +305,6 @@ void tsdbUntakeReadSnap(STsdbReader *pReader, STsdbReadSnap *pSnap, bool proa
// tsdbMerge.c ==============================================================================================
int32_t tsdbMerge(STsdb *pTsdb);
#define TSDB_CACHE_NO(c) ((c).cacheLast == 0)
#define TSDB_CACHE_LAST_ROW(c) (((c).cacheLast & 1) > 0)
#define TSDB_CACHE_LAST(c) (((c).cacheLast & 2) > 0)
// tsdbDiskData ==============================================================================================
int32_t tDiskDataBuilderCreate(SDiskDataBuilder **ppBuilder);
void *tDiskDataBuilderDestroy(SDiskDataBuilder *pBuilder);
......
......@@ -197,7 +197,7 @@ void tqClose(STQ*);
int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver);
int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg);
int tqUnregisterPushHandle(STQ* pTq, void* pHandle);
int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed.
int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed.
int tqCommit(STQ*);
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd);
......@@ -405,6 +405,10 @@ struct SVnode {
#define VND_IS_RSMA(v) ((v)->config.isRsma == 1)
#define VND_IS_TSMA(v) ((v)->config.isTsma == 1)
#define TSDB_CACHE_NO(c) ((c).cacheLast == 0)
#define TSDB_CACHE_LAST_ROW(c) (((c).cacheLast & 1) > 0)
#define TSDB_CACHE_LAST(c) (((c).cacheLast & 2) > 0)
struct STbUidStore {
tb_uid_t suid;
SArray* tbUids;
......
......@@ -208,10 +208,26 @@ static void tsdbCloseRocksCache(STsdb *pTsdb) {
taosMemoryFree(pTsdb->rCache.pTSchema);
}
static void rocksMayWrite(STsdb *pTsdb, bool force) {
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
if (force || rocksdb_writebatch_count(wb) >= 1024) {
char *err = NULL;
rocksdb_write(pTsdb->rCache.db, pTsdb->rCache.writeoptions, wb, &err);
if (NULL != err) {
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
rocksdb_free(err);
}
rocksdb_writebatch_clear(wb);
}
}
int32_t tsdbCacheCommit(STsdb *pTsdb) {
int32_t code = 0;
char *err = NULL;
rocksMayWrite(pTsdb, true);
rocksdb_flush(pTsdb->rCache.db, pTsdb->rCache.flushoptions, &err);
if (NULL != err) {
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
......@@ -281,22 +297,6 @@ static SLastCol *tsdbCacheLookup(STsdb *pTsdb, tb_uid_t uid, int16_t cid, int8_t
return pLastCol;
}
static void rocksMayWrite(STsdb *pTsdb) {
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
int count = rocksdb_writebatch_count(wb);
if (count >= 1024) {
char *err = NULL;
rocksdb_write(pTsdb->rCache.db, pTsdb->rCache.writeoptions, wb, &err);
if (NULL != err) {
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
rocksdb_free(err);
}
rocksdb_writebatch_clear(wb);
}
}
int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow) {
int32_t code = 0;
......@@ -398,7 +398,7 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
taosMemoryFree(values_list);
taosMemoryFree(values_list_sizes);
rocksMayWrite(pTsdb);
rocksMayWrite(pTsdb, false);
taosThreadMutexUnlock(&pTsdb->rCache.rMutex);
_exit:
......@@ -507,7 +507,7 @@ int32_t tsdbCacheGetSlow(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheR
}
if (wb) {
rocksMayWrite(pTsdb);
rocksMayWrite(pTsdb, false);
}
taosThreadMutexUnlock(&pTsdb->rCache.rMutex);
......@@ -575,7 +575,7 @@ static SLastCol *tsdbCacheLoadCol(STsdb *pTsdb, SCacheRowsReader *pr, int16_t sl
}
if (wb) {
rocksMayWrite(pTsdb);
rocksMayWrite(pTsdb, false);
}
taosThreadMutexUnlock(&pTsdb->rCache.rMutex);
......@@ -672,7 +672,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr
}
if (wb) {
rocksMayWrite(pTsdb);
rocksMayWrite(pTsdb, false);
}
taosArrayDestroy(pTmpColArray);
......@@ -734,10 +734,10 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA
code = -1;
}
SLastCol lastCol = *pLastCol;
reallocVarData(&lastCol.colVal);
// SLastCol lastCol = *pLastCol;
// reallocVarData(&lastCol.colVal);
taosArraySet(pLastArray, idxKey->idx, &lastCol);
taosArraySet(pLastArray, idxKey->idx, pLastCol);
taosArrayRemove(remainCols, j);
taosMemoryFree(values_list[i]);
......@@ -941,7 +941,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
taosMemoryFree(values_list);
taosMemoryFree(values_list_sizes);
rocksMayWrite(pTsdb);
rocksMayWrite(pTsdb, false);
taosThreadMutexUnlock(&pTsdb->rCache.rMutex);
_exit:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册