提交 c41364d5 编写于 作者: C Cary Xu

refactor: rsma commit and recovery

上级 5b1f3dc5
......@@ -22,19 +22,13 @@
extern "C" {
#endif
#undef SMA_DEBUG_MODE // TODO: remove when release
// smaDebug ================
// clang-format off
#define smaFatal(...) do { if (smaDebugFlag & DEBUG_FATAL) { taosPrintLog("SMA FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0)
#define smaError(...) do { if (smaDebugFlag & DEBUG_ERROR) { taosPrintLog("SMA ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0)
#define smaWarn(...) do { if (smaDebugFlag & DEBUG_WARN) { taosPrintLog("SMA WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0)
#define smaInfo(...) do { if (smaDebugFlag & DEBUG_INFO) { taosPrintLog("SMA ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0)
#ifdef SMA_DEBUG_MODE
#define smaDebug(...) do { if (smaDebugFlag & DEBUG_WARN) { taosPrintLog("SMA WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0)
#else
#define smaDebug(...) do { if (smaDebugFlag & DEBUG_DEBUG) { taosPrintLog("SMA ", DEBUG_DEBUG, tsdbDebugFlag, __VA_ARGS__); }} while(0)
#endif
#define smaTrace(...) do { if (smaDebugFlag & DEBUG_TRACE) { taosPrintLog("SMA ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0)
// clang-format on
......
......@@ -163,8 +163,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool
// sma
int32_t smaOpen(SVnode* pVnode);
int32_t smaCloseEnv(SSma* pSma);
int32_t smaCloseEx(SSma* pSma);
int32_t smaClose(SSma* pSma);
int32_t smaBegin(SSma* pSma);
int32_t smaPreCommit(SSma* pSma);
int32_t smaCommit(SSma* pSma);
......
......@@ -98,7 +98,6 @@ static int32_t tdProcessRSmaPreCommitImpl(SSma *pSma) {
SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv);
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat);
smaDebug("vgId:%d, rsma pre commit", SMA_VID(pSma));
// step 1: set persistence task paused
atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED);
......@@ -122,6 +121,8 @@ static int32_t tdProcessRSmaPreCommitImpl(SSma *pSma) {
}
}
smaDebug("vgId:%d, rsma pre commit succeess", SMA_VID(pSma));
return TSDB_CODE_SUCCESS;
}
......
......@@ -135,17 +135,11 @@ _err:
return -1;
}
int32_t smaCloseEnv(SSma *pSma) {
int32_t smaClose(SSma *pSma) {
if (pSma) {
taosThreadMutexDestroy(&pSma->mutex);
SMA_TSMA_ENV(pSma) = tdFreeSmaEnv(SMA_TSMA_ENV(pSma));
SMA_RSMA_ENV(pSma) = tdFreeSmaEnv(SMA_RSMA_ENV(pSma));
}
return 0;
}
int32_t smaCloseEx(SSma *pSma) {
if (pSma) {
taosThreadMutexDestroy(&pSma->mutex);
if SMA_RSMA_TSDB0 (pSma) tsdbClose(&SMA_RSMA_TSDB0(pSma));
if SMA_RSMA_TSDB1 (pSma) tsdbClose(&SMA_RSMA_TSDB1(pSma));
if SMA_RSMA_TSDB2 (pSma) tsdbClose(&SMA_RSMA_TSDB2(pSma));
......
......@@ -43,8 +43,8 @@ static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, SRSmaQTaskInfoIter *pIter);
static int32_t tdRSmaQTaskInfoItemRestore(SSma *pSma, const SRSmaQTaskInfoItem *infoItem);
static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables);
static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma);
static int32_t tdRSmaRestoreTSDataReload(SSma *pSma);
static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int64_t *committed);
static int32_t tdRSmaRestoreTSDataReload(SSma *pSma, int64_t committed);
struct SRSmaInfoItem {
SRSmaInfo *pRsmaInfo;
......@@ -803,7 +803,7 @@ _err:
return TSDB_CODE_FAILED;
}
static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma) {
static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int64_t *committed) {
SVnode *pVnode = pSma->pVnode;
STFile tFile = {0};
char qTaskInfoFName[TSDB_FILENAME_LEN] = {0};
......@@ -814,13 +814,14 @@ static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma) {
}
if (!taosCheckExistFile(TD_TFILE_FULL_NAME(&tFile))) {
if (pVnode->state.committed) {
goto _err;
if (pVnode->state.committed > 0) {
smaWarn("vgId:%d, rsma restore for version %" PRIi64 ", not start as %s not exist", TD_VID(pVnode),
pVnode->state.committed, TD_TFILE_FULL_NAME(&tFile));
} else {
smaDebug("vgId:%d, rsma restore for version %" PRIi64 ", no need as %s not exist", TD_VID(pVnode),
pVnode->state.committed, TD_TFILE_FULL_NAME(&tFile));
return TSDB_CODE_SUCCESS;
}
return TSDB_CODE_SUCCESS;
}
if (tdOpenTFile(&tFile, TD_FILE_READ) < 0) {
......@@ -845,6 +846,10 @@ static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma) {
tdRSmaQTaskInfoIterDestroy(&fIter);
tdCloseTFile(&tFile);
tdDestroyTFile(&tFile);
// restored successfully from committed
*committed = pVnode->state.committed;
return TSDB_CODE_SUCCESS;
_err:
smaError("vgId:%d, rsma restore for version %" PRIi64 ", qtaskinfo reload failed since %s", TD_VID(pVnode),
......@@ -856,34 +861,39 @@ _err:
* @brief reload ts data from checkpoint
*
* @param pSma
* @param committed restore from committed version
* @return int32_t
*/
static int32_t tdRSmaRestoreTSDataReload(SSma *pSma) {
static int32_t tdRSmaRestoreTSDataReload(SSma *pSma, int64_t committed) {
// TODO
smaDebug("vgId:%d, rsma restore from %" PRIi64 ", ts data reload success", SMA_VID(pSma), committed);
return TSDB_CODE_SUCCESS;
_err:
smaError("rsma restore, ts data reload failed since %s", terrstr());
smaError("vgId:%d, rsma restore from %" PRIi64 ", ts data reload failed since %s", SMA_VID(pSma), committed,
terrstr());
return TSDB_CODE_FAILED;
}
int32_t tdProcessRSmaRestoreImpl(SSma *pSma) {
int64_t nTables = 0;
// step 1: iterate all stables to restore the rsma env
int64_t nTables = 0;
if (tdRSmaRestoreQTaskInfoInit(pSma, &nTables) < 0) {
goto _err;
}
if (nTables <= 0) {
smaDebug("vgId:%d, no need to restore rsma task since no tables", SMA_VID(pSma));
return TSDB_CODE_SUCCESS;
}
// step 2: retrieve qtaskinfo items from the persistence file(rsma/qtaskinfo) and restore
if (tdRSmaRestoreQTaskInfoReload(pSma) < 0) {
int64_t committed = -1;
if (tdRSmaRestoreQTaskInfoReload(pSma, &committed) < 0) {
goto _err;
}
// step 3: reload ts data from checkpoint
if (tdRSmaRestoreTSDataReload(pSma) < 0) {
if ((committed > 0) && (tdRSmaRestoreTSDataReload(pSma, committed)) < 0) {
goto _err;
}
......@@ -1112,11 +1122,15 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat) {
char qTaskInfoFName[TSDB_FILENAME_LEN];
tdRSmaQTaskInfoGetFName(vid, pSma->pVnode->state.applied, qTaskInfoFName);
if (tdInitTFile(&tFile, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFName) < 0) {
smaError("vgId:%d, rsma persit, init %s failed since %s", vid, qTaskInfoFName, terrstr());
goto _err;
}
if (tdCreateTFile(&tFile, true, -1) < 0) {
smaError("vgId:%d, rsma persit, create %s failed since %s", vid, TD_TFILE_FULL_NAME(&tFile), terrstr());
goto _err;
}
smaDebug("vgId:%d, rsma, table %" PRIi64 " level %d serialize qTaskInfo, file %s created", vid, pRSmaInfo->suid,
i + 1, TD_TFILE_FULL_NAME(&tFile));
isFileCreated = true;
}
......@@ -1156,6 +1170,7 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat) {
}
return TSDB_CODE_SUCCESS;
_err:
smaError("vgId:%d, rsma persit failed since %s", vid, terrstr());
if (isFileCreated) {
tdRemoveTFile(&tFile);
tdDestroyTFile(&tFile);
......
......@@ -140,7 +140,7 @@ int64_t tdAppendTFile(STFile *pTFile, void *buf, int64_t nbyte, int64_t *offset)
return -1;
}
#if 1
#if 0
smaDebug("append to file %s, offset:%" PRIi64 " nbyte:%" PRIi64 " fsize:%" PRIi64, TD_TFILE_FULL_NAME(pTFile),
toffset, nbyte, toffset + nbyte);
#endif
......@@ -242,35 +242,36 @@ int32_t tdInitTFile(STFile *pTFile, const char *dname, const char *fname) {
int32_t tdCreateTFile(STFile *pTFile, bool updateHeader, int8_t fType) {
ASSERT(pTFile->info.fsize == 0 && pTFile->info.magic == TD_FILE_INIT_MAGIC);
pTFile->pFile = taosOpenFile(TD_TFILE_FULL_NAME(pTFile), TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pTFile->pFile == NULL) {
if (errno == ENOENT) {
// Try to create directory recursively
if (taosMulMkDir(taosDirName(TD_TFILE_FULL_NAME(pTFile))) != 0) {
char *s = strdup(TD_TFILE_FULL_NAME(pTFile));
if (taosMulMkDir(taosDirName(s)) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
taosMemoryFree(s);
return -1;
}
taosMemoryFree(s);
pTFile->pFile = taosOpenFile(TD_TFILE_FULL_NAME(pTFile), TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pTFile->pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
} else {
pTFile->pFile = taosOpenFile(TD_TFILE_FULL_NAME(pTFile), TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pTFile->pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
}
}
}
if (!updateHeader) {
return 0;
}
if (!updateHeader) {
return 0;
}
pTFile->info.fsize += TD_FILE_HEAD_SIZE;
pTFile->info.fver = 0;
pTFile->info.fsize += TD_FILE_HEAD_SIZE;
pTFile->info.fver = 0;
if (tdUpdateTFileHeader(pTFile) < 0) {
tdCloseTFile(pTFile);
tdRemoveTFile(pTFile);
return -1;
}
if (tdUpdateTFileHeader(pTFile) < 0) {
tdCloseTFile(pTFile);
tdRemoveTFile(pTFile);
return -1;
}
return 0;
......
......@@ -152,12 +152,11 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
return pVnode;
_err:
if (pVnode->pSma) smaCloseEnv(pVnode->pSma);
if (pVnode->pQuery) vnodeQueryClose(pVnode);
if (pVnode->pTq) tqClose(pVnode->pTq);
if (pVnode->pWal) walClose(pVnode->pWal);
if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb);
if (pVnode->pSma) smaCloseEx(pVnode->pSma);
if (pVnode->pSma) smaClose(pVnode->pSma);
if (pVnode->pMeta) metaClose(pVnode->pMeta);
tsem_destroy(&(pVnode->canCommit));
......@@ -167,14 +166,13 @@ _err:
void vnodeClose(SVnode *pVnode) {
if (pVnode) {
smaCloseEnv(pVnode->pSma);
vnodeCommit(pVnode);
vnodeSyncClose(pVnode);
vnodeQueryClose(pVnode);
walClose(pVnode->pWal);
tqClose(pVnode->pTq);
if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb);
smaCloseEx(pVnode->pSma);
smaClose(pVnode->pSma);
metaClose(pVnode->pMeta);
vnodeCloseBufPool(pVnode);
// destroy handle
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册