提交 f9bd3580 编写于 作者: K kailixu

chore: rsma refact

上级 723ac3ce
......@@ -217,9 +217,12 @@ static FORCE_INLINE void tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat) {
void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree);
int32_t tdRSmaFSOpen(SSma *pSma, int64_t version, int8_t rollback);
void tdRSmaFSClose(SRSmaFS *fs);
int32_t tdRSmaFSPrepareCommit(SSma *pSma, SRSmaFS *pFSNew);
int32_t tdRSmaFSCommit(SSma *pSma);
int32_t tdRSmaFSCopy(SSma *pSma, SRSmaFS *pFSOut);
int32_t tdRSmaFSTakeSnapshot(SSma *pSma, SRSmaFS *pFSOut);
int32_t tdRSmaFSRef(SSma *pSma, SRSmaStat *pStat, int64_t suid, int8_t level, int64_t version);
void tdRSmaFSUnRef(SSma *pSma, SRSmaStat *pStat, int64_t suid, int8_t level, int64_t version);
int64_t tdRSmaFSMaxVer(SSma *pSma, SRSmaStat *pStat);
int32_t tdRSmaFSUpsertQTaskFile(SRSmaFS *pFS, SQTaskFile *qTaskFile);
int32_t tdRSmaFSRollback(SSma *pSma);
int32_t tdRSmaRestore(SSma *pSma, int8_t type, int64_t committedVer, int8_t rollback);
......@@ -293,9 +296,9 @@ void tdCloseTFile(STFile *pTFile);
void tdDestroyTFile(STFile *pTFile);
#endif
void tdGetVndFileName(int32_t vgId, const char *pdname, const char *dname, const char *fname, int64_t suid,
void tdRSmaGetFileName(int32_t vgId, const char *pdname, const char *dname, const char *fname, int64_t suid,
int8_t level, int64_t version, char *outputName);
void tdGetVndDirName(int32_t vgId, const char *pdname, const char *dname, bool endWithSep, char *outputName);
void tdRSmaGetDirName(int32_t vgId, const char *pdname, const char *dname, bool endWithSep, char *outputName);
#ifdef __cplusplus
}
......
......@@ -203,7 +203,7 @@ void smaCleanUp();
int32_t smaOpen(SVnode* pVnode, int8_t rollback);
int32_t smaClose(SSma* pSma);
int32_t smaBegin(SSma* pSma);
int32_t smaPrepareCommit(SSma* pSma);
int32_t smaPrepareAsyncCommit(SSma* pSma);
int32_t smaCommit(SSma* pSma, SCommitInfo* pInfo);
int32_t smaFinishCommit(SSma* pSma);
int32_t smaPostCommit(SSma* pSma);
......
......@@ -28,7 +28,7 @@ static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pRSmaStat);
* @param pSma
* @return int32_t
*/
int32_t smaPrepareCommit(SSma *pSma) { return tdProcessRSmaAsyncPreCommitImpl(pSma); }
int32_t smaPrepareAsyncCommit(SSma *pSma) { return tdProcessRSmaAsyncPreCommitImpl(pSma); }
/**
* @brief async commit, only applicable to Rollup SMA
......@@ -280,14 +280,9 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma, SCommitInfo *pInfo) {
int32_t code = 0;
SVnode *pVnode = pSma->pVnode;
#if 0
SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv);
// perform persist task for qTaskInfo operator
if (tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat)) < 0) {
return TSDB_CODE_FAILED;
}
#endif
if ((code = tsdbCommit(VND_RSMA1(pVnode), pInfo)) < 0) {
smaError("vgId:%d, failed to commit tsdb rsma1 since %s", TD_VID(pVnode), tstrerror(code));
......
......@@ -137,63 +137,10 @@ _exit:
return code;
}
/**
* @brief Open RSma FS from qTaskInfo files
*
* @param pSma
* @param version
* @return int32_t
*/
#if 0
int32_t tdRSmaFSOpen(SSma *pSma, int64_t version) {
SVnode *pVnode = pSma->pVnode;
int64_t commitID = pVnode->state.commitID;
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pStat = NULL;
SArray *output = NULL;
terrno = TSDB_CODE_SUCCESS;
if (!pEnv) {
return TSDB_CODE_SUCCESS;
}
if (tdFetchQTaskInfoFiles(pSma, version, &output) < 0) {
goto _end;
}
pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
for (int32_t i = 0; i < taosArrayGetSize(output); ++i) {
int32_t vid = 0;
int64_t version = -1;
sscanf((const char *)taosArrayGetP(output, i), "v%dqinf.v%" PRIi64, &vid, &version);
SQTaskFile qTaskFile = {.version = version, .nRef = 1};
if ((terrno = tdRSmaFSUpsertQTaskFile(RSMA_FS(pStat), &qTaskFile)) < 0) {
goto _end;
}
smaInfo("vgId:%d, open fs, version:%" PRIi64 ", ref:%d", TD_VID(pVnode), qTaskFile.version, qTaskFile.nRef);
}
_end:
for (int32_t i = 0; i < taosArrayGetSize(output); ++i) {
void *ptr = taosArrayGetP(output, i);
taosMemoryFreeClear(ptr);
}
taosArrayDestroy(output);
if (terrno != TSDB_CODE_SUCCESS) {
smaError("vgId:%d, open rsma fs failed since %s", TD_VID(pVnode), terrstr());
return TSDB_CODE_FAILED;
}
return TSDB_CODE_SUCCESS;
}
#endif
static int32_t tdRSmaFSCreate(SRSmaFS *pFS) {
static int32_t tdRSmaFSCreate(SRSmaFS *pFS, int32_t size) {
int32_t code = 0;
pFS->aQTaskInf = taosArrayInit(0, sizeof(SQTaskFile));
pFS->aQTaskInf = taosArrayInit(size, sizeof(SQTaskFile));
if (pFS->aQTaskInf == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
......@@ -207,20 +154,20 @@ static void tdRSmaGetCurrentFName(SSma *pSma, char *current, char *current_t) {
SVnode *pVnode = pSma->pVnode;
if (pVnode->pTfs) {
if (current) {
snprintf(current, TSDB_FILENAME_LEN - 1, "%s%svnode%svnode%d%srsma%sCURRENT", tfsGetPrimaryPath(pVnode->pTfs),
TD_DIRSEP, TD_VID(pVnode), TD_DIRSEP);
snprintf(current, TSDB_FILENAME_LEN - 1, "%s%svnode%svnode%d%srsma%sPRESENT", tfsGetPrimaryPath(pVnode->pTfs),
TD_DIRSEP, TD_DIRSEP, TD_VID(pVnode), TD_DIRSEP, TD_DIRSEP);
}
if (current_t) {
snprintf(current, TSDB_FILENAME_LEN - 1, "%s%svnode%svnode%d%srsma%sCURRENT.t", tfsGetPrimaryPath(pVnode->pTfs),
TD_DIRSEP, TD_VID(pVnode), TD_DIRSEP);
snprintf(current, TSDB_FILENAME_LEN - 1, "%s%svnode%svnode%d%srsma%sPRESENT.t", tfsGetPrimaryPath(pVnode->pTfs),
TD_DIRSEP, TD_DIRSEP, TD_VID(pVnode), TD_DIRSEP, TD_DIRSEP);
}
} else {
#if 0
if (current) {
snprintf(current, TSDB_FILENAME_LEN - 1, "%s%sCURRENT", pTsdb->path, TD_DIRSEP);
snprintf(current, TSDB_FILENAME_LEN - 1, "%s%sPRESENT", pTsdb->path, TD_DIRSEP);
}
if (current_t) {
snprintf(current_t, TSDB_FILENAME_LEN - 1, "%s%sCURRENT.t", pTsdb->path, TD_DIRSEP);
snprintf(current_t, TSDB_FILENAME_LEN - 1, "%s%sPRESENT.t", pTsdb->path, TD_DIRSEP);
}
#endif
}
......@@ -278,7 +225,6 @@ _exit:
return code;
}
static int32_t tdQTaskInfCmprFn1(const void *p1, const void *p2) {
const SQTaskFile *q1 = (const SQTaskFile *)p1;
const SQTaskFile *q2 = (const SQTaskFile *)p2;
......@@ -306,12 +252,15 @@ static int32_t tdQTaskInfCmprFn1(const void *p1, const void *p2) {
static int32_t tdRSmaFSApplyChange(SSma *pSma, SRSmaFS *pFS) {
int32_t code = 0;
#if 0
int32_t lino = 0;
SVnode *pVnode = pSma->pVnode;
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
int32_t nRef = 0;
char fname[TSDB_FILENAME_LEN] = {0};
// SQTaskFile
int32_t iOld = 0;
int32_t iNew = 0;
......@@ -371,6 +320,7 @@ _exit:
if (code) {
smaError("vgId:%d, %s failed at line %d since %s", SMA_VID(pSma), __func__, lino, tstrerror(code));
}
#endif
return code;
}
......@@ -393,7 +343,7 @@ static int32_t tdFetchQTaskInfoFiles(SSma *pSma, int64_t version, SArray **outpu
terrno = TSDB_CODE_SUCCESS;
tdGetVndDirName(TD_VID(pVnode), tfsGetPrimaryPath(pVnode->pTfs), VNODE_RSMA_DIR, true, dir);
tdRSmaGetDirName(TD_VID(pVnode), tfsGetPrimaryPath(pVnode->pTfs), VNODE_RSMA_DIR, true, dir);
if (!taosCheckExistFile(dir)) {
smaDebug("vgId:%d, fetch qtask files, no need as dir %s not exist", TD_VID(pVnode), dir);
......@@ -471,72 +421,65 @@ _end:
}
// EXPOSED APIS ====================================================================================
int32_t tdRSmaFSCommit(SSma *pSma) {
static int32_t tdRSmaFSScanAndTryFix(SSma *pSma) {
int32_t code = 0;
int32_t lino = 0;
SRSmaFS fs = {0};
SVnode *pVnode = pSma->pVnode;
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
SRSmaFS *pFS = RSMA_FS(pStat);
char fname[TSDB_FILENAME_LEN] = {0};
char fnameVer[TSDB_FILENAME_LEN] = {0};
char current[TSDB_FILENAME_LEN] = {0};
char current_t[TSDB_FILENAME_LEN] = {0};
tdRSmaGetCurrentFName(pSma, current, current_t);
// SArray<SQTaskFile>
int32_t size = taosArrayGetSize(pFS->aQTaskInf);
for (int32_t i = 0; i < size; ++i) {
SQTaskFile *pTaskF = (SQTaskFile *)taosArrayGet(pFS->aQTaskInf, i);
if (!taosCheckExistFile(current_t)) goto _exit;
// main.tdb =========
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->suid, pTaskF->level, pTaskF->version,
tfsGetPrimaryPath(pVnode->pTfs), fnameVer);
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->suid, pTaskF->level, -1, tfsGetPrimaryPath(pVnode->pTfs), fname);
// rename the file
if (taosRenameFile(current_t, current) < 0) {
if (taosCheckExistFile(fnameVer)) {
if (taosRenameFile(fnameVer, fname) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
// Load the new FS
code = tdRSmaFSCreate(&fs);
TSDB_CHECK_CODE(code, lino, _exit);
code = tdRSmaLoadFSFromFile(current, &fs);
smaDebug("vgId:%d, %s:%d succeed to to rename %s to %s", TD_VID(pVnode), __func__, lino, fnameVer, fname);
} else if (taosCheckExistFile(fname)) {
if (taosRemoveFile(fname) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
smaDebug("vgId:%d, %s:%d succeed to to remove %s", TD_VID(pVnode), __func__, lino, fname);
}
}
// apply file change
code = tdRSmaFSApplyChange(pSma, &fs);
TSDB_CHECK_CODE(code, lino, _exit);
{
// remove those invalid files (todo)
// main.tdb-journal.5 // TDB should handle its clear for kill -9
}
_exit:
tdRSmaFSClose(&fs);
if (code) {
smaError("vgId:%d, %s failed at line %d since %s", SMA_VID(pSma), __func__, lino, tstrerror(code));
smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
}
return code;
}
int32_t tdRSmaFSRollback(SSma *pSma) {
int32_t code = 0;
int32_t lino = 0;
char current_t[TSDB_FILENAME_LEN] = {0};
tdRSmaGetCurrentFName(pSma, NULL, current_t);
(void)taosRemoveFile(current_t);
_exit:
if (code) {
smaError("vgId:%d, %s failed at line %d since %s", SMA_VID(pSma), __func__, lino, tstrerror(errno));
}
return code;
}
// EXPOSED APIS ====================================================================================
int32_t tdRSmaFSOpen(SSma *pSma, int64_t version, int8_t rollback) {
int32_t code = 0;
int32_t lino = 0;
SVnode *pVnode = pSma->pVnode;
int64_t commitID = pVnode->state.commitID;
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pStat = NULL;
if (!pEnv) {
return TSDB_CODE_SUCCESS;
}
SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
// open handle
code = tdRSmaFSCreate(&pStat->fs);
code = tdRSmaFSCreate(RSMA_FS(pStat), 0);
TSDB_CHECK_CODE(code, lino, _exit);
// open impl
......@@ -545,7 +488,7 @@ int32_t tdRSmaFSOpen(SSma *pSma, int64_t version, int8_t rollback) {
tdRSmaGetCurrentFName(pSma, current, current_t);
if (taosCheckExistFile(current)) {
code = tdRSmaLoadFSFromFile(current, &pStat->fs);
code = tdRSmaLoadFSFromFile(current, RSMA_FS(pStat));
TSDB_CHECK_CODE(code, lino, _exit);
if (taosCheckExistFile(current_t)) {
......@@ -553,53 +496,125 @@ int32_t tdRSmaFSOpen(SSma *pSma, int64_t version, int8_t rollback) {
code = tdRSmaFSRollback(pSma);
TSDB_CHECK_CODE(code, lino, _exit);
} else {
code = tsdbFSCommit(pTsdb);
code = tdRSmaFSCommit(pSma);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
} else {
// empty one
code = tsdbSaveFSToFile(&pTsdb->fs, current);
// 1st open with empty current/qTaskInfoFile
code = tdRSmaSaveFSToFile(RSMA_FS(pStat), current);
TSDB_CHECK_CODE(code, lino, _exit);
ASSERT(!rollback);
}
// scan and fix FS
code = tsdbScanAndTryFixFS(pTsdb);
// scan and try fix(remove main.db/main.db.xxx and use the one with version)
code = tdRSmaFSScanAndTryFix(pSma);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
}
return code;
}
void tdRSmaFSClose(SRSmaFS *pFS) { taosArrayDestroy(pFS->aQTaskInf); }
int32_t tdRSmaFSPrepareCommit(SSma *pSma, SRSmaFS *pFSNew) {
int32_t code = 0;
int32_t lino = 0;
char tfname[TSDB_FILENAME_LEN];
tdRSmaGetCurrentFName(pSma, NULL, tfname);
// gnrt PRESENT.t
code = tdRSmaSaveFSToFile(pFSNew, tfname);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pSma->pVnode), __func__, lino, tstrerror(code));
}
return code;
}
int32_t tdRSmaFSCommit(SSma *pSma) {
int32_t code = 0;
int32_t lino = 0;
SRSmaFS fs = {0};
char current[TSDB_FILENAME_LEN] = {0};
char current_t[TSDB_FILENAME_LEN] = {0};
tdRSmaGetCurrentFName(pSma, current, current_t);
if (!taosCheckExistFile(current_t)) goto _exit;
// rename the file
if (taosRenameFile(current_t, current) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
// Load the new FS
code = tdRSmaFSCreate(&fs, 0);
TSDB_CHECK_CODE(code, lino, _exit);
code = tdRSmaLoadFSFromFile(current, &fs);
TSDB_CHECK_CODE(code, lino, _exit);
// apply file change
code = tdRSmaFSApplyChange(pSma, &fs);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
tdRSmaFSClose(&fs);
if (code) {
smaError("vgId:%d, %s failed at line %d since %s", SMA_VID(pSma), __func__, lino, tstrerror(code));
}
return code;
}
int32_t tdRSmaFSRollback(SSma *pSma) {
int32_t code = 0;
int32_t lino = 0;
char current_t[TSDB_FILENAME_LEN] = {0};
tdRSmaGetCurrentFName(pSma, NULL, current_t);
(void)taosRemoveFile(current_t);
_exit:
if (code) {
smaError("vgId:%d, %s failed at line %d since %s", SMA_VID(pSma), __func__, lino, tstrerror(errno));
}
return code;
}
int32_t tdRSmaFSUpsertQTaskFile(SRSmaFS *pFS, SQTaskFile *qTaskFile) {
int32_t tdRSmaFSUpsertQTaskFile(SRSmaFS *pFS, SQTaskFile *qTaskFile, int32_t size) {
int32_t code = 0;
int32_t idx = taosArraySearchIdx(pFS->aQTaskInf, qTaskFile, tdQTaskInfCmprFn1, TD_GE);
for (int32_t i = 0; i < size; ++i) {
SQTaskFile *qTaskF = qTaskFile + i;
int32_t idx = taosArraySearchIdx(pFS->aQTaskInf, qTaskF, tdQTaskInfCmprFn1, TD_GE);
if (idx < 0) {
idx = taosArrayGetSize(pFS->aQTaskInf);
} else {
SQTaskFile *pTaskF = (SQTaskFile *)taosArrayGet(pFS->aQTaskInf, idx);
int32_t c = tdQTaskInfCmprFn1(pTaskF, qTaskFile);
int32_t c = tdQTaskInfCmprFn1(pTaskF, qTaskF);
if (c == 0) {
pTaskF->nRef = qTaskFile->nRef;
ASSERT(pTaskF->size == qTaskFile->size);
ASSERT(0);
pTaskF->nRef = qTaskF->nRef;
ASSERT(pTaskF->size == qTaskF->size);
goto _exit;
}
}
if (taosArrayInsert(pFS->aQTaskInf, idx, qTaskFile) == NULL) {
if (taosArrayInsert(pFS->aQTaskInf, idx, qTaskF) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
}
_exit:
return code;
......@@ -613,17 +628,17 @@ int32_t tdRSmaFSRef(SSma *pSma, SRSmaStat *pStat, int64_t suid, int8_t level, in
taosRLockLatch(RSMA_FS_LOCK(pStat));
if (suid > 0 && level > 0) {
ASSERT(version > 0);
if ((pTaskF = taosArraySearch(aQTaskInf, &qTaskF, tdQTaskInfCmprFn1, TD_EQ))) {
oldVal = atomic_fetch_add_32(&pTaskF->nRef, 1);
ASSERT(oldVal > 0);
}
} else {
// ref all
int32_t size = taosArrayGetSize(aQTaskInf);
for (int32_t i = 0; i < size; ++i) {
pTaskF = TARRAY_GET_ELEM(aQTaskInf, i);
if (pTaskF->version == version) {
oldVal = atomic_fetch_add_32(&pTaskF->nRef, 1);
}
ASSERT(oldVal > 0);
}
}
......@@ -631,18 +646,6 @@ int32_t tdRSmaFSRef(SSma *pSma, SRSmaStat *pStat, int64_t suid, int8_t level, in
return oldVal;
}
int64_t tdRSmaFSMaxVer(SSma *pSma, SRSmaStat *pStat) {
SArray *aQTaskInf = RSMA_FS(pStat)->aQTaskInf;
int64_t version = -1;
taosRLockLatch(RSMA_FS_LOCK(pStat));
if (taosArrayGetSize(aQTaskInf) > 0) {
version = ((SQTaskFile *)taosArrayGetLast(aQTaskInf))->version;
}
taosRUnLockLatch(RSMA_FS_LOCK(pStat));
return version;
}
void tdRSmaFSUnRef(SSma *pSma, SRSmaStat *pStat, int64_t suid, int8_t level, int64_t version) {
SVnode *pVnode = pSma->pVnode;
SArray *aQTaskInf = RSMA_FS(pStat)->aQTaskInf;
......@@ -653,12 +656,13 @@ void tdRSmaFSUnRef(SSma *pSma, SRSmaStat *pStat, int64_t suid, int8_t level, int
taosWLockLatch(RSMA_FS_LOCK(pStat));
if (suid > 0 && level > 0) {
ASSERT(version > 0);
if ((idx = taosArraySearchIdx(aQTaskInf, &qTaskF, tdQTaskInfCmprFn1, TD_EQ)) >= 0) {
ASSERT(idx < taosArrayGetSize(aQTaskInf));
pTaskF = taosArrayGet(aQTaskInf, idx);
if (atomic_sub_fetch_32(&pTaskF->nRef, 1) <= 0) {
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->suid, level, pTaskF->version, tfsGetPrimaryPath(pVnode->pTfs),
qTaskFullName);
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->suid, level, pTaskF->version,
tfsGetPrimaryPath(pVnode->pTfs), qTaskFullName);
if (taosRemoveFile(qTaskFullName) < 0) {
smaWarn("vgId:%d, failed to remove %s since %s", TD_VID(pVnode), qTaskFullName,
tstrerror(TAOS_SYSTEM_ERROR(errno)));
......@@ -695,3 +699,39 @@ void tdRSmaFSUnRef(SSma *pSma, SRSmaStat *pStat, int64_t suid, int8_t level, int
taosWUnLockLatch(RSMA_FS_LOCK(pStat));
}
int32_t tdRSmaFSTakeSnapshot(SSma *pSma, SRSmaFS *pFSOut) {
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
int32_t code = 0;
taosRLockLatch(RSMA_FS_LOCK(pStat));
code = tdRSmaFSCopy(pSma, pFSOut);
taosWUnLockLatch(RSMA_FS_LOCK(pStat));
_exit:
if (code) {
smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pSma->pVnode), __func__, lino, tstrerror(code));
}
return code;
}
int32_t tdRSmaFSCopy(SSma *pSma, SRSmaFS *pFSOut) {
int32_t code = 0;
int32_t lino = 0;
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
SRSmaFS *pFS = RSMA_FS(pStat);
int32_t size = 0;
code = tdRSmaFSCreate(pFSOut, size);
TSDB_CHECK_CODE(code, lino, _exit);
taosArraySetSize(pFSOut->aQTaskInf, size);
memcpy(TARRAY_GET_ELEM(pFSOut->aQTaskInf, 0), TARRAY_GET_ELEM(pFS->aQTaskInf, 0), size * sizeof(SQTaskFile));
_exit:
if (code) {
smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pSma->pVnode), __func__, lino, tstrerror(code));
}
return code;
}
......@@ -28,8 +28,6 @@ SSmaMgmt smaMgmt = {
.rsetId = -1,
};
#define TD_QTASKINFO_FNAME_PREFIX "main.db"
typedef struct SRSmaQTaskInfoItem SRSmaQTaskInfoItem;
static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid);
......@@ -59,26 +57,6 @@ struct SRSmaQTaskInfoItem {
void *qTaskInfo;
};
void tdRSmaQTaskInfoGetFileName(int32_t vgId, int64_t suid, int8_t level, int64_t version, char *outputName) {
tdGetVndFileName(vgId, NULL, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, suid, level, version, outputName);
}
void tdRSmaQTaskInfoGetFullName(int32_t vgId, int64_t suid, int8_t level, int64_t version, const char *path, char *outputName) {
tdGetVndFileName(vgId, path, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, suid, level, version, outputName);
}
void tdRSmaQTaskInfoGetFullPath(int32_t vgId, int8_t level, const char *path, char *outputName) {
tdGetVndDirName(vgId, path, VNODE_RSMA_DIR, true, outputName);
int32_t rsmaLen = strlen(outputName);
snprintf(outputName + rsmaLen, TSDB_FILENAME_LEN - rsmaLen, "%" PRIi8, level);
}
void tdRSmaQTaskInfoGetFullPathEx(int32_t vgId, tb_uid_t suid, int8_t level, const char *path, char *outputName) {
tdGetVndDirName(vgId, path, VNODE_RSMA_DIR, true, outputName);
int32_t rsmaLen = strlen(outputName);
snprintf(outputName + rsmaLen, TSDB_FILENAME_LEN - rsmaLen, "%" PRIi64 "%s%" PRIi8, suid, TD_DIRSEP, level);
}
static void tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level) {
// Note: free/kill may in RC
if (!taskHandle || !(*taskHandle)) return;
......@@ -353,10 +331,12 @@ int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
return TSDB_CODE_SUCCESS;
}
#if 0
if (tdCheckAndInitSmaEnv(pSma, TSDB_SMA_TYPE_ROLLUP) != TSDB_CODE_SUCCESS) {
terrno = TSDB_CODE_TDB_INIT_FAILED;
return TSDB_CODE_FAILED;
}
#endif
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
......@@ -1178,20 +1158,21 @@ _err:
}
/**
* @brief reload ts data from checkpoint
*
* @param pSma
* @return int32_t
* N.B. the data would be restored from the unified WAL replay procedure
*/
#if 0
static int32_t tdRSmaRestoreTSDataReload(SSma *pSma) {
// NOTHING TODO: the data would be restored from the unified WAL replay procedure
return TSDB_CODE_SUCCESS;
}
#endif
int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer, int8_t rollback) {
// step 1: iterate all stables to restore the rsma env
// step 1: init env
if (tdCheckAndInitSmaEnv(pSma, TSDB_SMA_TYPE_ROLLUP) != TSDB_CODE_SUCCESS) {
terrno = TSDB_CODE_TDB_INIT_FAILED;
return TSDB_CODE_FAILED;
}
// step 2: open SRSmaFS for qTaskFiles
if (tdRSmaFSOpen(pSma, qtaskFileVer, rollback) < 0) {
goto _err;
}
// step 3: iterate all stables to restore the rsma env
int64_t nTables = 0;
if (tdRSmaRestoreQTaskInfoInit(pSma, &nTables) < 0) {
goto _err;
......@@ -1201,18 +1182,6 @@ int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer,
return TSDB_CODE_SUCCESS;
}
// step 2: reload ts data from checkpoint
#if 0
if (tdRSmaRestoreTSDataReload(pSma) < 0) {
goto _err;
}
#endif
// step 3: open SRSmaFS for qTaskFiles
if (tdRSmaFSOpen(pSma, qtaskFileVer, rollback) < 0) {
goto _err;
}
smaInfo("vgId:%d, restore rsma task %" PRIi8 " from qtaskf %" PRIi64 " succeed", SMA_VID(pSma), type, qtaskFileVer);
return TSDB_CODE_SUCCESS;
_err:
......@@ -1222,19 +1191,26 @@ _err:
}
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
int32_t code = 0;
int32_t lino = 0;
SSma *pSma = pRSmaStat->pSma;
SVnode *pVnode = pSma->pVnode;
int32_t vid = SMA_VID(pSma);
SArray *qTaskFArray = NULL;
int64_t version = pRSmaStat->commitAppliedVer;
TdFilePtr pOutFD = NULL;
TdFilePtr pInFD = NULL;
char fname[TSDB_FILENAME_LEN];
char fnameVer[TSDB_FILENAME_LEN];
SRSmaFS fs = {0};
if (taosHashGetSize(pInfoHash) <= 0) {
return TSDB_CODE_SUCCESS;
}
int64_t fsMaxVer = tdRSmaFSMaxVer(pSma, pRSmaStat);
if (pRSmaStat->commitAppliedVer <= fsMaxVer) {
smaDebug("vgId:%d, rsma persist, no need as applied %" PRIi64 " not larger than fsMaxVer %" PRIi64, vid,
pRSmaStat->commitAppliedVer, fsMaxVer);
return TSDB_CODE_SUCCESS;
qTaskFArray = taosArrayInit(taosHashGetSize(pInfoHash) << 1, sizeof(SQTaskFile));
if (!qTaskFArray) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
void *infoHash = NULL;
......@@ -1249,19 +1225,69 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pRSmaInfo, i);
if (pItem && pItem->pStreamState) {
if (streamStateCommit(pItem->pStreamState) < 0) {
terrno = TSDB_CODE_RSMA_STREAM_STATE_COMMIT;
goto _err;
code = TSDB_CODE_RSMA_STREAM_STATE_COMMIT;
TSDB_CHECK_CODE(code, lino, _exit);
}
smaDebug("vgId:%d, rsma persist, stream state commit success, table %" PRIi64 " level %d", vid, pRSmaInfo->suid,
i + 1);
smaDebug("vgId:%d, rsma persist, stream state commit success, table %" PRIi64 " level %d", TD_VID(pVnode),
pRSmaInfo->suid, i + 1);
// qTaskInfo file
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pRSmaInfo->suid, i + 1, -1, tfsGetPrimaryPath(pVnode->pTfs), fname);
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pRSmaInfo->suid, i + 1, version, tfsGetPrimaryPath(pVnode->pTfs),
fnameVer);
if (taosCheckExistFile(fnameVer)) {
smaWarn("vgId:%d, rsma persist, duplicate file %s exist", TD_VID(pVnode), fnameVer);
}
pOutFD = taosCreateFile(fnameVer, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
if (pOutFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
pInFD = taosOpenFile(fname, TD_FILE_READ);
if (pInFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
return TSDB_CODE_SUCCESS;
_err:
smaError("vgId:%d, rsma persist failed since %s", vid, terrstr());
return TSDB_CODE_FAILED;
int64_t size = 0;
if (taosFStatFile(pInFD, &size, NULL) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
int64_t offset = 0;
if (taosFSendFile(pOutFD, pInFD, &offset, size) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
smaError("vgId:%d, rsma persist, send qtaskinfo file %s failed since %s", TD_VID(pVnode), fname, tstrerror(code));
TSDB_CHECK_CODE(code, lino, _exit);
}
taosCloseFile(&pOutFD);
taosCloseFile(&pInFD);
SQTaskFile qTaskF = {.nRef = 1, .level = i + 1, .suid = pRSmaInfo->suid, .version = version, .size = size};
taosArrayPush(qTaskFArray, &qTaskF);
}
}
}
code = tdRSmaFSTakeSnapshot(pSma, &fs);
TSDB_CHECK_CODE(code, lino, _exit);
code = tdRSmaFSPrepareCommit(pSma, &fs);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
taosArrayDestroy(fs.aQTaskInf);
taosArrayDestroy(qTaskFArray);
if (code) {
if (pOutFD) taosCloseFile(&pOutFD);
if (pInFD) taosCloseFile(&pInFD);
smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
}
terrno = code;
return code;
}
/**
......
......@@ -15,186 +15,52 @@
#include "sma.h"
// smaFileUtil ================
#if 0
#define TD_FILE_STATE_OK 0
#define TD_FILE_STATE_BAD 1
#define TD_FILE_INIT_MAGIC 0xFFFFFFFF
static int32_t tdEncodeTFInfo(void **buf, STFInfo *pInfo);
static void *tdDecodeTFInfo(void *buf, STFInfo *pInfo);
static int32_t tdEncodeTFInfo(void **buf, STFInfo *pInfo) {
int32_t tlen = 0;
tlen += taosEncodeFixedU32(buf, pInfo->magic);
tlen += taosEncodeFixedU32(buf, pInfo->ftype);
tlen += taosEncodeFixedU32(buf, pInfo->fver);
tlen += taosEncodeFixedI64(buf, pInfo->fsize);
return tlen;
}
static void *tdDecodeTFInfo(void *buf, STFInfo *pInfo) {
buf = taosDecodeFixedU32(buf, &(pInfo->magic));
buf = taosDecodeFixedU32(buf, &(pInfo->ftype));
buf = taosDecodeFixedU32(buf, &(pInfo->fver));
buf = taosDecodeFixedI64(buf, &(pInfo->fsize));
return buf;
}
int64_t tdWriteTFile(STFile *pTFile, void *buf, int64_t nbyte) {
ASSERT(TD_TFILE_OPENED(pTFile));
int64_t nwrite = taosWriteFile(pTFile->pFile, buf, nbyte);
if (nwrite < nbyte) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return nwrite;
}
int64_t tdSeekTFile(STFile *pTFile, int64_t offset, int whence) {
ASSERT(TD_TFILE_OPENED(pTFile));
#define TD_QTASKINFO_FNAME_PREFIX "main.db"
int64_t loffset = taosLSeekFile(TD_TFILE_PFILE(pTFile), offset, whence);
if (loffset < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return loffset;
void tdRSmaQTaskInfoGetFileName(int32_t vgId, int64_t suid, int8_t level, int64_t version, char *outputName) {
tdRSmaGetFileName(vgId, NULL, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, suid, level, version, outputName);
}
int64_t tdGetTFileSize(STFile *pTFile, int64_t *size) {
ASSERT(TD_TFILE_OPENED(pTFile));
return taosFStatFile(pTFile->pFile, size, NULL);
void tdRSmaQTaskInfoGetFullName(int32_t vgId, int64_t suid, int8_t level, int64_t version, const char *path,
char *outputName) {
tdRSmaGetFileName(vgId, path, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, suid, level, version, outputName);
}
int64_t tdReadTFile(STFile *pTFile, void *buf, int64_t nbyte) {
ASSERT(TD_TFILE_OPENED(pTFile));
int64_t nread = taosReadFile(pTFile->pFile, buf, nbyte);
if (nread < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return nread;
void tdRSmaQTaskInfoGetFullPath(int32_t vgId, int8_t level, const char *path, char *outputName) {
tdRSmaGetDirName(vgId, path, VNODE_RSMA_DIR, true, outputName);
int32_t rsmaLen = strlen(outputName);
snprintf(outputName + rsmaLen, TSDB_FILENAME_LEN - rsmaLen, "%" PRIi8, level);
}
int32_t tdUpdateTFileHeader(STFile *pTFile) {
char buf[TD_FILE_HEAD_SIZE] = "\0";
if (tdSeekTFile(pTFile, 0, SEEK_SET) < 0) {
return -1;
}
void *ptr = buf;
tdEncodeTFInfo(&ptr, &(pTFile->info));
taosCalcChecksumAppend(0, (uint8_t *)buf, TD_FILE_HEAD_SIZE);
if (tdWriteTFile(pTFile, buf, TD_FILE_HEAD_SIZE) < 0) {
return -1;
}
return 0;
void tdRSmaQTaskInfoGetFullPathEx(int32_t vgId, tb_uid_t suid, int8_t level, const char *path, char *outputName) {
tdRSmaGetDirName(vgId, path, VNODE_RSMA_DIR, true, outputName);
int32_t rsmaLen = strlen(outputName);
snprintf(outputName + rsmaLen, TSDB_FILENAME_LEN - rsmaLen, "%" PRIi64 "%s%" PRIi8, suid, TD_DIRSEP, level);
}
int32_t tdLoadTFileHeader(STFile *pTFile, STFInfo *pInfo) {
char buf[TD_FILE_HEAD_SIZE] = "\0";
uint32_t _version;
ASSERT(TD_TFILE_OPENED(pTFile));
if (tdSeekTFile(pTFile, 0, SEEK_SET) < 0) {
return -1;
}
if (tdReadTFile(pTFile, buf, TD_FILE_HEAD_SIZE) < 0) {
return -1;
}
if (!taosCheckChecksumWhole((uint8_t *)buf, TD_FILE_HEAD_SIZE)) {
terrno = TSDB_CODE_FILE_CORRUPTED;
return -1;
}
void *pBuf = buf;
pBuf = tdDecodeTFInfo(pBuf, pInfo);
return 0;
}
void tdUpdateTFileMagic(STFile *pTFile, void *pCksm) {
pTFile->info.magic = taosCalcChecksum(pTFile->info.magic, (uint8_t *)(pCksm), sizeof(TSCKSUM));
}
int64_t tdAppendTFile(STFile *pTFile, void *buf, int64_t nbyte, int64_t *offset) {
ASSERT(TD_TFILE_OPENED(pTFile));
int64_t toffset;
if ((toffset = tdSeekTFile(pTFile, 0, SEEK_END)) < 0) {
return -1;
}
#if 0
smaDebug("append to file %s, offset:%" PRIi64 " nbyte:%" PRIi64 " fsize:%" PRIi64, TD_TFILE_FULL_NAME(pTFile),
toffset, nbyte, toffset + nbyte);
#endif
ASSERT(pTFile->info.fsize == toffset);
if (offset) {
*offset = toffset;
}
if (tdWriteTFile(pTFile, buf, nbyte) < 0) {
return -1;
}
pTFile->info.fsize += nbyte;
return nbyte;
}
int32_t tdOpenTFile(STFile *pTFile, int flags) {
ASSERT(!TD_TFILE_OPENED(pTFile));
pTFile->pFile = taosOpenFile(TD_TFILE_FULL_NAME(pTFile), flags);
if (pTFile->pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return 0;
}
void tdCloseTFile(STFile *pTFile) {
if (TD_TFILE_OPENED(pTFile)) {
taosCloseFile(&pTFile->pFile);
TD_TFILE_SET_CLOSED(pTFile);
}
}
void tdDestroyTFile(STFile *pTFile) { taosMemoryFreeClear(TD_TFILE_FULL_NAME(pTFile)); }
#endif
void tdGetVndFileName(int32_t vgId, const char *pdname, const char *dname, const char *fname, int64_t suid,
void tdRSmaGetFileName(int32_t vgId, const char *pdname, const char *dname, const char *fname, int64_t suid,
int8_t level, int64_t version, char *outputName) {
if (version >= 0 && level > 0 && suid > 0) {
if (level >= 0 && suid > 0) {
if (version >= 0) {
if (pdname) {
snprintf(outputName, TSDB_FILENAME_LEN, "%s%svnode%svnode%d%s%s%s%" PRIi64 "%s%" PRIi8 "%s%s.%" PRIi64, pdname,
TD_DIRSEP, TD_DIRSEP, vgId, TD_DIRSEP, dname, TD_DIRSEP, suid, level, fname, version);
TD_DIRSEP, TD_DIRSEP, vgId, TD_DIRSEP, dname, TD_DIRSEP, suid, TD_DIRSEP, level, TD_DIRSEP, fname,
version);
} else {
snprintf(outputName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s%s%" PRIi64 "%s%" PRIi8 "%s%s.%" PRIi64, TD_DIRSEP,
vgId, TD_DIRSEP, dname, TD_DIRSEP, suid, TD_DIRSEP, level, TD_DIRSEP, fname, version);
}
} else {
snprintf(outputName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s%s%" PRIi64 "%s%" PRIi8 "%s%s." PRIi64, TD_DIRSEP,
vgId, TD_DIRSEP, dname, TD_DIRSEP, suid, level, fname, version);
if (pdname) {
snprintf(outputName, TSDB_FILENAME_LEN, "%s%svnode%svnode%d%s%s%s%" PRIi64 "%s%" PRIi8 "%s%s", pdname,
TD_DIRSEP, TD_DIRSEP, vgId, TD_DIRSEP, dname, TD_DIRSEP, suid, TD_DIRSEP, level, TD_DIRSEP, fname);
} else {
snprintf(outputName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s%s%" PRIi64 "%s%" PRIi8 "%s%s", TD_DIRSEP, vgId,
TD_DIRSEP, dname, TD_DIRSEP, suid, TD_DIRSEP, level, TD_DIRSEP, fname);
}
}
} else {
if (version >= 0) {
if (pdname) {
snprintf(outputName, TSDB_FILENAME_LEN, "%s%svnode%svnode%d%s%s%sv%d%s%" PRIi64, pdname, TD_DIRSEP, TD_DIRSEP,
vgId, TD_DIRSEP, dname, TD_DIRSEP, vgId, fname, version);
......@@ -202,10 +68,19 @@ void tdGetVndFileName(int32_t vgId, const char *pdname, const char *dname, const
snprintf(outputName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s%sv%d%s%" PRIi64, TD_DIRSEP, vgId, TD_DIRSEP, dname,
TD_DIRSEP, vgId, fname, version);
}
} else {
if (pdname) {
snprintf(outputName, TSDB_FILENAME_LEN, "%s%svnode%svnode%d%s%s%sv%d%s", pdname, TD_DIRSEP, TD_DIRSEP, vgId,
TD_DIRSEP, dname, TD_DIRSEP, vgId, fname);
} else {
snprintf(outputName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s%sv%d%s", TD_DIRSEP, vgId, TD_DIRSEP, dname,
TD_DIRSEP, vgId, fname);
}
}
}
}
void tdGetVndDirName(int32_t vgId, const char *pdname, const char *dname, bool endWithSep, char *outputName) {
void tdRSmaGetDirName(int32_t vgId, const char *pdname, const char *dname, bool endWithSep, char *outputName) {
if (pdname) {
if (endWithSep) {
snprintf(outputName, TSDB_FILENAME_LEN, "%s%svnode%svnode%d%s%s%s", pdname, TD_DIRSEP, TD_DIRSEP, vgId, TD_DIRSEP,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册