提交 cb9aa7dc 编写于 作者: K kailixu

chore: rsmaFS process

上级 64e907b9
......@@ -85,7 +85,8 @@ struct STSmaStat {
struct SQTaskFile {
volatile int32_t nRef;
int32_t padding;
int8_t level;
int64_t suid;
int64_t version;
int64_t size;
};
......@@ -214,19 +215,21 @@ static FORCE_INLINE void tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat) {
// rsma
void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree);
int32_t tdRSmaFSOpen(SSma *pSma, int64_t version);
int32_t tdRSmaFSOpen(SSma *pSma, int64_t version, int8_t rollback);
void tdRSmaFSClose(SRSmaFS *fs);
int32_t tdRSmaFSRef(SSma *pSma, SRSmaStat *pStat, int64_t version);
void tdRSmaFSUnRef(SSma *pSma, SRSmaStat *pStat, int64_t version);
int32_t tdRSmaFSRef(SSma *pSma, SRSmaStat *pStat, int64_t suid, int8_t level, int64_t version);
void tdRSmaFSUnRef(SSma *pSma, SRSmaStat *pStat, int64_t suid, int8_t level, int64_t version);
int64_t tdRSmaFSMaxVer(SSma *pSma, SRSmaStat *pStat);
int32_t tdRSmaFSUpsertQTaskFile(SRSmaFS *pFS, SQTaskFile *qTaskFile);
int32_t tdRSmaRestore(SSma *pSma, int8_t type, int64_t committedVer);
int32_t tdRSmaFSRollback(SSma *pSma);
int32_t tdRSmaRestore(SSma *pSma, int8_t type, int64_t committedVer, int8_t rollback);
int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName);
int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type);
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash);
int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer);
void tdRSmaQTaskInfoGetFileName(int32_t vgId, int64_t version, char *outputName);
void tdRSmaQTaskInfoGetFullName(int32_t vgId, int64_t version, const char *path, char *outputName);
int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer, int8_t rollback);
void tdRSmaQTaskInfoGetFileName(int32_t vgId, int64_t suid, int8_t level, int64_t version, char *outputName);
void tdRSmaQTaskInfoGetFullName(int32_t vgId, int64_t suid, int8_t level, int64_t version, const char *path,
char *outputName);
void tdRSmaQTaskInfoGetFullPath(int32_t vgId, int8_t level, const char *path, char *outputName);
void tdRSmaQTaskInfoGetFullPathEx(int32_t vgId, tb_uid_t suid, int8_t level, const char *path, char *outputName);
......@@ -290,8 +293,8 @@ void tdCloseTFile(STFile *pTFile);
void tdDestroyTFile(STFile *pTFile);
#endif
void tdGetVndFileName(int32_t vgId, const char *pdname, const char *dname, const char *fname, int64_t version,
char *outputName);
void tdGetVndFileName(int32_t vgId, const char *pdname, const char *dname, const char *fname, int64_t suid,
int8_t level, int64_t version, char *outputName);
void tdGetVndDirName(int32_t vgId, const char *pdname, const char *dname, bool endWithSep, char *outputName);
#ifdef __cplusplus
......
......@@ -203,10 +203,7 @@ void smaCleanUp();
int32_t smaOpen(SVnode* pVnode, int8_t rollback);
int32_t smaClose(SSma* pSma);
int32_t smaBegin(SSma* pSma);
int32_t smaSyncPreCommit(SSma* pSma);
int32_t smaSyncCommit(SSma* pSma);
int32_t smaSyncPostCommit(SSma* pSma);
int32_t smaPrepareAsyncCommit(SSma* pSma);
int32_t smaPrepareCommit(SSma* pSma);
int32_t smaCommit(SSma* pSma, SCommitInfo* pInfo);
int32_t smaFinishCommit(SSma* pSma);
int32_t smaPostCommit(SSma* pSma);
......
......@@ -17,49 +17,18 @@
extern SSmaMgmt smaMgmt;
#if 0
static int32_t tdProcessRSmaSyncPreCommitImpl(SSma *pSma);
static int32_t tdProcessRSmaSyncCommitImpl(SSma *pSma);
static int32_t tdProcessRSmaSyncPostCommitImpl(SSma *pSma);
#endif
static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma);
static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma, SCommitInfo *pInfo);
static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma);
static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pRSmaStat);
#if 0
/**
* @brief Only applicable to Rollup SMA
*
* @param pSma
* @return int32_t
*/
int32_t smaSyncPreCommit(SSma *pSma) { return tdProcessRSmaSyncPreCommitImpl(pSma); }
/**
* @brief Only applicable to Rollup SMA
*
* @param pSma
* @return int32_t
*/
int32_t smaSyncCommit(SSma *pSma) { return tdProcessRSmaSyncCommitImpl(pSma); }
/**
* @brief Only applicable to Rollup SMA
*
* @param pSma
* @return int32_t
*/
int32_t smaSyncPostCommit(SSma *pSma) { return tdProcessRSmaSyncPostCommitImpl(pSma); }
#endif
/**
* @brief async commit, only applicable to Rollup SMA
*
* @param pSma
* @return int32_t
*/
int32_t smaPrepareAsyncCommit(SSma *pSma) { return tdProcessRSmaAsyncPreCommitImpl(pSma); }
int32_t smaPrepareCommit(SSma *pSma) { return tdProcessRSmaAsyncPreCommitImpl(pSma); }
/**
* @brief async commit, only applicable to Rollup SMA
......@@ -143,70 +112,6 @@ _exit:
return code;
}
#if 0
/**
* @brief pre-commit for rollup sma(sync commit).
* 1) set trigger stat of rsma timer TASK_TRIGGER_STAT_PAUSED.
* 2) wait for all triggered fetch tasks to finish
* 3) perform persist task for qTaskInfo
*
* @param pSma
* @return int32_t
*/
static int32_t tdProcessRSmaSyncPreCommitImpl(SSma *pSma) {
SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma);
if (!pSmaEnv) {
return TSDB_CODE_SUCCESS;
}
SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv);
SRSmaStat *pRSmaStat = SMA_STAT_RSMA(pStat);
// step 1: set rsma stat paused
atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED);
// step 2: wait for all triggered fetch tasks to finish
int32_t nLoops = 0;
while (1) {
if (T_REF_VAL_GET(pStat) == 0) {
smaDebug("vgId:%d, rsma fetch tasks are all finished", SMA_VID(pSma));
break;
} else {
smaDebug("vgId:%d, rsma fetch tasks are not all finished yet", SMA_VID(pSma));
}
++nLoops;
if (nLoops > 1000) {
sched_yield();
nLoops = 0;
}
}
// step 3: perform persist task for qTaskInfo
pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied;
tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat));
smaDebug("vgId:%d, rsma pre commit success", SMA_VID(pSma));
return TSDB_CODE_SUCCESS;
}
/**
* @brief commit for rollup sma
*
* @param pSma
* @return int32_t
*/
static int32_t tdProcessRSmaSyncCommitImpl(SSma *pSma) {
#if 0
SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma);
if (!pSmaEnv) {
return TSDB_CODE_SUCCESS;
}
#endif
return TSDB_CODE_SUCCESS;
}
#endif
// SQTaskFile ======================================================
/**
......@@ -218,6 +123,7 @@ static int32_t tdProcessRSmaSyncCommitImpl(SSma *pSma) {
* @return int32_t
*/
static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pStat) {
#if 0
SVnode *pVnode = pSma->pVnode;
SRSmaFS *pFS = RSMA_FS(pStat);
int64_t committed = pStat->commitAppliedVer;
......@@ -264,31 +170,10 @@ static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pStat) {
}
taosWUnLockLatch(RSMA_FS_LOCK(pStat));
#endif
return TSDB_CODE_SUCCESS;
}
#if 0
/**
* @brief post-commit for rollup sma
* 1) clean up the outdated qtaskinfo files
*
* @param pSma
* @return int32_t
*/
static int32_t tdProcessRSmaSyncPostCommitImpl(SSma *pSma) {
SVnode *pVnode = pSma->pVnode;
if (!VND_IS_RSMA(pVnode)) {
return TSDB_CODE_SUCCESS;
}
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma);
tdUpdateQTaskInfoFiles(pSma, pRSmaStat);
return TSDB_CODE_SUCCESS;
}
#endif
/**
* @brief Rsma async commit implementation(only do some necessary light weighted task)
* 1) set rsma stat TASK_TRIGGER_STAT_PAUSED
......
......@@ -20,6 +20,123 @@
static int32_t tdFetchQTaskInfoFiles(SSma *pSma, int64_t version, SArray **output);
static int32_t tdQTaskInfCmprFn1(const void *p1, const void *p2);
static int32_t tdQTaskInfCmprFn2(const void *p1, const void *p2);
static FORCE_INLINE int32_t tPutQTaskF(uint8_t *p, SQTaskFile *pFile) {
int32_t n = 0;
n += tPutI8(p ? p + n : p, pFile->level);
n += tPutI64v(p ? p + n : p, pFile->size);
n += tPutI64v(p ? p + n : p, pFile->suid);
n += tPutI64v(p ? p + n : p, pFile->version);
return n;
}
static int32_t tdRSmaFSToBinary(uint8_t *p, SRSmaFS *pFS) {
int32_t n = 0;
uint32_t size = taosArrayGetSize(pFS->aQTaskInf);
// version
n += tPutI8(p ? p + n : p, 0);
// SArray<SQTaskFile>
n += tPutU32v(p ? p + n : p, size);
for (uint32_t i = 0; i < size; ++i) {
n += tPutQTaskF(p ? p + n : p, taosArrayGet(pFS->aQTaskInf, i));
}
return n;
}
int32_t tdRSmaGetQTaskF(uint8_t *p, SQTaskFile *pFile) {
int32_t n = 0;
n += tGetI8(p + n, &pFile->level);
n += tGetI64v(p + n, &pFile->size);
n += tGetI64v(p + n, &pFile->suid);
n += tGetI64v(p + n, &pFile->version);
return n;
}
static int32_t tsdbBinaryToFS(uint8_t *pData, int64_t nData, SRSmaFS *pFS) {
int32_t code = 0;
int32_t n = 0;
int8_t version = 0;
// version
n += tGetI8(pData + n, &version);
// SArray<SQTaskFile>
taosArrayClear(pFS->aQTaskInf);
uint32_t size = 0;
n += tGetU32v(pData + n, &size);
for (uint32_t i = 0; i < size; ++i) {
SQTaskFile qTaskF = {0};
int32_t nt = tdRSmaGetQTaskF(pData + n, &qTaskF);
if (nt < 0) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
n += nt;
if (taosArrayPush(pFS->aQTaskInf, &qTaskF) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
}
ASSERT(n + sizeof(TSCKSUM) == nData);
_exit:
return code;
}
static int32_t tdRSmaSaveFSToFile(SRSmaFS *pFS, const char *fname) {
int32_t code = 0;
int32_t lino = 0;
// encode to binary
int32_t size = tdRSmaFSToBinary(NULL, pFS) + sizeof(TSCKSUM);
uint8_t *pData = taosMemoryMalloc(size);
if (pData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
tdRSmaFSToBinary(pData, pFS);
taosCalcChecksumAppend(0, pData, size);
// save to file
TdFilePtr pFD = taosCreateFile(fname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
if (pFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
int64_t n = taosWriteFile(pFD, pData, size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
taosCloseFile(&pFD);
TSDB_CHECK_CODE(code, lino, _exit);
}
if (taosFsyncFile(pFD) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
taosCloseFile(&pFD);
TSDB_CHECK_CODE(code, lino, _exit);
}
taosCloseFile(&pFD);
_exit:
if (pData) taosMemoryFree(pData);
if (code) {
smaError("%s failed at line %d since %s, fname:%s", __func__, lino, tstrerror(code), fname);
}
return code;
}
/**
* @brief Open RSma FS from qTaskInfo files
*
......@@ -27,6 +144,7 @@ static int32_t tdQTaskInfCmprFn2(const void *p1, const void *p2);
* @param version
* @return int32_t
*/
#if 0
int32_t tdRSmaFSOpen(SSma *pSma, int64_t version) {
SVnode *pVnode = pSma->pVnode;
int64_t commitID = pVnode->state.commitID;
......@@ -70,67 +188,190 @@ _end:
}
return TSDB_CODE_SUCCESS;
}
#endif
void tdRSmaFSClose(SRSmaFS *fs) { taosArrayDestroy(fs->aQTaskInf); }
static int32_t tdRSmaFSCreate(SRSmaFS *pFS) {
int32_t code = 0;
static int32_t tdQTaskInfCmprFn1(const void *p1, const void *p2) {
if (*(int64_t *)p1 < ((SQTaskFile *)p2)->version) {
return -1;
} else if (*(int64_t *)p1 > ((SQTaskFile *)p2)->version) {
return 1;
pFS->aQTaskInf = taosArrayInit(0, sizeof(SQTaskFile));
if (pFS->aQTaskInf == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
return 0;
_exit:
return code;
}
int32_t tdRSmaFSRef(SSma *pSma, SRSmaStat *pStat, int64_t version) {
SArray *aQTaskInf = RSMA_FS(pStat)->aQTaskInf;
SQTaskFile *pTaskF = NULL;
int32_t oldVal = 0;
static void tdRSmaGetCurrentFName(SSma *pSma, char *current, char *current_t) {
SVnode *pVnode = pSma->pVnode;
if (pVnode->pTfs) {
if (current) {
snprintf(current, TSDB_FILENAME_LEN - 1, "%s%svnode%svnode%d%srsma%sCURRENT", tfsGetPrimaryPath(pVnode->pTfs),
TD_DIRSEP, TD_VID(pVnode), TD_DIRSEP);
}
if (current_t) {
snprintf(current, TSDB_FILENAME_LEN - 1, "%s%svnode%svnode%d%srsma%sCURRENT.t", tfsGetPrimaryPath(pVnode->pTfs),
TD_DIRSEP, TD_VID(pVnode), TD_DIRSEP);
}
} else {
#if 0
if (current) {
snprintf(current, TSDB_FILENAME_LEN - 1, "%s%sCURRENT", pTsdb->path, TD_DIRSEP);
}
if (current_t) {
snprintf(current_t, TSDB_FILENAME_LEN - 1, "%s%sCURRENT.t", pTsdb->path, TD_DIRSEP);
}
#endif
}
}
taosRLockLatch(RSMA_FS_LOCK(pStat));
if ((pTaskF = taosArraySearch(aQTaskInf, &version, tdQTaskInfCmprFn1, TD_EQ))) {
oldVal = atomic_fetch_add_32(&pTaskF->nRef, 1);
ASSERT(oldVal > 0);
static int32_t tdRSmaLoadFSFromFile(const char *fname, SRSmaFS *pFS) {
int32_t code = 0;
int32_t lino = 0;
uint8_t *pData = NULL;
// load binary
TdFilePtr pFD = taosOpenFile(fname, TD_FILE_READ);
if (pFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
taosRUnLockLatch(RSMA_FS_LOCK(pStat));
return oldVal;
int64_t size;
if (taosFStatFile(pFD, &size, NULL) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
taosCloseFile(&pFD);
TSDB_CHECK_CODE(code, lino, _exit);
}
pData = taosMemoryMalloc(size);
if (pData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
taosCloseFile(&pFD);
TSDB_CHECK_CODE(code, lino, _exit);
}
if (taosReadFile(pFD, pData, size) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
taosCloseFile(&pFD);
TSDB_CHECK_CODE(code, lino, _exit);
}
if (!taosCheckChecksumWhole(pData, size)) {
code = TSDB_CODE_FILE_CORRUPTED;
taosCloseFile(&pFD);
TSDB_CHECK_CODE(code, lino, _exit);
}
taosCloseFile(&pFD);
// decode binary
code = tsdbBinaryToFS(pData, size, pFS);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (pData) taosMemoryFree(pData);
if (code) {
smaError("%s failed at line %d since %s, fname:%s", __func__, lino, tstrerror(code), fname);
}
return code;
}
int64_t tdRSmaFSMaxVer(SSma *pSma, SRSmaStat *pStat) {
SArray *aQTaskInf = RSMA_FS(pStat)->aQTaskInf;
int64_t version = -1;
taosRLockLatch(RSMA_FS_LOCK(pStat));
if (taosArrayGetSize(aQTaskInf) > 0) {
version = ((SQTaskFile *)taosArrayGetLast(aQTaskInf))->version;
static int32_t tdQTaskInfCmprFn1(const void *p1, const void *p2) {
const SQTaskFile *q1 = (const SQTaskFile *)p1;
const SQTaskFile *q2 = (const SQTaskFile *)p2;
if (q1->suid < q2->suid) {
return -1;
} else if (q1->suid > q2->suid) {
return 1;
}
taosRUnLockLatch(RSMA_FS_LOCK(pStat));
return version;
if (q1->level < q2->level) {
return -1;
} else if (q1->level > q2->level) {
return 1;
}
if (q1->version < q2->version) {
return -1;
} else if (q1->version > q2->version) {
return 1;
}
return 0;
}
void tdRSmaFSUnRef(SSma *pSma, SRSmaStat *pStat, int64_t version) {
SVnode *pVnode = pSma->pVnode;
SArray *aQTaskInf = RSMA_FS(pStat)->aQTaskInf;
char qTaskFullName[TSDB_FILENAME_LEN];
SQTaskFile *pTaskF = NULL;
int32_t idx = -1;
static int32_t tdRSmaFSApplyChange(SSma *pSma, SRSmaFS *pFS) {
int32_t code = 0;
int32_t lino = 0;
taosWLockLatch(RSMA_FS_LOCK(pStat));
if ((idx = taosArraySearchIdx(aQTaskInf, &version, tdQTaskInfCmprFn1, TD_EQ)) >= 0) {
ASSERT(idx < taosArrayGetSize(aQTaskInf));
pTaskF = taosArrayGet(aQTaskInf, idx);
if (atomic_sub_fetch_32(&pTaskF->nRef, 1) <= 0) {
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->version, tfsGetPrimaryPath(pVnode->pTfs), qTaskFullName);
if (taosRemoveFile(qTaskFullName) < 0) {
smaWarn("vgId:%d, failed to remove %s since %s", TD_VID(pVnode), qTaskFullName,
tstrerror(TAOS_SYSTEM_ERROR(errno)));
int32_t nRef = 0;
char fname[TSDB_FILENAME_LEN] = {0};
// SQTaskFile
int32_t iOld = 0;
int32_t iNew = 0;
while (true) {
int32_t nOld = taosArrayGetSize(pTsdb->fs.aDFileSet);
int32_t nNew = taosArrayGetSize(pFS->aDFileSet);
SDFileSet fSet = {0};
int8_t sameDisk = 0;
if (iOld >= nOld && iNew >= nNew) break;
SDFileSet *pSetOld = (iOld < nOld) ? taosArrayGet(pTsdb->fs.aDFileSet, iOld) : NULL;
SDFileSet *pSetNew = (iNew < nNew) ? taosArrayGet(pFS->aDFileSet, iNew) : NULL;
if (pSetOld && pSetNew) {
if (pSetOld->fid == pSetNew->fid) {
code = tsdbMergeFileSet(pTsdb, pSetOld, pSetNew);
TSDB_CHECK_CODE(code, lino, _exit);
iOld++;
iNew++;
} else if (pSetOld->fid < pSetNew->fid) {
code = tsdbRemoveFileSet(pTsdb, pSetOld);
TSDB_CHECK_CODE(code, lino, _exit);
taosArrayRemove(pTsdb->fs.aDFileSet, iOld);
} else {
smaDebug("vgId:%d, success to remove %s", TD_VID(pVnode), qTaskFullName);
code = tsdbNewFileSet(pTsdb, &fSet, pSetNew);
TSDB_CHECK_CODE(code, lino, _exit)
if (taosArrayInsert(pTsdb->fs.aDFileSet, iOld, &fSet) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
iOld++;
iNew++;
}
taosArrayRemove(aQTaskInf, idx);
} else if (pSetOld) {
code = tsdbRemoveFileSet(pTsdb, pSetOld);
TSDB_CHECK_CODE(code, lino, _exit);
taosArrayRemove(pTsdb->fs.aDFileSet, iOld);
} else {
code = tsdbNewFileSet(pTsdb, &fSet, pSetNew);
TSDB_CHECK_CODE(code, lino, _exit)
if (taosArrayInsert(pTsdb->fs.aDFileSet, iOld, &fSet) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
iOld++;
iNew++;
}
}
taosWUnLockLatch(RSMA_FS_LOCK(pStat));
_exit:
if (code) {
smaError("vgId:%d, %s failed at line %d since %s", SMA_VID(pSma), __func__, lino, tstrerror(code));
}
return code;
}
/**
......@@ -229,29 +470,128 @@ _end:
return terrno == 0 ? TSDB_CODE_SUCCESS : TSDB_CODE_FAILED;
}
static int32_t tdQTaskFileCmprFn2(const void *p1, const void *p2) {
if (((SQTaskFile *)p1)->version < ((SQTaskFile *)p2)->version) {
return -1;
} else if (((SQTaskFile *)p1)->version > ((SQTaskFile *)p2)->version) {
return 1;
// EXPOSED APIS ====================================================================================
int32_t tdRSmaFSCommit(SSma *pSma) {
int32_t code = 0;
int32_t lino = 0;
SRSmaFS fs = {0};
char current[TSDB_FILENAME_LEN] = {0};
char current_t[TSDB_FILENAME_LEN] = {0};
tdRSmaGetCurrentFName(pSma, current, current_t);
if (!taosCheckExistFile(current_t)) goto _exit;
// rename the file
if (taosRenameFile(current_t, current) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
return 0;
// Load the new FS
code = tdRSmaFSCreate(&fs);
TSDB_CHECK_CODE(code, lino, _exit);
code = tdRSmaLoadFSFromFile(current, &fs);
TSDB_CHECK_CODE(code, lino, _exit);
// apply file change
code = tdRSmaFSApplyChange(pSma, &fs);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
tdRSmaFSClose(&fs);
if (code) {
smaError("vgId:%d, %s failed at line %d since %s", SMA_VID(pSma), __func__, lino, tstrerror(code));
}
return code;
}
int32_t tdRSmaFSRollback(SSma *pSma) {
int32_t code = 0;
int32_t lino = 0;
char current_t[TSDB_FILENAME_LEN] = {0};
tdRSmaGetCurrentFName(pSma, NULL, current_t);
(void)taosRemoveFile(current_t);
_exit:
if (code) {
smaError("vgId:%d, %s failed at line %d since %s", SMA_VID(pSma), __func__, lino, tstrerror(errno));
}
return code;
}
int32_t tdRSmaFSOpen(SSma *pSma, int64_t version, int8_t rollback) {
int32_t code = 0;
int32_t lino = 0;
SVnode *pVnode = pSma->pVnode;
int64_t commitID = pVnode->state.commitID;
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pStat = NULL;
if (!pEnv) {
return TSDB_CODE_SUCCESS;
}
// open handle
code = tdRSmaFSCreate(&pStat->fs);
TSDB_CHECK_CODE(code, lino, _exit);
// open impl
char current[TSDB_FILENAME_LEN] = {0};
char current_t[TSDB_FILENAME_LEN] = {0};
tdRSmaGetCurrentFName(pSma, current, current_t);
if (taosCheckExistFile(current)) {
code = tdRSmaLoadFSFromFile(current, &pStat->fs);
TSDB_CHECK_CODE(code, lino, _exit);
if (taosCheckExistFile(current_t)) {
if (rollback) {
code = tdRSmaFSRollback(pSma);
TSDB_CHECK_CODE(code, lino, _exit);
} else {
code = tsdbFSCommit(pTsdb);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
} else {
// empty one
code = tsdbSaveFSToFile(&pTsdb->fs, current);
TSDB_CHECK_CODE(code, lino, _exit);
ASSERT(!rollback);
}
// scan and fix FS
code = tsdbScanAndTryFixFS(pTsdb);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
}
return code;
}
void tdRSmaFSClose(SRSmaFS *pFS) { taosArrayDestroy(pFS->aQTaskInf); }
int32_t tdRSmaFSUpsertQTaskFile(SRSmaFS *pFS, SQTaskFile *qTaskFile) {
int32_t code = 0;
int32_t idx = taosArraySearchIdx(pFS->aQTaskInf, qTaskFile, tdQTaskFileCmprFn2, TD_GE);
int32_t idx = taosArraySearchIdx(pFS->aQTaskInf, qTaskFile, tdQTaskInfCmprFn1, TD_GE);
if (idx < 0) {
idx = taosArrayGetSize(pFS->aQTaskInf);
} else {
SQTaskFile *pTaskF = (SQTaskFile *)taosArrayGet(pFS->aQTaskInf, idx);
int32_t c = tdQTaskFileCmprFn2(pTaskF, qTaskFile);
int32_t c = tdQTaskInfCmprFn1(pTaskF, qTaskFile);
if (c == 0) {
pTaskF->nRef = qTaskFile->nRef;
pTaskF->version = qTaskFile->version;
pTaskF->size = qTaskFile->size;
ASSERT(pTaskF->size == qTaskFile->size);
goto _exit;
}
}
......@@ -263,4 +603,95 @@ int32_t tdRSmaFSUpsertQTaskFile(SRSmaFS *pFS, SQTaskFile *qTaskFile) {
_exit:
return code;
}
\ No newline at end of file
}
int32_t tdRSmaFSRef(SSma *pSma, SRSmaStat *pStat, int64_t suid, int8_t level, int64_t version) {
SArray *aQTaskInf = RSMA_FS(pStat)->aQTaskInf;
SQTaskFile qTaskF = {.level = level, .suid = suid, .version = version};
SQTaskFile *pTaskF = NULL;
int32_t oldVal = 0;
taosRLockLatch(RSMA_FS_LOCK(pStat));
if (suid > 0 && level > 0) {
if ((pTaskF = taosArraySearch(aQTaskInf, &qTaskF, tdQTaskInfCmprFn1, TD_EQ))) {
oldVal = atomic_fetch_add_32(&pTaskF->nRef, 1);
ASSERT(oldVal > 0);
}
} else {
int32_t size = taosArrayGetSize(aQTaskInf);
for (int32_t i = 0; i < size; ++i) {
pTaskF = TARRAY_GET_ELEM(aQTaskInf, i);
if (pTaskF->version == version) {
oldVal = atomic_fetch_add_32(&pTaskF->nRef, 1);
}
ASSERT(oldVal > 0);
}
}
taosRUnLockLatch(RSMA_FS_LOCK(pStat));
return oldVal;
}
int64_t tdRSmaFSMaxVer(SSma *pSma, SRSmaStat *pStat) {
SArray *aQTaskInf = RSMA_FS(pStat)->aQTaskInf;
int64_t version = -1;
taosRLockLatch(RSMA_FS_LOCK(pStat));
if (taosArrayGetSize(aQTaskInf) > 0) {
version = ((SQTaskFile *)taosArrayGetLast(aQTaskInf))->version;
}
taosRUnLockLatch(RSMA_FS_LOCK(pStat));
return version;
}
void tdRSmaFSUnRef(SSma *pSma, SRSmaStat *pStat, int64_t suid, int8_t level, int64_t version) {
SVnode *pVnode = pSma->pVnode;
SArray *aQTaskInf = RSMA_FS(pStat)->aQTaskInf;
char qTaskFullName[TSDB_FILENAME_LEN];
SQTaskFile qTaskF = {.level = level, .suid = suid, .version = version};
SQTaskFile *pTaskF = NULL;
int32_t idx = -1;
taosWLockLatch(RSMA_FS_LOCK(pStat));
if (suid > 0 && level > 0) {
if ((idx = taosArraySearchIdx(aQTaskInf, &qTaskF, tdQTaskInfCmprFn1, TD_EQ)) >= 0) {
ASSERT(idx < taosArrayGetSize(aQTaskInf));
pTaskF = taosArrayGet(aQTaskInf, idx);
if (atomic_sub_fetch_32(&pTaskF->nRef, 1) <= 0) {
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->suid, level, pTaskF->version, tfsGetPrimaryPath(pVnode->pTfs),
qTaskFullName);
if (taosRemoveFile(qTaskFullName) < 0) {
smaWarn("vgId:%d, failed to remove %s since %s", TD_VID(pVnode), qTaskFullName,
tstrerror(TAOS_SYSTEM_ERROR(errno)));
} else {
smaDebug("vgId:%d, success to remove %s", TD_VID(pVnode), qTaskFullName);
}
taosArrayRemove(aQTaskInf, idx);
}
}
} else {
for (int32_t i = 0; i < taosArrayGetSize(aQTaskInf);) {
pTaskF = TARRAY_GET_ELEM(aQTaskInf, i);
int32_t nRef = INT32_MAX;
if (pTaskF->version == version) {
nRef = atomic_sub_fetch_32(&pTaskF->nRef, 1);
} else if (pTaskF->version < version) {
nRef = atomic_load_32(&pTaskF->nRef);
}
if (nRef <= 0) {
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->suid, pTaskF->level, pTaskF->version,
tfsGetPrimaryPath(pVnode->pTfs), qTaskFullName);
if (taosRemoveFile(qTaskFullName) < 0) {
smaWarn("vgId:%d, failed to remove %s since %s", TD_VID(pVnode), qTaskFullName,
tstrerror(TAOS_SYSTEM_ERROR(errno)));
} else {
smaDebug("vgId:%d, success to remove %s", TD_VID(pVnode), qTaskFullName);
}
taosArrayRemove(aQTaskInf, i);
continue;
}
++i;
}
}
taosWUnLockLatch(RSMA_FS_LOCK(pStat));
}
......@@ -150,7 +150,7 @@ int32_t smaOpen(SVnode *pVnode, int8_t rollback) {
}
// restore the rsma
if (tdRSmaRestore(pSma, RSMA_RESTORE_REBOOT, pVnode->state.committed) < 0) {
if (tdRSmaRestore(pSma, RSMA_RESTORE_REBOOT, pVnode->state.committed, rollback) < 0) {
goto _err;
}
}
......@@ -181,8 +181,8 @@ int32_t smaClose(SSma *pSma) {
* @param committedVer
* @return int32_t
*/
int32_t tdRSmaRestore(SSma *pSma, int8_t type, int64_t committedVer) {
int32_t tdRSmaRestore(SSma *pSma, int8_t type, int64_t committedVer, int8_t rollback) {
ASSERT(VND_IS_RSMA(pSma->pVnode));
return tdRSmaProcessRestoreImpl(pSma, type, committedVer);
return tdRSmaProcessRestoreImpl(pSma, type, committedVer, rollback);
}
......@@ -28,7 +28,7 @@ SSmaMgmt smaMgmt = {
.rsetId = -1,
};
#define TD_QTASKINFO_FNAME_PREFIX "qinf.v"
#define TD_QTASKINFO_FNAME_PREFIX "main.db"
typedef struct SRSmaQTaskInfoItem SRSmaQTaskInfoItem;
......@@ -59,12 +59,12 @@ struct SRSmaQTaskInfoItem {
void *qTaskInfo;
};
void tdRSmaQTaskInfoGetFileName(int32_t vgId, int64_t version, char *outputName) {
tdGetVndFileName(vgId, NULL, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, version, outputName);
void tdRSmaQTaskInfoGetFileName(int32_t vgId, int64_t suid, int8_t level, int64_t version, char *outputName) {
tdGetVndFileName(vgId, NULL, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, suid, level, version, outputName);
}
void tdRSmaQTaskInfoGetFullName(int32_t vgId, int64_t version, const char *path, char *outputName) {
tdGetVndFileName(vgId, path, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, version, outputName);
void tdRSmaQTaskInfoGetFullName(int32_t vgId, int64_t suid, int8_t level, int64_t version, const char *path, char *outputName) {
tdGetVndFileName(vgId, path, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, suid, level, version, outputName);
}
void tdRSmaQTaskInfoGetFullPath(int32_t vgId, int8_t level, const char *path, char *outputName) {
......@@ -311,8 +311,8 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
pItem->triggerStat = TASK_TRIGGER_STAT_ACTIVE; // fetch the data when reboot
pItem->pStreamState = pStreamState;
if (param->maxdelay[idx] < TSDB_MIN_ROLLUP_MAX_DELAY) {
int64_t msInterval =
convertTimeFromPrecisionToUnit(pRetention[idx + 1].freq, pTsdbCfg->precision, TIME_UNIT_MILLISECOND);
int64_t msInterval =
convertTimeFromPrecisionToUnit(pRetention[idx + 1].freq, pTsdbCfg->precision, TIME_UNIT_MILLISECOND);
pItem->maxDelay = (int32_t)msInterval;
} else {
pItem->maxDelay = (int32_t)param->maxdelay[idx];
......@@ -1190,7 +1190,7 @@ static int32_t tdRSmaRestoreTSDataReload(SSma *pSma) {
}
#endif
int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer) {
int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer, int8_t rollback) {
// step 1: iterate all stables to restore the rsma env
int64_t nTables = 0;
if (tdRSmaRestoreQTaskInfoInit(pSma, &nTables) < 0) {
......@@ -1209,7 +1209,7 @@ int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer)
#endif
// step 3: open SRSmaFS for qTaskFiles
if (tdRSmaFSOpen(pSma, qtaskFileVer) < 0) {
if (tdRSmaFSOpen(pSma, qtaskFileVer, rollback) < 0) {
goto _err;
}
......
......@@ -77,6 +77,7 @@ _err:
}
static int32_t rsmaQTaskInfSnapReaderOpen(SRSmaSnapReader* pReader, int64_t version) {
#if 0
int32_t code = 0;
SSma* pSma = pReader->pSma;
SVnode* pVnode = pSma->pVnode;
......@@ -133,10 +134,12 @@ _end:
}
smaInfo("vgId:%d, vnode snapshot rsma reader open %s succeed", TD_VID(pVnode), qTaskInfoFullName);
#endif
return TSDB_CODE_SUCCESS;
}
static int32_t rsmaQTaskInfSnapReaderClose(SQTaskFReader** ppReader) {
#if 0
if (!(*ppReader)) {
return TSDB_CODE_SUCCESS;
}
......@@ -149,7 +152,7 @@ static int32_t rsmaQTaskInfSnapReaderClose(SQTaskFReader** ppReader) {
tdRSmaFSUnRef(pSma, pStat, version);
taosMemoryFreeClear(*ppReader);
smaInfo("vgId:%d, vnode snapshot rsma reader closed for qTaskInfo version %" PRIi64, SMA_VID(pSma), version);
#endif
return TSDB_CODE_SUCCESS;
}
......@@ -307,6 +310,7 @@ struct SRSmaSnapWriter {
int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapWriter** ppWriter) {
int32_t code = 0;
#if 0
SRSmaSnapWriter* pWriter = NULL;
SVnode* pVnode = pSma->pVnode;
......@@ -361,11 +365,13 @@ _err:
smaError("vgId:%d, rsma snapshot writer open failed since %s", TD_VID(pSma->pVnode), tstrerror(code));
if (pWriter) rsmaSnapWriterClose(&pWriter, 0);
*ppWriter = NULL;
#endif
return code;
}
int32_t rsmaSnapWriterClose(SRSmaSnapWriter** ppWriter, int8_t rollback) {
int32_t code = 0;
#if 0
SRSmaSnapWriter* pWriter = *ppWriter;
SVnode* pVnode = pWriter->pSma->pVnode;
......@@ -399,7 +405,8 @@ int32_t rsmaSnapWriterClose(SRSmaSnapWriter** ppWriter, int8_t rollback) {
pWriter->pQTaskFWriter->fname, qTaskInfoFullName);
// rsma restore
if ((code = tdRSmaRestore(pWriter->pSma, RSMA_RESTORE_SYNC, pWriter->ever)) < 0) {
int8_t rollback = 0;
if ((code = tdRSmaRestore(pWriter->pSma, RSMA_RESTORE_SYNC, pWriter->ever, rollback)) < 0) {
goto _err;
}
smaInfo("vgId:%d, vnode snapshot rsma writer restore from %s succeed", SMA_VID(pWriter->pSma), qTaskInfoFullName);
......@@ -413,6 +420,7 @@ int32_t rsmaSnapWriterClose(SRSmaSnapWriter** ppWriter, int8_t rollback) {
_err:
smaError("vgId:%d, vnode snapshot rsma writer close failed since %s", SMA_VID(pWriter->pSma), tstrerror(code));
#endif
return code;
}
......
......@@ -184,15 +184,15 @@ void tdDestroyTFile(STFile *pTFile) { taosMemoryFreeClear(TD_TFILE_FULL_NAME(pTF
#endif
void tdGetVndFileName(int32_t vgId, const char *pdname, const char *dname, const char *fname, int64_t version,
char *outputName) {
if (version < 0) {
void tdGetVndFileName(int32_t vgId, const char *pdname, const char *dname, const char *fname, int64_t suid,
int8_t level, int64_t version, char *outputName) {
if (version >= 0 && level > 0 && suid > 0) {
if (pdname) {
snprintf(outputName, TSDB_FILENAME_LEN, "%s%svnode%svnode%d%s%s%sv%d%s", pdname, TD_DIRSEP, TD_DIRSEP, vgId,
TD_DIRSEP, dname, TD_DIRSEP, vgId, fname);
snprintf(outputName, TSDB_FILENAME_LEN, "%s%svnode%svnode%d%s%s%s%" PRIi64 "%s%" PRIi8 "%s%s.%" PRIi64, pdname,
TD_DIRSEP, TD_DIRSEP, vgId, TD_DIRSEP, dname, TD_DIRSEP, suid, level, fname, version);
} else {
snprintf(outputName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s%sv%d%s", TD_DIRSEP, vgId, TD_DIRSEP, dname, TD_DIRSEP,
vgId, fname);
snprintf(outputName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s%s%" PRIi64 "%s%" PRIi8 "%s%s." PRIi64, TD_DIRSEP,
vgId, TD_DIRSEP, dname, TD_DIRSEP, suid, level, fname, version);
}
} else {
if (pdname) {
......
......@@ -191,8 +191,7 @@ static void vnodePrepareCommit(SVnode *pVnode) {
tsdbPrepareCommit(pVnode->pTsdb);
metaPrepareAsyncCommit(pVnode->pMeta);
smaPrepareAsyncCommit(pVnode->pSma);
smaPrepareCommit(pVnode->pSma);
vnodeBufPoolUnRef(pVnode->inUse);
pVnode->inUse = NULL;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册