提交 e8cdf132 编写于 作者: K kailixu

chore: rsma code refactor

上级 61380b78
......@@ -32,7 +32,6 @@ target_sources(
# sma
"src/sma/smaEnv.c"
"src/sma/smaUtil.c"
"src/sma/smaFS.c"
"src/sma/smaOpen.c"
"src/sma/smaCommit.c"
"src/sma/smaRollup.c"
......
......@@ -159,11 +159,6 @@ struct SRSmaInfo {
void *taskInfo[TSDB_RETENTION_L2]; // qTaskInfo_t
STaosQueue *queue; // buffer queue of SubmitReq
STaosQall *qall; // buffer qall of SubmitReq
#if 0
void *iTaskInfo[TSDB_RETENTION_L2]; // immutable qTaskInfo_t
STaosQueue *iQueue; // immutable buffer queue of SubmitReq
STaosQall *iQall; // immutable buffer qall of SubmitReq
#endif
};
#define RSMA_INFO_HEAD_LEN offsetof(SRSmaInfo, items)
......@@ -223,27 +218,12 @@ int32_t smaPreClose(SSma *pSma);
// rsma
void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree);
// int32_t tdRSmaFSOpen(SSma *pSma, int64_t version, int8_t rollback);
// void tdRSmaFSClose(SRSmaFS *fs);
// int32_t tdRSmaFSPrepareCommit(SSma *pSma, SRSmaFS *pFSNew);
// int32_t tdRSmaFSCommit(SSma *pSma);
// int32_t tdRSmaFSFinishCommit(SSma *pSma);
// int32_t tdRSmaFSCopy(SSma *pSma, SRSmaFS *pFS);
// int32_t tdRSmaFSTakeSnapshot(SSma *pSma, SRSmaFS *pFS);
// int32_t tdRSmaFSRef(SSma *pSma, SRSmaFS *pFS);
// void tdRSmaFSUnRef(SSma *pSma, SRSmaFS *pFS);
// int32_t tdRSmaFSUpsertQTaskFile(SSma *pSma, SRSmaFS *pFS, SQTaskFile *qTaskFile, int32_t nSize);
// int32_t tdRSmaFSRollback(SSma *pSma);
int32_t tdRSmaRestore(SSma *pSma, int8_t type, int64_t committedVer, int8_t rollback);
int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName);
int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type);
// int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash);
int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer, int8_t rollback);
void tdRSmaQTaskInfoGetFileName(int32_t vgId, int64_t suid, int8_t level, int64_t version, char *outputName);
void tdRSmaQTaskInfoGetFullName(int32_t vgId, int64_t suid, int8_t level, int64_t version, const char *path,
char *outputName);
void tdRSmaQTaskInfoGetFullPath(int32_t vgId, int8_t level, const char *path, char *outputName);
void tdRSmaQTaskInfoGetFullPathEx(int32_t vgId, tb_uid_t suid, int8_t level, const char *path, char *outputName);
void tdRSmaQTaskInfoGetFullPath(int32_t vgId, tb_uid_t suid, int8_t level, const char *path, char *outputName);
static FORCE_INLINE void tdRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) {
int32_t ref = T_REF_INC(pRSmaInfo);
......@@ -254,8 +234,7 @@ static FORCE_INLINE void tdUnRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) {
smaTrace("vgId:%d, unref rsma info:%p, val:%d", SMA_VID(pSma), pRSmaInfo, ref);
}
void tdRSmaGetFileName(int32_t vgId, const char *pdname, const char *dname, const char *fname, int64_t suid,
int8_t level, int64_t version, char *outputName);
void tdRSmaGetDirName(int32_t vgId, const char *pdname, const char *dname, bool endWithSep, char *outputName);
#ifdef __cplusplus
......
......@@ -318,7 +318,6 @@ int32_t rsmaSnapRead(SRSmaSnapReader* pReader, uint8_t** ppData);
// SRSmaSnapWriter ========================================
int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapWriter** ppWriter);
int32_t rsmaSnapWrite(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData);
// int32_t rsmaSnapWriterPrepareClose(SRSmaSnapWriter* pWriter);
int32_t rsmaSnapWriterClose(SRSmaSnapWriter** ppWriter, int8_t rollback);
typedef struct {
......@@ -467,7 +466,7 @@ enum {
SNAP_DATA_DEL = 3,
SNAP_DATA_RSMA1 = 4,
SNAP_DATA_RSMA2 = 5,
SNAP_DATA_QTASK = 6,
SNAP_DATA_QTASK = 6, // obsolete
SNAP_DATA_TQ_HANDLE = 7,
SNAP_DATA_TQ_OFFSET = 8,
SNAP_DATA_STREAM_TASK = 9,
......
......@@ -108,9 +108,6 @@ int32_t smaFinishCommit(SSma *pSma) {
int32_t lino = 0;
SVnode *pVnode = pSma->pVnode;
// code = tdRSmaFSFinishCommit(pSma);
// TSDB_CHECK_CODE(code, lino, _exit);
if (VND_RSMA1(pVnode) && (code = tsdbFinishCommit(VND_RSMA1(pVnode))) < 0) {
TSDB_CHECK_CODE(code, lino, _exit);
}
......@@ -187,30 +184,6 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma, bool isCommit) {
if (!isCommit) goto _exit;
smaInfo("vgId:%d, rsma commit, all items are consumed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
// code = tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat));
// TSDB_CHECK_CODE(code, lino, _exit);
// smaInfo("vgId:%d, rsma commit, operator state committed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
#if 0 // consuming task of qTaskInfo clone
// step 4: swap queue/qall and iQueue/iQall
// lock
taosWLockLatch(SMA_ENV_LOCK(pEnv));
void *pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), NULL);
while (pIter) {
SRSmaInfo *pInfo = *(SRSmaInfo **)pIter;
TSWAP(pInfo->iQall, pInfo->qall);
TSWAP(pInfo->iQueue, pInfo->queue);
TSWAP(pInfo->iTaskInfo[0], pInfo->taskInfo[0]);
TSWAP(pInfo->iTaskInfo[1], pInfo->taskInfo[1]);
pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), pIter);
}
// unlock
taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
#endif
// all rsma results are written completely
STsdb *pTsdb = NULL;
......@@ -246,9 +219,6 @@ static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma, SCommitInfo *pInfo) {
goto _exit;
}
// code = tdRSmaFSCommit(pSma);
// TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbCommit(VND_RSMA1(pVnode), pInfo);
TSDB_CHECK_CODE(code, lino, _exit);
......
......@@ -287,10 +287,7 @@ static void tdDestroyRSmaStat(void *pRSmaStat) {
// step 4: destroy the rsma info and associated fetch tasks
taosHashCleanup(RSMA_INFO_HASH(pStat));
// step 5:
// tdRSmaFSClose(RSMA_FS(pStat));
// step 6: free pStat
// step 5: free pStat
tsem_destroy(&(pStat->notEmpty));
taosMemoryFreeClear(pStat);
}
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "sma.h"
// =================================================================================================
#if 0
// static int32_t tdFetchQTaskInfoFiles(SSma *pSma, int64_t version, SArray **output);
static int32_t tdQTaskInfCmprFn1(const void *p1, const void *p2);
static FORCE_INLINE int32_t tPutQTaskF(uint8_t *p, SQTaskFile *pFile) {
int32_t n = 0;
n += tPutI8(p ? p + n : p, pFile->level);
n += tPutI64v(p ? p + n : p, pFile->size);
n += tPutI64v(p ? p + n : p, pFile->suid);
n += tPutI64v(p ? p + n : p, pFile->version);
n += tPutI64v(p ? p + n : p, pFile->mtime);
return n;
}
static int32_t tdRSmaFSToBinary(uint8_t *p, SRSmaFS *pFS) {
int32_t n = 0;
uint32_t size = taosArrayGetSize(pFS->aQTaskInf);
// version
n += tPutI8(p ? p + n : p, 0);
// SArray<SQTaskFile>
n += tPutU32v(p ? p + n : p, size);
for (uint32_t i = 0; i < size; ++i) {
n += tPutQTaskF(p ? p + n : p, taosArrayGet(pFS->aQTaskInf, i));
}
return n;
}
int32_t tdRSmaGetQTaskF(uint8_t *p, SQTaskFile *pFile) {
int32_t n = 0;
n += tGetI8(p + n, &pFile->level);
n += tGetI64v(p + n, &pFile->size);
n += tGetI64v(p + n, &pFile->suid);
n += tGetI64v(p + n, &pFile->version);
n += tGetI64v(p + n, &pFile->mtime);
return n;
}
static int32_t tsdbBinaryToFS(uint8_t *pData, int64_t nData, SRSmaFS *pFS) {
int32_t code = 0;
int32_t n = 0;
int8_t version = 0;
// version
n += tGetI8(pData + n, &version);
// SArray<SQTaskFile>
taosArrayClear(pFS->aQTaskInf);
uint32_t size = 0;
n += tGetU32v(pData + n, &size);
for (uint32_t i = 0; i < size; ++i) {
SQTaskFile qTaskF = {0};
int32_t nt = tdRSmaGetQTaskF(pData + n, &qTaskF);
if (nt < 0) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _exit;
}
n += nt;
if (taosArrayPush(pFS->aQTaskInf, &qTaskF) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
}
if (ASSERTS(n + sizeof(TSCKSUM) == nData, "n:%d + sizeof(TSCKSUM):%d != nData:%d", n, (int32_t)sizeof(TSCKSUM),
nData)) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _exit;
}
_exit:
return code;
}
static int32_t tdRSmaSaveFSToFile(SRSmaFS *pFS, const char *fname) {
int32_t code = 0;
int32_t lino = 0;
TdFilePtr pFD = NULL;
// encode to binary
int32_t size = tdRSmaFSToBinary(NULL, pFS) + sizeof(TSCKSUM);
uint8_t *pData = taosMemoryMalloc(size);
if (pData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
tdRSmaFSToBinary(pData, pFS);
taosCalcChecksumAppend(0, pData, size);
// save to file
pFD = taosCreateFile(fname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
if (!pFD) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
int64_t n = taosWriteFile(pFD, pData, size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
if (taosFsyncFile(pFD) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
_exit:
taosCloseFile(&pFD);
if (pData) taosMemoryFree(pData);
if (code) {
smaError("%s failed at line %d since %s, fname:%s", __func__, lino, tstrerror(code), fname);
}
return code;
}
static int32_t tdRSmaFSCreate(SRSmaFS *pFS, int32_t size) {
int32_t code = 0;
pFS->aQTaskInf = taosArrayInit(size, sizeof(SQTaskFile));
if (pFS->aQTaskInf == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
_exit:
return code;
}
static void tdRSmaGetCurrentFName(SSma *pSma, char *current, char *current_t) {
SVnode *pVnode = pSma->pVnode;
if (pVnode->pTfs) {
if (current) {
snprintf(current, TSDB_FILENAME_LEN - 1, "%s%svnode%svnode%d%srsma%sPRESENT", tfsGetPrimaryPath(pVnode->pTfs),
TD_DIRSEP, TD_DIRSEP, TD_VID(pVnode), TD_DIRSEP, TD_DIRSEP);
}
if (current_t) {
snprintf(current_t, TSDB_FILENAME_LEN - 1, "%s%svnode%svnode%d%srsma%sPRESENT.t", tfsGetPrimaryPath(pVnode->pTfs),
TD_DIRSEP, TD_DIRSEP, TD_VID(pVnode), TD_DIRSEP, TD_DIRSEP);
}
} else {
#if 0
if (current) {
snprintf(current, TSDB_FILENAME_LEN - 1, "%s%sPRESENT", pTsdb->path, TD_DIRSEP);
}
if (current_t) {
snprintf(current_t, TSDB_FILENAME_LEN - 1, "%s%sPRESENT.t", pTsdb->path, TD_DIRSEP);
}
#endif
}
}
static int32_t tdRSmaLoadFSFromFile(const char *fname, SRSmaFS *pFS) {
int32_t code = 0;
int32_t lino = 0;
uint8_t *pData = NULL;
// load binary
TdFilePtr pFD = taosOpenFile(fname, TD_FILE_READ);
if (pFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
int64_t size;
if (taosFStatFile(pFD, &size, NULL) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
pData = taosMemoryMalloc(size);
if (pData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
if (taosReadFile(pFD, pData, size) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
if (!taosCheckChecksumWhole(pData, size)) {
code = TSDB_CODE_FILE_CORRUPTED;
TSDB_CHECK_CODE(code, lino, _exit);
}
// decode binary
code = tsdbBinaryToFS(pData, size, pFS);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
taosCloseFile(&pFD);
if (pData) taosMemoryFree(pData);
if (code) {
smaError("%s failed at line %d since %s, fname:%s", __func__, lino, tstrerror(code), fname);
}
return code;
}
static int32_t tdQTaskInfCmprFn1(const void *p1, const void *p2) {
const SQTaskFile *q1 = (const SQTaskFile *)p1;
const SQTaskFile *q2 = (const SQTaskFile *)p2;
if (q1->suid < q2->suid) {
return -1;
} else if (q1->suid > q2->suid) {
return 1;
}
if (q1->level < q2->level) {
return -1;
} else if (q1->level > q2->level) {
return 1;
}
if (q1->version < q2->version) {
return -2;
} else if (q1->version > q2->version) {
return 1;
}
return 0;
}
static int32_t tdRSmaFSApplyChange(SSma *pSma, SRSmaFS *pFSNew) {
int32_t code = 0;
int32_t lino = 0;
int32_t nRef = 0;
SVnode *pVnode = pSma->pVnode;
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
SRSmaFS *pFSOld = RSMA_FS(pStat);
int64_t version = pStat->commitAppliedVer;
char fname[TSDB_FILENAME_LEN] = {0};
// SQTaskFile
int32_t nNew = taosArrayGetSize(pFSNew->aQTaskInf);
int32_t iNew = 0;
while (iNew < nNew) {
SQTaskFile *pQTaskFNew = TARRAY_GET_ELEM(pFSNew->aQTaskInf, iNew++);
int32_t idx = taosArraySearchIdx(pFSOld->aQTaskInf, pQTaskFNew, tdQTaskInfCmprFn1, TD_GE);
if (idx < 0) {
idx = taosArrayGetSize(pFSOld->aQTaskInf);
pQTaskFNew->nRef = 1;
} else {
SQTaskFile *pTaskF = TARRAY_GET_ELEM(pFSOld->aQTaskInf, idx);
int32_t c1 = tdQTaskInfCmprFn1(pQTaskFNew, pTaskF);
if (c1 == 0) {
// utilize the item in pFSOld->qQTaskInf, instead of pFSNew
continue;
} else if (c1 < 0) {
// NOTHING TODO
} else {
code = TSDB_CODE_RSMA_FS_UPDATE;
TSDB_CHECK_CODE(code, lino, _exit);
}
}
if (taosArrayInsert(pFSOld->aQTaskInf, idx, pQTaskFNew) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
// remove previous version
while (--idx >= 0) {
SQTaskFile *preTaskF = TARRAY_GET_ELEM(pFSOld->aQTaskInf, idx);
int32_t c2 = tdQTaskInfCmprFn1(preTaskF, pQTaskFNew);
if (c2 == 0) {
code = TSDB_CODE_RSMA_FS_UPDATE;
TSDB_CHECK_CODE(code, lino, _exit);
} else if (c2 != -2) {
break;
}
nRef = atomic_sub_fetch_32(&preTaskF->nRef, 1);
if (nRef <= 0) {
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), preTaskF->suid, preTaskF->level, preTaskF->version,
tfsGetPrimaryPath(pVnode->pTfs), fname);
(void)taosRemoveFile(fname);
taosArrayRemove(pFSOld->aQTaskInf, idx);
}
}
}
_exit:
if (code) {
smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
}
return code;
}
static int32_t tdRSmaFSScanAndTryFix(SSma *pSma) {
int32_t code = 0;
#if 0
int32_t lino = 0;
SVnode *pVnode = pSma->pVnode;
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
SRSmaFS *pFS = RSMA_FS(pStat);
char fname[TSDB_FILENAME_LEN] = {0};
char fnameVer[TSDB_FILENAME_LEN] = {0};
// SArray<SQTaskFile>
int32_t size = taosArrayGetSize(pFS->aQTaskInf);
for (int32_t i = 0; i < size; ++i) {
SQTaskFile *pTaskF = (SQTaskFile *)taosArrayGet(pFS->aQTaskInf, i);
// main.tdb =========
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->suid, pTaskF->level, pTaskF->version,
tfsGetPrimaryPath(pVnode->pTfs), fnameVer);
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->suid, pTaskF->level, -1, tfsGetPrimaryPath(pVnode->pTfs), fname);
if (taosCheckExistFile(fnameVer)) {
if (taosRenameFile(fnameVer, fname) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
smaDebug("vgId:%d, %s:%d succeed to to rename %s to %s", TD_VID(pVnode), __func__, lino, fnameVer, fname);
} else if (taosCheckExistFile(fname)) {
if (taosRemoveFile(fname) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
smaDebug("vgId:%d, %s:%d succeed to to remove %s", TD_VID(pVnode), __func__, lino, fname);
}
}
{
// remove those invalid files (todo)
// main.tdb-journal.5 // TDB should handle its clear for kill -9
}
_exit:
if (code) {
smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
}
#endif
return code;
}
// EXPOSED APIS ====================================================================================
int32_t tdRSmaFSOpen(SSma *pSma, int64_t version, int8_t rollback) {
int32_t code = 0;
int32_t lino = 0;
SVnode *pVnode = pSma->pVnode;
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
// open handle
code = tdRSmaFSCreate(RSMA_FS(pStat), 0);
TSDB_CHECK_CODE(code, lino, _exit);
// open impl
char current[TSDB_FILENAME_LEN] = {0};
char current_t[TSDB_FILENAME_LEN] = {0};
tdRSmaGetCurrentFName(pSma, current, current_t);
if (taosCheckExistFile(current)) {
code = tdRSmaLoadFSFromFile(current, RSMA_FS(pStat));
TSDB_CHECK_CODE(code, lino, _exit);
if (taosCheckExistFile(current_t)) {
if (rollback) {
code = tdRSmaFSRollback(pSma);
TSDB_CHECK_CODE(code, lino, _exit);
} else {
code = tdRSmaFSCommit(pSma);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
} else {
// 1st time open with empty current/qTaskInfoFile
code = tdRSmaSaveFSToFile(RSMA_FS(pStat), current);
TSDB_CHECK_CODE(code, lino, _exit);
}
// scan and try fix(remove main.db/main.db.xxx and use the one with version)
code = tdRSmaFSScanAndTryFix(pSma);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
}
return code;
}
void tdRSmaFSClose(SRSmaFS *pFS) { pFS->aQTaskInf = taosArrayDestroy(pFS->aQTaskInf); }
int32_t tdRSmaFSPrepareCommit(SSma *pSma, SRSmaFS *pFSNew) {
int32_t code = 0;
int32_t lino = 0;
char tfname[TSDB_FILENAME_LEN];
tdRSmaGetCurrentFName(pSma, NULL, tfname);
// generate PRESENT.t
code = tdRSmaSaveFSToFile(pFSNew, tfname);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pSma->pVnode), __func__, lino, tstrerror(code));
}
return code;
}
int32_t tdRSmaFSCommit(SSma *pSma) {
int32_t code = 0;
int32_t lino = 0;
SRSmaFS fs = {0};
char current[TSDB_FILENAME_LEN] = {0};
char current_t[TSDB_FILENAME_LEN] = {0};
tdRSmaGetCurrentFName(pSma, current, current_t);
if (!taosCheckExistFile(current_t)) {
goto _exit;
}
// rename the file
if (taosRenameFile(current_t, current) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
// load the new FS
code = tdRSmaFSCreate(&fs, 1);
TSDB_CHECK_CODE(code, lino, _exit);
code = tdRSmaLoadFSFromFile(current, &fs);
TSDB_CHECK_CODE(code, lino, _exit);
// apply file change
code = tdRSmaFSApplyChange(pSma, &fs);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
tdRSmaFSClose(&fs);
if (code) {
smaError("vgId:%d, %s failed at line %d since %s", SMA_VID(pSma), __func__, lino, tstrerror(code));
}
return code;
}
int32_t tdRSmaFSFinishCommit(SSma *pSma) {
int32_t code = 0;
int32_t lino = 0;
SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv);
taosWLockLatch(RSMA_FS_LOCK(pStat));
code = tdRSmaFSCommit(pSma);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
taosWUnLockLatch(RSMA_FS_LOCK(pStat));
if (code) {
smaError("vgId:%d, %s failed at line %d since %s", SMA_VID(pSma), __func__, lino, tstrerror(code));
} else {
smaInfo("vgId:%d, rsmaFS finish commit", SMA_VID(pSma));
}
return code;
}
int32_t tdRSmaFSRollback(SSma *pSma) {
int32_t code = 0;
int32_t lino = 0;
char current_t[TSDB_FILENAME_LEN] = {0};
tdRSmaGetCurrentFName(pSma, NULL, current_t);
(void)taosRemoveFile(current_t);
_exit:
if (code) {
smaError("vgId:%d, %s failed at line %d since %s", SMA_VID(pSma), __func__, lino, tstrerror(errno));
}
return code;
}
int32_t tdRSmaFSUpsertQTaskFile(SSma *pSma, SRSmaFS *pFS, SQTaskFile *qTaskFile, int32_t nSize) {
int32_t code = 0;
for (int32_t i = 0; i < nSize; ++i) {
SQTaskFile *qTaskF = qTaskFile + i;
int32_t idx = taosArraySearchIdx(pFS->aQTaskInf, qTaskF, tdQTaskInfCmprFn1, TD_GE);
if (idx < 0) {
idx = taosArrayGetSize(pFS->aQTaskInf);
} else {
SQTaskFile *pTaskF = (SQTaskFile *)taosArrayGet(pFS->aQTaskInf, idx);
int32_t c = tdQTaskInfCmprFn1(pTaskF, qTaskF);
if (c == 0) {
if (pTaskF->size != qTaskF->size) {
code = TSDB_CODE_RSMA_FS_UPDATE;
smaError("vgId:%d, %s failed at line %d since %s, level:%" PRIi8 ", suid:%" PRIi64 ", version:%" PRIi64
", size:%" PRIi64 " != %" PRIi64,
SMA_VID(pSma), __func__, __LINE__, tstrerror(code), pTaskF->level, pTaskF->suid, pTaskF->version,
pTaskF->size, qTaskF->size);
goto _exit;
}
continue;
}
}
if (!taosArrayInsert(pFS->aQTaskInf, idx, qTaskF)) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
}
_exit:
return code;
}
int32_t tdRSmaFSRef(SSma *pSma, SRSmaFS *pFS) {
int32_t code = 0;
int32_t lino = 0;
int32_t nRef = 0;
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
SRSmaFS *qFS = RSMA_FS(pStat);
int32_t size = taosArrayGetSize(qFS->aQTaskInf);
pFS->aQTaskInf = taosArrayInit_s(sizeof(SQTaskFile), size);
if (pFS->aQTaskInf == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
for (int32_t i = 0; i < size; ++i) {
SQTaskFile *qTaskF = (SQTaskFile *)taosArrayGet(qFS->aQTaskInf, i);
nRef = atomic_fetch_add_32(&qTaskF->nRef, 1);
if (nRef <= 0) {
code = TSDB_CODE_RSMA_FS_REF;
TSDB_CHECK_CODE(code, lino, _exit);
}
}
memcpy(pFS->aQTaskInf->pData, qFS->aQTaskInf->pData, size * sizeof(SQTaskFile));
_exit:
if (code) {
smaError("vgId:%d, %s failed at line %d since %s, nRef %d", TD_VID(pSma->pVnode), __func__, lino, tstrerror(code),
nRef);
}
return code;
}
void tdRSmaFSUnRef(SSma *pSma, SRSmaFS *pFS) {
int32_t nRef = 0;
char fname[TSDB_FILENAME_LEN];
SVnode *pVnode = pSma->pVnode;
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
int32_t size = taosArrayGetSize(pFS->aQTaskInf);
for (int32_t i = 0; i < size; ++i) {
SQTaskFile *pTaskF = (SQTaskFile *)taosArrayGet(pFS->aQTaskInf, i);
nRef = atomic_sub_fetch_32(&pTaskF->nRef, 1);
if (nRef == 0) {
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->suid, pTaskF->level, pTaskF->version,
tfsGetPrimaryPath(pVnode->pTfs), fname);
if (taosRemoveFile(fname) < 0) {
smaWarn("vgId:%d, failed to remove %s since %s", TD_VID(pVnode), fname, tstrerror(TAOS_SYSTEM_ERROR(errno)));
} else {
smaDebug("vgId:%d, success to remove %s", TD_VID(pVnode), fname);
}
} else if (nRef < 0) {
smaWarn("vgId:%d, abnormal unref %s since %s", TD_VID(pVnode), fname, tstrerror(TSDB_CODE_RSMA_FS_REF));
}
}
taosArrayDestroy(pFS->aQTaskInf);
}
int32_t tdRSmaFSTakeSnapshot(SSma *pSma, SRSmaFS *pFS) {
int32_t code = 0;
int32_t lino = 0;
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
taosRLockLatch(RSMA_FS_LOCK(pStat));
code = tdRSmaFSRef(pSma, pFS);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
taosRUnLockLatch(RSMA_FS_LOCK(pStat));
if (code) {
smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pSma->pVnode), __func__, lino, tstrerror(code));
}
return code;
}
int32_t tdRSmaFSCopy(SSma *pSma, SRSmaFS *pFS) {
int32_t code = 0;
int32_t lino = 0;
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
SRSmaFS *qFS = RSMA_FS(pStat);
int32_t size = taosArrayGetSize(qFS->aQTaskInf);
code = tdRSmaFSCreate(pFS, size);
TSDB_CHECK_CODE(code, lino, _exit);
taosArrayAddBatch(pFS->aQTaskInf, qFS->aQTaskInf->pData, size);
_exit:
if (code) {
smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pSma->pVnode), __func__, lino, tstrerror(code));
}
return code;
}
#endif
\ No newline at end of file
......@@ -47,7 +47,6 @@ static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo);
static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema,
int64_t suid);
static void tdRSmaFetchTrigger(void *param, void *tmrId);
// static int32_t tdRSmaInfoClone(SSma *pSma, SRSmaInfo *pInfo);
static void tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level);
static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables);
static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int8_t type, int64_t qTaskFileVer);
......@@ -96,11 +95,6 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) {
if (isDeepFree && pInfo->taskInfo[i]) {
tdRSmaQTaskInfoFree(&pInfo->taskInfo[i], SMA_VID(pSma), i + 1);
}
#if 0
if (pInfo->iTaskInfo[i]) {
tdRSmaQTaskInfoFree(&pInfo->iTaskInfo[i], SMA_VID(pSma), i + 1);
}
#endif
}
if (isDeepFree) {
taosMemoryFreeClear(pInfo->pTSchema);
......@@ -115,16 +109,6 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) {
taosFreeQall(pInfo->qall);
pInfo->qall = NULL;
}
#if 0
if (pInfo->iQueue) {
taosCloseQueue(pInfo->iQueue);
pInfo->iQueue = NULL;
}
if (pInfo->iQall) {
taosFreeQall(pInfo->iQall);
pInfo->iQall = NULL;
}
#endif
}
taosMemoryFree(pInfo);
......@@ -260,7 +244,7 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
void *pStreamState = NULL;
// set the backend of stream state
tdRSmaQTaskInfoGetFullPathEx(TD_VID(pVnode), pRSmaInfo->suid, idx + 1, tfsGetPrimaryPath(pVnode->pTfs), taskInfDir);
tdRSmaQTaskInfoGetFullPath(TD_VID(pVnode), pRSmaInfo->suid, idx + 1, tfsGetPrimaryPath(pVnode->pTfs), taskInfDir);
if (!taosCheckExistFile(taskInfDir)) {
char *s = taosStrdup(taskInfDir);
if (taosMulMkDir(taosDirName(s)) != 0) {
......@@ -371,15 +355,6 @@ int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
goto _err;
}
#if 0
if (!(pRSmaInfo->iQueue = taosOpenQueue())) {
goto _err;
}
if (!(pRSmaInfo->iQall = taosAllocateQall())) {
goto _err;
}
#endif
if (taosHashPut(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t), &pRSmaInfo, sizeof(pRSmaInfo)) < 0) {
goto _err;
}
......@@ -605,15 +580,6 @@ _end:
return code;
}
#if 0
static void tdBlockDataDestroy(SArray *pBlockArr) {
for (int32_t i = 0; i < taosArrayGetSize(pBlockArr); ++i) {
blockDataDestroy(taosArrayGetP(pBlockArr, i));
}
taosArrayDestroy(pBlockArr);
}
#endif
static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema,
int64_t suid) {
int32_t code = 0;
......@@ -780,11 +746,8 @@ static int32_t tdRsmaPrintSubmitReq(SSma *pSma, SSubmitReq *pReq) {
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int32_t inputType, SRSmaInfo *pInfo,
ERsmaExecType type, int8_t level) {
int32_t idx = level - 1;
#if 0
void *qTaskInfo = (type == RSMA_EXEC_COMMIT) ? RSMA_INFO_IQTASK(pInfo, idx) : RSMA_INFO_QTASK(pInfo, idx);
#else
void *qTaskInfo = RSMA_INFO_QTASK(pInfo, idx);
#endif
void *qTaskInfo = RSMA_INFO_QTASK(pInfo, idx);
if (!qTaskInfo) {
smaDebug("vgId:%d, no qTaskInfo to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level,
pInfo->suid);
......@@ -817,115 +780,6 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize,
return TSDB_CODE_SUCCESS;
}
#if 0
static int32_t tdCloneQTaskInfo(SSma *pSma, qTaskInfo_t dstTaskInfo, qTaskInfo_t srcTaskInfo, SRSmaParam *param,
tb_uid_t suid, int8_t idx) {
int32_t code = 0;
int32_t lino = 0;
SVnode *pVnode = pSma->pVnode;
char *pOutput = NULL;
int32_t len = 0;
if (!srcTaskInfo) {
code = TSDB_CODE_INVALID_PTR;
smaWarn("vgId:%d, rsma clone, table %" PRIi64 ", no need since srcTaskInfo is NULL", TD_VID(pVnode), suid);
TSDB_CHECK_CODE(code, lino, _exit);
}
code = qSerializeTaskStatus(srcTaskInfo, &pOutput, &len);
TSDB_CHECK_CODE(code, lino, _exit);
SReadHandle handle = { .vnode = pVnode, .initTqReader = 1 };
initStorageAPI(&handle.api);
if (ASSERTS(!dstTaskInfo, "dstTaskInfo:%p is not NULL", dstTaskInfo)) {
code = TSDB_CODE_APP_ERROR;
TSDB_CHECK_CODE(code, lino, _exit);
}
dstTaskInfo = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle, TD_VID(pVnode));
if (!dstTaskInfo) {
code = TSDB_CODE_RSMA_QTASKINFO_CREATE;
TSDB_CHECK_CODE(code, lino, _exit);
}
code = qDeserializeTaskStatus(dstTaskInfo, pOutput, len);
TSDB_CHECK_CODE(code, lino, _exit);
smaDebug("vgId:%d, rsma clone, restore rsma task for table:%" PRIi64 " succeed", TD_VID(pVnode), suid);
_exit:
taosMemoryFreeClear(pOutput);
if (code) {
tdRSmaQTaskInfoFree(dstTaskInfo, TD_VID(pVnode), idx + 1);
smaError("vgId:%d, rsma clone, restore rsma task for table:%" PRIi64 " failed since %s", TD_VID(pVnode), suid,
terrstr());
}
return code;
}
#endif
/**
* @brief Clone qTaskInfo of SRSmaInfo
*
* @param pSma
* @param pInfo
* @return int32_t
*/
#if 0
static int32_t tdRSmaInfoClone(SSma *pSma, SRSmaInfo *pInfo) {
int32_t code = 0;
int32_t lino = 0;
SRSmaParam *param = NULL;
SMetaReader mr = {0};
if (!pInfo) {
return TSDB_CODE_SUCCESS;
}
metaReaderDoInit(&mr, SMA_META(pSma), 0);
smaDebug("vgId:%d, rsma clone qTaskInfo for suid:%" PRIi64, SMA_VID(pSma), pInfo->suid);
if (metaReaderGetTableEntryByUidCache(&mr, pInfo->suid) < 0) {
code = terrno;
TSDB_CHECK_CODE(code, lino, _exit);
}
if (mr.me.type != TSDB_SUPER_TABLE) {
code = TSDB_CODE_RSMA_INVALID_SCHEMA;
TSDB_CHECK_CODE(code, lino, _exit);
}
if (mr.me.uid != pInfo->suid) {
code = TSDB_CODE_RSMA_INVALID_SCHEMA;
TSDB_CHECK_CODE(code, lino, _exit);
}
if (TABLE_IS_ROLLUP(mr.me.flags)) {
param = &mr.me.stbEntry.rsmaParam;
#if 0
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
if (!pInfo->iTaskInfo[i]) {
continue;
}
code = tdCloneQTaskInfo(pSma, pInfo->taskInfo[i], pInfo->iTaskInfo[i], param, pInfo->suid, i);
TSDB_CHECK_CODE(code, lino, _exit);
}
smaDebug("vgId:%d, rsma clone env success for %" PRIi64, SMA_VID(pSma), pInfo->suid);
#endif
} else {
code = TSDB_CODE_RSMA_INVALID_SCHEMA;
TSDB_CHECK_CODE(code, lino, _exit);
}
_exit:
if (code) {
smaError("vgId:%d, %s failed at line %d since %s, suid:%" PRIi64 ", flags:%" PRIi8 ",type:%" PRIi8 ", uid:%" PRIi64,
SMA_VID(pSma), __func__, lino, tstrerror(code), pInfo->suid, mr.me.flags, mr.me.type, mr.me.uid);
}
metaReaderClear(&mr);
return code;
}
#endif
/**
* @brief During async commit, the SRSmaInfo object would be COW from iRSmaInfoHash and write lock should be applied.
*
......@@ -960,14 +814,7 @@ static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid) {
taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
return NULL;
}
#if 0
if (!pRSmaInfo->taskInfo[0]) {
if ((terrno = tdRSmaInfoClone(pSma, pRSmaInfo)) < 0) {
taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
return NULL;
}
}
#endif
tdRefRSmaInfo(pSma, pRSmaInfo);
taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
if (ASSERTS(pRSmaInfo->suid == suid, "suid:%" PRIi64 " != %" PRIi64, pRSmaInfo->suid, suid)) {
......@@ -1188,12 +1035,7 @@ int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer,
goto _err;
}
// step 2: open SRSmaFS for qTaskFiles
// if ((code = tdRSmaFSOpen(pSma, qtaskFileVer, rollback)) < 0) {
// goto _err;
// }
// step 3: iterate all stables to restore the rsma env
// step 2: iterate all stables to restore the rsma env
if ((code = tdRSmaRestoreQTaskInfoInit(pSma, &nTables)) < 0) {
goto _err;
}
......@@ -1209,117 +1051,7 @@ _err:
return code;
}
#if 0
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
int32_t code = 0;
int32_t lino = 0;
SSma *pSma = pRSmaStat->pSma;
SVnode *pVnode = pSma->pVnode;
SArray *qTaskFArray = NULL;
int64_t version = pRSmaStat->commitAppliedVer;
TdFilePtr pOutFD = NULL;
TdFilePtr pInFD = NULL;
char fname[TSDB_FILENAME_LEN];
char fnameVer[TSDB_FILENAME_LEN];
SRSmaFS fs = {0};
if (taosHashGetSize(pInfoHash) <= 0) {
return TSDB_CODE_SUCCESS;
}
qTaskFArray = taosArrayInit(taosHashGetSize(pInfoHash) << 1, sizeof(SQTaskFile));
if (!qTaskFArray) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
void *infoHash = NULL;
while ((infoHash = taosHashIterate(pInfoHash, infoHash))) {
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash;
if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
continue;
}
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pRSmaInfo, i);
if (pItem && pItem->pStreamState) {
if (streamStateCommit(pItem->pStreamState) < 0) {
code = TSDB_CODE_RSMA_STREAM_STATE_COMMIT;
TSDB_CHECK_CODE(code, lino, _exit);
}
smaDebug("vgId:%d, rsma persist, stream state commit success, table %" PRIi64 ", level %d", TD_VID(pVnode),
pRSmaInfo->suid, i + 1);
// qTaskInfo file
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pRSmaInfo->suid, i + 1, -1, tfsGetPrimaryPath(pVnode->pTfs), fname);
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pRSmaInfo->suid, i + 1, version, tfsGetPrimaryPath(pVnode->pTfs),
fnameVer);
if (taosCheckExistFile(fnameVer)) {
smaWarn("vgId:%d, rsma persist, duplicate file %s exist", TD_VID(pVnode), fnameVer);
}
pOutFD = taosCreateFile(fnameVer, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
if (pOutFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
pInFD = taosOpenFile(fname, TD_FILE_READ);
if (pInFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
int64_t size = 0;
uint32_t mtime = 0;
if (taosFStatFile(pInFD, &size, &mtime) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
int64_t offset = 0;
if (taosFSendFile(pOutFD, pInFD, &offset, size) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
smaError("vgId:%d, rsma persist, send qtaskinfo file %s to %s failed since %s", TD_VID(pVnode), fname,
fnameVer, tstrerror(code));
TSDB_CHECK_CODE(code, lino, _exit);
}
taosCloseFile(&pOutFD);
taosCloseFile(&pInFD);
SQTaskFile qTaskF = {
.nRef = 1, .level = i + 1, .suid = pRSmaInfo->suid, .version = version, .size = size, .mtime = mtime};
taosArrayPush(qTaskFArray, &qTaskF);
}
}
}
// prepare
code = tdRSmaFSCopy(pSma, &fs);
TSDB_CHECK_CODE(code, lino, _exit);
code = tdRSmaFSUpsertQTaskFile(pSma, &fs, qTaskFArray->pData, taosArrayGetSize(qTaskFArray));
TSDB_CHECK_CODE(code, lino, _exit);
code = tdRSmaFSPrepareCommit(pSma, &fs);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
taosArrayDestroy(fs.aQTaskInf);
taosArrayDestroy(qTaskFArray);
if (code) {
if (pOutFD) taosCloseFile(&pOutFD);
if (pInFD) taosCloseFile(&pInFD);
smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
}
terrno = code;
return code;
}
#endif
/**
* @brief trigger to get rsma result in async mode
*
......
......@@ -15,9 +15,6 @@
#include "sma.h"
// static int32_t rsmaSnapReadQTaskInfo(SRSmaSnapReader* pReader, uint8_t** ppData);
// static int32_t rsmaSnapWriteQTaskInfo(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData);
// SRSmaSnapReader ========================================
struct SRSmaSnapReader {
SSma* pSma;
......@@ -28,11 +25,6 @@ struct SRSmaSnapReader {
// for data file
int8_t rsmaDataDone[TSDB_RETENTION_L2];
STsdbSnapReader* pDataReader[TSDB_RETENTION_L2];
// for qtaskinfo file
int8_t qTaskDone;
int32_t fsIter;
SQTaskFReader* pQTaskFReader;
};
int32_t rsmaSnapReaderOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapReader** ppReader) {
......@@ -62,22 +54,6 @@ int32_t rsmaSnapReaderOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapRead
}
}
// open qtaskinfo
// taosRLockLatch(RSMA_FS_LOCK(pStat));
// code = tdRSmaFSRef(pSma, &pReader->fs);
// taosRUnLockLatch(RSMA_FS_LOCK(pStat));
// TSDB_CHECK_CODE(code, lino, _exit);
if (taosArrayGetSize(pReader->fs.aQTaskInf) > 0) {
pReader->pQTaskFReader = taosMemoryCalloc(1, sizeof(SQTaskFReader));
if (!pReader->pQTaskFReader) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
pReader->pQTaskFReader->pSma = pSma;
pReader->pQTaskFReader->version = pReader->ever;
}
*ppReader = pReader;
_exit:
if (code) {
......@@ -88,114 +64,6 @@ _exit:
return code;
}
static int32_t rsmaSnapReadQTaskInfo(SRSmaSnapReader* pReader, uint8_t** ppBuf) {
int32_t code = 0;
int32_t lino = 0;
SVnode* pVnode = pReader->pSma->pVnode;
SQTaskFReader* qReader = pReader->pQTaskFReader;
SRSmaFS* pFS = &pReader->fs;
int64_t n = 0;
uint8_t* pBuf = NULL;
int64_t version = pReader->ever;
char fname[TSDB_FILENAME_LEN];
if (!qReader) {
*ppBuf = NULL;
smaInfo("vgId:%d, vnode snapshot rsma reader qtaskinfo, not needed since qTaskReader is NULL", TD_VID(pVnode));
goto _exit;
}
if (pReader->fsIter >= taosArrayGetSize(pFS->aQTaskInf)) {
*ppBuf = NULL;
smaInfo("vgId:%d, vnode snapshot rsma reader qtaskinfo, fsIter reach end", TD_VID(pVnode));
goto _exit;
}
while (pReader->fsIter < taosArrayGetSize(pFS->aQTaskInf)) {
SQTaskFile* qTaskF = taosArrayGet(pFS->aQTaskInf, pReader->fsIter++);
if (qTaskF->version != version) {
continue;
}
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), qTaskF->suid, qTaskF->level, version, tfsGetPrimaryPath(pVnode->pTfs),
fname);
if (!taosCheckExistFile(fname)) {
smaError("vgId:%d, vnode snapshot rsma reader for qtaskinfo, table %" PRIi64 ", level %" PRIi8
", version %" PRIi64 " failed since %s not exist",
TD_VID(pVnode), qTaskF->suid, qTaskF->level, version, fname);
code = TSDB_CODE_RSMA_FS_SYNC;
TSDB_CHECK_CODE(code, lino, _exit);
}
TdFilePtr fp = taosOpenFile(fname, TD_FILE_READ);
if (!fp) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
qReader->pReadH = fp;
qReader->level = qTaskF->level;
qReader->suid = qTaskF->suid;
}
if (!qReader->pReadH) {
*ppBuf = NULL;
smaInfo("vgId:%d, vnode snapshot rsma reader qtaskinfo, not needed since readh is NULL", TD_VID(pVnode));
goto _exit;
}
int64_t size = 0;
if (taosFStatFile(qReader->pReadH, &size, NULL) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
// seek
if (taosLSeekFile(qReader->pReadH, 0, SEEK_SET) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
if (*ppBuf) {
*ppBuf = taosMemoryRealloc(*ppBuf, sizeof(SSnapDataHdr) + size);
} else {
*ppBuf = taosMemoryMalloc(sizeof(SSnapDataHdr) + size);
}
if (!(*ppBuf)) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
// read
n = taosReadFile(qReader->pReadH, POINTER_SHIFT(*ppBuf, sizeof(SSnapDataHdr)), size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
} else if (n != size) {
code = TSDB_CODE_FILE_CORRUPTED;
TSDB_CHECK_CODE(code, lino, _exit);
}
smaInfo("vgId:%d, vnode snapshot rsma read qtaskinfo, version:%" PRIi64 ", size:%" PRIi64, TD_VID(pVnode), version,
size);
SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppBuf);
pHdr->type = SNAP_DATA_QTASK;
pHdr->flag = qReader->level;
pHdr->index = qReader->suid;
pHdr->size = size;
_exit:
if (qReader) taosCloseFile(&qReader->pReadH);
if (code) {
*ppBuf = NULL;
smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
} else {
smaInfo("vgId:%d, vnode snapshot rsma read qtaskinfo succeed", TD_VID(pVnode));
}
return code;
}
int32_t rsmaSnapRead(SRSmaSnapReader* pReader, uint8_t** ppData) {
int32_t code = 0;
int32_t lino = 0;
......@@ -223,18 +91,6 @@ int32_t rsmaSnapRead(SRSmaSnapReader* pReader, uint8_t** ppData) {
}
}
// read qtaskinfo file
if (!pReader->qTaskDone) {
smaInfo("vgId:%d, vnode snapshot rsma qtaskinfo not done", SMA_VID(pReader->pSma));
code = rsmaSnapReadQTaskInfo(pReader, ppData);
TSDB_CHECK_CODE(code, lino, _exit);
if (*ppData) {
goto _exit;
} else {
pReader->qTaskDone = 1;
}
}
_exit:
if (code) {
smaError("vgId:%d, vnode snapshot rsma read failed since %s", SMA_VID(pReader->pSma), tstrerror(code));
......@@ -249,9 +105,6 @@ int32_t rsmaSnapReaderClose(SRSmaSnapReader** ppReader) {
int32_t code = 0;
SRSmaSnapReader* pReader = *ppReader;
// tdRSmaFSUnRef(pReader->pSma, &pReader->fs);
taosMemoryFreeClear(pReader->pQTaskFReader);
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
if (pReader->pDataReader[i]) {
tsdbSnapReaderClose(&pReader->pDataReader[i]);
......@@ -299,10 +152,6 @@ int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapWrit
}
}
// qtaskinfo
// code = tdRSmaFSCopy(pSma, &pWriter->fs);
// TSDB_CHECK_CODE(code, lino, _exit);
// snapWriter
*ppWriter = pWriter;
_exit:
......@@ -316,22 +165,6 @@ _exit:
return code;
}
// int32_t rsmaSnapWriterPrepareClose(SRSmaSnapWriter* pWriter) {
// int32_t code = 0;
// int32_t lino = 0;
// if (pWriter) {
// code = tdRSmaFSPrepareCommit(pWriter->pSma, &pWriter->fs);
// TSDB_CHECK_CODE(code, lino, _exit);
// }
// _exit:
// if (code) {
// smaError("vgId:%d, %s failed at line %d since %s", SMA_VID(pWriter->pSma), __func__, lino, tstrerror(code));
// }
// return code;
// }
int32_t rsmaSnapWriterClose(SRSmaSnapWriter** ppWriter, int8_t rollback) {
int32_t code = 0;
int32_t lino = 0;
......@@ -364,62 +197,6 @@ int32_t rsmaSnapWriterClose(SRSmaSnapWriter** ppWriter, int8_t rollback) {
}
}
#if 0
// qtaskinfo
if (rollback) {
// tdRSmaFSRollback(pSma);
// remove qTaskFiles
} else {
// sendFile from fname.Ver to fname
SRSmaFS* pFS = &pWriter->fs;
int32_t size = taosArrayGetSize(pFS->aQTaskInf);
for (int32_t i = 0; i < size; ++i) {
SQTaskFile* pTaskF = TARRAY_GET_ELEM(pFS->aQTaskInf, i);
if (pTaskF->version == pWriter->ever) {
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->suid, pTaskF->level, pTaskF->version, primaryPath, fnameVer);
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->suid, pTaskF->level, -1, primaryPath, fname);
pInFD = taosOpenFile(fnameVer, TD_FILE_READ);
if (pInFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
pOutFD = taosCreateFile(fname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
if (pOutFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
int64_t size = 0;
if (taosFStatFile(pInFD, &size, NULL) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
int64_t offset = 0;
if (taosFSendFile(pOutFD, pInFD, &offset, size) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
smaError("vgId:%d, vnode snapshot rsma writer, send qtaskinfo file %s to %s failed since %s", TD_VID(pVnode),
fnameVer, fname, tstrerror(code));
TSDB_CHECK_CODE(code, lino, _exit);
}
taosCloseFile(&pOutFD);
taosCloseFile(&pInFD);
}
}
// lock
taosWLockLatch(RSMA_FS_LOCK(pStat));
code = tdRSmaFSCommit(pSma);
if (code) {
taosWUnLockLatch(RSMA_FS_LOCK(pStat));
goto _exit;
}
// unlock
taosWUnLockLatch(RSMA_FS_LOCK(pStat));
}
#endif
// rsma restore
code = tdRSmaRestore(pWriter->pSma, RSMA_RESTORE_SYNC, pWriter->ever, rollback);
TSDB_CHECK_CODE(code, lino, _exit);
......@@ -451,8 +228,6 @@ int32_t rsmaSnapWrite(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData)
} else if (pHdr->type == SNAP_DATA_RSMA2) {
pHdr->type = SNAP_DATA_TSDB;
code = tsdbSnapWrite(pWriter->pDataWriter[1], pHdr);
} else if (pHdr->type == SNAP_DATA_QTASK) {
// code = rsmaSnapWriteQTaskInfo(pWriter, pData, nData);
} else {
code = TSDB_CODE_RSMA_FS_SYNC;
}
......@@ -466,70 +241,4 @@ _exit:
smaInfo("vgId:%d, rsma snapshot write for data type %" PRIi8 " succeed", SMA_VID(pWriter->pSma), pHdr->type);
}
return code;
}
#if 0
static int32_t rsmaSnapWriteQTaskInfo(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
int32_t code = 0;
int32_t lino = 0;
SSma* pSma = pWriter->pSma;
SVnode* pVnode = pSma->pVnode;
char fname[TSDB_FILENAME_LEN];
TdFilePtr fp = NULL;
SSnapDataHdr* pHdr = (SSnapDataHdr*)pData;
fname[0] = '\0';
if (pHdr->size != (nData - sizeof(SSnapDataHdr))) {
code = TSDB_CODE_RSMA_FS_SYNC;
TSDB_CHECK_CODE(code, lino, _exit);
}
SQTaskFile qTaskFile = {
.nRef = 1, .level = pHdr->flag, .suid = pHdr->index, .version = pWriter->ever, .size = pHdr->size};
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pHdr->index, pHdr->flag, qTaskFile.version,
tfsGetPrimaryPath(pVnode->pTfs), fname);
fp = taosCreateFile(fname, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (!fp) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
int64_t contLen = taosWriteFile(fp, pHdr->data, pHdr->size);
if (contLen != pHdr->size) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
uint32_t mtime = 0;
if (taosFStatFile(fp, NULL, &mtime) != 0) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
} else {
qTaskFile.mtime = mtime;
}
if (taosFsyncFile(fp) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
taosCloseFile(&fp);
// code = tdRSmaFSUpsertQTaskFile(pSma, &pWriter->fs, &qTaskFile, 1);
// TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
if (fp) {
(void)taosRemoveFile(fname);
}
smaError("vgId:%d, %s failed at line %d since %s, file:%s", TD_VID(pVnode), __func__, lino, tstrerror(code), fname);
} else {
smaInfo("vgId:%d, vnode snapshot rsma write qtaskinfo %s succeed", TD_VID(pVnode), fname);
}
return code;
}
#endif
\ No newline at end of file
}
\ No newline at end of file
......@@ -17,75 +17,12 @@
#define TD_QTASKINFO_FNAME_PREFIX "main.tdb"
#if 0
void tdRSmaQTaskInfoGetFileName(int32_t vgId, int64_t suid, int8_t level, int64_t version, char *outputName) {
tdRSmaGetFileName(vgId, NULL, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, suid, level, version, outputName);
}
#endif
void tdRSmaQTaskInfoGetFullName(int32_t vgId, int64_t suid, int8_t level, int64_t version, const char *path,
char *outputName) {
tdRSmaGetFileName(vgId, path, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, suid, level, version, outputName);
}
#if 0
void tdRSmaQTaskInfoGetFullPath(int32_t vgId, int8_t level, const char *path, char *outputName) {
tdRSmaGetDirName(vgId, path, VNODE_RSMA_DIR, true, outputName);
int32_t rsmaLen = strlen(outputName);
snprintf(outputName + rsmaLen, TSDB_FILENAME_LEN - rsmaLen, "%" PRIi8, level);
}
#endif
void tdRSmaQTaskInfoGetFullPathEx(int32_t vgId, tb_uid_t suid, int8_t level, const char *path, char *outputName) {
void tdRSmaQTaskInfoGetFullPath(int32_t vgId, tb_uid_t suid, int8_t level, const char *path, char *outputName) {
tdRSmaGetDirName(vgId, path, VNODE_RSMA_DIR, true, outputName);
int32_t rsmaLen = strlen(outputName);
snprintf(outputName + rsmaLen, TSDB_FILENAME_LEN - rsmaLen, "%" PRIi8 "%s%" PRIi64, level, TD_DIRSEP, suid);
}
void tdRSmaGetFileName(int32_t vgId, const char *pdname, const char *dname, const char *fname, int64_t suid,
int8_t level, int64_t version, char *outputName) {
if (level >= 0 && suid > 0) {
if (version >= 0) {
if (pdname) {
snprintf(outputName, TSDB_FILENAME_LEN, "%s%svnode%svnode%d%s%s%s%" PRIi8 "%s%" PRIi64 "%s%s.%" PRIi64, pdname,
TD_DIRSEP, TD_DIRSEP, vgId, TD_DIRSEP, dname, TD_DIRSEP, level, TD_DIRSEP, suid, TD_DIRSEP, fname,
version);
} else {
snprintf(outputName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s%s%" PRIi8 "%s%" PRIi64 "%s%s.%" PRIi64, TD_DIRSEP,
vgId, TD_DIRSEP, dname, TD_DIRSEP, level, TD_DIRSEP, suid, TD_DIRSEP, fname, version);
}
} else {
if (pdname) {
snprintf(outputName, TSDB_FILENAME_LEN, "%s%svnode%svnode%d%s%s%s%" PRIi8 "%s%" PRIi64 "%s%s", pdname,
TD_DIRSEP, TD_DIRSEP, vgId, TD_DIRSEP, dname, TD_DIRSEP, level, TD_DIRSEP, suid, TD_DIRSEP, fname);
} else {
snprintf(outputName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s%s%" PRIi8 "%s%" PRIi64 "%s%s", TD_DIRSEP, vgId,
TD_DIRSEP, dname, TD_DIRSEP, level, TD_DIRSEP, suid, TD_DIRSEP, fname);
}
}
} else {
#if 0
if (version >= 0) {
if (pdname) {
snprintf(outputName, TSDB_FILENAME_LEN, "%s%svnode%svnode%d%s%s%sv%d%s%" PRIi64, pdname, TD_DIRSEP, TD_DIRSEP,
vgId, TD_DIRSEP, dname, TD_DIRSEP, vgId, fname, version);
} else {
snprintf(outputName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s%sv%d%s%" PRIi64, TD_DIRSEP, vgId, TD_DIRSEP, dname,
TD_DIRSEP, vgId, fname, version);
}
} else {
if (pdname) {
snprintf(outputName, TSDB_FILENAME_LEN, "%s%svnode%svnode%d%s%s%sv%d%s", pdname, TD_DIRSEP, TD_DIRSEP, vgId,
TD_DIRSEP, dname, TD_DIRSEP, vgId, fname);
} else {
snprintf(outputName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s%sv%d%s", TD_DIRSEP, vgId, TD_DIRSEP, dname,
TD_DIRSEP, vgId, fname);
}
}
#endif
}
}
void tdRSmaGetDirName(int32_t vgId, const char *pdname, const char *dname, bool endWithSep, char *outputName) {
if (pdname) {
if (endWithSep) {
......
......@@ -3705,15 +3705,15 @@ static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* ret
if (level == TSDB_RETENTION_L0) {
*pLevel = TSDB_RETENTION_L0;
tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L0, str);
tsdbError("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L0, str);
return VND_RSMA0(pVnode);
} else if (level == TSDB_RETENTION_L1) {
*pLevel = TSDB_RETENTION_L1;
tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L1, str);
tsdbError("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L1, str);
return VND_RSMA1(pVnode);
} else {
*pLevel = TSDB_RETENTION_L2;
tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L2, str);
tsdbError("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L2, str);
return VND_RSMA2(pVnode);
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册