提交 b084dd8e 编写于 作者: L Liu Jicong

feat(tq): transport snapshot

上级 bb6fabcc
...@@ -203,6 +203,7 @@ SWalRef *walRefCommittedVer(SWal *); ...@@ -203,6 +203,7 @@ SWalRef *walRefCommittedVer(SWal *);
SWalRef *walOpenRef(SWal *); SWalRef *walOpenRef(SWal *);
void walCloseRef(SWal *pWal, int64_t refId); void walCloseRef(SWal *pWal, int64_t refId);
int32_t walRefVer(SWalRef *, int64_t ver); int32_t walRefVer(SWalRef *, int64_t ver);
int32_t walPreRefVer(SWalRef *pRef, int64_t ver);
void walUnrefVer(SWalRef *); void walUnrefVer(SWalRef *);
// helper function for raft // helper function for raft
......
...@@ -58,6 +58,8 @@ target_sources( ...@@ -58,6 +58,8 @@ target_sources(
"src/tq/tqPush.c" "src/tq/tqPush.c"
"src/tq/tqSink.c" "src/tq/tqSink.c"
"src/tq/tqCommit.c" "src/tq/tqCommit.c"
"src/tq/tqSnapshot.c"
"src/tq/tqOffsetSnapshot.c"
) )
target_include_directories( target_include_directories(
vnode vnode
......
...@@ -80,7 +80,8 @@ struct SMeta { ...@@ -80,7 +80,8 @@ struct SMeta {
TTB* pSmaIdx; TTB* pSmaIdx;
TTB* pTaskIdx; // stream
TTB* pStreamDb;
SMetaIdx* pIdx; SMetaIdx* pIdx;
}; };
......
...@@ -133,6 +133,9 @@ typedef struct { ...@@ -133,6 +133,9 @@ typedef struct {
static STqMgmt tqMgmt = {0}; static STqMgmt tqMgmt = {0};
int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle);
int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle);
// tqRead // tqRead
int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* offset); int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* offset);
int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum); int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum);
...@@ -146,6 +149,7 @@ int32_t tqMetaOpen(STQ* pTq); ...@@ -146,6 +149,7 @@ int32_t tqMetaOpen(STQ* pTq);
int32_t tqMetaClose(STQ* pTq); int32_t tqMetaClose(STQ* pTq);
int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle); int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle);
int32_t tqMetaDeleteHandle(STQ* pTq, const char* key); int32_t tqMetaDeleteHandle(STQ* pTq, const char* key);
int32_t tqMetaRestoreHandle(STQ* pTq);
typedef struct { typedef struct {
int32_t size; int32_t size;
...@@ -156,11 +160,15 @@ void tqOffsetClose(STqOffsetStore*); ...@@ -156,11 +160,15 @@ void tqOffsetClose(STqOffsetStore*);
STqOffset* tqOffsetRead(STqOffsetStore* pStore, const char* subscribeKey); STqOffset* tqOffsetRead(STqOffsetStore* pStore, const char* subscribeKey);
int32_t tqOffsetWrite(STqOffsetStore* pStore, const STqOffset* pOffset); int32_t tqOffsetWrite(STqOffsetStore* pStore, const STqOffset* pOffset);
int32_t tqOffsetDelete(STqOffsetStore* pStore, const char* subscribeKey); int32_t tqOffsetDelete(STqOffsetStore* pStore, const char* subscribeKey);
int32_t tqOffsetSnapshot(STqOffsetStore* pStore); int32_t tqOffsetCommitFile(STqOffsetStore* pStore);
// tqSink // tqSink
void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data); void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
// tqOffset
char* tqOffsetBuildFName(const char* path, int32_t ver);
int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname);
static FORCE_INLINE void tqOffsetResetToData(STqOffsetVal* pOffsetVal, int64_t uid, int64_t ts) { static FORCE_INLINE void tqOffsetResetToData(STqOffsetVal* pOffsetVal, int64_t uid, int64_t ts) {
pOffsetVal->type = TMQ_OFFSET__SNAPSHOT_DATA; pOffsetVal->type = TMQ_OFFSET__SNAPSHOT_DATA;
pOffsetVal->uid = uid; pOffsetVal->uid = uid;
......
...@@ -49,22 +49,30 @@ ...@@ -49,22 +49,30 @@
extern "C" { extern "C" {
#endif #endif
typedef struct SVnodeInfo SVnodeInfo; typedef struct SVnodeInfo SVnodeInfo;
typedef struct SMeta SMeta; typedef struct SMeta SMeta;
typedef struct SSma SSma; typedef struct SSma SSma;
typedef struct STsdb STsdb; typedef struct STsdb STsdb;
typedef struct STQ STQ; typedef struct STQ STQ;
typedef struct SVState SVState; typedef struct SVState SVState;
typedef struct SVBufPool SVBufPool; typedef struct SVBufPool SVBufPool;
typedef struct SQWorker SQHandle; typedef struct SQWorker SQHandle;
typedef struct STsdbKeepCfg STsdbKeepCfg; typedef struct STsdbKeepCfg STsdbKeepCfg;
typedef struct SMetaSnapReader SMetaSnapReader; typedef struct SMetaSnapReader SMetaSnapReader;
typedef struct SMetaSnapWriter SMetaSnapWriter; typedef struct SMetaSnapWriter SMetaSnapWriter;
typedef struct STsdbSnapReader STsdbSnapReader; typedef struct STsdbSnapReader STsdbSnapReader;
typedef struct STsdbSnapWriter STsdbSnapWriter; typedef struct STsdbSnapWriter STsdbSnapWriter;
typedef struct SRsmaSnapReader SRsmaSnapReader; typedef struct STqSnapReader STqSnapReader;
typedef struct SRsmaSnapWriter SRsmaSnapWriter; typedef struct STqSnapWriter STqSnapWriter;
typedef struct SSnapDataHdr SSnapDataHdr; typedef struct STqOffsetReader STqOffsetReader;
typedef struct STqOffsetWriter STqOffsetWriter;
typedef struct SStreamTaskReader SStreamTaskReader;
typedef struct SStreamTaskWriter SStreamTaskWriter;
typedef struct SStreamStateReader SStreamStateReader;
typedef struct SStreamStateWriter SStreamStateWriter;
typedef struct SRsmaSnapReader SRsmaSnapReader;
typedef struct SRsmaSnapWriter SRsmaSnapWriter;
typedef struct SSnapDataHdr SSnapDataHdr;
#define VNODE_META_DIR "meta" #define VNODE_META_DIR "meta"
#define VNODE_TSDB_DIR "tsdb" #define VNODE_TSDB_DIR "tsdb"
...@@ -205,6 +213,26 @@ int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData); ...@@ -205,6 +213,26 @@ int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData);
int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWriter** ppWriter); int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWriter** ppWriter);
int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData); int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData);
int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback); int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback);
// STqSnapshotReader ==
int32_t tqSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapReader** ppReader);
int32_t tqSnapReaderClose(STqSnapReader** ppReader);
int32_t tqSnapRead(STqSnapReader* pReader, uint8_t** ppData);
// STqSnapshotWriter ======================================
int32_t tqSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapWriter** ppWriter);
int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback);
int32_t tqSnapWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData);
// STqOffsetReader ========================================
int32_t tqOffsetReaderOpen(STQ* pTq, int64_t sver, int64_t ever, STqOffsetReader** ppReader);
int32_t tqOffsetReaderClose(STqOffsetReader** ppReader);
int32_t tqOffsetSnapRead(STqOffsetReader* pReader, uint8_t** ppData);
// STqOffsetWriter ========================================
int32_t tqOffsetWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqOffsetWriter** ppWriter);
int32_t tqOffsetWriterClose(STqOffsetWriter** ppWriter, int8_t rollback);
int32_t tqOffsetSnapWrite(STqOffsetWriter* pWriter, uint8_t* pData, uint32_t nData);
// SStreamTaskWriter ======================================
// SStreamTaskReader ======================================
// SStreamStateWriter =====================================
// SStreamStateReader =====================================
// SRsmaSnapReader ======================================== // SRsmaSnapReader ========================================
int32_t rsmaSnapReaderOpen(SSma* pSma, int64_t sver, int64_t ever, SRsmaSnapReader** ppReader); int32_t rsmaSnapReaderOpen(SSma* pSma, int64_t sver, int64_t ever, SRsmaSnapReader** ppReader);
int32_t rsmaSnapReaderClose(SRsmaSnapReader** ppReader); int32_t rsmaSnapReaderClose(SRsmaSnapReader** ppReader);
...@@ -331,6 +359,10 @@ enum { ...@@ -331,6 +359,10 @@ enum {
SNAP_DATA_RSMA1 = 3, SNAP_DATA_RSMA1 = 3,
SNAP_DATA_RSMA2 = 4, SNAP_DATA_RSMA2 = 4,
SNAP_DATA_QTASK = 5, SNAP_DATA_QTASK = 5,
SNAP_DATA_TQ_HANDLE = 6,
SNAP_DATA_TQ_OFFSET = 7,
SNAP_DATA_STREAM_TASK = 8,
SNAP_DATA_STREAM_STATE = 9,
}; };
struct SSnapDataHdr { struct SSnapDataHdr {
......
...@@ -131,7 +131,7 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) { ...@@ -131,7 +131,7 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) {
goto _err; goto _err;
} }
ret = tdbTbOpen("stream.task.db", sizeof(int64_t), -1, taskIdxKeyCmpr, pMeta->pEnv, &pMeta->pTaskIdx); ret = tdbTbOpen("stream.task.db", sizeof(int64_t), -1, taskIdxKeyCmpr, pMeta->pEnv, &pMeta->pStreamDb);
if (ret < 0) { if (ret < 0) {
metaError("vgId: %d, failed to open meta stream task index since %s", TD_VID(pVnode), tstrerror(terrno)); metaError("vgId: %d, failed to open meta stream task index since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err; goto _err;
...@@ -150,7 +150,7 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) { ...@@ -150,7 +150,7 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) {
_err: _err:
if (pMeta->pIdx) metaCloseIdx(pMeta); if (pMeta->pIdx) metaCloseIdx(pMeta);
if (pMeta->pTaskIdx) tdbTbClose(pMeta->pTaskIdx); if (pMeta->pStreamDb) tdbTbClose(pMeta->pStreamDb);
if (pMeta->pSmaIdx) tdbTbClose(pMeta->pSmaIdx); if (pMeta->pSmaIdx) tdbTbClose(pMeta->pSmaIdx);
if (pMeta->pTtlIdx) tdbTbClose(pMeta->pTtlIdx); if (pMeta->pTtlIdx) tdbTbClose(pMeta->pTtlIdx);
if (pMeta->pTagIvtIdx) indexClose(pMeta->pTagIvtIdx); if (pMeta->pTagIvtIdx) indexClose(pMeta->pTagIvtIdx);
...@@ -170,7 +170,7 @@ _err: ...@@ -170,7 +170,7 @@ _err:
int metaClose(SMeta *pMeta) { int metaClose(SMeta *pMeta) {
if (pMeta) { if (pMeta) {
if (pMeta->pIdx) metaCloseIdx(pMeta); if (pMeta->pIdx) metaCloseIdx(pMeta);
if (pMeta->pTaskIdx) tdbTbClose(pMeta->pTaskIdx); if (pMeta->pStreamDb) tdbTbClose(pMeta->pStreamDb);
if (pMeta->pSmaIdx) tdbTbClose(pMeta->pSmaIdx); if (pMeta->pSmaIdx) tdbTbClose(pMeta->pSmaIdx);
if (pMeta->pTtlIdx) tdbTbClose(pMeta->pTtlIdx); if (pMeta->pTtlIdx) tdbTbClose(pMeta->pTtlIdx);
if (pMeta->pTagIvtIdx) indexClose(pMeta->pTagIvtIdx); if (pMeta->pTagIvtIdx) indexClose(pMeta->pTagIvtIdx);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "meta.h"
...@@ -548,6 +548,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { ...@@ -548,6 +548,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
SWalRef* pRef = walRefCommittedVer(pTq->pVnode->pWal); SWalRef* pRef = walRefCommittedVer(pTq->pVnode->pWal);
if (pRef == NULL) { if (pRef == NULL) {
ASSERT(0); ASSERT(0);
return -1;
} }
int64_t ver = pRef->refVer; int64_t ver = pRef->refVer;
pHandle->pRef = pRef; pHandle->pRef = pRef;
......
...@@ -15,4 +15,4 @@ ...@@ -15,4 +15,4 @@
#include "tq.h" #include "tq.h"
int tqCommit(STQ* pTq) { return tqOffsetSnapshot(pTq->pOffsetStore); } int tqCommit(STQ* pTq) { return tqOffsetCommitFile(pTq->pOffsetStore); }
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
#include "tdbInt.h" #include "tdbInt.h"
#include "tq.h" #include "tq.h"
static int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle) { int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle) {
if (tStartEncode(pEncoder) < 0) return -1; if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeCStr(pEncoder, pHandle->subKey) < 0) return -1; if (tEncodeCStr(pEncoder, pHandle->subKey) < 0) return -1;
if (tEncodeI64(pEncoder, pHandle->consumerId) < 0) return -1; if (tEncodeI64(pEncoder, pHandle->consumerId) < 0) return -1;
...@@ -29,7 +29,7 @@ static int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle) { ...@@ -29,7 +29,7 @@ static int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle) {
return pEncoder->pos; return pEncoder->pos;
} }
static int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) { int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) {
if (tStartDecode(pDecoder) < 0) return -1; if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pHandle->subKey) < 0) return -1; if (tDecodeCStrTo(pDecoder, pHandle->subKey) < 0) return -1;
if (tDecodeI64(pDecoder, &pHandle->consumerId) < 0) return -1; if (tDecodeI64(pDecoder, &pHandle->consumerId) < 0) return -1;
...@@ -43,33 +43,20 @@ static int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) { ...@@ -43,33 +43,20 @@ static int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) {
return 0; return 0;
} }
int32_t tqMetaOpen(STQ* pTq) { int32_t tqMetaRestoreHandle(STQ* pTq) {
if (tdbOpen(pTq->path, 16 * 1024, 1, &pTq->pMetaStore) < 0) { TBC* pCur = NULL;
if (tdbTbcOpen(pTq->pExecStore, &pCur, NULL) < 0) {
ASSERT(0); ASSERT(0);
return -1;
} }
if (tdbTbOpen("handles", -1, -1, NULL, pTq->pMetaStore, &pTq->pExecStore) < 0) { void* pKey = NULL;
ASSERT(0); int kLen = 0;
} void* pVal = NULL;
int vLen = 0;
TXN txn = {0}; SDecoder decoder;
if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) {
ASSERT(0);
}
TBC* pCur;
if (tdbTbcOpen(pTq->pExecStore, &pCur, &txn) < 0) {
ASSERT(0);
}
void* pKey = NULL;
int kLen = 0;
void* pVal = NULL;
int vLen = 0;
tdbTbcMoveToFirst(pCur); tdbTbcMoveToFirst(pCur);
SDecoder decoder;
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) { while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
STqHandle handle; STqHandle handle;
...@@ -79,6 +66,7 @@ int32_t tqMetaOpen(STQ* pTq) { ...@@ -79,6 +66,7 @@ int32_t tqMetaOpen(STQ* pTq) {
handle.pRef = walOpenRef(pTq->pVnode->pWal); handle.pRef = walOpenRef(pTq->pVnode->pWal);
if (handle.pRef == NULL) { if (handle.pRef == NULL) {
ASSERT(0); ASSERT(0);
return -1;
} }
walRefVer(handle.pRef, handle.snapshotVer); walRefVer(handle.pRef, handle.snapshotVer);
...@@ -109,9 +97,24 @@ int32_t tqMetaOpen(STQ* pTq) { ...@@ -109,9 +97,24 @@ int32_t tqMetaOpen(STQ* pTq) {
} }
tdbTbcClose(pCur); tdbTbcClose(pCur);
if (tdbTxnClose(&txn) < 0) { return 0;
}
int32_t tqMetaOpen(STQ* pTq) {
if (tdbOpen(pTq->path, 16 * 1024, 1, &pTq->pMetaStore) < 0) {
ASSERT(0); ASSERT(0);
return -1;
} }
if (tdbTbOpen("tq.db", -1, -1, NULL, pTq->pMetaStore, &pTq->pExecStore) < 0) {
ASSERT(0);
return -1;
}
if (tqMetaRestoreHandle(pTq) < 0) {
return -1;
}
return 0; return 0;
} }
......
...@@ -22,29 +22,15 @@ struct STqOffsetStore { ...@@ -22,29 +22,15 @@ struct STqOffsetStore {
SHashObj* pHash; // SHashObj<subscribeKey, offset> SHashObj* pHash; // SHashObj<subscribeKey, offset>
}; };
static char* buildFileName(const char* path) { char* tqOffsetBuildFName(const char* path, int32_t ver) {
int32_t len = strlen(path); int32_t len = strlen(path);
char* fname = taosMemoryCalloc(1, len + 20); char* fname = taosMemoryCalloc(1, len + 40);
snprintf(fname, len + 20, "%s/offset", path); snprintf(fname, len + 40, "%s/offset-ver%d", path, ver);
return fname; return fname;
} }
STqOffsetStore* tqOffsetOpen(STQ* pTq) { int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname) {
STqOffsetStore* pStore = taosMemoryCalloc(1, sizeof(STqOffsetStore));
if (pStore == NULL) {
return NULL;
}
pStore->pTq = pTq;
pTq->pOffsetStore = pStore;
pStore->pHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);
if (pStore->pHash == NULL) {
if (pStore->pHash) taosHashCleanup(pStore->pHash);
return NULL;
}
char* fname = buildFileName(pStore->pTq->path);
TdFilePtr pFile = taosOpenFile(fname, TD_FILE_READ); TdFilePtr pFile = taosOpenFile(fname, TD_FILE_READ);
taosMemoryFree(fname);
if (pFile != NULL) { if (pFile != NULL) {
STqOffsetHead head = {0}; STqOffsetHead head = {0};
int64_t code; int64_t code;
...@@ -79,11 +65,32 @@ STqOffsetStore* tqOffsetOpen(STQ* pTq) { ...@@ -79,11 +65,32 @@ STqOffsetStore* tqOffsetOpen(STQ* pTq) {
taosCloseFile(&pFile); taosCloseFile(&pFile);
} }
return 0;
}
STqOffsetStore* tqOffsetOpen(STQ* pTq) {
STqOffsetStore* pStore = taosMemoryCalloc(1, sizeof(STqOffsetStore));
if (pStore == NULL) {
return NULL;
}
pStore->pTq = pTq;
pTq->pOffsetStore = pStore;
pStore->pHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);
if (pStore->pHash == NULL) {
taosMemoryFree(pStore);
return NULL;
}
char* fname = tqOffsetBuildFName(pStore->pTq->path, 0);
if (tqOffsetRestoreFromFile(pStore, fname) < 0) {
ASSERT(0);
}
taosMemoryFree(fname);
return pStore; return pStore;
} }
void tqOffsetClose(STqOffsetStore* pStore) { void tqOffsetClose(STqOffsetStore* pStore) {
tqOffsetSnapshot(pStore); tqOffsetCommitFile(pStore);
taosHashCleanup(pStore->pHash); taosHashCleanup(pStore->pHash);
taosMemoryFree(pStore); taosMemoryFree(pStore);
} }
...@@ -93,8 +100,6 @@ STqOffset* tqOffsetRead(STqOffsetStore* pStore, const char* subscribeKey) { ...@@ -93,8 +100,6 @@ STqOffset* tqOffsetRead(STqOffsetStore* pStore, const char* subscribeKey) {
} }
int32_t tqOffsetWrite(STqOffsetStore* pStore, const STqOffset* pOffset) { int32_t tqOffsetWrite(STqOffsetStore* pStore, const STqOffset* pOffset) {
/*ASSERT(pOffset->val.type == TMQ_OFFSET__LOG);*/
/*ASSERT(pOffset->val.version >= 0);*/
return taosHashPut(pStore->pHash, pOffset->subKey, strlen(pOffset->subKey), pOffset, sizeof(STqOffset)); return taosHashPut(pStore->pHash, pOffset->subKey, strlen(pOffset->subKey), pOffset, sizeof(STqOffset));
} }
...@@ -102,10 +107,9 @@ int32_t tqOffsetDelete(STqOffsetStore* pStore, const char* subscribeKey) { ...@@ -102,10 +107,9 @@ int32_t tqOffsetDelete(STqOffsetStore* pStore, const char* subscribeKey) {
return taosHashRemove(pStore->pHash, subscribeKey, strlen(subscribeKey)); return taosHashRemove(pStore->pHash, subscribeKey, strlen(subscribeKey));
} }
int32_t tqOffsetSnapshot(STqOffsetStore* pStore) { int32_t tqOffsetCommitFile(STqOffsetStore* pStore) {
// open file // TODO file name should be with a newer version
// TODO file name should be with a version char* fname = tqOffsetBuildFName(pStore->pTq->path, 0);
char* fname = buildFileName(pStore->pTq->path);
TdFilePtr pFile = taosOpenFile(fname, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); TdFilePtr pFile = taosOpenFile(fname, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
taosMemoryFree(fname); taosMemoryFree(fname);
if (pFile == NULL) { if (pFile == NULL) {
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "meta.h"
#include "tdbInt.h"
#include "tq.h"
// STqOffsetReader ========================================
struct STqOffsetReader {
STQ* pTq;
int64_t sver;
int64_t ever;
int8_t readEnd;
};
int32_t tqOffsetReaderOpen(STQ* pTq, int64_t sver, int64_t ever, STqOffsetReader** ppReader) {
STqOffsetReader* pReader = NULL;
pReader = taosMemoryCalloc(1, sizeof(STqOffsetReader));
if (pReader == NULL) {
*ppReader = NULL;
return -1;
}
pReader->pTq = pTq;
pReader->sver = sver;
pReader->ever = ever;
tqInfo("vgId:%d vnode snapshot tq offset reader opened", TD_VID(pTq->pVnode));
*ppReader = pReader;
return 0;
}
int32_t tqOffsetReaderClose(STqOffsetReader** ppReader) {
taosMemoryFree(*ppReader);
*ppReader = NULL;
return 0;
}
int32_t tqOffsetSnapRead(STqOffsetReader* pReader, uint8_t** ppData) {
if (pReader->readEnd != 0) return 0;
char* fname = tqOffsetBuildFName(pReader->pTq->path, 0);
TdFilePtr pFile = taosOpenFile(fname, TD_FILE_READ);
taosMemoryFree(fname);
if (pFile != NULL) {
return 0;
}
int64_t sz = 0;
if (taosStatFile(fname, &sz, NULL) < 0) {
ASSERT(0);
}
SSnapDataHdr* buf = taosMemoryCalloc(1, sz + sizeof(SSnapDataHdr));
if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
void* abuf = POINTER_SHIFT(buf, sizeof(SSnapDataHdr));
int64_t contLen = taosReadFile(pFile, abuf, sz);
if (contLen != sz) {
ASSERT(0);
return -1;
}
buf->size = sz;
buf->type = SNAP_DATA_TQ_OFFSET;
*ppData = (uint8_t*)buf;
pReader->readEnd = 1;
return 0;
}
// STqOffseWriter ========================================
struct STqOffsetWriter {
STQ* pTq;
int64_t sver;
int64_t ever;
int32_t tmpFileVer;
char* fname;
};
int32_t tqOffsetWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqOffsetWriter** ppWriter) {
int32_t code = 0;
STqOffsetWriter* pWriter;
pWriter = (STqOffsetWriter*)taosMemoryCalloc(1, sizeof(STqOffsetWriter));
if (pWriter == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pWriter->pTq = pTq;
pWriter->sver = sver;
pWriter->ever = ever;
*ppWriter = pWriter;
return code;
_err:
tqError("vgId:%d tq snapshot writer open failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
*ppWriter = NULL;
return code;
}
int32_t tqOffsetWriterClose(STqOffsetWriter** ppWriter, int8_t rollback) {
STqOffsetWriter* pWriter = *ppWriter;
STQ* pTq = pWriter->pTq;
char* fname = tqOffsetBuildFName(pTq->path, 0);
if (rollback) {
taosRemoveFile(pWriter->fname);
} else {
taosRenameFile(pWriter->fname, fname);
if (tqOffsetRestoreFromFile(pTq->pOffsetStore, fname) < 0) {
ASSERT(0);
}
}
taosMemoryFree(fname);
taosMemoryFree(pWriter->fname);
taosMemoryFree(pWriter);
*ppWriter = NULL;
return 0;
}
int32_t tqOffsetSnapWrite(STqOffsetWriter* pWriter, uint8_t* pData, uint32_t nData) {
STQ* pTq = pWriter->pTq;
pWriter->tmpFileVer = 1;
pWriter->fname = tqOffsetBuildFName(pTq->path, pWriter->tmpFileVer);
TdFilePtr pFile = taosOpenFile(pWriter->fname, TD_FILE_CREATE | TD_FILE_WRITE);
SSnapDataHdr* pHdr = (SSnapDataHdr*)pData;
int64_t size = pHdr->size;
ASSERT(size == nData - sizeof(SSnapDataHdr));
if (pFile) {
int64_t contLen = taosWriteFile(pFile, pHdr->data, size);
if (contLen != size) {
ASSERT(0);
}
} else {
ASSERT(0);
return -1;
}
return 0;
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "meta.h"
#include "tdbInt.h"
#include "tq.h"
// STqSnapReader ========================================
struct STqSnapReader {
STQ* pTq;
int64_t sver;
int64_t ever;
TBC* pCur;
};
int32_t tqSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapReader** ppReader) {
int32_t code = 0;
STqSnapReader* pReader = NULL;
// alloc
pReader = (STqSnapReader*)taosMemoryCalloc(1, sizeof(STqSnapReader));
if (pReader == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pReader->pTq = pTq;
pReader->sver = sver;
pReader->ever = ever;
// impl
code = tdbTbcOpen(pTq->pExecStore, &pReader->pCur, NULL);
if (code) {
taosMemoryFree(pReader);
goto _err;
}
code = tdbTbcMoveToFirst(pReader->pCur);
if (code) {
taosMemoryFree(pReader);
goto _err;
}
tqInfo("vgId:%d vnode snapshot tq reader opened", TD_VID(pTq->pVnode));
*ppReader = pReader;
return code;
_err:
tqError("vgId:%d vnode snapshot tq reader open failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
*ppReader = NULL;
return code;
}
int32_t tqSnapReaderClose(STqSnapReader** ppReader) {
int32_t code = 0;
tdbTbcClose((*ppReader)->pCur);
taosMemoryFree(*ppReader);
*ppReader = NULL;
return code;
}
int32_t tqSnapRead(STqSnapReader* pReader, uint8_t** ppData) {
int32_t code = 0;
const void* pKey = NULL;
const void* pVal = NULL;
int32_t kLen = 0;
int32_t vLen = 0;
SDecoder decoder;
STqHandle handle;
*ppData = NULL;
for (;;) {
if (tdbTbcGet(pReader->pCur, &pKey, &kLen, &pVal, &vLen)) {
goto _exit;
}
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
tDecodeSTqHandle(&decoder, &handle);
tDecoderClear(&decoder);
if (handle.snapshotVer <= pReader->sver && handle.snapshotVer >= pReader->ever) {
tdbTbcMoveToNext(pReader->pCur);
break;
} else {
tdbTbcMoveToNext(pReader->pCur);
}
}
ASSERT(pVal && vLen);
*ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + vLen);
if (*ppData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
pHdr->type = SNAP_DATA_TQ_HANDLE;
pHdr->size = vLen;
memcpy(pHdr->data, pVal, vLen);
tqInfo("vgId:%d vnode snapshot tq read data, version:%" PRId64 " subKey: %s vLen:%d", TD_VID(pReader->pTq->pVnode),
handle.snapshotVer, handle.subKey, vLen);
_exit:
return code;
_err:
tqError("vgId:%d vnode snapshot tq read data failed since %s", TD_VID(pReader->pTq->pVnode), tstrerror(code));
return code;
}
// STqSnapWriter ========================================
struct STqSnapWriter {
STQ* pTq;
int64_t sver;
int64_t ever;
TXN txn;
};
int32_t tqSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapWriter** ppWriter) {
int32_t code = 0;
STqSnapWriter* pWriter;
// alloc
pWriter = (STqSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
if (pWriter == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pWriter->pTq = pTq;
pWriter->sver = sver;
pWriter->ever = ever;
if (tdbTxnOpen(&pWriter->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) {
ASSERT(0);
}
*ppWriter = pWriter;
return code;
_err:
tqError("vgId:%d tq snapshot writer open failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
*ppWriter = NULL;
return code;
}
int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) {
int32_t code = 0;
STqSnapWriter* pWriter = *ppWriter;
STQ* pTq = pWriter->pTq;
if (rollback) {
ASSERT(0);
} else {
code = tdbCommit(pWriter->pTq->pMetaStore, &pWriter->txn);
if (code) goto _err;
}
taosMemoryFree(pWriter);
*ppWriter = NULL;
// restore from metastore
if (tqMetaRestoreHandle(pTq) < 0) {
goto _err;
}
return code;
_err:
tqError("vgId:%d tq snapshot writer close failed since %s", TD_VID(pWriter->pTq->pVnode), tstrerror(code));
return code;
}
int32_t tqSnapWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
int32_t code = 0;
STQ* pTq = pWriter->pTq;
SDecoder decoder = {0};
SDecoder* pDecoder = &decoder;
STqHandle handle;
tDecoderInit(pDecoder, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
code = tDecodeSTqHandle(pDecoder, &handle);
if (code) goto _err;
code = tqMetaSaveHandle(pTq, handle.subKey, &handle);
if (code < 0) goto _err;
tDecoderClear(pDecoder);
return code;
_err:
tDecoderClear(pDecoder);
tqError("vgId:%d vnode snapshot tq write failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
return code;
}
...@@ -27,6 +27,16 @@ struct SVSnapReader { ...@@ -27,6 +27,16 @@ struct SVSnapReader {
// tsdb // tsdb
int8_t tsdbDone; int8_t tsdbDone;
STsdbSnapReader *pTsdbReader; STsdbSnapReader *pTsdbReader;
// tq
int8_t tqHandleDone;
STqSnapReader *pTqSnapReader;
int8_t tqOffsetDone;
STqOffsetReader *pTqOffsetReader;
// stream
int8_t streamTaskDone;
SStreamTaskReader *pStreamTaskReader;
int8_t streamStateDone;
SStreamStateReader *pStreamStateReader;
// rsma // rsma
int8_t rsmaDone; int8_t rsmaDone;
SRsmaSnapReader *pRsmaReader; SRsmaSnapReader *pRsmaReader;
...@@ -104,7 +114,8 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) ...@@ -104,7 +114,8 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
if (!pReader->tsdbDone) { if (!pReader->tsdbDone) {
// open if not // open if not
if (pReader->pTsdbReader == NULL) { if (pReader->pTsdbReader == NULL) {
code = tsdbSnapReaderOpen(pReader->pVnode->pTsdb, pReader->sver, pReader->ever, SNAP_DATA_TSDB, &pReader->pTsdbReader); code = tsdbSnapReaderOpen(pReader->pVnode->pTsdb, pReader->sver, pReader->ever, SNAP_DATA_TSDB,
&pReader->pTsdbReader);
if (code) goto _err; if (code) goto _err;
} }
...@@ -122,6 +133,52 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) ...@@ -122,6 +133,52 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
} }
} }
// TQ ================
if (!pReader->tqHandleDone) {
if (pReader->pTqSnapReader == NULL) {
code = tqSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->ever, &pReader->pTqSnapReader);
if (code < 0) goto _err;
}
code = tqSnapRead(pReader->pTqSnapReader, ppData);
if (code) {
goto _err;
} else {
if (*ppData) {
goto _exit;
} else {
pReader->tqHandleDone = 1;
code = tqSnapReaderClose(&pReader->pTqSnapReader);
if (code) goto _err;
}
}
}
if (!pReader->tqOffsetDone) {
if (pReader->pTqOffsetReader == NULL) {
code = tqOffsetReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->ever, &pReader->pTqOffsetReader);
if (code < 0) goto _err;
}
code = tqOffsetSnapRead(pReader->pTqOffsetReader, ppData);
if (code) {
goto _err;
} else {
if (*ppData) {
goto _exit;
} else {
pReader->tqHandleDone = 1;
code = tqOffsetReaderClose(&pReader->pTqOffsetReader);
if (code) goto _err;
}
}
}
// STREAM ============
if (!pReader->streamTaskDone) {
}
if (!pReader->streamStateDone) {
}
// RSMA ============== // RSMA ==============
if (VND_IS_RSMA(pReader->pVnode) && !pReader->rsmaDone) { if (VND_IS_RSMA(pReader->pVnode) && !pReader->rsmaDone) {
// open if not // open if not
...@@ -177,6 +234,12 @@ struct SVSnapWriter { ...@@ -177,6 +234,12 @@ struct SVSnapWriter {
SMetaSnapWriter *pMetaSnapWriter; SMetaSnapWriter *pMetaSnapWriter;
// tsdb // tsdb
STsdbSnapWriter *pTsdbSnapWriter; STsdbSnapWriter *pTsdbSnapWriter;
// tq
STqSnapWriter *pTqSnapWriter;
STqOffsetWriter *pTqOffsetWriter;
// stream
SStreamTaskWriter *pStreamTaskWriter;
SStreamStateWriter *pStreamStateWriter;
// rsma // rsma
SRsmaSnapWriter *pRsmaSnapWriter; SRsmaSnapWriter *pRsmaSnapWriter;
}; };
...@@ -301,6 +364,14 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) { ...@@ -301,6 +364,14 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) {
code = tsdbSnapWrite(pWriter->pTsdbSnapWriter, pData, nData); code = tsdbSnapWrite(pWriter->pTsdbSnapWriter, pData, nData);
if (code) goto _err; if (code) goto _err;
} break; } break;
case SNAP_DATA_TQ_HANDLE: {
} break;
case SNAP_DATA_TQ_OFFSET: {
} break;
case SNAP_DATA_STREAM_TASK: {
} break;
case SNAP_DATA_STREAM_STATE: {
} break;
case SNAP_DATA_RSMA1: case SNAP_DATA_RSMA1:
case SNAP_DATA_RSMA2: { case SNAP_DATA_RSMA2: {
// rsma1/rsma2 // rsma1/rsma2
...@@ -332,4 +403,4 @@ _err: ...@@ -332,4 +403,4 @@ _err:
vError("vgId:%d vnode snapshot write failed since %s, index:%" PRId64 " type:%d nData:%d", TD_VID(pVnode), vError("vgId:%d vnode snapshot write failed since %s, index:%" PRId64 " type:%d nData:%d", TD_VID(pVnode),
tstrerror(code), pHdr->index, pHdr->type, nData); tstrerror(code), pHdr->index, pHdr->type, nData);
return code; return code;
} }
\ No newline at end of file
...@@ -62,6 +62,11 @@ int32_t walRefVer(SWalRef *pRef, int64_t ver) { ...@@ -62,6 +62,11 @@ int32_t walRefVer(SWalRef *pRef, int64_t ver) {
return 0; return 0;
} }
int32_t walPreRefVer(SWalRef *pRef, int64_t ver) {
pRef->refVer = ver;
return 0;
}
void walUnrefVer(SWalRef *pRef) { void walUnrefVer(SWalRef *pRef) {
pRef->refId = -1; pRef->refId = -1;
pRef->refFile = -1; pRef->refFile = -1;
......
...@@ -26,7 +26,7 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) { ...@@ -26,7 +26,7 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) {
pIter = taosHashIterate(pWal->pRefHash, pIter); pIter = taosHashIterate(pWal->pRefHash, pIter);
if (pIter == NULL) break; if (pIter == NULL) break;
SWalRef *pRef = (SWalRef *)pIter; SWalRef *pRef = (SWalRef *)pIter;
if (pRef->refVer != -1) { if (pRef->refVer != -1 && pRef->refVer <= ver) {
taosHashCancelIterate(pWal->pRefHash, pIter); taosHashCancelIterate(pWal->pRefHash, pIter);
return -1; return -1;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册