提交 6d01e18f 编写于 作者: K kailixu

chore: rsma fs and vnode commit optimization

上级 c90a41a8
...@@ -700,7 +700,7 @@ int32_t* taosGetErrno(); ...@@ -700,7 +700,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_RSMA_INVALID_ENV TAOS_DEF_ERROR_CODE(0, 0x3150) #define TSDB_CODE_RSMA_INVALID_ENV TAOS_DEF_ERROR_CODE(0, 0x3150)
#define TSDB_CODE_RSMA_INVALID_STAT TAOS_DEF_ERROR_CODE(0, 0x3151) #define TSDB_CODE_RSMA_INVALID_STAT TAOS_DEF_ERROR_CODE(0, 0x3151)
#define TSDB_CODE_RSMA_QTASKINFO_CREATE TAOS_DEF_ERROR_CODE(0, 0x3152) #define TSDB_CODE_RSMA_QTASKINFO_CREATE TAOS_DEF_ERROR_CODE(0, 0x3152)
// #define TSDB_CODE_RSMA_FILE_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x3153) #define TSDB_CODE_RSMA_FS_COMMIT TAOS_DEF_ERROR_CODE(0, 0x3153)
#define TSDB_CODE_RSMA_REMOVE_EXISTS TAOS_DEF_ERROR_CODE(0, 0x3154) #define TSDB_CODE_RSMA_REMOVE_EXISTS TAOS_DEF_ERROR_CODE(0, 0x3154)
#define TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP TAOS_DEF_ERROR_CODE(0, 0x3155) #define TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP TAOS_DEF_ERROR_CODE(0, 0x3155)
#define TSDB_CODE_RSMA_EMPTY_INFO TAOS_DEF_ERROR_CODE(0, 0x3156) #define TSDB_CODE_RSMA_EMPTY_INFO TAOS_DEF_ERROR_CODE(0, 0x3156)
......
...@@ -89,6 +89,7 @@ struct SQTaskFile { ...@@ -89,6 +89,7 @@ struct SQTaskFile {
int64_t suid; int64_t suid;
int64_t version; int64_t version;
int64_t size; int64_t size;
int64_t mtime;
}; };
struct SQTaskFReader { struct SQTaskFReader {
...@@ -246,57 +247,6 @@ static FORCE_INLINE void tdUnRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) { ...@@ -246,57 +247,6 @@ static FORCE_INLINE void tdUnRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) {
smaTrace("vgId:%d, unref rsma info:%p, val:%d", SMA_VID(pSma), pRSmaInfo, ref); smaTrace("vgId:%d, unref rsma info:%p, val:%d", SMA_VID(pSma), pRSmaInfo, ref);
} }
// smaFileUtil ================
#define TD_FILE_HEAD_SIZE 512
typedef struct STFInfo STFInfo;
typedef struct STFile STFile;
struct STFInfo {
// common fields
uint32_t magic;
uint32_t ftype;
uint32_t fver;
int64_t fsize;
};
enum {
TD_FTYPE_RSMA_QTASKINFO = 0,
};
#if 0
struct STFile {
uint8_t state;
STFInfo info;
char *fname;
TdFilePtr pFile;
};
#define TD_TFILE_PFILE(tf) ((tf)->pFile)
#define TD_TFILE_OPENED(tf) (TD_TFILE_PFILE(tf) != NULL)
#define TD_TFILE_FULL_NAME(tf) ((tf)->fname)
#define TD_TFILE_OPENED(tf) (TD_TFILE_PFILE(tf) != NULL)
#define TD_TFILE_CLOSED(tf) (!TD_TFILE_OPENED(tf))
#define TD_TFILE_SET_CLOSED(f) (TD_TFILE_PFILE(f) = NULL)
#define TD_TFILE_SET_STATE(tf, s) ((tf)->state = (s))
int32_t tdInitTFile(STFile *pTFile, const char *dname, const char *fname);
int32_t tdCreateTFile(STFile *pTFile, bool updateHeader, int8_t fType);
int32_t tdOpenTFile(STFile *pTFile, int flags);
int64_t tdReadTFile(STFile *pTFile, void *buf, int64_t nbyte);
int64_t tdSeekTFile(STFile *pTFile, int64_t offset, int whence);
int64_t tdWriteTFile(STFile *pTFile, void *buf, int64_t nbyte);
int64_t tdAppendTFile(STFile *pTFile, void *buf, int64_t nbyte, int64_t *offset);
int64_t tdGetTFileSize(STFile *pTFile, int64_t *size);
int32_t tdRemoveTFile(STFile *pTFile);
int32_t tdLoadTFileHeader(STFile *pTFile, STFInfo *pInfo);
int32_t tdUpdateTFileHeader(STFile *pTFile);
void tdUpdateTFileMagic(STFile *pTFile, void *pCksm);
void tdCloseTFile(STFile *pTFile);
void tdDestroyTFile(STFile *pTFile);
#endif
void tdRSmaGetFileName(int32_t vgId, const char *pdname, const char *dname, const char *fname, int64_t suid, void tdRSmaGetFileName(int32_t vgId, const char *pdname, const char *dname, const char *fname, int64_t suid,
int8_t level, int64_t version, char *outputName); int8_t level, int64_t version, char *outputName);
void tdRSmaGetDirName(int32_t vgId, const char *pdname, const char *dname, bool endWithSep, char *outputName); void tdRSmaGetDirName(int32_t vgId, const char *pdname, const char *dname, bool endWithSep, char *outputName);
......
...@@ -187,9 +187,11 @@ static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pStat) { ...@@ -187,9 +187,11 @@ static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pStat) {
* @return int32_t * @return int32_t
*/ */
static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
int32_t code = 0;
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
if (!pEnv) { if (!pEnv) {
return TSDB_CODE_SUCCESS; return code;
} }
SSmaStat *pStat = SMA_ENV_STAT(pEnv); SSmaStat *pStat = SMA_ENV_STAT(pEnv);
...@@ -240,8 +242,8 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { ...@@ -240,8 +242,8 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
} }
} }
smaInfo("vgId:%d, rsma commit, all items are consumed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); smaInfo("vgId:%d, rsma commit, all items are consumed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
if (tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat)) < 0) { if ((code = tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat))) != 0) {
return TSDB_CODE_FAILED; return code;
} }
smaInfo("vgId:%d, rsma commit, operator state committed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); smaInfo("vgId:%d, rsma commit, operator state committed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
...@@ -272,7 +274,7 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { ...@@ -272,7 +274,7 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
if ((pTsdb = VND_RSMA1(pSma->pVnode))) tsdbPrepareCommit(pTsdb); if ((pTsdb = VND_RSMA1(pSma->pVnode))) tsdbPrepareCommit(pTsdb);
if ((pTsdb = VND_RSMA2(pSma->pVnode))) tsdbPrepareCommit(pTsdb); if ((pTsdb = VND_RSMA2(pSma->pVnode))) tsdbPrepareCommit(pTsdb);
return TSDB_CODE_SUCCESS; return code;
} }
/** /**
......
...@@ -247,6 +247,8 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS ...@@ -247,6 +247,8 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
taosInitRWLatch(RSMA_FS_LOCK(pRSmaStat));
} else if (smaType == TSDB_SMA_TYPE_TIME_RANGE) { } else if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
// TODO // TODO
} else { } else {
......
...@@ -27,6 +27,7 @@ static FORCE_INLINE int32_t tPutQTaskF(uint8_t *p, SQTaskFile *pFile) { ...@@ -27,6 +27,7 @@ static FORCE_INLINE int32_t tPutQTaskF(uint8_t *p, SQTaskFile *pFile) {
n += tPutI64v(p ? p + n : p, pFile->size); n += tPutI64v(p ? p + n : p, pFile->size);
n += tPutI64v(p ? p + n : p, pFile->suid); n += tPutI64v(p ? p + n : p, pFile->suid);
n += tPutI64v(p ? p + n : p, pFile->version); n += tPutI64v(p ? p + n : p, pFile->version);
n += tPutI64v(p ? p + n : p, pFile->mtime);
return n; return n;
} }
...@@ -54,6 +55,7 @@ int32_t tdRSmaGetQTaskF(uint8_t *p, SQTaskFile *pFile) { ...@@ -54,6 +55,7 @@ int32_t tdRSmaGetQTaskF(uint8_t *p, SQTaskFile *pFile) {
n += tGetI64v(p + n, &pFile->size); n += tGetI64v(p + n, &pFile->size);
n += tGetI64v(p + n, &pFile->suid); n += tGetI64v(p + n, &pFile->suid);
n += tGetI64v(p + n, &pFile->version); n += tGetI64v(p + n, &pFile->version);
n += tGetI64v(p + n, &pFile->mtime);
return n; return n;
} }
...@@ -157,7 +159,7 @@ static void tdRSmaGetCurrentFName(SSma *pSma, char *current, char *current_t) { ...@@ -157,7 +159,7 @@ static void tdRSmaGetCurrentFName(SSma *pSma, char *current, char *current_t) {
TD_DIRSEP, TD_DIRSEP, TD_VID(pVnode), TD_DIRSEP, TD_DIRSEP); TD_DIRSEP, TD_DIRSEP, TD_VID(pVnode), TD_DIRSEP, TD_DIRSEP);
} }
if (current_t) { if (current_t) {
snprintf(current, TSDB_FILENAME_LEN - 1, "%s%svnode%svnode%d%srsma%sPRESENT.t", tfsGetPrimaryPath(pVnode->pTfs), snprintf(current_t, TSDB_FILENAME_LEN - 1, "%s%svnode%svnode%d%srsma%sPRESENT.t", tfsGetPrimaryPath(pVnode->pTfs),
TD_DIRSEP, TD_DIRSEP, TD_VID(pVnode), TD_DIRSEP, TD_DIRSEP); TD_DIRSEP, TD_DIRSEP, TD_VID(pVnode), TD_DIRSEP, TD_DIRSEP);
} }
} else { } else {
...@@ -249,7 +251,7 @@ static int32_t tdQTaskInfCmprFn1(const void *p1, const void *p2) { ...@@ -249,7 +251,7 @@ static int32_t tdQTaskInfCmprFn1(const void *p1, const void *p2) {
return 0; return 0;
} }
static int32_t tdRSmaFSApplyChange(SSma *pSma, SRSmaFS *pFS) { static int32_t tdRSmaFSApplyChange(SSma *pSma, SRSmaFS *pFSNew) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
SVnode *pVnode = pSma->pVnode; SVnode *pVnode = pSma->pVnode;
...@@ -260,10 +262,10 @@ static int32_t tdRSmaFSApplyChange(SSma *pSma, SRSmaFS *pFS) { ...@@ -260,10 +262,10 @@ static int32_t tdRSmaFSApplyChange(SSma *pSma, SRSmaFS *pFS) {
char fname[TSDB_FILENAME_LEN] = {0}; char fname[TSDB_FILENAME_LEN] = {0};
// SQTaskFile // SQTaskFile
int32_t nNew = taosArrayGetSize(pFS->aQTaskInf); int32_t nNew = taosArrayGetSize(pFSNew->aQTaskInf);
int32_t iNew = 0; int32_t iNew = 0;
while (iNew < nNew) { while (iNew < nNew) {
SQTaskFile *pQTaskFNew = TARRAY_GET_ELEM(pFS->aQTaskInf, iNew); SQTaskFile *pQTaskFNew = TARRAY_GET_ELEM(pFSNew->aQTaskInf, iNew++);
int32_t idx = taosArraySearchIdx(pFSOld->aQTaskInf, pQTaskFNew, tdQTaskInfCmprFn1, TD_GE); int32_t idx = taosArraySearchIdx(pFSOld->aQTaskInf, pQTaskFNew, tdQTaskInfCmprFn1, TD_GE);
...@@ -272,22 +274,22 @@ static int32_t tdRSmaFSApplyChange(SSma *pSma, SRSmaFS *pFS) { ...@@ -272,22 +274,22 @@ static int32_t tdRSmaFSApplyChange(SSma *pSma, SRSmaFS *pFS) {
pQTaskFNew->nRef = 1; pQTaskFNew->nRef = 1;
} else { } else {
SQTaskFile *pTaskF = (SQTaskFile *)taosArrayGet(pFSOld->aQTaskInf, idx); SQTaskFile *pTaskF = (SQTaskFile *)taosArrayGet(pFSOld->aQTaskInf, idx);
int32_t c = tdQTaskInfCmprFn1(pTaskF, pQTaskFNew); int32_t c = tdQTaskInfCmprFn1(pQTaskFNew, pTaskF);
if (c < 0) { if (c == 0) {
l // utilize the item in pFSOld->qQTaskInf, instead of pFSNew
} else if(c == 0) {
++iNew;
continue; continue;
} else if (c < 0) {
// NOTHING TODO
} else { } else {
ASSERT(0); ASSERT(0);
continue;
} }
} }
if (taosArrayInsert(pFS->aQTaskInf, idx, qTaskF) == NULL) { if (taosArrayInsert(pFSOld->aQTaskInf, idx, pQTaskFNew) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit; goto _exit;
} }
} }
_exit: _exit:
...@@ -500,7 +502,7 @@ int32_t tdRSmaFSPrepareCommit(SSma *pSma, SRSmaFS *pFSNew) { ...@@ -500,7 +502,7 @@ int32_t tdRSmaFSPrepareCommit(SSma *pSma, SRSmaFS *pFSNew) {
tdRSmaGetCurrentFName(pSma, NULL, tfname); tdRSmaGetCurrentFName(pSma, NULL, tfname);
// gnrt PRESENT.t // generate PRESENT.t
code = tdRSmaSaveFSToFile(pFSNew, tfname); code = tdRSmaSaveFSToFile(pFSNew, tfname);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
...@@ -600,13 +602,12 @@ int32_t tdRSmaFSUpsertQTaskFile(SRSmaFS *pFS, SQTaskFile *qTaskFile, int32_t siz ...@@ -600,13 +602,12 @@ int32_t tdRSmaFSUpsertQTaskFile(SRSmaFS *pFS, SQTaskFile *qTaskFile, int32_t siz
SQTaskFile *pTaskF = (SQTaskFile *)taosArrayGet(pFS->aQTaskInf, idx); SQTaskFile *pTaskF = (SQTaskFile *)taosArrayGet(pFS->aQTaskInf, idx);
int32_t c = tdQTaskInfCmprFn1(pTaskF, qTaskF); int32_t c = tdQTaskInfCmprFn1(pTaskF, qTaskF);
if (c == 0) { if (c == 0) {
ASSERT(pTaskF->size == qTaskF->size);
ASSERT(0); ASSERT(0);
goto _exit; continue;
} }
} }
if (taosArrayInsert(pFS->aQTaskInf, idx, qTaskF) == NULL) { if (!taosArrayInsert(pFS->aQTaskInf, idx, qTaskF)) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit; goto _exit;
} }
...@@ -705,10 +706,10 @@ int32_t tdRSmaFSTakeSnapshot(SSma *pSma, SRSmaFS *pFSOut) { ...@@ -705,10 +706,10 @@ int32_t tdRSmaFSTakeSnapshot(SSma *pSma, SRSmaFS *pFSOut) {
taosRLockLatch(RSMA_FS_LOCK(pStat)); taosRLockLatch(RSMA_FS_LOCK(pStat));
code = tdRSmaFSCopy(pSma, pFSOut); code = tdRSmaFSCopy(pSma, pFSOut);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
taosWUnLockLatch(RSMA_FS_LOCK(pStat)); taosRUnLockLatch(RSMA_FS_LOCK(pStat));
_exit: _exit:
if (code) { if (code) {
taosWUnLockLatch(RSMA_FS_LOCK(pStat)); taosRUnLockLatch(RSMA_FS_LOCK(pStat));
smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pSma->pVnode), __func__, lino, tstrerror(code)); smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pSma->pVnode), __func__, lino, tstrerror(code));
} }
return code; return code;
...@@ -720,13 +721,13 @@ int32_t tdRSmaFSCopy(SSma *pSma, SRSmaFS *pFSOut) { ...@@ -720,13 +721,13 @@ int32_t tdRSmaFSCopy(SSma *pSma, SRSmaFS *pFSOut) {
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
SRSmaFS *pFS = RSMA_FS(pStat); SRSmaFS *pFS = RSMA_FS(pStat);
int32_t size = 0; int32_t size = taosArrayGetSize(pFS->aQTaskInf);
code = tdRSmaFSCreate(pFSOut, size); code = tdRSmaFSCreate(pFSOut, size);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
taosArraySetSize(pFSOut->aQTaskInf, size); taosArraySetSize(pFSOut->aQTaskInf, size);
memcpy(TARRAY_GET_ELEM(pFSOut->aQTaskInf, 0), TARRAY_GET_ELEM(pFS->aQTaskInf, 0), size * sizeof(SQTaskFile)); memcpy(pFSOut->aQTaskInf->pData, pFS->aQTaskInf->pData, size * sizeof(SQTaskFile));
_exit: _exit:
if (code) { if (code) {
......
...@@ -344,13 +344,8 @@ int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con ...@@ -344,13 +344,8 @@ int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t)); pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t));
if (pRSmaInfo) { if (pRSmaInfo) {
// TODO: free original pRSmaInfo if exists abnormally smaInfo("vgId:%d, rsma info already exists for table %s, %" PRIi64, SMA_VID(pSma), tbName, suid);
tdFreeRSmaInfo(pSma, *(SRSmaInfo **)pRSmaInfo, true); return TSDB_CODE_SUCCESS;
if (taosHashRemove(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t)) < 0) {
terrno = TSDB_CODE_RSMA_REMOVE_EXISTS;
goto _err;
}
smaWarn("vgId:%d, remove the rsma info already exists for table %s, %" PRIi64, SMA_VID(pSma), tbName, suid);
} }
// from write queue: single thead // from write queue: single thead
...@@ -1228,7 +1223,7 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { ...@@ -1228,7 +1223,7 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
code = TSDB_CODE_RSMA_STREAM_STATE_COMMIT; code = TSDB_CODE_RSMA_STREAM_STATE_COMMIT;
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
smaDebug("vgId:%d, rsma persist, stream state commit success, table %" PRIi64 " level %d", TD_VID(pVnode), smaDebug("vgId:%d, rsma persist, stream state commit success, table %" PRIi64 ", level %d", TD_VID(pVnode),
pRSmaInfo->suid, i + 1); pRSmaInfo->suid, i + 1);
// qTaskInfo file // qTaskInfo file
...@@ -1250,22 +1245,27 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { ...@@ -1250,22 +1245,27 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
int64_t size = 0; int64_t size = 0;
if (taosFStatFile(pInFD, &size, NULL) < 0) { uint32_t mtime = 0;
if (taosFStatFile(pInFD, &size, &mtime) < 0) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
ASSERT(size > 0);
int64_t offset = 0; int64_t offset = 0;
if (taosFSendFile(pOutFD, pInFD, &offset, size) < 0) { if (taosFSendFile(pOutFD, pInFD, &offset, size) < 0) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
smaError("vgId:%d, rsma persist, send qtaskinfo file %s failed since %s", TD_VID(pVnode), fname, tstrerror(code)); smaError("vgId:%d, rsma persist, send qtaskinfo file %s to %s failed since %s", TD_VID(pVnode), fname,
fnameVer, tstrerror(code));
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
taosCloseFile(&pOutFD); taosCloseFile(&pOutFD);
taosCloseFile(&pInFD); taosCloseFile(&pInFD);
SQTaskFile qTaskF = {.nRef = 1, .level = i + 1, .suid = pRSmaInfo->suid, .version = version, .size = size}; SQTaskFile qTaskF = {
.nRef = 1, .level = i + 1, .suid = pRSmaInfo->suid, .version = version, .size = size, .mtime = mtime};
taosArrayPush(qTaskFArray, &qTaskF); taosArrayPush(qTaskFArray, &qTaskF);
} }
} }
...@@ -1274,8 +1274,10 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { ...@@ -1274,8 +1274,10 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
// prepare // prepare
code = tdRSmaFSTakeSnapshot(pSma, &fs); code = tdRSmaFSTakeSnapshot(pSma, &fs);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
code = tdRSmaFSUpsertQTaskFile(&fs, qTaskFArray->pData, taosArrayGetSize(qTaskFArray)); code = tdRSmaFSUpsertQTaskFile(&fs, qTaskFArray->pData, taosArrayGetSize(qTaskFArray));
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
code = tdRSmaFSPrepareCommit(pSma, &fs); code = tdRSmaFSPrepareCommit(pSma, &fs);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
...@@ -1283,11 +1285,13 @@ _exit: ...@@ -1283,11 +1285,13 @@ _exit:
taosArrayDestroy(fs.aQTaskInf); taosArrayDestroy(fs.aQTaskInf);
taosArrayDestroy(qTaskFArray); taosArrayDestroy(qTaskFArray);
if (code) { if (code) {
if (pOutFD) taosCloseFile(&pOutFD); if (pOutFD) taosCloseFile(&pOutFD);
if (pInFD) taosCloseFile(&pInFD); if (pInFD) taosCloseFile(&pInFD);
smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
} }
terrno = code; terrno = code;
return code; return code;
} }
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
#include "sma.h" #include "sma.h"
#define TD_QTASKINFO_FNAME_PREFIX "main.db" #define TD_QTASKINFO_FNAME_PREFIX "main.tdb"
void tdRSmaQTaskInfoGetFileName(int32_t vgId, int64_t suid, int8_t level, int64_t version, char *outputName) { void tdRSmaQTaskInfoGetFileName(int32_t vgId, int64_t suid, int8_t level, int64_t version, char *outputName) {
tdRSmaGetFileName(vgId, NULL, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, suid, level, version, outputName); tdRSmaGetFileName(vgId, NULL, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, suid, level, version, outputName);
......
...@@ -184,16 +184,21 @@ _err: ...@@ -184,16 +184,21 @@ _err:
return -1; return -1;
} }
static void vnodePrepareCommit(SVnode *pVnode) { static int32_t vnodePrepareCommit(SVnode *pVnode) {
int32_t code = 0;
tsem_wait(&pVnode->canCommit); tsem_wait(&pVnode->canCommit);
tsdbPrepareCommit(pVnode->pTsdb); tsdbPrepareCommit(pVnode->pTsdb);
metaPrepareAsyncCommit(pVnode->pMeta); metaPrepareAsyncCommit(pVnode->pMeta);
smaPrepareAsyncCommit(pVnode->pSma); code = smaPrepareAsyncCommit(pVnode->pSma);
if (code) goto _exit;
_exit:
vnodeBufPoolUnRef(pVnode->inUse); vnodeBufPoolUnRef(pVnode->inUse);
pVnode->inUse = NULL; pVnode->inUse = NULL;
return code;
} }
static int32_t vnodeCommitTask(void *arg) { static int32_t vnodeCommitTask(void *arg) {
int32_t code = 0; int32_t code = 0;
...@@ -203,10 +208,9 @@ static int32_t vnodeCommitTask(void *arg) { ...@@ -203,10 +208,9 @@ static int32_t vnodeCommitTask(void *arg) {
code = vnodeCommitImpl(pInfo); code = vnodeCommitImpl(pInfo);
if (code) goto _exit; if (code) goto _exit;
_exit:
// end commit // end commit
tsem_post(&pInfo->pVnode->canCommit); tsem_post(&pInfo->pVnode->canCommit);
_exit:
taosMemoryFree(pInfo); taosMemoryFree(pInfo);
return code; return code;
} }
...@@ -214,7 +218,8 @@ int vnodeAsyncCommit(SVnode *pVnode) { ...@@ -214,7 +218,8 @@ int vnodeAsyncCommit(SVnode *pVnode) {
int32_t code = 0; int32_t code = 0;
// prepare to commit // prepare to commit
vnodePrepareCommit(pVnode); code = vnodePrepareCommit(pVnode);
if (code) goto _exit;
// schedule the task // schedule the task
pVnode->state.commitTerm = pVnode->state.applyTerm; pVnode->state.commitTerm = pVnode->state.applyTerm;
...@@ -230,14 +235,15 @@ int vnodeAsyncCommit(SVnode *pVnode) { ...@@ -230,14 +235,15 @@ int vnodeAsyncCommit(SVnode *pVnode) {
pInfo->info.state.commitID = pVnode->state.commitID; pInfo->info.state.commitID = pVnode->state.commitID;
pInfo->pVnode = pVnode; pInfo->pVnode = pVnode;
pInfo->txn = metaGetTxn(pVnode->pMeta); pInfo->txn = metaGetTxn(pVnode->pMeta);
vnodeScheduleTask(vnodeCommitTask, pInfo); code = vnodeScheduleTask(vnodeCommitTask, pInfo);
_exit: _exit:
if (code) { if (code) {
vError("vgId:%d %s failed since %s, commit id:%" PRId64, TD_VID(pVnode), __func__, tstrerror(code), tsem_post(&pVnode->canCommit);
vError("vgId:%d, %s failed since %s, commit id:%" PRId64, TD_VID(pVnode), __func__, tstrerror(code),
pVnode->state.commitID); pVnode->state.commitID);
} else { } else {
vDebug("vgId:%d %s done", TD_VID(pVnode), __func__); vDebug("vgId:%d, %s done", TD_VID(pVnode), __func__);
} }
return code; return code;
} }
......
...@@ -15,13 +15,13 @@ ...@@ -15,13 +15,13 @@
#include "vnd.h" #include "vnd.h"
#define VNODE_GET_LOAD_RESET_VALS(pVar, oVal, vType, tags) \ #define VNODE_GET_LOAD_RESET_VALS(pVar, oVal, vType, tags) \
do { \ do { \
int##vType##_t newVal = atomic_sub_fetch_##vType(&(pVar), (oVal)); \ int##vType##_t newVal = atomic_sub_fetch_##vType(&(pVar), (oVal)); \
ASSERT(newVal >= 0); \ ASSERT(newVal >= 0); \
if (newVal < 0) { \ if (newVal < 0) { \
vWarn("vgId:%d %s, abnormal val:%" PRIi64 ", old val:%" PRIi64, TD_VID(pVnode), tags, newVal, (oVal)); \ vWarn("vgId:%d, %s, abnormal val:%" PRIi64 ", old val:%" PRIi64, TD_VID(pVnode), tags, newVal, (oVal)); \
} \ } \
} while (0) } while (0)
int vnodeQueryOpen(SVnode *pVnode) { int vnodeQueryOpen(SVnode *pVnode) {
......
...@@ -317,7 +317,10 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp ...@@ -317,7 +317,10 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
// commit if need // commit if need
if (vnodeShouldCommit(pVnode)) { if (vnodeShouldCommit(pVnode)) {
vInfo("vgId:%d, commit at version %" PRId64, TD_VID(pVnode), version); vInfo("vgId:%d, commit at version %" PRId64, TD_VID(pVnode), version);
vnodeAsyncCommit(pVnode); if (vnodeAsyncCommit(pVnode) < 0) {
vError("vgId:%d, failed to vnode async commit since %s.", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
// start a new one // start a new one
if (vnodeBegin(pVnode) < 0) { if (vnodeBegin(pVnode) < 0) {
......
...@@ -585,6 +585,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_NO_INDEX_IN_CACHE, "No tsma index in ca ...@@ -585,6 +585,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_NO_INDEX_IN_CACHE, "No tsma index in ca
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_ENV, "Invalid rsma env") TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_ENV, "Invalid rsma env")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_STAT, "Invalid rsma state") TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_STAT, "Invalid rsma state")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_QTASKINFO_CREATE, "Rsma qtaskinfo creation error") TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_QTASKINFO_CREATE, "Rsma qtaskinfo creation error")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FS_COMMIT, "Invalid rsma fs commit")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_REMOVE_EXISTS, "Rsma remove exists") TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_REMOVE_EXISTS, "Rsma remove exists")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP, "Rsma fetch msg is messed up") TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP, "Rsma fetch msg is messed up")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_EMPTY_INFO, "Rsma info is empty") TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_EMPTY_INFO, "Rsma info is empty")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册