提交 afe16332 编写于 作者: C Cary Xu

feat: rsma snapshot

上级 cc3967b9
...@@ -273,8 +273,8 @@ struct SVnode { ...@@ -273,8 +273,8 @@ struct SVnode {
#define VND_TSDB(vnd) ((vnd)->pTsdb) #define VND_TSDB(vnd) ((vnd)->pTsdb)
#define VND_RSMA0(vnd) ((vnd)->pTsdb) #define VND_RSMA0(vnd) ((vnd)->pTsdb)
#define VND_RSMA1(vnd) ((vnd)->pSma->pRSmaTsdb1) #define VND_RSMA1(vnd) ((vnd)->pSma->pRSmaTsdb[TSDB_RETENTION_L0])
#define VND_RSMA2(vnd) ((vnd)->pSma->pRSmaTsdb2) #define VND_RSMA2(vnd) ((vnd)->pSma->pRSmaTsdb[TSDB_RETENTION_L1])
#define VND_RETENTIONS(vnd) (&(vnd)->config.tsdbCfg.retentions) #define VND_RETENTIONS(vnd) (&(vnd)->config.tsdbCfg.retentions)
#define VND_IS_RSMA(v) ((v)->config.isRsma == 1) #define VND_IS_RSMA(v) ((v)->config.isRsma == 1)
#define VND_IS_TSMA(v) ((v)->config.isTsma == 1) #define VND_IS_TSMA(v) ((v)->config.isTsma == 1)
...@@ -289,8 +289,7 @@ struct SSma { ...@@ -289,8 +289,7 @@ struct SSma {
bool locked; bool locked;
TdThreadMutex mutex; TdThreadMutex mutex;
SVnode* pVnode; SVnode* pVnode;
STsdb* pRSmaTsdb1; STsdb* pRSmaTsdb[TSDB_RETENTION_L2];
STsdb* pRSmaTsdb2;
void* pTSmaEnv; void* pTSmaEnv;
void* pRSmaEnv; void* pRSmaEnv;
}; };
...@@ -305,8 +304,8 @@ struct SSma { ...@@ -305,8 +304,8 @@ struct SSma {
#define SMA_TSMA_ENV(s) ((s)->pTSmaEnv) #define SMA_TSMA_ENV(s) ((s)->pTSmaEnv)
#define SMA_RSMA_ENV(s) ((s)->pRSmaEnv) #define SMA_RSMA_ENV(s) ((s)->pRSmaEnv)
#define SMA_RSMA_TSDB0(s) ((s)->pVnode->pTsdb) #define SMA_RSMA_TSDB0(s) ((s)->pVnode->pTsdb)
#define SMA_RSMA_TSDB1(s) ((s)->pRSmaTsdb1) #define SMA_RSMA_TSDB1(s) ((s)->pRSmaTsdb[TSDB_RETENTION_L0])
#define SMA_RSMA_TSDB2(s) ((s)->pRSmaTsdb2) #define SMA_RSMA_TSDB2(s) ((s)->pRSmaTsdb[TSDB_RETENTION_L1])
// sma // sma
void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data); void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data);
......
...@@ -123,11 +123,9 @@ int32_t smaOpen(SVnode *pVnode) { ...@@ -123,11 +123,9 @@ int32_t smaOpen(SVnode *pVnode) {
} }
// restore the rsma // restore the rsma
#if 1
if (rsmaRestore(pSma) < 0) { if (rsmaRestore(pSma) < 0) {
goto _err; goto _err;
} }
#endif
} }
return 0; return 0;
......
...@@ -605,7 +605,7 @@ static int32_t tdRSmaFetchAndSubmitResult(SRSmaInfoItem *pItem, STSchema *pTSche ...@@ -605,7 +605,7 @@ static int32_t tdRSmaFetchAndSubmitResult(SRSmaInfoItem *pItem, STSchema *pTSche
snprintf(flag, 10, "level %" PRIi8, pItem->level); snprintf(flag, 10, "level %" PRIi8, pItem->level);
blockDebugShowDataBlocks(pResult, flag); blockDebugShowDataBlocks(pResult, flag);
#endif #endif
STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb1 : pSma->pRSmaTsdb2); STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]);
SSubmitReq *pReq = NULL; SSubmitReq *pReq = NULL;
// TODO: the schema update should be handled // TODO: the schema update should be handled
if (buildSubmitReqFromDataBlock(&pReq, pResult, pTSchema, SMA_VID(pSma), suid) < 0) { if (buildSubmitReqFromDataBlock(&pReq, pResult, pTSchema, SMA_VID(pSma), suid) < 0) {
...@@ -949,13 +949,8 @@ _err: ...@@ -949,13 +949,8 @@ _err:
* @return int32_t * @return int32_t
*/ */
static int32_t tdRSmaRestoreTSDataReload(SSma *pSma, int64_t committed) { static int32_t tdRSmaRestoreTSDataReload(SSma *pSma, int64_t committed) {
// TODO // NOTHING TODO: the data would be restored from the unified WAL replay procedure
smaDebug("vgId:%d, rsma restore from %" PRIi64 ", ts data reload success", SMA_VID(pSma), committed);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_err:
smaError("vgId:%d, rsma restore from %" PRIi64 ", ts data reload failed since %s", SMA_VID(pSma), committed,
terrstr());
return TSDB_CODE_FAILED;
} }
int32_t tdProcessRSmaRestoreImpl(SSma *pSma) { int32_t tdProcessRSmaRestoreImpl(SSma *pSma) {
......
...@@ -27,6 +27,8 @@ struct SVSnapReader { ...@@ -27,6 +27,8 @@ struct SVSnapReader {
// tsdb // tsdb
int8_t tsdbDone; int8_t tsdbDone;
STsdbSnapReader *pTsdbReader; STsdbSnapReader *pTsdbReader;
// rsma
int8_t rsmaDone[TSDB_RETENTION_L2];
}; };
int32_t vnodeSnapReaderOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapReader **ppReader) { int32_t vnodeSnapReaderOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapReader **ppReader) {
...@@ -115,6 +117,42 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) ...@@ -115,6 +117,42 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
} }
} }
// RSMA ==============
#if 0
if (VND_IS_RSMA(pReader->pVnode)) {
// RSMA1/RSMA2
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
if (!pReader->rsmaDone[i]) {
if (!pReader->pVnode->pSma->pRSmaTsdb[i]) {
// no valid tsdb
pReader->rsmaDone[i] = 1;
continue;
}
if (pReader->pTsdbReader == NULL) {
code = tsdbSnapReaderOpen(pReader->pVnode->pSma->pRSmaTsdb[i], pReader->sver, pReader->ever,
&pReader->pTsdbReader);
if (code) goto _err;
}
code = tsdbSnapRead(pReader->pTsdbReader, ppData);
if (code) {
goto _err;
} else {
if (*ppData) {
goto _exit;
} else {
pReader->tsdbDone = 1;
code = tsdbSnapReaderClose(&pReader->pTsdbReader);
if (code) goto _err;
}
}
}
}
// QTaskInfoFile
// TODO ...
}
#endif
*ppData = NULL; *ppData = NULL;
*nData = 0; *nData = 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册