提交 12eed5d0 编写于 作者: K kailixu

chore: more code

上级 fb265ff9
...@@ -219,11 +219,12 @@ int32_t tdRSmaFSOpen(SSma *pSma, int64_t version, int8_t rollback); ...@@ -219,11 +219,12 @@ int32_t tdRSmaFSOpen(SSma *pSma, int64_t version, int8_t rollback);
void tdRSmaFSClose(SRSmaFS *fs); void tdRSmaFSClose(SRSmaFS *fs);
int32_t tdRSmaFSPrepareCommit(SSma *pSma, SRSmaFS *pFSNew); int32_t tdRSmaFSPrepareCommit(SSma *pSma, SRSmaFS *pFSNew);
int32_t tdRSmaFSCommit(SSma *pSma); int32_t tdRSmaFSCommit(SSma *pSma);
int32_t tdRSmaFSFinishCommit(SSma *pSma);
int32_t tdRSmaFSCopy(SSma *pSma, SRSmaFS *pFSOut); int32_t tdRSmaFSCopy(SSma *pSma, SRSmaFS *pFSOut);
int32_t tdRSmaFSTakeSnapshot(SSma *pSma, SRSmaFS *pFSOut); int32_t tdRSmaFSTakeSnapshot(SSma *pSma, SRSmaFS *pFSOut);
int32_t tdRSmaFSRef(SSma *pSma, SRSmaStat *pStat, int64_t suid, int8_t level, int64_t version); int32_t tdRSmaFSRef(SSma *pSma, SRSmaStat *pStat, int64_t suid, int8_t level, int64_t version);
void tdRSmaFSUnRef(SSma *pSma, SRSmaStat *pStat, int64_t suid, int8_t level, int64_t version); void tdRSmaFSUnRef(SSma *pSma, SRSmaStat *pStat, int64_t suid, int8_t level, int64_t version);
int32_t tdRSmaFSUpsertQTaskFile(SRSmaFS *pFS, SQTaskFile *qTaskFile); int32_t tdRSmaFSUpsertQTaskFile(SRSmaFS *pFS, SQTaskFile *qTaskFile, int32_t size);
int32_t tdRSmaFSRollback(SSma *pSma); int32_t tdRSmaFSRollback(SSma *pSma);
int32_t tdRSmaRestore(SSma *pSma, int8_t type, int64_t committedVer, int8_t rollback); int32_t tdRSmaRestore(SSma *pSma, int8_t type, int64_t committedVer, int8_t rollback);
int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName); int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName);
......
...@@ -97,18 +97,22 @@ _exit: ...@@ -97,18 +97,22 @@ _exit:
int32_t smaFinishCommit(SSma *pSma) { int32_t smaFinishCommit(SSma *pSma) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0;
SVnode *pVnode = pSma->pVnode; SVnode *pVnode = pSma->pVnode;
code = tdRSmaFSFinishCommit(pSma);
TSDB_CHECK_CODE(code, lino, _exit);
if (VND_RSMA1(pVnode) && (code = tsdbFinishCommit(VND_RSMA1(pVnode))) < 0) { if (VND_RSMA1(pVnode) && (code = tsdbFinishCommit(VND_RSMA1(pVnode))) < 0) {
smaError("vgId:%d, failed to finish commit tsdb rsma1 since %s", TD_VID(pVnode), tstrerror(code)); TSDB_CHECK_CODE(code, lino, _exit);
goto _exit;
} }
if (VND_RSMA2(pVnode) && (code = tsdbFinishCommit(VND_RSMA2(pVnode))) < 0) { if (VND_RSMA2(pVnode) && (code = tsdbFinishCommit(VND_RSMA2(pVnode))) < 0) {
smaError("vgId:%d, failed to finish commit tsdb rsma2 since %s", TD_VID(pVnode), tstrerror(code)); TSDB_CHECK_CODE(code, lino, _exit);
goto _exit;
} }
_exit: _exit:
terrno = code; if (code) {
smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
}
return code; return code;
} }
...@@ -123,7 +127,7 @@ _exit: ...@@ -123,7 +127,7 @@ _exit:
* @return int32_t * @return int32_t
*/ */
static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pStat) { static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pStat) {
#if 0 #if 0
SVnode *pVnode = pSma->pVnode; SVnode *pVnode = pSma->pVnode;
SRSmaFS *pFS = RSMA_FS(pStat); SRSmaFS *pFS = RSMA_FS(pStat);
int64_t committed = pStat->commitAppliedVer; int64_t committed = pStat->commitAppliedVer;
...@@ -170,7 +174,7 @@ static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pStat) { ...@@ -170,7 +174,7 @@ static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pStat) {
} }
taosWUnLockLatch(RSMA_FS_LOCK(pStat)); taosWUnLockLatch(RSMA_FS_LOCK(pStat));
#endif #endif
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -279,21 +283,22 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { ...@@ -279,21 +283,22 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
*/ */
static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma, SCommitInfo *pInfo) { static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma, SCommitInfo *pInfo) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0;
SVnode *pVnode = pSma->pVnode; SVnode *pVnode = pSma->pVnode;
code = tdRSmaFSCommit(pSma);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbCommit(VND_RSMA1(pVnode), pInfo);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbCommit(VND_RSMA2(pVnode), pInfo);
TSDB_CHECK_CODE(code, lino, _exit);
if ((code = tsdbCommit(VND_RSMA1(pVnode), pInfo)) < 0) {
smaError("vgId:%d, failed to commit tsdb rsma1 since %s", TD_VID(pVnode), tstrerror(code));
goto _exit;
}
if ((code = tsdbCommit(VND_RSMA2(pVnode), pInfo)) < 0) {
smaError("vgId:%d, failed to commit tsdb rsma2 since %s", TD_VID(pVnode), tstrerror(code));
goto _exit;
}
_exit: _exit:
terrno = code; if (code) {
smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
}
return code; return code;
} }
......
...@@ -17,9 +17,8 @@ ...@@ -17,9 +17,8 @@
// ================================================================================================= // =================================================================================================
static int32_t tdFetchQTaskInfoFiles(SSma *pSma, int64_t version, SArray **output); // static int32_t tdFetchQTaskInfoFiles(SSma *pSma, int64_t version, SArray **output);
static int32_t tdQTaskInfCmprFn1(const void *p1, const void *p2); static int32_t tdQTaskInfCmprFn1(const void *p1, const void *p2);
static int32_t tdQTaskInfCmprFn2(const void *p1, const void *p2);
static FORCE_INLINE int32_t tPutQTaskF(uint8_t *p, SQTaskFile *pFile) { static FORCE_INLINE int32_t tPutQTaskF(uint8_t *p, SQTaskFile *pFile) {
int32_t n = 0; int32_t n = 0;
...@@ -252,75 +251,49 @@ static int32_t tdQTaskInfCmprFn1(const void *p1, const void *p2) { ...@@ -252,75 +251,49 @@ static int32_t tdQTaskInfCmprFn1(const void *p1, const void *p2) {
static int32_t tdRSmaFSApplyChange(SSma *pSma, SRSmaFS *pFS) { static int32_t tdRSmaFSApplyChange(SSma *pSma, SRSmaFS *pFS) {
int32_t code = 0; int32_t code = 0;
#if 0
int32_t lino = 0; int32_t lino = 0;
SVnode *pVnode = pSma->pVnode; SVnode *pVnode = pSma->pVnode;
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
SRSmaFS *pFSOld = RSMA_FS(pStat);
int32_t nRef = 0; int64_t version = pStat->commitAppliedVer;
char fname[TSDB_FILENAME_LEN] = {0}; char fname[TSDB_FILENAME_LEN] = {0};
// SQTaskFile // SQTaskFile
int32_t iOld = 0; int32_t nNew = taosArrayGetSize(pFS->aQTaskInf);
int32_t iNew = 0; int32_t iNew = 0;
while (true) { while (iNew < nNew) {
int32_t nOld = taosArrayGetSize(pTsdb->fs.aDFileSet); SQTaskFile *pQTaskFNew = TARRAY_GET_ELEM(pFS->aQTaskInf, iNew);
int32_t nNew = taosArrayGetSize(pFS->aDFileSet);
SDFileSet fSet = {0};
int8_t sameDisk = 0;
if (iOld >= nOld && iNew >= nNew) break;
SDFileSet *pSetOld = (iOld < nOld) ? taosArrayGet(pTsdb->fs.aDFileSet, iOld) : NULL;
SDFileSet *pSetNew = (iNew < nNew) ? taosArrayGet(pFS->aDFileSet, iNew) : NULL;
if (pSetOld && pSetNew) {
if (pSetOld->fid == pSetNew->fid) {
code = tsdbMergeFileSet(pTsdb, pSetOld, pSetNew);
TSDB_CHECK_CODE(code, lino, _exit);
iOld++; int32_t idx = taosArraySearchIdx(pFSOld->aQTaskInf, pQTaskFNew, tdQTaskInfCmprFn1, TD_GE);
iNew++;
} else if (pSetOld->fid < pSetNew->fid) {
code = tsdbRemoveFileSet(pTsdb, pSetOld);
TSDB_CHECK_CODE(code, lino, _exit);
taosArrayRemove(pTsdb->fs.aDFileSet, iOld);
} else {
code = tsdbNewFileSet(pTsdb, &fSet, pSetNew);
TSDB_CHECK_CODE(code, lino, _exit)
if (taosArrayInsert(pTsdb->fs.aDFileSet, iOld, &fSet) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
iOld++; if (idx < 0) {
iNew++; idx = taosArrayGetSize(pFSOld->aQTaskInf);
} pQTaskFNew->nRef = 1;
} else if (pSetOld) {
code = tsdbRemoveFileSet(pTsdb, pSetOld);
TSDB_CHECK_CODE(code, lino, _exit);
taosArrayRemove(pTsdb->fs.aDFileSet, iOld);
} else { } else {
code = tsdbNewFileSet(pTsdb, &fSet, pSetNew); SQTaskFile *pTaskF = (SQTaskFile *)taosArrayGet(pFSOld->aQTaskInf, idx);
TSDB_CHECK_CODE(code, lino, _exit) int32_t c = tdQTaskInfCmprFn1(pTaskF, pQTaskFNew);
if (c < 0) {
if (taosArrayInsert(pTsdb->fs.aDFileSet, iOld, &fSet) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; } else if(c == 0) {
TSDB_CHECK_CODE(code, lino, _exit); ++iNew;
continue;
} else {
ASSERT(0);
} }
}
iOld++; if (taosArrayInsert(pFS->aQTaskInf, idx, qTaskF) == NULL) {
iNew++; code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
} }
} }
_exit: _exit:
if (code) { if (code) {
smaError("vgId:%d, %s failed at line %d since %s", SMA_VID(pSma), __func__, lino, tstrerror(code)); smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
} }
#endif
return code; return code;
} }
...@@ -332,6 +305,7 @@ _exit: ...@@ -332,6 +305,7 @@ _exit:
* @param output * @param output
* @return int32_t * @return int32_t
*/ */
#if 0
static int32_t tdFetchQTaskInfoFiles(SSma *pSma, int64_t version, SArray **output) { static int32_t tdFetchQTaskInfoFiles(SSma *pSma, int64_t version, SArray **output) {
SVnode *pVnode = pSma->pVnode; SVnode *pVnode = pSma->pVnode;
TdDirPtr pDir = NULL; TdDirPtr pDir = NULL;
...@@ -419,7 +393,7 @@ _end: ...@@ -419,7 +393,7 @@ _end:
regfree(&regex); regfree(&regex);
return terrno == 0 ? TSDB_CODE_SUCCESS : TSDB_CODE_FAILED; return terrno == 0 ? TSDB_CODE_SUCCESS : TSDB_CODE_FAILED;
} }
#endif
static int32_t tdRSmaFSScanAndTryFix(SSma *pSma) { static int32_t tdRSmaFSScanAndTryFix(SSma *pSma) {
int32_t code = 0; int32_t code = 0;
...@@ -468,7 +442,6 @@ _exit: ...@@ -468,7 +442,6 @@ _exit:
return code; return code;
} }
// EXPOSED APIS ==================================================================================== // EXPOSED APIS ====================================================================================
int32_t tdRSmaFSOpen(SSma *pSma, int64_t version, int8_t rollback) { int32_t tdRSmaFSOpen(SSma *pSma, int64_t version, int8_t rollback) {
...@@ -547,7 +520,9 @@ int32_t tdRSmaFSCommit(SSma *pSma) { ...@@ -547,7 +520,9 @@ int32_t tdRSmaFSCommit(SSma *pSma) {
char current_t[TSDB_FILENAME_LEN] = {0}; char current_t[TSDB_FILENAME_LEN] = {0};
tdRSmaGetCurrentFName(pSma, current, current_t); tdRSmaGetCurrentFName(pSma, current, current_t);
if (!taosCheckExistFile(current_t)) goto _exit; if (!taosCheckExistFile(current_t)) {
goto _exit;
}
// rename the file // rename the file
if (taosRenameFile(current_t, current) < 0) { if (taosRenameFile(current_t, current) < 0) {
...@@ -556,7 +531,7 @@ int32_t tdRSmaFSCommit(SSma *pSma) { ...@@ -556,7 +531,7 @@ int32_t tdRSmaFSCommit(SSma *pSma) {
} }
// Load the new FS // Load the new FS
code = tdRSmaFSCreate(&fs, 0); code = tdRSmaFSCreate(&fs, 1);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
code = tdRSmaLoadFSFromFile(current, &fs); code = tdRSmaLoadFSFromFile(current, &fs);
...@@ -574,6 +549,28 @@ _exit: ...@@ -574,6 +549,28 @@ _exit:
return code; return code;
} }
int32_t tdRSmaFSFinishCommit(SSma *pSma) {
int32_t code = 0;
int32_t lino = 0;
SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv);
taosWLockLatch(RSMA_FS_LOCK(pStat));
if ((code = tdRSmaFSCommit(pSma)) < 0) {
taosWUnLockLatch(RSMA_FS_LOCK(pStat));
TSDB_CHECK_CODE(code, lino, _exit);
}
taosWUnLockLatch(RSMA_FS_LOCK(pStat));
_exit:
if (code) {
smaError("vgId:%d, %s failed at line %d since %s", SMA_VID(pSma), __func__, lino, tstrerror(code));
} else {
smaInfo("vgId:%d, rsmaFS finish commit", SMA_VID(pSma));
}
return code;
}
int32_t tdRSmaFSRollback(SSma *pSma) { int32_t tdRSmaFSRollback(SSma *pSma) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
...@@ -603,9 +600,8 @@ int32_t tdRSmaFSUpsertQTaskFile(SRSmaFS *pFS, SQTaskFile *qTaskFile, int32_t siz ...@@ -603,9 +600,8 @@ int32_t tdRSmaFSUpsertQTaskFile(SRSmaFS *pFS, SQTaskFile *qTaskFile, int32_t siz
SQTaskFile *pTaskF = (SQTaskFile *)taosArrayGet(pFS->aQTaskInf, idx); SQTaskFile *pTaskF = (SQTaskFile *)taosArrayGet(pFS->aQTaskInf, idx);
int32_t c = tdQTaskInfCmprFn1(pTaskF, qTaskF); int32_t c = tdQTaskInfCmprFn1(pTaskF, qTaskF);
if (c == 0) { if (c == 0) {
ASSERT(0);
pTaskF->nRef = qTaskF->nRef;
ASSERT(pTaskF->size == qTaskF->size); ASSERT(pTaskF->size == qTaskF->size);
ASSERT(0);
goto _exit; goto _exit;
} }
} }
...@@ -634,7 +630,7 @@ int32_t tdRSmaFSRef(SSma *pSma, SRSmaStat *pStat, int64_t suid, int8_t level, in ...@@ -634,7 +630,7 @@ int32_t tdRSmaFSRef(SSma *pSma, SRSmaStat *pStat, int64_t suid, int8_t level, in
ASSERT(oldVal > 0); ASSERT(oldVal > 0);
} }
} else { } else {
// ref all // ref all
int32_t size = taosArrayGetSize(aQTaskInf); int32_t size = taosArrayGetSize(aQTaskInf);
for (int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
pTaskF = TARRAY_GET_ELEM(aQTaskInf, i); pTaskF = TARRAY_GET_ELEM(aQTaskInf, i);
...@@ -701,15 +697,18 @@ void tdRSmaFSUnRef(SSma *pSma, SRSmaStat *pStat, int64_t suid, int8_t level, int ...@@ -701,15 +697,18 @@ void tdRSmaFSUnRef(SSma *pSma, SRSmaStat *pStat, int64_t suid, int8_t level, int
} }
int32_t tdRSmaFSTakeSnapshot(SSma *pSma, SRSmaFS *pFSOut) { int32_t tdRSmaFSTakeSnapshot(SSma *pSma, SRSmaFS *pFSOut) {
int32_t code = 0;
int32_t lino = 0;
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
int32_t code = 0;
taosRLockLatch(RSMA_FS_LOCK(pStat)); taosRLockLatch(RSMA_FS_LOCK(pStat));
code = tdRSmaFSCopy(pSma, pFSOut); code = tdRSmaFSCopy(pSma, pFSOut);
TSDB_CHECK_CODE(code, lino, _exit);
taosWUnLockLatch(RSMA_FS_LOCK(pStat)); taosWUnLockLatch(RSMA_FS_LOCK(pStat));
_exit: _exit:
if (code) { if (code) {
taosWUnLockLatch(RSMA_FS_LOCK(pStat));
smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pSma->pVnode), __func__, lino, tstrerror(code)); smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pSma->pVnode), __func__, lino, tstrerror(code));
} }
return code; return code;
......
...@@ -1271,9 +1271,11 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { ...@@ -1271,9 +1271,11 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
} }
} }
// prepare
code = tdRSmaFSTakeSnapshot(pSma, &fs); code = tdRSmaFSTakeSnapshot(pSma, &fs);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
code = tdRSmaFSUpsertQTaskFile(&fs, qTaskFArray->pData, taosArrayGetSize(qTaskFArray));
TSDB_CHECK_CODE(code, lino, _exit);
code = tdRSmaFSPrepareCommit(pSma, &fs); code = tdRSmaFSPrepareCommit(pSma, &fs);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册