diff --git a/source/dnode/vnode/src/sma/smaFS.c b/source/dnode/vnode/src/sma/smaFS.c index a315e1ccb38c71d025ff9c68872ea05eb619600f..a515674dd1219d5620d10a8a197f44acf60e85d8 100644 --- a/source/dnode/vnode/src/sma/smaFS.c +++ b/source/dnode/vnode/src/sma/smaFS.c @@ -320,106 +320,9 @@ _exit: return code; } -/** - * @brief Fetch qtaskfiles LE than version - * - * @param pSma - * @param version - * @param output - * @return int32_t - */ -#if 0 -static int32_t tdFetchQTaskInfoFiles(SSma *pSma, int64_t version, SArray **output) { - SVnode *pVnode = pSma->pVnode; - TdDirPtr pDir = NULL; - TdDirEntryPtr pDirEntry = NULL; - char dir[TSDB_FILENAME_LEN]; - const char *pattern = "v[0-9]+qinf\\.v([0-9]+)?$"; - regex_t regex; - int code = 0; - - terrno = TSDB_CODE_SUCCESS; - - tdRSmaGetDirName(TD_VID(pVnode), tfsGetPrimaryPath(pVnode->pTfs), VNODE_RSMA_DIR, true, dir); - - if (!taosCheckExistFile(dir)) { - smaDebug("vgId:%d, fetch qtask files, no need as dir %s not exist", TD_VID(pVnode), dir); - return TSDB_CODE_SUCCESS; - } - - // Resource allocation and init - if ((code = regcomp(®ex, pattern, REG_EXTENDED)) != 0) { - terrno = TSDB_CODE_RSMA_REGEX_MATCH; - char errbuf[128]; - regerror(code, ®ex, errbuf, sizeof(errbuf)); - smaWarn("vgId:%d, fetch qtask files, regcomp for %s failed since %s", TD_VID(pVnode), dir, errbuf); - return TSDB_CODE_FAILED; - } - - if (!(pDir = taosOpenDir(dir))) { - regfree(®ex); - terrno = TAOS_SYSTEM_ERROR(errno); - smaError("vgId:%d, fetch qtask files, open dir %s failed since %s", TD_VID(pVnode), dir, terrstr()); - return TSDB_CODE_FAILED; - } - - int32_t dirLen = strlen(dir); - char *dirEnd = POINTER_SHIFT(dir, dirLen); - regmatch_t regMatch[2]; - while ((pDirEntry = taosReadDir(pDir))) { - char *entryName = taosGetDirEntryName(pDirEntry); - if (!entryName) { - continue; - } - - code = regexec(®ex, entryName, 2, regMatch, 0); - - if (code == 0) { - // match - smaInfo("vgId:%d, fetch qtask files, max ver:%" PRIi64 ", %s found", TD_VID(pVnode), version, entryName); - - int64_t ver = -1; - sscanf((const char *)POINTER_SHIFT(entryName, regMatch[1].rm_so), "%" PRIi64, &ver); - if ((ver <= version) && (ver > -1)) { - if (!(*output)) { - if (!(*output = taosArrayInit(1, POINTER_BYTES))) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto _end; - } - } - char *entryDup = strdup(entryName); - if (!entryDup) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto _end; - } - if (!taosArrayPush(*output, &entryDup)) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto _end; - } - } else { - } - } else if (code == REG_NOMATCH) { - // not match - smaTrace("vgId:%d, fetch qtask files, not match %s", TD_VID(pVnode), entryName); - continue; - } else { - // has other error - char errbuf[128]; - regerror(code, ®ex, errbuf, sizeof(errbuf)); - smaWarn("vgId:%d, fetch qtask files, regexec failed since %s", TD_VID(pVnode), errbuf); - terrno = TSDB_CODE_RSMA_REGEX_MATCH; - goto _end; - } - } -_end: - taosCloseDir(&pDir); - regfree(®ex); - return terrno == 0 ? TSDB_CODE_SUCCESS : TSDB_CODE_FAILED; -} -#endif - static int32_t tdRSmaFSScanAndTryFix(SSma *pSma) { int32_t code = 0; +#if 0 int32_t lino = 0; SVnode *pVnode = pSma->pVnode; SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); @@ -462,6 +365,7 @@ _exit: if (code) { smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); } +#endif return code; } @@ -497,10 +401,9 @@ int32_t tdRSmaFSOpen(SSma *pSma, int64_t version, int8_t rollback) { } } } else { - // 1st open with empty current/qTaskInfoFile + // 1st time open with empty current/qTaskInfoFile code = tdRSmaSaveFSToFile(RSMA_FS(pStat), current); TSDB_CHECK_CODE(code, lino, _exit); - ASSERT(!rollback); } // scan and try fix(remove main.db/main.db.xxx and use the one with version) diff --git a/source/dnode/vnode/src/sma/smaSnapshot.c b/source/dnode/vnode/src/sma/smaSnapshot.c index 356a52e9964368f296b49b176ec84094ddf7fc8c..c68525a493b83a8baadeb86c8133c00f2dad09a0 100644 --- a/source/dnode/vnode/src/sma/smaSnapshot.c +++ b/source/dnode/vnode/src/sma/smaSnapshot.c @@ -101,18 +101,18 @@ static int32_t rsmaSnapReadQTaskInfo(SRSmaSnapReader* pReader, uint8_t** ppBuf) if (!qReader) { *ppBuf = NULL; - smaInfo("vgId:%d, vnode snapshot rsma reader qtaskinfo, qTaskReader is NULL", TD_VID(pVnode)); - return 0; + smaInfo("vgId:%d, vnode snapshot rsma reader qtaskinfo, not needed since qTaskReader is NULL", TD_VID(pVnode)); + goto _exit; } if (pReader->fsIter >= taosArrayGetSize(pFS->aQTaskInf)) { *ppBuf = NULL; smaInfo("vgId:%d, vnode snapshot rsma reader qtaskinfo, fsIter reach end", TD_VID(pVnode)); - return 0; + goto _exit; } while (pReader->fsIter < taosArrayGetSize(pFS->aQTaskInf)) { - SQTaskFile* qTaskF = taosArrayGet(pFS->aQTaskInf, pReader->fsIter); + SQTaskFile* qTaskF = taosArrayGet(pFS->aQTaskInf, pReader->fsIter++); if (qTaskF->version != version) { continue; } @@ -120,10 +120,11 @@ static int32_t rsmaSnapReadQTaskInfo(SRSmaSnapReader* pReader, uint8_t** ppBuf) tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), qTaskF->suid, qTaskF->level, version, tfsGetPrimaryPath(pVnode->pTfs), fname); if (!taosCheckExistFile(fname)) { - smaWarn("vgId:%d, vnode snapshot rsma reader for qtaskinfo, table %" PRIi64 ", level %" PRIi8 ", version %" PRIi64 - " is not needed as %s not exist", - TD_VID(pVnode), qTaskF->suid, qTaskF->level, version, fname); - continue; + smaError("vgId:%d, vnode snapshot rsma reader for qtaskinfo, table %" PRIi64 ", level %" PRIi8 + ", version %" PRIi64 " failed since %s not exist", + TD_VID(pVnode), qTaskF->suid, qTaskF->level, version, fname); + code = TSDB_CODE_RSMA_FS_SYNC; + TSDB_CHECK_CODE(code, lino, _exit); } TdFilePtr fp = taosOpenFile(fname, TD_FILE_READ); @@ -139,7 +140,7 @@ static int32_t rsmaSnapReadQTaskInfo(SRSmaSnapReader* pReader, uint8_t** ppBuf) if (!qReader->pReadH) { *ppBuf = NULL; smaInfo("vgId:%d, vnode snapshot rsma reader qtaskinfo, not needed since readh is NULL", TD_VID(pVnode)); - return 0; + goto _exit; } int64_t size = 0; @@ -184,6 +185,8 @@ static int32_t rsmaSnapReadQTaskInfo(SRSmaSnapReader* pReader, uint8_t** ppBuf) pHdr->size = size; _exit: + if (qReader) taosCloseFile(&qReader->pReadH); + if (code) { *ppBuf = NULL; smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); @@ -234,6 +237,7 @@ int32_t rsmaSnapRead(SRSmaSnapReader* pReader, uint8_t** ppData) { _exit: if (code) { + rsmaSnapReaderClose(&pReader); smaError("vgId:%d, vnode snapshot rsma read failed since %s", SMA_VID(pReader->pSma), tstrerror(code)); } else { smaInfo("vgId:%d, vnode snapshot rsma read succeed", SMA_VID(pReader->pSma)); @@ -246,10 +250,7 @@ int32_t rsmaSnapReaderClose(SRSmaSnapReader** ppReader) { SRSmaSnapReader* pReader = *ppReader; tdRSmaFSUnRef(pReader->pSma, &pReader->fs); - if (pReader->pQTaskFReader) { - taosCloseFile(&pReader->pQTaskFReader->pReadH); - taosMemoryFree(pReader->pQTaskFReader); - } + taosMemoryFreeClear(pReader->pQTaskFReader); for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { if (pReader->pDataReader[i]) { @@ -319,10 +320,10 @@ int32_t rsmaSnapWriterPrepareClose(SRSmaSnapWriter* pWriter) { int32_t code = 0; int32_t lino = 0; - // rsmaSnapWriterClose - - code = tdRSmaFSPrepareCommit(pWriter->pSma, &pWriter->fs); - TSDB_CHECK_CODE(code, lino, _exit); + if (pWriter) { + code = tdRSmaFSPrepareCommit(pWriter->pSma, &pWriter->fs); + TSDB_CHECK_CODE(code, lino, _exit); + } _exit: if (code) { @@ -335,17 +336,25 @@ int32_t rsmaSnapWriterClose(SRSmaSnapWriter** ppWriter, int8_t rollback) { int32_t code = 0; int32_t lino = 0; SSma* pSma = NULL; + SVnode* pVnode = NULL; SSmaEnv* pEnv = NULL; SRSmaStat* pStat = NULL; SRSmaSnapWriter* pWriter = *ppWriter; + const char* primaryPath = NULL; + char fname[TSDB_FILENAME_LEN] = {0}; + char fnameVer[TSDB_FILENAME_LEN] = {0}; + TdFilePtr pOutFD = NULL; + TdFilePtr pInFD = NULL; if (!pWriter) { goto _exit; } pSma = pWriter->pSma; + pVnode = pSma->pVnode; pEnv = SMA_RSMA_ENV(pSma); pStat = (SRSmaStat*)SMA_ENV_STAT(pEnv); + primaryPath = tfsGetPrimaryPath(pVnode->pTfs); // rsma1/rsma2 for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { @@ -360,6 +369,45 @@ int32_t rsmaSnapWriterClose(SRSmaSnapWriter** ppWriter, int8_t rollback) { tdRSmaFSRollback(pSma); // remove qTaskFiles } else { + // sendFile from fname.Ver to fname + SRSmaFS* pFS = &pWriter->fs; + int32_t size = taosArrayGetSize(pFS->aQTaskInf); + for (int32_t i = 0; i < size; ++i) { + SQTaskFile* pTaskF = TARRAY_GET_ELEM(pFS->aQTaskInf, i); + if (pTaskF->version == pWriter->ever) { + tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->suid, pTaskF->level, pTaskF->version, primaryPath, fnameVer); + tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->suid, pTaskF->level, -1, primaryPath, fname); + + pInFD = taosOpenFile(fnameVer, TD_FILE_READ); + if (pInFD == NULL) { + code = TAOS_SYSTEM_ERROR(errno); + TSDB_CHECK_CODE(code, lino, _exit); + } + + pOutFD = taosCreateFile(fname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC); + if (pOutFD == NULL) { + code = TAOS_SYSTEM_ERROR(errno); + TSDB_CHECK_CODE(code, lino, _exit); + } + + int64_t size = 0; + if (taosFStatFile(pInFD, &size, NULL) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + TSDB_CHECK_CODE(code, lino, _exit); + } + + int64_t offset = 0; + if (taosFSendFile(pOutFD, pInFD, &offset, size) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + smaError("vgId:%d, vnode snapshot rsma writer, send qtaskinfo file %s to %s failed since %s", TD_VID(pVnode), + fnameVer, fname, tstrerror(code)); + TSDB_CHECK_CODE(code, lino, _exit); + } + taosCloseFile(&pOutFD); + taosCloseFile(&pInFD); + } + } + // lock taosWLockLatch(RSMA_FS_LOCK(pStat)); code = tdRSmaFSCommit(pSma); @@ -380,6 +428,8 @@ _exit: if (pWriter) taosMemoryFree(pWriter); *ppWriter = NULL; if (code) { + if (pOutFD) taosCloseFile(&pOutFD); + if (pInFD) taosCloseFile(&pInFD); smaError("vgId:%d, vnode snapshot rsma writer close failed since %s", SMA_VID(pSma), tstrerror(code)); } else { smaInfo("vgId:%d, vnode snapshot rsma writer close succeed", SMA_VID(pSma)); @@ -452,7 +502,10 @@ static int32_t rsmaSnapWriteQTaskInfo(SRSmaSnapWriter* pWriter, uint8_t* pData, } uint32_t mtime = 0; - if (taosFStatFile(fp, NULL, &mtime) == 0) { + if (taosFStatFile(fp, NULL, &mtime) != 0) { + code = TAOS_SYSTEM_ERROR(errno); + TSDB_CHECK_CODE(code, lino, _exit); + } else { qTaskFile.mtime = mtime; }