diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h
index d59a0a64b340c3ca8a39f31cf11c538fb2e88452..d8dea8a1bed03493ba57365c30cb3a84c071b515 100644
--- a/include/libs/transport/trpc.h
+++ b/include/libs/transport/trpc.h
@@ -27,7 +27,7 @@ extern "C" {
#define TAOS_CONN_SERVER 0
#define TAOS_CONN_CLIENT 1
-#define IsReq(pMsg) (pMsg->msgType & 1U)
+#define IsReq(pMsg) (pMsg->msgType & 1U)
extern int32_t tsRpcHeadSize;
diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt
index d20d79ee47c2f9e1e40345527eee0c68f63cba8f..e9e20912c52666d71af546b53e6a946cb53b6833 100644
--- a/source/dnode/vnode/CMakeLists.txt
+++ b/source/dnode/vnode/CMakeLists.txt
@@ -3,6 +3,7 @@ add_library(vnode STATIC "")
target_sources(
vnode
PRIVATE
+
# vnode
"src/vnd/vnodeOpen.c"
"src/vnd/vnodeBufPool.c"
@@ -13,7 +14,6 @@ target_sources(
"src/vnd/vnodeSvr.c"
"src/vnd/vnodeSync.c"
"src/vnd/vnodeSnapshot.c"
- "src/vnd/vnodeUtil.c"
# meta
"src/meta/metaOpen.c"
@@ -46,6 +46,7 @@ target_sources(
"src/tsdb/tsdbUtil.c"
"src/tsdb/tsdbSnapshot.c"
"src/tsdb/tsdbCacheRead.c"
+ "src/tsdb/tsdbRetention.c"
# tq
"src/tq/tq.c"
@@ -62,7 +63,6 @@ target_include_directories(
PUBLIC "inc"
PRIVATE "src/inc"
PUBLIC "${TD_SOURCE_DIR}/include/libs/scalar"
-
)
target_link_libraries(
vnode
@@ -76,18 +76,19 @@ target_link_libraries(
PUBLIC executor
PUBLIC scheduler
PUBLIC tdb
- #PUBLIC bdb
- #PUBLIC scalar
+
+ # PUBLIC bdb
+ # PUBLIC scalar
PUBLIC transport
PUBLIC stream
PUBLIC index
)
target_compile_definitions(vnode PUBLIC -DMETA_REFACT)
-if (${BUILD_WITH_INVERTEDINDEX})
- add_definitions(-DUSE_INVERTED_INDEX)
+
+if(${BUILD_WITH_INVERTEDINDEX})
+ add_definitions(-DUSE_INVERTED_INDEX)
endif(${BUILD_WITH_INVERTEDINDEX})
+
if(${BUILD_TEST})
add_subdirectory(test)
endif(${BUILD_TEST})
-
-
diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h
index ff29305b745dd4a772413d91ca15ef78c22d9156..62a39711ec7af9bf33b85b5565ecea3de1b27eb7 100644
--- a/source/dnode/vnode/inc/vnode.h
+++ b/source/dnode/vnode/inc/vnode.h
@@ -38,10 +38,11 @@ extern "C" {
#endif
// vnode
-typedef struct SVnode SVnode;
-typedef struct STsdbCfg STsdbCfg; // todo: remove
-typedef struct SVnodeCfg SVnodeCfg;
-typedef struct SVSnapshotReader SVSnapshotReader;
+typedef struct SVnode SVnode;
+typedef struct STsdbCfg STsdbCfg; // todo: remove
+typedef struct SVnodeCfg SVnodeCfg;
+typedef struct SVSnapReader SVSnapReader;
+typedef struct SVSnapWriter SVSnapWriter;
extern const SVnodeCfg vnodeCfgDefault;
@@ -57,10 +58,6 @@ void vnodeStop(SVnode *pVnode);
int64_t vnodeGetSyncHandle(SVnode *pVnode);
void vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnapshot);
void vnodeGetInfo(SVnode *pVnode, const char **dbname, int32_t *vgId);
-int32_t vnodeSnapshotReaderOpen(SVnode *pVnode, SVSnapshotReader **ppReader, int64_t sver, int64_t ever);
-int32_t vnodeSnapshotReaderClose(SVSnapshotReader *pReader);
-int32_t vnodeSnapshotRead(SVSnapshotReader *pReader, const void **ppData, uint32_t *nData);
-
int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen);
int32_t vnodeGetAllTableList(SVnode *pVnode, uint64_t uid, SArray *list);
int32_t vnodeGetCtbIdList(SVnode *pVnode, int64_t suid, SArray *list);
@@ -185,7 +182,14 @@ int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, STqReader *pReader);
// sma
int32_t smaGetTSmaDays(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days);
-// need to reposition
+// SVSnapReader
+int32_t vnodeSnapReaderOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapReader **ppReader);
+int32_t vnodeSnapReaderClose(SVSnapReader *pReader);
+int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData);
+// SVSnapWriter
+int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWriter **ppWriter);
+int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback);
+int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData);
// structs
struct STsdbCfg {
diff --git a/source/dnode/vnode/src/inc/meta.h b/source/dnode/vnode/src/inc/meta.h
index 66d1689d57c794cba300cca65c27886de2750370..e08925acc398f740c1bc4bcd205e14768cdfcc98 100644
--- a/source/dnode/vnode/src/inc/meta.h
+++ b/source/dnode/vnode/src/inc/meta.h
@@ -57,6 +57,9 @@ int metaRemoveTableFromIdx(SMeta* pMeta, tb_uid_t uid);
// metaCommit ==================
static FORCE_INLINE tb_uid_t metaGenerateUid(SMeta* pMeta) { return tGenIdPI64(); }
+// metaTable ==================
+int metaHandleEntry(SMeta* pMeta, const SMetaEntry* pME);
+
struct SMeta {
TdThreadRwlock lock;
diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h
index cd2dfd335106a0fd05466b87ee5eef8b4f3c9ba7..cce3da60cb1ed6844a844d21e95b47f94aced425 100644
--- a/source/dnode/vnode/src/inc/tsdb.h
+++ b/source/dnode/vnode/src/inc/tsdb.h
@@ -64,6 +64,7 @@ typedef struct SRowIter SRowIter;
typedef struct STsdbFS STsdbFS;
typedef struct SRowMerger SRowMerger;
typedef struct STsdbFSState STsdbFSState;
+typedef struct STsdbSnapHdr STsdbSnapHdr;
#define TSDB_MAX_SUBBLOCKS 8
#define TSDB_FHDR_SIZE 512
diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h
index 34fac045a735b79ce77d98a5f8686f4838febbcb..0c386babdeeb3060be5aaa822822835f9c67ffad 100644
--- a/source/dnode/vnode/src/inc/vnodeInt.h
+++ b/source/dnode/vnode/src/inc/vnodeInt.h
@@ -49,17 +49,20 @@
extern "C" {
#endif
-typedef struct SVnodeInfo SVnodeInfo;
-typedef struct SMeta SMeta;
-typedef struct SSma SSma;
-typedef struct STsdb STsdb;
-typedef struct STQ STQ;
-typedef struct SVState SVState;
-typedef struct SVBufPool SVBufPool;
-typedef struct SQWorker SQHandle;
-typedef struct STsdbKeepCfg STsdbKeepCfg;
-typedef struct SMetaSnapshotReader SMetaSnapshotReader;
-typedef struct STsdbSnapshotReader STsdbSnapshotReader;
+typedef struct SVnodeInfo SVnodeInfo;
+typedef struct SMeta SMeta;
+typedef struct SSma SSma;
+typedef struct STsdb STsdb;
+typedef struct STQ STQ;
+typedef struct SVState SVState;
+typedef struct SVBufPool SVBufPool;
+typedef struct SQWorker SQHandle;
+typedef struct STsdbKeepCfg STsdbKeepCfg;
+typedef struct SMetaSnapReader SMetaSnapReader;
+typedef struct SMetaSnapWriter SMetaSnapWriter;
+typedef struct STsdbSnapReader STsdbSnapReader;
+typedef struct STsdbSnapWriter STsdbSnapWriter;
+typedef struct SSnapDataHdr SSnapDataHdr;
#define VNODE_META_DIR "meta"
#define VNODE_TSDB_DIR "tsdb"
@@ -72,10 +75,8 @@ typedef struct STsdbSnapshotReader STsdbSnapshotReader;
#define VNODE_RSMA2_DIR "rsma2"
// vnd.h
-void* vnodeBufPoolMalloc(SVBufPool* pPool, int size);
-void vnodeBufPoolFree(SVBufPool* pPool, void* p);
-int32_t vnodeRealloc(void** pp, int32_t size);
-void vnodeFree(void* p);
+void* vnodeBufPoolMalloc(SVBufPool* pPool, int size);
+void vnodeBufPoolFree(SVBufPool* pPool, void* p);
// meta
typedef struct SMCtbCursor SMCtbCursor;
@@ -109,9 +110,6 @@ STSma* metaGetSmaInfoByIndex(SMeta* pMeta, int64_t indexUid);
STSmaWrapper* metaGetSmaInfoByTable(SMeta* pMeta, tb_uid_t uid, bool deepCopy);
SArray* metaGetSmaIdsByTable(SMeta* pMeta, tb_uid_t uid);
SArray* metaGetSmaTbUids(SMeta* pMeta);
-int32_t metaSnapshotReaderOpen(SMeta* pMeta, SMetaSnapshotReader** ppReader, int64_t sver, int64_t ever);
-int32_t metaSnapshotReaderClose(SMetaSnapshotReader* pReader);
-int32_t metaSnapshotRead(SMetaSnapshotReader* pReader, void** ppData, uint32_t* nData);
void* metaGetIdx(SMeta* pMeta);
void* metaGetIvtIdx(SMeta* pMeta);
int metaTtlSmaller(SMeta* pMeta, uint64_t time, SArray* uidList);
@@ -131,9 +129,6 @@ int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* p
int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey);
STsdbReader tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
void* pMemRef);
-int32_t tsdbSnapshotReaderOpen(STsdb* pTsdb, STsdbSnapshotReader** ppReader, int64_t sver, int64_t ever);
-int32_t tsdbSnapshotReaderClose(STsdbSnapshotReader* pReader);
-int32_t tsdbSnapshotRead(STsdbSnapshotReader* pReader, void** ppData, uint32_t* nData);
// tq
int tqInit();
@@ -183,6 +178,23 @@ int32_t tdUpdateTbUidList(SSma* pSma, STbUidStore* pUidStore);
void tdUidStoreDestory(STbUidStore* pStore);
void* tdUidStoreFree(STbUidStore* pStore);
+// SMetaSnapReader ========================================
+int32_t metaSnapReaderOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapReader** ppReader);
+int32_t metaSnapReaderClose(SMetaSnapReader** ppReader);
+int32_t metaSnapRead(SMetaSnapReader* pReader, uint8_t** ppData);
+// SMetaSnapWriter ========================================
+int32_t metaSnapWriterOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapWriter** ppWriter);
+int32_t metaSnapWrite(SMetaSnapWriter* pWriter, uint8_t* pData, uint32_t nData);
+int32_t metaSnapWriterClose(SMetaSnapWriter** ppWriter, int8_t rollback);
+// STsdbSnapReader ========================================
+int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapReader** ppReader);
+int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader);
+int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData);
+// STsdbSnapWriter ========================================
+int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWriter** ppWriter);
+int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData);
+int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback);
+
typedef struct {
int8_t streamType; // sma or other
int8_t dstType;
@@ -202,7 +214,9 @@ typedef struct {
struct SVState {
int64_t committed;
int64_t applied;
+ int64_t applyTerm;
int64_t commitID;
+ int64_t commitTerm;
};
struct SVnodeInfo {
@@ -291,6 +305,12 @@ struct SSma {
// sma
void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data);
+struct SSnapDataHdr {
+ int8_t type;
+ int64_t size;
+ uint8_t data[];
+};
+
#ifdef __cplusplus
}
#endif
diff --git a/source/dnode/vnode/src/meta/metaSnapshot.c b/source/dnode/vnode/src/meta/metaSnapshot.c
index 5757039d55d410808b4eeb57d2e09286b7939004..ac84842e851f659d7d6b1a1c6373cc3aa3a99f39 100644
--- a/source/dnode/vnode/src/meta/metaSnapshot.c
+++ b/source/dnode/vnode/src/meta/metaSnapshot.c
@@ -15,53 +15,57 @@
#include "meta.h"
-struct SMetaSnapshotReader {
+// SMetaSnapReader ========================================
+struct SMetaSnapReader {
SMeta* pMeta;
- TBC* pTbc;
int64_t sver;
int64_t ever;
+ TBC* pTbc;
};
-int32_t metaSnapshotReaderOpen(SMeta* pMeta, SMetaSnapshotReader** ppReader, int64_t sver, int64_t ever) {
- int32_t code = 0;
- int32_t c = 0;
- SMetaSnapshotReader* pMetaReader = NULL;
+int32_t metaSnapReaderOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapReader** ppReader) {
+ int32_t code = 0;
+ int32_t c = 0;
+ SMetaSnapReader* pMetaSnapReader = NULL;
- pMetaReader = (SMetaSnapshotReader*)taosMemoryCalloc(1, sizeof(*pMetaReader));
- if (pMetaReader == NULL) {
+ // alloc
+ pMetaSnapReader = (SMetaSnapReader*)taosMemoryCalloc(1, sizeof(*pMetaSnapReader));
+ if (pMetaSnapReader == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
- pMetaReader->pMeta = pMeta;
- pMetaReader->sver = sver;
- pMetaReader->ever = ever;
- code = tdbTbcOpen(pMeta->pTbDb, &pMetaReader->pTbc, NULL);
+ pMetaSnapReader->pMeta = pMeta;
+ pMetaSnapReader->sver = sver;
+ pMetaSnapReader->ever = ever;
+
+ // impl
+ code = tdbTbcOpen(pMeta->pTbDb, &pMetaSnapReader->pTbc, NULL);
if (code) {
goto _err;
}
- code = tdbTbcMoveTo(pMetaReader->pTbc, &(STbDbKey){.version = sver, .uid = INT64_MIN}, sizeof(STbDbKey), &c);
+ code = tdbTbcMoveTo(pMetaSnapReader->pTbc, &(STbDbKey){.version = sver, .uid = INT64_MIN}, sizeof(STbDbKey), &c);
if (code) {
goto _err;
}
- *ppReader = pMetaReader;
+ *ppReader = pMetaSnapReader;
return code;
_err:
+ metaError("vgId:%d meta snap reader open failed since %s", TD_VID(pMeta->pVnode), tstrerror(code));
*ppReader = NULL;
return code;
}
-int32_t metaSnapshotReaderClose(SMetaSnapshotReader* pReader) {
- if (pReader) {
- tdbTbcClose(pReader->pTbc);
- taosMemoryFree(pReader);
- }
+int32_t metaSnapReaderClose(SMetaSnapReader** ppReader) {
+ tdbTbcClose((*ppReader)->pTbc);
+ taosMemoryFree(*ppReader);
+ *ppReader = NULL;
return 0;
}
-int32_t metaSnapshotRead(SMetaSnapshotReader* pReader, void** ppData, uint32_t* nDatap) {
+int32_t metaSnapRead(SMetaSnapReader* pReader, uint8_t** ppData) {
const void* pKey = NULL;
const void* pData = NULL;
int32_t nKey = 0;
@@ -71,23 +75,110 @@ int32_t metaSnapshotRead(SMetaSnapshotReader* pReader, void** ppData, uint32_t*
for (;;) {
code = tdbTbcGet(pReader->pTbc, &pKey, &nKey, &pData, &nData);
if (code || ((STbDbKey*)pData)->version > pReader->ever) {
- return TSDB_CODE_VND_READ_END;
+ code = TSDB_CODE_VND_READ_END;
+ goto _exit;
}
if (((STbDbKey*)pData)->version < pReader->sver) {
+ tdbTbcMoveToNext(pReader->pTbc);
continue;
}
+ tdbTbcMoveToNext(pReader->pTbc);
break;
}
// copy the data
- if (vnodeRealloc(ppData, nData) < 0) {
+ if (tRealloc(ppData, sizeof(SSnapDataHdr) + nData) < 0) {
code = TSDB_CODE_OUT_OF_MEMORY;
return code;
}
+ ((SSnapDataHdr*)(*ppData))->type = 0; // TODO: use macro
+ ((SSnapDataHdr*)(*ppData))->size = nData;
+ memcpy(((SSnapDataHdr*)(*ppData))->data, pData, nData);
+
+_exit:
+ return code;
+}
- memcpy(*ppData, pData, nData);
- *nDatap = nData;
+// SMetaSnapWriter ========================================
+struct SMetaSnapWriter {
+ SMeta* pMeta;
+ int64_t sver;
+ int64_t ever;
+};
+
+static int32_t metaSnapRollback(SMetaSnapWriter* pWriter) {
+ int32_t code = 0;
+ // TODO
+ return code;
+}
+
+static int32_t metaSnapCommit(SMetaSnapWriter* pWriter) {
+ int32_t code = 0;
+ // TODO
+ return code;
+}
+
+int32_t metaSnapWriterOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapWriter** ppWriter) {
+ int32_t code = 0;
+ SMetaSnapWriter* pWriter;
+
+ // alloc
+ pWriter = (SMetaSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
+ if (pWriter == NULL) {
+ code = TSDB_CODE_OUT_OF_MEMORY;
+ goto _err;
+ }
+ pWriter->pMeta = pMeta;
+ pWriter->sver = sver;
+ pWriter->ever = ever;
+
+ *ppWriter = pWriter;
+ return code;
+
+_err:
+ metaError("vgId:%d meta snapshot writer open failed since %s", TD_VID(pMeta->pVnode), tstrerror(code));
+ *ppWriter = NULL;
+ return code;
+}
+
+int32_t metaSnapWriterClose(SMetaSnapWriter** ppWriter, int8_t rollback) {
+ int32_t code = 0;
+ SMetaSnapWriter* pWriter = *ppWriter;
+
+ if (rollback) {
+ code = metaSnapRollback(pWriter);
+ if (code) goto _err;
+ } else {
+ code = metaSnapCommit(pWriter);
+ if (code) goto _err;
+ }
+ taosMemoryFree(pWriter);
+ *ppWriter = NULL;
+
+ return code;
+
+_err:
+ metaError("vgId:%d meta snapshot writer close failed since %s", TD_VID(pWriter->pMeta->pVnode), tstrerror(code));
+ return code;
+}
+
+int32_t metaSnapWrite(SMetaSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
+ int32_t code = 0;
+ SMeta* pMeta = pWriter->pMeta;
+ SMetaEntry metaEntry = {0};
+ SDecoder* pDecoder = &(SDecoder){0};
+
+ tDecoderInit(pDecoder, pData, nData);
+ metaDecodeEntry(pDecoder, &metaEntry);
+
+ code = metaHandleEntry(pMeta, &metaEntry);
+ if (code) goto _err;
+
+ return code;
+
+_err:
+ metaError("vgId:%d meta snapshot write failed since %s", TD_VID(pMeta->pVnode), tstrerror(code));
return code;
}
\ No newline at end of file
diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c
index 341173103c2aa2ce522a31ce7c80e27d62d60005..daf7ccb26ae1091e386dd038aec713f857cb39aa 100644
--- a/source/dnode/vnode/src/meta/metaTable.c
+++ b/source/dnode/vnode/src/meta/metaTable.c
@@ -17,7 +17,6 @@
static int metaSaveJsonVarToIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSchema *pSchema);
static int metaDelJsonVarFromIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSchema *pSchema);
-static int metaHandleEntry(SMeta *pMeta, const SMetaEntry *pME);
static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME);
static int metaUpdateUidIdx(SMeta *pMeta, const SMetaEntry *pME);
static int metaUpdateNameIdx(SMeta *pMeta, const SMetaEntry *pME);
@@ -51,7 +50,7 @@ static int metaSaveJsonVarToIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const
if (pMeta->pTagIvtIdx == NULL || pCtbEntry == NULL) {
return -1;
}
- void * data = pCtbEntry->ctbEntry.pTags;
+ void *data = pCtbEntry->ctbEntry.pTags;
const char *tagName = pSchema->name;
tb_uid_t suid = pCtbEntry->ctbEntry.suid;
@@ -70,7 +69,7 @@ static int metaSaveJsonVarToIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const
STagVal *pTagVal = (STagVal *)taosArrayGet(pTagVals, i);
char type = pTagVal->type;
- char * key = pTagVal->pKey;
+ char *key = pTagVal->pKey;
int32_t nKey = strlen(key);
SIndexTerm *term = NULL;
@@ -78,7 +77,7 @@ static int metaSaveJsonVarToIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const
term = indexTermCreate(suid, ADD_VALUE, TSDB_DATA_TYPE_VARCHAR, key, nKey, NULL, 0);
} else if (type == TSDB_DATA_TYPE_NCHAR) {
if (pTagVal->nData > 0) {
- char * val = taosMemoryCalloc(1, pTagVal->nData + VARSTR_HEADER_SIZE);
+ char *val = taosMemoryCalloc(1, pTagVal->nData + VARSTR_HEADER_SIZE);
int32_t len = taosUcs4ToMbs((TdUcs4 *)pTagVal->pData, pTagVal->nData, val + VARSTR_HEADER_SIZE);
memcpy(val, (uint16_t *)&len, VARSTR_HEADER_SIZE);
type = TSDB_DATA_TYPE_VARCHAR;
@@ -109,7 +108,7 @@ int metaDelJsonVarFromIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSche
if (pMeta->pTagIvtIdx == NULL || pCtbEntry == NULL) {
return -1;
}
- void * data = pCtbEntry->ctbEntry.pTags;
+ void *data = pCtbEntry->ctbEntry.pTags;
const char *tagName = pSchema->name;
tb_uid_t suid = pCtbEntry->ctbEntry.suid;
@@ -128,7 +127,7 @@ int metaDelJsonVarFromIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSche
STagVal *pTagVal = (STagVal *)taosArrayGet(pTagVals, i);
char type = pTagVal->type;
- char * key = pTagVal->pKey;
+ char *key = pTagVal->pKey;
int32_t nKey = strlen(key);
SIndexTerm *term = NULL;
@@ -136,7 +135,7 @@ int metaDelJsonVarFromIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSche
term = indexTermCreate(suid, DEL_VALUE, TSDB_DATA_TYPE_VARCHAR, key, nKey, NULL, 0);
} else if (type == TSDB_DATA_TYPE_NCHAR) {
if (pTagVal->nData > 0) {
- char * val = taosMemoryCalloc(1, pTagVal->nData + VARSTR_HEADER_SIZE);
+ char *val = taosMemoryCalloc(1, pTagVal->nData + VARSTR_HEADER_SIZE);
int32_t len = taosUcs4ToMbs((TdUcs4 *)pTagVal->pData, pTagVal->nData, val + VARSTR_HEADER_SIZE);
memcpy(val, (uint16_t *)&len, VARSTR_HEADER_SIZE);
type = TSDB_DATA_TYPE_VARCHAR;
@@ -169,9 +168,9 @@ int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
int vLen = 0;
const void *pKey = NULL;
const void *pVal = NULL;
- void * pBuf = NULL;
+ void *pBuf = NULL;
int32_t szBuf = 0;
- void * p = NULL;
+ void *p = NULL;
SMetaReader mr = {0};
// validate req
@@ -229,7 +228,7 @@ int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq) {
}
// drop all child tables
- TBC * pCtbIdxc = NULL;
+ TBC *pCtbIdxc = NULL;
SArray *pArray = taosArrayInit(8, sizeof(tb_uid_t));
tdbTbcOpen(pMeta->pCtbIdx, &pCtbIdxc, &pMeta->txn);
@@ -285,8 +284,8 @@ _exit:
int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
SMetaEntry oStbEntry = {0};
SMetaEntry nStbEntry = {0};
- TBC * pUidIdxc = NULL;
- TBC * pTbDbc = NULL;
+ TBC *pUidIdxc = NULL;
+ TBC *pTbDbc = NULL;
const void *pData;
int nData;
int64_t oversion;
@@ -409,7 +408,7 @@ _err:
}
int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUids) {
- void * pData = NULL;
+ void *pData = NULL;
int nData = 0;
int rc = 0;
tb_uid_t uid;
@@ -477,7 +476,7 @@ static int metaDeleteTtlIdx(SMeta *pMeta, const SMetaEntry *pME) {
}
static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) {
- void * pData = NULL;
+ void *pData = NULL;
int nData = 0;
int rc = 0;
SMetaEntry e = {0};
@@ -538,14 +537,14 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) {
}
static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTbReq, STableMetaRsp *pMetaRsp) {
- void * pVal = NULL;
+ void *pVal = NULL;
int nVal = 0;
- const void * pData = NULL;
+ const void *pData = NULL;
int nData = 0;
int ret = 0;
tb_uid_t uid;
int64_t oversion;
- SSchema * pColumn = NULL;
+ SSchema *pColumn = NULL;
SMetaEntry entry = {0};
SSchemaWrapper *pSchema;
int c;
@@ -699,7 +698,7 @@ _err:
static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTbReq) {
SMetaEntry ctbEntry = {0};
SMetaEntry stbEntry = {0};
- void * pVal = NULL;
+ void *pVal = NULL;
int nVal = 0;
int ret;
int c;
@@ -730,7 +729,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
oversion = *(int64_t *)pData;
// search table.db
- TBC * pTbDbc = NULL;
+ TBC *pTbDbc = NULL;
SDecoder dc1 = {0};
SDecoder dc2 = {0};
@@ -754,7 +753,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
metaDecodeEntry(&dc2, &stbEntry);
SSchemaWrapper *pTagSchema = &stbEntry.stbEntry.schemaTag;
- SSchema * pColumn = NULL;
+ SSchema *pColumn = NULL;
int32_t iCol = 0;
for (;;) {
pColumn = NULL;
@@ -784,8 +783,8 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
memcpy((void *)ctbEntry.ctbEntry.pTags, pAlterTbReq->pTagVal, pAlterTbReq->nTagVal);
} else {
const STag *pOldTag = (const STag *)ctbEntry.ctbEntry.pTags;
- STag * pNewTag = NULL;
- SArray * pTagArray = taosArrayInit(pTagSchema->nCols, sizeof(STagVal));
+ STag *pNewTag = NULL;
+ SArray *pTagArray = taosArrayInit(pTagSchema->nCols, sizeof(STagVal));
if (!pTagArray) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
@@ -844,7 +843,7 @@ _err:
}
static int metaUpdateTableOptions(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTbReq) {
- void * pVal = NULL;
+ void *pVal = NULL;
int nVal = 0;
const void *pData = NULL;
int nData = 0;
@@ -948,8 +947,8 @@ int metaAlterTable(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq, STableMeta
static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME) {
STbDbKey tbDbKey;
- void * pKey = NULL;
- void * pVal = NULL;
+ void *pKey = NULL;
+ void *pVal = NULL;
int kLen = 0;
int vLen = 0;
SEncoder coder = {0};
@@ -1055,14 +1054,14 @@ static void metaDestroyTagIdxKey(STagIdxKey *pTagIdxKey) {
}
static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) {
- void * pData = NULL;
+ void *pData = NULL;
int nData = 0;
STbDbKey tbDbKey = {0};
SMetaEntry stbEntry = {0};
- STagIdxKey * pTagIdxKey = NULL;
+ STagIdxKey *pTagIdxKey = NULL;
int32_t nTagIdxKey;
const SSchema *pTagColumn; // = &stbEntry.stbEntry.schema.pSchema[0];
- const void * pTagData = NULL; //
+ const void *pTagData = NULL; //
int32_t nTagData = 0;
SDecoder dc = {0};
@@ -1109,7 +1108,7 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) {
static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME) {
SEncoder coder = {0};
- void * pVal = NULL;
+ void *pVal = NULL;
int vLen = 0;
int rcode = 0;
SSkmDbKey skmDbKey = {0};
@@ -1151,7 +1150,7 @@ _exit:
return rcode;
}
-static int metaHandleEntry(SMeta *pMeta, const SMetaEntry *pME) {
+int metaHandleEntry(SMeta *pMeta, const SMetaEntry *pME) {
metaWLock(pMeta);
// save to table.db
diff --git a/source/dnode/vnode/src/vnd/vnodeUtil.c b/source/dnode/vnode/src/tsdb/tsdbRetention.c
similarity index 52%
rename from source/dnode/vnode/src/vnd/vnodeUtil.c
rename to source/dnode/vnode/src/tsdb/tsdbRetention.c
index cd942099bc8924fde06ea912b0eecdfbe72603cb..e73f3f947cfa54ade3104e9a80f5a24fca12e782 100644
--- a/source/dnode/vnode/src/vnd/vnodeUtil.c
+++ b/source/dnode/vnode/src/tsdb/tsdbRetention.c
@@ -13,33 +13,30 @@
* along with this program. If not, see .
*/
-#include "vnd.h"
+#include "tsdb.h"
-int32_t vnodeRealloc(void** pp, int32_t size) {
- uint8_t* p = NULL;
- int32_t csize = 0;
+int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now) {
+ int32_t code = 0;
- if (*pp) {
- p = (uint8_t*)(*pp) - sizeof(int32_t);
- csize = *(int32_t*)p;
- }
+ // begin
+ code = tsdbFSBegin(pTsdb->fs);
+ if (code) goto _err;
- if (csize >= size) {
- return 0;
- }
+ // do retention
+ for (int32_t iSet = 0; iSet < taosArrayGetSize(pTsdb->fs->nState->aDFileSet); iSet++) {
+ SDFileSet *pDFileSet = (SDFileSet *)taosArrayGet(pTsdb->fs->nState->aDFileSet, iSet);
- p = (uint8_t*)taosMemoryRealloc(p, size);
- if (p == NULL) {
- return TSDB_CODE_OUT_OF_MEMORY;
+ // TODO
}
- *(int32_t*)p = size;
- *pp = p + sizeof(int32_t);
- return 0;
-}
+ // commit
+ code = tsdbFSCommit(pTsdb->fs);
+ if (code) goto _err;
-void vnodeFree(void* p) {
- if (p) {
- taosMemoryFree(((uint8_t*)p) - sizeof(int32_t));
- }
+_exit:
+ return code;
+
+_err:
+ tsdbError("vgId:%d tsdb do retention failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
+ return code;
}
\ No newline at end of file
diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c
index 79989a55601b99e681c573cae1f5c26e38cd7421..54087a787128d2672e9d6d8f29976ffececf3348 100644
--- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c
+++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c
@@ -15,22 +15,686 @@
#include "tsdb.h"
-struct STsdbSnapshotReader {
- STsdb* pTsdb;
- // TODO
+// STsdbSnapReader ========================================
+struct STsdbSnapReader {
+ STsdb* pTsdb;
+ int64_t sver;
+ int64_t ever;
+ // for data file
+ int8_t dataDone;
+ int32_t fid;
+ SDataFReader* pDataFReader;
+ SArray* aBlockIdx; // SArray
+ int32_t iBlockIdx;
+ SBlockIdx* pBlockIdx;
+ SMapData mBlock; // SMapData
+ int32_t iBlock;
+ SBlockData blkData;
+ // for del file
+ int8_t delDone;
+ SDelFReader* pDelFReader;
+ int32_t iDelIdx;
+ SArray* aDelIdx; // SArray
+ SArray* aDelData; // SArray
};
-int32_t tsdbSnapshotReaderOpen(STsdb* pTsdb, STsdbSnapshotReader** ppReader, int64_t sver, int64_t ever) {
+static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) {
+ int32_t code = 0;
+
+ while (true) {
+ if (pReader->pDataFReader == NULL) {
+ SDFileSet* pSet = NULL;
+
+ // search the next data file set to read (todo)
+ if (0 /* TODO */) {
+ code = TSDB_CODE_VND_READ_END;
+ goto _exit;
+ }
+
+ // open
+ code = tsdbDataFReaderOpen(&pReader->pDataFReader, pReader->pTsdb, pSet);
+ if (code) goto _err;
+
+ // SBlockIdx
+ code = tsdbReadBlockIdx(pReader->pDataFReader, pReader->aBlockIdx, NULL);
+ if (code) goto _err;
+
+ pReader->iBlockIdx = 0;
+ pReader->pBlockIdx = NULL;
+ }
+
+ while (true) {
+ if (pReader->pBlockIdx == NULL) {
+ if (pReader->iBlockIdx >= taosArrayGetSize(pReader->aBlockIdx)) {
+ tsdbDataFReaderClose(&pReader->pDataFReader);
+ break;
+ }
+
+ pReader->pBlockIdx = (SBlockIdx*)taosArrayGet(pReader->aBlockIdx, pReader->iBlockIdx);
+ pReader->iBlockIdx++;
+
+ // SBlock
+ code = tsdbReadBlock(pReader->pDataFReader, pReader->pBlockIdx, &pReader->mBlock, NULL);
+ if (code) goto _err;
+
+ pReader->iBlock = 0;
+ }
+
+ while (true) {
+ SBlock block;
+ SBlock* pBlock = █
+
+ if (pReader->iBlock >= pReader->mBlock.nItem) {
+ pReader->pBlockIdx = NULL;
+ break;
+ }
+
+ tMapDataGetItemByIdx(&pReader->mBlock, pReader->iBlock, pBlock, tGetBlock);
+ pReader->iBlock++;
+
+ if ((pBlock->minVersion >= pReader->sver && pBlock->minVersion <= pReader->ever) ||
+ (pBlock->maxVersion >= pReader->sver && pBlock->maxVersion <= pReader->ever)) {
+ // overlap (todo)
+
+ code = tsdbReadBlockData(pReader->pDataFReader, pReader->pBlockIdx, pBlock, &pReader->blkData, NULL, NULL);
+ if (code) goto _err;
+
+ goto _exit;
+ }
+ }
+ }
+ }
+
+_exit:
+ return code;
+
+_err:
+ tsdbError("vgId:%d snap read data failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
+ return code;
+}
+
+static int32_t tsdbSnapReadDel(STsdbSnapReader* pReader, uint8_t** ppData) {
+ int32_t code = 0;
+ STsdb* pTsdb = pReader->pTsdb;
+ SDelFile* pDelFile = pTsdb->fs->cState->pDelFile;
+
+ if (pReader->pDelFReader == NULL) {
+ if (pDelFile == NULL) {
+ code = TSDB_CODE_VND_READ_END;
+ goto _exit;
+ }
+
+ // open
+ code = tsdbDelFReaderOpen(&pReader->pDelFReader, pDelFile, pTsdb, NULL);
+ if (code) goto _err;
+
+ // read index
+ code = tsdbReadDelIdx(pReader->pDelFReader, pReader->aDelIdx, NULL);
+ if (code) goto _err;
+
+ pReader->iDelIdx = 0;
+ }
+
+ while (pReader->iDelIdx < taosArrayGetSize(pReader->aDelIdx)) {
+ SDelIdx* pDelIdx = (SDelIdx*)taosArrayGet(pReader->aDelIdx, pReader->iDelIdx);
+ int32_t size = 0;
+
+ pReader->iDelIdx++;
+
+ code = tsdbReadDelData(pReader->pDelFReader, pDelIdx, pReader->aDelData, NULL);
+ if (code) goto _err;
+
+ for (int32_t iDelData = 0; iDelData < taosArrayGetSize(pReader->aDelData); iDelData++) {
+ SDelData* pDelData = (SDelData*)taosArrayGet(pReader->aDelData, iDelData);
+
+ if (pDelData->version >= pReader->sver && pDelData->version <= pReader->ever) {
+ size += tPutDelData(NULL, pDelData);
+ }
+ }
+
+ if (size > 0) {
+ int64_t n = 0;
+
+ size = size + sizeof(SSnapDataHdr) + sizeof(TABLEID);
+ code = tRealloc(ppData, size);
+ if (code) goto _err;
+
+ // SSnapDataHdr
+ SSnapDataHdr* pSnapDataHdr = (SSnapDataHdr*)(*ppData + n);
+ pSnapDataHdr->type = 1;
+ pSnapDataHdr->size = size; // TODO: size here may incorrect
+ n += sizeof(SSnapDataHdr);
+
+ // TABLEID
+ TABLEID* pId = (TABLEID*)(*ppData + n);
+ pId->suid = pDelIdx->suid;
+ pId->uid = pDelIdx->uid;
+ n += sizeof(*pId);
+
+ // DATA
+ for (int32_t iDelData = 0; iDelData < taosArrayGetSize(pReader->aDelData); iDelData++) {
+ SDelData* pDelData = (SDelData*)taosArrayGet(pReader->aDelData, iDelData);
+
+ if (pDelData->version >= pReader->sver && pDelData->version <= pReader->ever) {
+ n += tPutDelData(*ppData + n, pDelData);
+ }
+ }
+
+ goto _exit;
+ }
+ }
+
+ code = TSDB_CODE_VND_READ_END;
+ tsdbDelFReaderClose(&pReader->pDelFReader);
+
+_exit:
+ return code;
+
+_err:
+ tsdbError("vgId:%d snap read del failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
+ return code;
+}
+
+int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapReader** ppReader) {
+ int32_t code = 0;
+ STsdbSnapReader* pReader = NULL;
+
+ // alloc
+ pReader = (STsdbSnapReader*)taosMemoryCalloc(1, sizeof(*pReader));
+ if (pReader == NULL) {
+ code = TSDB_CODE_OUT_OF_MEMORY;
+ goto _err;
+ }
+ pReader->pTsdb = pTsdb;
+ pReader->sver = sver;
+ pReader->ever = ever;
+
+ pReader->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
+ if (pReader->aBlockIdx == NULL) {
+ code = TSDB_CODE_OUT_OF_MEMORY;
+ goto _err;
+ }
+
+ pReader->mBlock = tMapDataInit();
+
+ code = tBlockDataInit(&pReader->blkData);
+ if (code) goto _err;
+
+ pReader->aDelIdx = taosArrayInit(0, sizeof(SDelIdx));
+ if (pReader->aDelIdx == NULL) {
+ code = TSDB_CODE_OUT_OF_MEMORY;
+ goto _err;
+ }
+
+ pReader->aDelData = taosArrayInit(0, sizeof(SDelData));
+ if (pReader->aDelData == NULL) {
+ code = TSDB_CODE_OUT_OF_MEMORY;
+ goto _err;
+ }
+
+ *ppReader = pReader;
+ return code;
+
+_err:
+ tsdbError("vgId:%d snapshot reader open failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
+ *ppReader = NULL;
+ return code;
+}
+
+int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader) {
+ int32_t code = 0;
+ STsdbSnapReader* pReader = *ppReader;
+
+ taosArrayDestroy(pReader->aDelData);
+ taosArrayDestroy(pReader->aDelIdx);
+ if (pReader->pDelFReader) {
+ tsdbDelFReaderClose(&pReader->pDelFReader);
+ }
+ tBlockDataClear(&pReader->blkData);
+ tMapDataClear(&pReader->mBlock);
+ taosArrayDestroy(pReader->aBlockIdx);
+ if (pReader->pDataFReader) {
+ tsdbDataFReaderClose(&pReader->pDataFReader);
+ }
+ taosMemoryFree(pReader);
+ *ppReader = NULL;
+
+ return code;
+}
+
+int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData) {
+ int32_t code = 0;
+
+ // read data file
+ if (!pReader->dataDone) {
+ code = tsdbSnapReadData(pReader, ppData);
+ if (code) {
+ if (code == TSDB_CODE_VND_READ_END) {
+ pReader->dataDone = 1;
+ } else {
+ goto _err;
+ }
+ } else {
+ goto _exit;
+ }
+ }
+
+ // read del file
+ if (!pReader->delDone) {
+ code = tsdbSnapReadDel(pReader, ppData);
+ if (code) {
+ if (code == TSDB_CODE_VND_READ_END) {
+ pReader->delDone = 1;
+ } else {
+ goto _err;
+ }
+ } else {
+ goto _exit;
+ }
+ }
+
+ code = TSDB_CODE_VND_READ_END;
+
+_exit:
+ return code;
+
+_err:
+ tsdbError("vgId:%d snapshot read failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
+ return code;
+}
+
+// STsdbSnapWriter ========================================
+struct STsdbSnapWriter {
+ STsdb* pTsdb;
+ int64_t sver;
+ int64_t ever;
+
+ // config
+ int32_t minutes;
+ int8_t precision;
+
+ // for data file
+ int32_t fid;
+ SDataFReader* pDataFReader;
+ SArray* aBlockIdx;
+ int32_t iBlockIdx;
+ SBlockIdx* pBlockIdx;
+ SMapData mBlock;
+ int32_t iBlock;
+ SBlockData blockData;
+ int32_t iRow;
+
+ SDataFWriter* pDataFWriter;
+ SArray* aBlockIdxN;
+ SBlockIdx blockIdx;
+ SMapData mBlockN;
+ SBlock block;
+ SBlockData nBlockData;
+
+ // for del file
+ SDelFReader* pDelFReader;
+ SDelFWriter* pDelFWriter;
+ int32_t iDelIdx;
+ SArray* aDelIdx;
+ SArray* aDelData;
+ SArray* aDelIdxN;
+};
+
+static int32_t tsdbSnapRollback(STsdbSnapWriter* pWriter) {
+ int32_t code = 0;
// TODO
- return 0;
+ return code;
}
-int32_t tsdbSnapshotReaderClose(STsdbSnapshotReader* pReader) {
+static int32_t tsdbSnapCommit(STsdbSnapWriter* pWriter) {
+ int32_t code = 0;
// TODO
- return 0;
+ return code;
}
-int32_t tsdbSnapshotRead(STsdbSnapshotReader* pReader, void** ppData, uint32_t* nData) {
+static int32_t tsdbSnapWriteDataEnd(STsdbSnapWriter* pWriter) {
+ int32_t code = 0;
+ STsdb* pTsdb = pWriter->pTsdb;
+
+ if (pWriter->pDataFWriter == NULL) goto _exit;
+
// TODO
- return 0;
+
+ code = tsdbDataFWriterClose(&pWriter->pDataFWriter, 0);
+ if (code) goto _err;
+
+ if (pWriter->pDataFReader) {
+ code = tsdbDataFReaderClose(&pWriter->pDataFReader);
+ if (code) goto _err;
+ }
+
+_exit:
+ return code;
+
+_err:
+ tsdbError("vgId:%d tsdb snapshot writer data end failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
+ return code;
+}
+
+static int32_t tsdbSnapWriteAppendData(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
+ int32_t code = 0;
+ int32_t iRow = 0; // todo
+ int32_t nRow = 0; // todo
+ SBlockData* pBlockData = NULL; // todo
+
+ while (iRow < nRow) {
+ code = tBlockDataAppendRow(&pWriter->nBlockData, &tsdbRowFromBlockData(pBlockData, iRow), NULL);
+ if (code) goto _err;
+ }
+
+ return code;
+
+_err:
+ tsdbError("vgId:%d tsdb snapshot write append data failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
+ return code;
+}
+
+static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
+ int32_t code = 0;
+ STsdb* pTsdb = pWriter->pTsdb;
+ int64_t suid = 0; // todo
+ int64_t uid = 0; // todo
+ int64_t skey; // todo
+ int64_t ekey; // todo
+
+ int32_t fid = tsdbKeyFid(skey, pWriter->minutes, pWriter->precision);
+ ASSERT(fid == tsdbKeyFid(ekey, pWriter->minutes, pWriter->precision));
+
+ // begin
+ if (pWriter->pDataFWriter == NULL || pWriter->fid != fid) {
+ code = tsdbSnapWriteDataEnd(pWriter);
+ if (code) goto _err;
+
+ pWriter->fid = fid;
+ SDFileSet* pSet = tsdbFSStateGetDFileSet(pTsdb->fs->nState, fid);
+ // reader
+ if (pSet) {
+ // open
+ code = tsdbDataFReaderOpen(&pWriter->pDataFReader, pTsdb, pSet);
+ if (code) goto _err;
+
+ // SBlockIdx
+ code = tsdbReadBlockIdx(pWriter->pDataFReader, pWriter->aBlockIdx, NULL);
+ if (code) goto _err;
+ } else {
+ taosArrayClear(pWriter->aBlockIdx);
+ }
+ pWriter->iBlockIdx = 0;
+
+ // writer
+ SDFileSet wSet = {0};
+ if (pSet == NULL) {
+ wSet = (SDFileSet){0}; // todo
+ } else {
+ wSet = (SDFileSet){0}; // todo
+ }
+
+ code = tsdbDataFWriterOpen(&pWriter->pDataFWriter, pTsdb, &wSet);
+ if (code) goto _err;
+
+ taosArrayClear(pWriter->aBlockIdxN);
+ }
+
+ // process
+ TABLEID id = {0}; // TODO
+ TSKEY minKey = 0; // TODO
+ TSKEY maxKey = 0; // TODO
+
+ while (true) {
+ if (pWriter->pBlockIdx) {
+ int32_t c = tTABLEIDCmprFn(&id, pWriter->pBlockIdx);
+
+ if (c == 0) {
+ } else if (c < 0) {
+ // keep merge
+ } else {
+ // code = tsdbSnapWriteTableDataEnd(pWriter);
+ if (code) goto _err;
+
+ pWriter->iBlockIdx++;
+ if (pWriter->iBlockIdx < taosArrayGetSize(pWriter->aBlockIdx)) {
+ pWriter->pBlockIdx = (SBlockIdx*)taosArrayGet(pWriter->aBlockIdx, pWriter->iBlockIdx);
+ } else {
+ pWriter->pBlockIdx = NULL;
+ }
+
+ if (pWriter->pBlockIdx) {
+ code = tsdbReadBlock(pWriter->pDataFReader, pWriter->pBlockIdx, &pWriter->mBlock, NULL);
+ if (code) goto _err;
+ }
+ }
+ } else {
+ int32_t c = tTABLEIDCmprFn(&id, &pWriter->blockIdx);
+
+ if (c == 0) {
+ // merge commit the block data
+ } else if (c > 0) {
+ // code = tsdbSnapWriteTableDataEnd(pWriter);
+ if (code) goto _err;
+ } else {
+ ASSERT(0);
+ }
+ }
+ }
+
+ return code;
+
+_err:
+ tsdbError("vgId:%d tsdb snapshot write data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
+ return code;
+}
+
+static int32_t tsdbSnapWriteDel(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
+ int32_t code = 0;
+ STsdb* pTsdb = pWriter->pTsdb;
+
+ if (pWriter->pDelFWriter == NULL) {
+ SDelFile* pDelFile = tsdbFSStateGetDelFile(pTsdb->fs->nState);
+
+ // reader
+ if (pDelFile) {
+ code = tsdbDelFReaderOpen(&pWriter->pDelFReader, pDelFile, pTsdb, NULL);
+ if (code) goto _err;
+
+ code = tsdbReadDelIdx(pWriter->pDelFReader, pWriter->aDelIdx, NULL);
+ if (code) goto _err;
+ }
+
+ // writer
+ SDelFile delFile = {.commitID = pTsdb->pVnode->state.commitID, .offset = 0, .size = 0};
+ code = tsdbDelFWriterOpen(&pWriter->pDelFWriter, &delFile, pTsdb);
+ if (code) goto _err;
+ }
+
+ // process the del data
+ TABLEID id = {0}; // todo
+
+ while (true) {
+ SDelIdx* pDelIdx = NULL;
+ int64_t n = 0;
+ SDelData delData;
+ SDelIdx delIdx;
+ int8_t toBreak = 0;
+
+ if (pWriter->iDelIdx < taosArrayGetSize(pWriter->aDelIdx)) {
+ pDelIdx = taosArrayGet(pWriter->aDelIdx, pWriter->iDelIdx);
+ }
+
+ if (pDelIdx) {
+ int32_t c = tTABLEIDCmprFn(&id, pDelIdx);
+ if (c < 0) {
+ goto _new_del;
+ } else {
+ code = tsdbReadDelData(pWriter->pDelFReader, pDelIdx, pWriter->aDelData, NULL);
+ if (code) goto _err;
+
+ pWriter->iDelIdx++;
+ if (c == 0) {
+ toBreak = 1;
+ delIdx = (SDelIdx){.suid = id.suid, .uid = id.uid};
+ goto _merge_del;
+ } else {
+ delIdx = (SDelIdx){.suid = pDelIdx->suid, .uid = pDelIdx->uid};
+ goto _write_del;
+ }
+ }
+ }
+
+ _new_del:
+ toBreak = 1;
+ delIdx = (SDelIdx){.suid = id.suid, .uid = id.uid};
+ taosArrayClear(pWriter->aDelData);
+
+ _merge_del:
+ while (n < nData) {
+ n += tGetDelData(pData + n, &delData);
+ if (taosArrayPush(pWriter->aDelData, &delData) == NULL) {
+ code = TSDB_CODE_OUT_OF_MEMORY;
+ goto _err;
+ }
+ }
+
+ _write_del:
+ code = tsdbWriteDelData(pWriter->pDelFWriter, pWriter->aDelData, NULL, &delIdx);
+ if (code) goto _err;
+
+ if (taosArrayPush(pWriter->aDelIdxN, &delIdx) == NULL) {
+ code = TSDB_CODE_OUT_OF_MEMORY;
+ goto _err;
+ }
+
+ if (toBreak) break;
+ }
+
+_exit:
+ return code;
+
+_err:
+ tsdbError("vgId:%d tsdb snapshot write del failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
+ return code;
+}
+
+static int32_t tsdbSnapWriteDelEnd(STsdbSnapWriter* pWriter) {
+ int32_t code = 0;
+ STsdb* pTsdb = pWriter->pTsdb;
+
+ if (pWriter->pDelFWriter == NULL) goto _exit;
+ for (; pWriter->iDelIdx < taosArrayGetSize(pWriter->aDelIdx); pWriter->iDelIdx++) {
+ SDelIdx* pDelIdx = (SDelIdx*)taosArrayGet(pWriter->aDelIdx, pWriter->iDelIdx);
+
+ code = tsdbReadDelData(pWriter->pDelFReader, pDelIdx, pWriter->aDelData, NULL);
+ if (code) goto _err;
+
+ SDelIdx delIdx = (SDelIdx){.suid = pDelIdx->suid, .uid = pDelIdx->uid};
+ code = tsdbWriteDelData(pWriter->pDelFWriter, pWriter->aDelData, NULL, &delIdx);
+ if (code) goto _err;
+
+ if (taosArrayPush(pWriter->aDelIdx, &delIdx) == NULL) {
+ code = TSDB_CODE_OUT_OF_MEMORY;
+ goto _err;
+ }
+ }
+
+ code = tsdbUpdateDelFileHdr(pWriter->pDelFWriter);
+ if (code) goto _err;
+
+ code = tsdbFSStateUpsertDelFile(pTsdb->fs->nState, &pWriter->pDelFWriter->fDel);
+ if (code) goto _err;
+
+ code = tsdbDelFWriterClose(&pWriter->pDelFWriter, 1);
+ if (code) goto _err;
+
+ if (pWriter->pDelFReader) {
+ code = tsdbDelFReaderClose(&pWriter->pDelFReader);
+ if (code) goto _err;
+ }
+
+_exit:
+ return code;
+
+_err:
+ tsdbError("vgId:%d tsdb snapshow write del end failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
+ return code;
+}
+
+int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWriter** ppWriter) {
+ int32_t code = 0;
+ STsdbSnapWriter* pWriter = NULL;
+
+ // alloc
+ pWriter = (STsdbSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
+ if (pWriter == NULL) {
+ code = TSDB_CODE_OUT_OF_MEMORY;
+ goto _err;
+ }
+ pWriter->pTsdb = pTsdb;
+ pWriter->sver = sver;
+ pWriter->ever = ever;
+
+ *ppWriter = pWriter;
+ return code;
+
+_err:
+ tsdbError("vgId:%d tsdb snapshot writer open failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
+ *ppWriter = NULL;
+ return code;
+}
+
+int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) {
+ int32_t code = 0;
+ STsdbSnapWriter* pWriter = *ppWriter;
+
+ if (rollback) {
+ code = tsdbSnapRollback(pWriter);
+ if (code) goto _err;
+ } else {
+ code = tsdbSnapWriteDataEnd(pWriter);
+ if (code) goto _err;
+
+ code = tsdbSnapWriteDelEnd(pWriter);
+ if (code) goto _err;
+
+ code = tsdbSnapCommit(pWriter);
+ if (code) goto _err;
+ }
+
+ taosMemoryFree(pWriter);
+ *ppWriter = NULL;
+
+ return code;
+
+_err:
+ tsdbError("vgId:%d tsdb snapshot writer close failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
+ return code;
+}
+
+int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
+ int32_t code = 0;
+ int8_t type = pData[0];
+
+ // ts data
+ if (type == 0) {
+ code = tsdbSnapWriteData(pWriter, pData + 1, nData - 1);
+ if (code) goto _err;
+ } else {
+ code = tsdbSnapWriteDataEnd(pWriter);
+ if (code) goto _err;
+ }
+
+ // del data
+ if (type == 1) {
+ code = tsdbSnapWriteDel(pWriter, pData + 1, nData - 1);
+ if (code) goto _err;
+ }
+
+ return code;
+
+_err:
+ tsdbError("vgId:%d tsdb snapshow write failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
+ return code;
}
diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c
index 2e628edb7a2946521ed9f4aa5e4a4cdd173ee32a..415a674737b57bb3e0a50cfc377606d2f19d4867 100644
--- a/source/dnode/vnode/src/tsdb/tsdbUtil.c
+++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c
@@ -87,8 +87,10 @@ int32_t tPutMapData(uint8_t *p, SMapData *pMapData) {
n += tPutI32v(p ? p + n : p, pMapData->nItem);
if (pMapData->nItem) {
+ int32_t lOffset = 0;
for (int32_t iItem = 0; iItem < pMapData->nItem; iItem++) {
- n += tPutI32v(p ? p + n : p, pMapData->aOffset[iItem]);
+ n += tPutI32v(p ? p + n : p, pMapData->aOffset[iItem] - lOffset);
+ lOffset = pMapData->aOffset[iItem];
}
n += tPutI32v(p ? p + n : p, pMapData->nData);
@@ -111,8 +113,11 @@ int32_t tGetMapData(uint8_t *p, SMapData *pMapData) {
if (pMapData->nItem) {
if (tRealloc((uint8_t **)&pMapData->aOffset, sizeof(int32_t) * pMapData->nItem)) return -1;
+ int32_t lOffset = 0;
for (int32_t iItem = 0; iItem < pMapData->nItem; iItem++) {
n += tGetI32v(p + n, &pMapData->aOffset[iItem]);
+ pMapData->aOffset[iItem] += lOffset;
+ lOffset = pMapData->aOffset[iItem];
}
n += tGetI32v(p + n, &pMapData->nData);
diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c
index 21db14f0dfec36e24601295ac9784501fca2446a..ed829666cd0002d6b4a2d54e22d9052ac1ab7e44 100644
--- a/source/dnode/vnode/src/vnd/vnodeCommit.c
+++ b/source/dnode/vnode/src/vnd/vnodeCommit.c
@@ -223,6 +223,7 @@ int vnodeCommit(SVnode *pVnode) {
// save info
info.config = pVnode->config;
info.state.committed = pVnode->state.applied;
+ info.state.commitTerm = pVnode->state.applyTerm;
info.state.commitID = pVnode->state.commitID;
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path);
if (vnodeSaveInfo(dir, &info) < 0) {
@@ -270,7 +271,7 @@ int vnodeCommit(SVnode *pVnode) {
ASSERT(0);
return -1;
}
-
+
pVnode->state.committed = info.state.committed;
// postCommit
@@ -316,6 +317,7 @@ static int vnodeEncodeState(const void *pObj, SJson *pJson) {
if (tjsonAddIntegerToObject(pJson, "commit version", pState->committed) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "commit ID", pState->commitID) < 0) return -1;
+ if (tjsonAddIntegerToObject(pJson, "commit term", pState->commitTerm) < 0) return -1;
return 0;
}
@@ -328,6 +330,8 @@ static int vnodeDecodeState(const SJson *pJson, void *pObj) {
if (code < 0) return -1;
tjsonGetNumberValue(pJson, "commit ID", pState->commitID, code);
if (code < 0) return -1;
+ tjsonGetNumberValue(pJson, "commit term", pState->commitTerm, code);
+ if (code < 0) return -1;
return 0;
}
diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c
index 4267dd9b1f560ae10f83fb8b27d79b211490961e..e59f8ae5587bee78606afdbea682440bde8b3515 100644
--- a/source/dnode/vnode/src/vnd/vnodeOpen.c
+++ b/source/dnode/vnode/src/vnd/vnodeOpen.c
@@ -79,8 +79,10 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
strcpy(pVnode->path, path);
pVnode->config = info.config;
pVnode->state.committed = info.state.committed;
+ pVnode->state.commitTerm = info.state.commitTerm;
pVnode->state.applied = info.state.committed;
pVnode->state.commitID = info.state.commitID;
+ pVnode->state.commitTerm = info.state.commitTerm;
pVnode->pTfs = pTfs;
pVnode->msgCb = msgCb;
pVnode->blockCount = 0;
@@ -194,4 +196,9 @@ void vnodeStop(SVnode *pVnode) {}
int64_t vnodeGetSyncHandle(SVnode *pVnode) { return pVnode->sync; }
-void vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnapshot) { pSnapshot->lastApplyIndex = pVnode->state.committed; }
+void vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnapshot) {
+ pSnapshot->data = NULL;
+ pSnapshot->lastApplyIndex = pVnode->state.committed;
+ pSnapshot->lastApplyTerm = pVnode->state.commitTerm;
+ pSnapshot->lastConfigIndex = -1;
+}
diff --git a/source/dnode/vnode/src/vnd/vnodeSnapshot.c b/source/dnode/vnode/src/vnd/vnodeSnapshot.c
index baa8422307dd7785201bcc4b8b632bb3c05a37cb..27f30ec7878351f4e0a2d78d21da5a1fdc960873 100644
--- a/source/dnode/vnode/src/vnd/vnodeSnapshot.c
+++ b/source/dnode/vnode/src/vnd/vnodeSnapshot.c
@@ -13,24 +13,27 @@
* along with this program. If not, see .
*/
-#include "vnodeInt.h"
-
-struct SVSnapshotReader {
- SVnode *pVnode;
- int64_t sver;
- int64_t ever;
- int8_t isMetaEnd;
- int8_t isTsdbEnd;
- SMetaSnapshotReader *pMetaReader;
- STsdbSnapshotReader *pTsdbReader;
- void *pData;
- int32_t nData;
+#include "vnd.h"
+
+// SVSnapReader ========================================================
+struct SVSnapReader {
+ SVnode *pVnode;
+ int64_t sver;
+ int64_t ever;
+ // meta
+ int8_t metaDone;
+ SMetaSnapReader *pMetaReader;
+ // tsdb
+ int8_t tsdbDone;
+ STsdbSnapReader *pTsdbReader;
+ uint8_t *pData;
};
-int32_t vnodeSnapshotReaderOpen(SVnode *pVnode, SVSnapshotReader **ppReader, int64_t sver, int64_t ever) {
- SVSnapshotReader *pReader = NULL;
+int32_t vnodeSnapReaderOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapReader **ppReader) {
+ int32_t code = 0;
+ SVSnapReader *pReader = NULL;
- pReader = (SVSnapshotReader *)taosMemoryCalloc(1, sizeof(*pReader));
+ pReader = (SVSnapReader *)taosMemoryCalloc(1, sizeof(*pReader));
if (pReader == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
@@ -38,72 +41,169 @@ int32_t vnodeSnapshotReaderOpen(SVnode *pVnode, SVSnapshotReader **ppReader, int
pReader->pVnode = pVnode;
pReader->sver = sver;
pReader->ever = ever;
- pReader->isMetaEnd = 0;
- pReader->isTsdbEnd = 0;
- if (metaSnapshotReaderOpen(pVnode->pMeta, &pReader->pMetaReader, sver, ever) < 0) {
- taosMemoryFree(pReader);
- goto _err;
- }
+ code = metaSnapReaderOpen(pVnode->pMeta, sver, ever, &pReader->pMetaReader);
+ if (code) goto _err;
- if (tsdbSnapshotReaderOpen(pVnode->pTsdb, &pReader->pTsdbReader, sver, ever) < 0) {
- metaSnapshotReaderClose(pReader->pMetaReader);
- taosMemoryFree(pReader);
- goto _err;
- }
+ code = tsdbSnapReaderOpen(pVnode->pTsdb, sver, ever, &pReader->pTsdbReader);
+ if (code) goto _err;
-_exit:
*ppReader = pReader;
- return 0;
+ return code;
_err:
+ vError("vgId:%d vnode snapshot reader open failed since %s", TD_VID(pVnode), tstrerror(code));
*ppReader = NULL;
- return -1;
+ return code;
}
-int32_t vnodeSnapshotReaderClose(SVSnapshotReader *pReader) {
- if (pReader) {
- vnodeFree(pReader->pData);
- tsdbSnapshotReaderClose(pReader->pTsdbReader);
- metaSnapshotReaderClose(pReader->pMetaReader);
- taosMemoryFree(pReader);
- }
- return 0;
+int32_t vnodeSnapReaderClose(SVSnapReader *pReader) {
+ int32_t code = 0;
+
+ tFree(pReader->pData);
+ if (pReader->pTsdbReader) tsdbSnapReaderClose(&pReader->pTsdbReader);
+ if (pReader->pMetaReader) metaSnapReaderClose(&pReader->pMetaReader);
+ taosMemoryFree(pReader);
+
+ return code;
}
-int32_t vnodeSnapshotRead(SVSnapshotReader *pReader, const void **ppData, uint32_t *nData) {
+int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) {
int32_t code = 0;
- if (!pReader->isMetaEnd) {
- code = metaSnapshotRead(pReader->pMetaReader, &pReader->pData, &pReader->nData);
+ if (!pReader->metaDone) {
+ code = metaSnapRead(pReader->pMetaReader, &pReader->pData);
if (code) {
if (code == TSDB_CODE_VND_READ_END) {
- pReader->isMetaEnd = 1;
+ pReader->metaDone = 1;
} else {
- return code;
+ goto _err;
}
} else {
*ppData = pReader->pData;
- *nData = pReader->nData;
- return code;
+ *nData = sizeof(SSnapDataHdr) + ((SSnapDataHdr *)pReader->pData)->size;
+ goto _exit;
}
}
- if (!pReader->isTsdbEnd) {
- code = tsdbSnapshotRead(pReader->pTsdbReader, &pReader->pData, &pReader->nData);
+ if (!pReader->tsdbDone) {
+ code = tsdbSnapRead(pReader->pTsdbReader, &pReader->pData);
if (code) {
if (code == TSDB_CODE_VND_READ_END) {
- pReader->isTsdbEnd = 1;
+ pReader->tsdbDone = 1;
} else {
- return code;
+ goto _err;
}
} else {
*ppData = pReader->pData;
- *nData = pReader->nData;
- return code;
+ *nData = sizeof(SSnapDataHdr) + ((SSnapDataHdr *)pReader->pData)->size;
+ goto _exit;
}
}
code = TSDB_CODE_VND_READ_END;
+
+_exit:
+ return code;
+
+_err:
+ vError("vgId:% snapshot read failed since %s", TD_VID(pReader->pVnode), tstrerror(code));
+ return code;
+}
+
+// SVSnapWriter ========================================================
+struct SVSnapWriter {
+ SVnode *pVnode;
+ int64_t sver;
+ int64_t ever;
+ // meta
+ SMetaSnapWriter *pMetaSnapWriter;
+ // tsdb
+ STsdbSnapWriter *pTsdbSnapWriter;
+};
+
+static int32_t vnodeSnapRollback(SVSnapWriter *pWriter) {
+ int32_t code = 0;
+ // TODO
+ return code;
+}
+
+static int32_t vnodeSnapCommit(SVSnapWriter *pWriter) {
+ int32_t code = 0;
+ // TODO
+ return code;
+}
+
+int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWriter **ppWriter) {
+ int32_t code = 0;
+ SVSnapWriter *pWriter = NULL;
+
+ // alloc
+ pWriter = (SVSnapWriter *)taosMemoryCalloc(1, sizeof(*pWriter));
+ if (pWriter == NULL) {
+ code = TSDB_CODE_OUT_OF_MEMORY;
+ goto _err;
+ }
+ pWriter->pVnode = pVnode;
+ pWriter->sver = sver;
+ pWriter->ever = ever;
+
+ return code;
+
+_err:
+ vError("vgId:%d vnode snapshot writer open failed since %s", TD_VID(pVnode), tstrerror(code));
+ return code;
+}
+
+int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback) {
+ int32_t code = 0;
+
+ if (rollback) {
+ code = vnodeSnapRollback(pWriter);
+ if (code) goto _err;
+ } else {
+ code = vnodeSnapCommit(pWriter);
+ if (code) goto _err;
+ }
+
+ taosMemoryFree(pWriter);
+ return code;
+
+_err:
+ vError("vgId:%d vnode snapshow writer close failed since %s", TD_VID(pWriter->pVnode), tstrerror(code));
+ return code;
+}
+
+int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) {
+ int32_t code = 0;
+ SSnapDataHdr *pSnapDataHdr = (SSnapDataHdr *)pData;
+ SVnode *pVnode = pWriter->pVnode;
+
+ ASSERT(pSnapDataHdr->size + sizeof(SSnapDataHdr) == nData);
+
+ if (pSnapDataHdr->type == 0) {
+ // meta
+ if (pWriter->pMetaSnapWriter == NULL) {
+ code = metaSnapWriterOpen(pVnode->pMeta, pWriter->sver, pWriter->ever, &pWriter->pMetaSnapWriter);
+ if (code) goto _err;
+ }
+
+ code = metaSnapWrite(pWriter->pMetaSnapWriter, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
+ if (code) goto _err;
+ } else {
+ // tsdb
+ if (pWriter->pTsdbSnapWriter == NULL) {
+ code = tsdbSnapWriterOpen(pVnode->pTsdb, pWriter->sver, pWriter->ever, &pWriter->pTsdbSnapWriter);
+ if (code) goto _err;
+ }
+
+ code = tsdbSnapWrite(pWriter->pTsdbSnapWriter, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
+ if (code) goto _err;
+ }
+
+ return code;
+
+_err:
+ vError("vgId:%d vnode snapshot write failed since %s", TD_VID(pVnode), tstrerror(code));
return code;
}
\ No newline at end of file
diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c
index cd25707fce3288d1c79c47a53bf2b939e236e454..dceeb4c2828f864462ac73156e1254b46c4cb02e 100644
--- a/source/dnode/vnode/src/vnd/vnodeSvr.c
+++ b/source/dnode/vnode/src/vnd/vnodeSvr.c
@@ -143,6 +143,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
version);
pVnode->state.applied = version;
+ pVnode->state.applyTerm = pMsg->info.conn.applyTerm;
// skip header
pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c
index a16ba8b89e39882e5cbc99ae81a62e1ac3254504..a2f5fa6dbb1e5ba524cae3f1cf1d531a1505f345 100644
--- a/source/dnode/vnode/src/vnd/vnodeSync.c
+++ b/source/dnode/vnode/src/vnd/vnodeSync.c
@@ -453,20 +453,19 @@ static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta
static int32_t vnodeSnapshotStartRead(struct SSyncFSM *pFsm, void *pParam, void **ppReader) {
SVnode *pVnode = pFsm->data;
SSnapshotParam *pSnapshotParam = pParam;
- int32_t code =
- vnodeSnapshotReaderOpen(pVnode, (SVSnapshotReader **)ppReader, pSnapshotParam->start, pSnapshotParam->end);
+ int32_t code = vnodeSnapReaderOpen(pVnode, pSnapshotParam->start, pSnapshotParam->end, (SVSnapReader **)ppReader);
return code;
}
static int32_t vnodeSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) {
SVnode *pVnode = pFsm->data;
- int32_t code = vnodeSnapshotReaderClose(pReader);
+ int32_t code = vnodeSnapReaderClose(pReader);
return code;
}
static int32_t vnodeSnapshotDoRead(struct SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) {
SVnode *pVnode = pFsm->data;
- int32_t code = vnodeSnapshotRead(pReader, (const void **)ppBuf, len);
+ int32_t code = vnodeSnapRead(pReader, (uint8_t **)ppBuf, len);
return code;
}