提交 2ae64cdf 编写于 作者: K kailixu

Merge branch 'fix/TD-23623' of github.com:taosdata/TDengine into fix/TD-23623

......@@ -346,6 +346,7 @@ struct STsdbFS {
typedef struct {
rocksdb_t *db;
rocksdb_options_t *options;
rocksdb_flushoptions_t *flushoptions;
rocksdb_writeoptions_t *writeoptions;
rocksdb_readoptions_t *readoptions;
rocksdb_writebatch_t *writebatch;
......
......@@ -178,6 +178,7 @@ int tsdbClose(STsdb** pTsdb);
int32_t tsdbBegin(STsdb* pTsdb);
int32_t tsdbPrepareCommit(STsdb* pTsdb);
int32_t tsdbCommit(STsdb* pTsdb, SCommitInfo* pInfo);
int32_t tsdbCacheCommit(STsdb* pTsdb);
int32_t tsdbCompact(STsdb* pTsdb, SCompactInfo* pInfo);
int32_t tsdbFinishCommit(STsdb* pTsdb);
int32_t tsdbRollbackCommit(STsdb* pTsdb);
......@@ -194,9 +195,9 @@ STQ* tqOpen(const char* path, SVnode* pVnode);
void tqClose(STQ*);
int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver);
int tqRegisterPushHandle(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg, SMqDataRsp* pDataRsp,
int32_t type);
int32_t type);
int tqUnregisterPushHandle(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer);
int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed.
int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed.
int tqCommit(STQ*);
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd);
......
......@@ -98,12 +98,19 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) {
goto _err3;
}
rocksdb_flushoptions_t *flushoptions = rocksdb_flushoptions_create();
if (NULL == flushoptions) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err4;
}
rocksdb_writebatch_t *writebatch = rocksdb_writebatch_create();
pTsdb->rCache.writebatch = writebatch;
pTsdb->rCache.options = options;
pTsdb->rCache.writeoptions = writeoptions;
pTsdb->rCache.readoptions = readoptions;
pTsdb->rCache.flushoptions = flushoptions;
pTsdb->rCache.db = db;
taosThreadMutexInit(&pTsdb->rCache.rMutex, NULL);
......@@ -122,6 +129,7 @@ _err:
static void tsdbCloseRocksCache(STsdb *pTsdb) {
rocksdb_close(pTsdb->rCache.db);
rocksdb_flushoptions_destroy(pTsdb->rCache.flushoptions);
rocksdb_writebatch_destroy(pTsdb->rCache.writebatch);
rocksdb_readoptions_destroy(pTsdb->rCache.readoptions);
rocksdb_writeoptions_destroy(pTsdb->rCache.writeoptions);
......@@ -129,6 +137,20 @@ static void tsdbCloseRocksCache(STsdb *pTsdb) {
taosThreadMutexDestroy(&pTsdb->rCache.rMutex);
}
int32_t tsdbCacheCommit(STsdb *pTsdb) {
int32_t code = 0;
char *err = NULL;
rocksdb_flush(pTsdb->rCache.db, pTsdb->rCache.flushoptions, &err);
if (NULL != err) {
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
rocksdb_free(err);
code = -1;
}
return code;
}
SLastCol *tsdbCacheDeserialize(char const *value) {
if (!value) {
return NULL;
......@@ -365,6 +387,21 @@ int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsR
}
// maybe store it back to rocks cache
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
char *value = NULL;
size_t vlen = 0;
tsdbCacheSerialize(pLastCol, &value, &vlen);
char key[ROCKS_KEY_LEN];
size_t klen = snprintf(key, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last", uid, pLastCol->colVal.cid);
rocksdb_writebatch_put(wb, key, klen, value, vlen);
char *err = NULL;
rocksdb_write(pTsdb->rCache.db, pTsdb->rCache.writeoptions, wb, &err);
if (NULL != err) {
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
rocksdb_free(err);
}
taosMemoryFree(value);
}
taosArrayPush(pLastArray, pLastCol);
......@@ -2256,7 +2293,7 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC
if (lastRowTs == TSKEY_MAX) {
lastRowTs = rowTs;
for (int16_t iCol = 0; iCol < nCols; ++iCol) {
for (int16_t iCol = noneCol; iCol < nCols; ++iCol) {
if (iCol >= nLastCol) {
break;
}
......@@ -2264,6 +2301,13 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC
if (pCol->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) {
continue;
}
if (slotIds[iCol] == 0) {
STColumn *pTColumn = &pTSchema->columns[0];
*pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = lastRowTs});
taosArraySet(pColArray, 0, &(SLastCol){.ts = lastRowTs, .colVal = *pColVal});
continue;
}
tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
*pCol = (SLastCol){.ts = lastRowTs, .colVal = *pColVal};
......@@ -2336,10 +2380,6 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC
}
} while (setNoneCol);
// if (taosArrayGetSize(pColArray) <= 0) {
//*ppLastArray = NULL;
// taosArrayDestroy(pColArray);
//} else {
if (!hasRow) {
if (ignoreEarlierTs) {
taosArrayDestroy(pColArray);
......@@ -2349,11 +2389,10 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC
}
}
*ppLastArray = pColArray;
//}
nextRowIterClose(&iter);
taosArrayDestroy(aColArray);
// taosMemoryFreeClear(pTSchema);
return code;
_err:
......
......@@ -144,8 +144,8 @@ _exit:
}
int vnodeShouldCommit(SVnode *pVnode, bool atExit) {
bool diskAvail = osDataSpaceAvailable();
bool needCommit = false;
bool diskAvail = osDataSpaceAvailable();
bool needCommit = false;
taosThreadMutexLock(&pVnode->mutex);
if (pVnode->inUse && diskAvail) {
......@@ -439,6 +439,9 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) {
code = tsdbCommit(pVnode->pTsdb, pInfo);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbCacheCommit(pVnode->pTsdb);
TSDB_CHECK_CODE(code, lino, _exit);
if (VND_IS_RSMA(pVnode)) {
code = smaCommit(pVnode->pSma, pInfo);
TSDB_CHECK_CODE(code, lino, _exit);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册