提交 9ae0e15d 编写于 作者: C Cary Xu

refactor: rsma code refactor and use ref for qtaskf

上级 fca65df5
...@@ -65,13 +65,6 @@ typedef enum { ...@@ -65,13 +65,6 @@ typedef enum {
TSDB_STATIS_NONE = 1, // statis part not exist TSDB_STATIS_NONE = 1, // statis part not exist
} ETsdbStatisStatus; } ETsdbStatisStatus;
typedef enum {
TSDB_SMA_STAT_UNKNOWN = -1, // unknown
TSDB_SMA_STAT_OK = 0, // ready to provide service
TSDB_SMA_STAT_EXPIRED = 1, // not ready or expired
TSDB_SMA_STAT_DROPPED = 2, // sma dropped
} ETsdbSmaStat; // bit operation
typedef enum { typedef enum {
TSDB_SMA_TYPE_BLOCK = 0, // Block-wise SMA TSDB_SMA_TYPE_BLOCK = 0, // Block-wise SMA
TSDB_SMA_TYPE_TIME_RANGE = 1, // Time-range-wise SMA TSDB_SMA_TYPE_TIME_RANGE = 1, // Time-range-wise SMA
......
...@@ -616,6 +616,7 @@ int32_t* taosGetErrno(); ...@@ -616,6 +616,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP TAOS_DEF_ERROR_CODE(0, 0x3155) #define TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP TAOS_DEF_ERROR_CODE(0, 0x3155)
#define TSDB_CODE_RSMA_EMPTY_INFO TAOS_DEF_ERROR_CODE(0, 0x3156) #define TSDB_CODE_RSMA_EMPTY_INFO TAOS_DEF_ERROR_CODE(0, 0x3156)
#define TSDB_CODE_RSMA_INVALID_SCHEMA TAOS_DEF_ERROR_CODE(0, 0x3157) #define TSDB_CODE_RSMA_INVALID_SCHEMA TAOS_DEF_ERROR_CODE(0, 0x3157)
#define TSDB_CODE_RSMA_REGEX_MATCH TAOS_DEF_ERROR_CODE(0, 0x3158)
//index //index
#define TSDB_CODE_INDEX_REBUILDING TAOS_DEF_ERROR_CODE(0, 0x3200) #define TSDB_CODE_INDEX_REBUILDING TAOS_DEF_ERROR_CODE(0, 0x3200)
......
...@@ -29,6 +29,7 @@ target_sources( ...@@ -29,6 +29,7 @@ target_sources(
# sma # sma
"src/sma/smaEnv.c" "src/sma/smaEnv.c"
"src/sma/smaUtil.c" "src/sma/smaUtil.c"
"src/sma/smaFS.c"
"src/sma/smaOpen.c" "src/sma/smaOpen.c"
"src/sma/smaCommit.c" "src/sma/smaCommit.c"
"src/sma/smaRollup.c" "src/sma/smaRollup.c"
......
...@@ -41,6 +41,7 @@ typedef struct SRSmaStat SRSmaStat; ...@@ -41,6 +41,7 @@ typedef struct SRSmaStat SRSmaStat;
typedef struct SSmaKey SSmaKey; typedef struct SSmaKey SSmaKey;
typedef struct SRSmaInfo SRSmaInfo; typedef struct SRSmaInfo SRSmaInfo;
typedef struct SRSmaInfoItem SRSmaInfoItem; typedef struct SRSmaInfoItem SRSmaInfoItem;
typedef struct SRSmaFS SRSmaFS;
typedef struct SQTaskFile SQTaskFile; typedef struct SQTaskFile SQTaskFile;
typedef struct SQTaskFReader SQTaskFReader; typedef struct SQTaskFReader SQTaskFReader;
typedef struct SQTaskFWriter SQTaskFWriter; typedef struct SQTaskFWriter SQTaskFWriter;
...@@ -73,7 +74,8 @@ struct STSmaStat { ...@@ -73,7 +74,8 @@ struct STSmaStat {
struct SQTaskFile { struct SQTaskFile {
volatile int32_t nRef; volatile int32_t nRef;
int64_t commitID; int32_t padding;
int64_t version;
int64_t size; int64_t size;
}; };
...@@ -89,6 +91,10 @@ struct SQTaskFWriter { ...@@ -89,6 +91,10 @@ struct SQTaskFWriter {
char *fname; char *fname;
}; };
struct SRSmaFS {
SArray *aQTaskInf; // array of SQTaskFile
};
struct SRSmaStat { struct SRSmaStat {
SSma *pSma; SSma *pSma;
int64_t commitAppliedVer; // vnode applied version for async commit int64_t commitAppliedVer; // vnode applied version for async commit
...@@ -98,7 +104,7 @@ struct SRSmaStat { ...@@ -98,7 +104,7 @@ struct SRSmaStat {
volatile int32_t nFetchAll; // active number of fetch all volatile int32_t nFetchAll; // active number of fetch all
int8_t triggerStat; // shared by fetch tasks int8_t triggerStat; // shared by fetch tasks
int8_t commitStat; // 0 not in committing, 1 in committing int8_t commitStat; // 0 not in committing, 1 in committing
SArray *aTaskFile; // qTaskFiles committed recently(for recovery/snapshot r/w) SRSmaFS fs; // for recovery/snapshot r/w
SHashObj *infoHash; // key: suid, value: SRSmaInfo SHashObj *infoHash; // key: suid, value: SRSmaInfo
tsem_t notEmpty; // has items in queue buffer tsem_t notEmpty; // has items in queue buffer
}; };
...@@ -163,14 +169,6 @@ enum { ...@@ -163,14 +169,6 @@ enum {
TASK_TRIGGER_STAT_DROPPED = 5, TASK_TRIGGER_STAT_DROPPED = 5,
}; };
enum {
RSMA_ROLE_CREATE = 0,
RSMA_ROLE_DROP = 1,
RSMA_ROLE_SUBMIT = 2,
RSMA_ROLE_FETCH = 3,
RSMA_ROLE_ITERATE = 4,
};
enum { enum {
RSMA_RESTORE_REBOOT = 1, RSMA_RESTORE_REBOOT = 1,
RSMA_RESTORE_SYNC = 2, RSMA_RESTORE_SYNC = 2,
...@@ -182,88 +180,32 @@ typedef enum { ...@@ -182,88 +180,32 @@ typedef enum {
RSMA_EXEC_COMMIT = 3, // triggered by commit RSMA_EXEC_COMMIT = 3, // triggered by commit
} ERsmaExecType; } ERsmaExecType;
void tdDestroySmaEnv(SSmaEnv *pSmaEnv); // sma
void *tdFreeSmaEnv(SSmaEnv *pSmaEnv); int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType);
void tdDestroySmaEnv(SSmaEnv *pSmaEnv);
int32_t tdDropTSma(SSma *pSma, char *pMsg); void *tdFreeSmaEnv(SSmaEnv *pSmaEnv);
int32_t tdDropTSmaData(SSma *pSma, int64_t indexUid);
int32_t tdInsertRSmaData(SSma *pSma, char *msg);
int32_t tdRefSmaStat(SSma *pSma, SSmaStat *pStat); int32_t tdRefSmaStat(SSma *pSma, SSmaStat *pStat);
int32_t tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat); int32_t tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat);
int32_t tdRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo);
int32_t tdUnRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo);
void *tdAcquireSmaRef(int32_t rsetId, int64_t refId);
int32_t tdReleaseSmaRef(int32_t rsetId, int64_t refId);
int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType);
int32_t tdLockSma(SSma *pSma); int32_t tdLockSma(SSma *pSma);
int32_t tdUnLockSma(SSma *pSma); int32_t tdUnLockSma(SSma *pSma);
void *tdAcquireSmaRef(int32_t rsetId, int64_t refId);
int32_t tdReleaseSmaRef(int32_t rsetId, int64_t refId);
static FORCE_INLINE int8_t tdSmaStat(STSmaStat *pTStat) { // rsma
if (pTStat) { int32_t tdRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo);
return atomic_load_8(&pTStat->state); int32_t tdUnRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo);
} void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree);
return TSDB_SMA_STAT_UNKNOWN; int32_t tdRSmaFSOpen(SSma *pSma, int64_t version);
} void tdRSmaFSClose(SRSmaFS *fs);
int32_t tdRSmaFSUpsertQFile(SRSmaFS *fs, SQTaskFile *pTaskF);
static FORCE_INLINE bool tdSmaStatIsOK(STSmaStat *pTStat, int8_t *state) { int32_t tdRSmaRestore(SSma *pSma, int8_t type, int64_t committedVer);
if (!pTStat) { int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName);
return false; int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type);
} int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash);
int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer);
if (state) {
*state = atomic_load_8(&pTStat->state); void tdRSmaQTaskInfoGetFileName(int32_t vid, int64_t version, char *outputName);
return *state == TSDB_SMA_STAT_OK; void tdRSmaQTaskInfoGetFullName(int32_t vid, int64_t version, const char *path, char *outputName);
}
return atomic_load_8(&pTStat->state) == TSDB_SMA_STAT_OK;
}
static FORCE_INLINE bool tdSmaStatIsExpired(STSmaStat *pTStat) {
return pTStat ? (atomic_load_8(&pTStat->state) & TSDB_SMA_STAT_EXPIRED) : true;
}
static FORCE_INLINE bool tdSmaStatIsDropped(STSmaStat *pTStat) {
return pTStat ? (atomic_load_8(&pTStat->state) & TSDB_SMA_STAT_DROPPED) : true;
}
static FORCE_INLINE void tdSmaStatSetOK(STSmaStat *pTStat) {
if (pTStat) {
atomic_store_8(&pTStat->state, TSDB_SMA_STAT_OK);
}
}
static FORCE_INLINE void tdSmaStatSetExpired(STSmaStat *pTStat) {
if (pTStat) {
atomic_or_fetch_8(&pTStat->state, TSDB_SMA_STAT_EXPIRED);
}
}
static FORCE_INLINE void tdSmaStatSetDropped(STSmaStat *pTStat) {
if (pTStat) {
atomic_or_fetch_8(&pTStat->state, TSDB_SMA_STAT_DROPPED);
}
}
void tdRSmaQTaskInfoGetFileName(int32_t vid, int64_t version, char *outputName);
void tdRSmaQTaskInfoGetFullName(int32_t vid, int64_t version, const char *path, char *outputName);
int32_t tdCloneRSmaInfo(SSma *pSma, SRSmaInfo *pInfo);
void tdFreeQTaskInfo(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level);
static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType);
void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType);
void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree);
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash);
int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type);
int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName);
int32_t tdProcessRSmaRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer);
int32_t tdRsmaRestore(SSma *pSma, int8_t type, int64_t committedVer);
int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg);
int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg);
int32_t tdProcessTSmaGetDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days);
// smaFileUtil ================ // smaFileUtil ================
......
...@@ -21,7 +21,7 @@ static int32_t tdProcessRSmaSyncPostCommitImpl(SSma *pSma); ...@@ -21,7 +21,7 @@ static int32_t tdProcessRSmaSyncPostCommitImpl(SSma *pSma);
static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma); static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma);
static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma); static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma);
static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma); static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma);
static int32_t tdCleanupQTaskInfoFiles(SSma *pSma, SRSmaStat *pRSmaStat); static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pRSmaStat);
/** /**
* @brief Only applicable to Rollup SMA * @brief Only applicable to Rollup SMA
...@@ -166,114 +166,45 @@ static int32_t tdProcessRSmaSyncCommitImpl(SSma *pSma) { ...@@ -166,114 +166,45 @@ static int32_t tdProcessRSmaSyncCommitImpl(SSma *pSma) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t tdCleanupQTaskInfoFiles(SSma *pSma, SRSmaStat *pRSmaStat) {
SVnode *pVnode = pSma->pVnode;
int64_t committed = pRSmaStat->commitAppliedVer;
TdDirPtr pDir = NULL;
TdDirEntryPtr pDirEntry = NULL;
char dir[TSDB_FILENAME_LEN];
const char *pattern = "v[0-9]+qinf\\.v([0-9]+)?$";
regex_t regex;
int code = 0;
tdGetVndDirName(TD_VID(pVnode), tfsGetPrimaryPath(pVnode->pTfs), VNODE_RSMA_DIR, true, dir);
// Resource allocation and init
if ((code = regcomp(&regex, pattern, REG_EXTENDED)) != 0) {
char errbuf[128];
regerror(code, &regex, errbuf, sizeof(errbuf));
smaWarn("vgId:%d, rsma post commit, regcomp for %s failed since %s", TD_VID(pVnode), dir, errbuf);
return TSDB_CODE_FAILED;
}
if ((pDir = taosOpenDir(dir)) == NULL) {
regfree(&regex);
terrno = TAOS_SYSTEM_ERROR(errno);
smaDebug("vgId:%d, rsma post commit, open dir %s failed since %s", TD_VID(pVnode), dir, terrstr());
return TSDB_CODE_FAILED;
}
int32_t dirLen = strlen(dir);
char *dirEnd = POINTER_SHIFT(dir, dirLen);
regmatch_t regMatch[2];
while ((pDirEntry = taosReadDir(pDir)) != NULL) {
char *entryName = taosGetDirEntryName(pDirEntry);
if (!entryName) {
continue;
}
code = regexec(&regex, entryName, 2, regMatch, 0);
if (code == 0) {
// match
int64_t version = -1;
sscanf((const char *)POINTER_SHIFT(entryName, regMatch[1].rm_so), "%" PRIi64, &version);
if ((version < committed) && (version > -1)) {
strncpy(dirEnd, entryName, TSDB_FILENAME_LEN - dirLen);
if (taosRemoveFile(dir) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
smaWarn("vgId:%d, committed version:%" PRIi64 ", failed to remove %s since %s", TD_VID(pVnode), committed,
dir, terrstr());
} else {
smaDebug("vgId:%d, committed version:%" PRIi64 ", success to remove %s", TD_VID(pVnode), committed, dir);
}
}
} else if (code == REG_NOMATCH) {
// not match
smaTrace("vgId:%d, rsma post commit, not match %s", TD_VID(pVnode), entryName);
continue;
} else {
// has other error
char errbuf[128];
regerror(code, &regex, errbuf, sizeof(errbuf));
smaWarn("vgId:%d, rsma post commit, regexec failed since %s", TD_VID(pVnode), errbuf);
taosCloseDir(&pDir);
regfree(&regex);
return TSDB_CODE_FAILED;
}
}
taosCloseDir(&pDir);
regfree(&regex);
return TSDB_CODE_SUCCESS;
}
// SQTaskFile ====================================================== // SQTaskFile ======================================================
// int32_t tCmprQTaskFile(void const *lhs, void const *rhs) {
// int64_t *lCommitted = *(int64_t *)lhs;
// SQTaskFile *rQTaskF = (SQTaskFile *)rhs;
// if (lCommitted < rQTaskF->commitID) {
// return -1;
// } else if (lCommitted > rQTaskF->commitID) {
// return 1;
// }
// return 0;
// }
#if 0
/** /**
* @brief At most time, there is only one qtaskinfo file committed latest in aTaskFile. Sometimes, there would be * @brief At most time, there is only one qtaskinfo file committed latest in aTaskFile. Sometimes, there would be
* multiple qtaskinfo files supporting snapshot replication. * multiple qtaskinfo files supporting snapshot replication.
* *
* @param pSma * @param pSma
* @param pRSmaStat * @param pStat
* @return int32_t * @return int32_t
*/ */
static int32_t tdCleanupQTaskInfoFiles(SSma *pSma, SRSmaStat *pRSmaStat) { static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pStat) {
SVnode *pVnode = pSma->pVnode; SVnode *pVnode = pSma->pVnode;
int64_t committed = pRSmaStat->commitAppliedVer; SRSmaFS *pFS = &pStat->fs;
SArray *aTaskFile = pRSmaStat->aTaskFile; int64_t committed = pStat->commitAppliedVer;
char qTaskInfoFullName[TSDB_FILENAME_LEN];
for (int32_t i = 0; i < taosArrayGetSize(pFS->aQTaskInf);) {
SQTaskFile *pTaskF = taosArrayGet(pFS->aQTaskInf, i);
if (atomic_sub_fetch_32(&pTaskF->nRef, 1) <= 0) {
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->version, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFullName);
if (taosRemoveFile(qTaskInfoFullName) < 0) {
smaWarn("vgId:%d, cleanup qinf, failed to remove %s since %s", TD_VID(pVnode), qTaskInfoFullName,
tstrerror(TAOS_SYSTEM_ERROR(errno)));
} else {
smaDebug("vgId:%d, cleanup qinf, success to remove %s", TD_VID(pVnode), qTaskInfoFullName);
}
taosArrayRemove(pFS->aQTaskInf, i);
continue;
}
++i;
}
void *qTaskFile = taosArraySearch(aTaskFile, committed, tCmprQTaskFile, TD_LE); SQTaskFile qFile = {.nRef = 1, .padding = 0, .version = committed, .size = 0};
if (tdRSmaFSUpsertQFile(pFS, &qFile) < 0) {
return TSDB_CODE_FAILED;
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
#endif
/** /**
* @brief post-commit for rollup sma * @brief post-commit for rollup sma
...@@ -290,8 +221,7 @@ static int32_t tdProcessRSmaSyncPostCommitImpl(SSma *pSma) { ...@@ -290,8 +221,7 @@ static int32_t tdProcessRSmaSyncPostCommitImpl(SSma *pSma) {
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma); SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma);
// cleanup outdated qtaskinfo files tdUpdateQTaskInfoFiles(pSma, pRSmaStat);
tdCleanupQTaskInfoFiles(pSma, pRSmaStat);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -488,8 +418,7 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) { ...@@ -488,8 +418,7 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) {
// unlock // unlock
// taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); // taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
// step 2: cleanup outdated qtaskinfo files tdUpdateQTaskInfoFiles(pSma, pRSmaStat);
tdCleanupQTaskInfoFiles(pSma, pRSmaStat);
atomic_store_8(RSMA_COMMIT_STAT(pRSmaStat), 0); atomic_store_8(RSMA_COMMIT_STAT(pRSmaStat), 0);
......
...@@ -28,6 +28,8 @@ static int32_t tdInitSmaEnv(SSma *pSma, int8_t smaType, SSmaEnv **ppEnv); ...@@ -28,6 +28,8 @@ static int32_t tdInitSmaEnv(SSma *pSma, int8_t smaType, SSmaEnv **ppEnv);
static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pSma); static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pSma);
static int32_t tdRsmaStartExecutor(const SSma *pSma); static int32_t tdRsmaStartExecutor(const SSma *pSma);
static int32_t tdRsmaStopExecutor(const SSma *pSma); static int32_t tdRsmaStopExecutor(const SSma *pSma);
static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType);
static void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType);
static void *tdFreeTSmaStat(STSmaStat *pStat); static void *tdFreeTSmaStat(STSmaStat *pStat);
static void tdDestroyRSmaStat(void *pRSmaStat); static void tdDestroyRSmaStat(void *pRSmaStat);
...@@ -244,6 +246,11 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS ...@@ -244,6 +246,11 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS
if (tdRsmaStartExecutor(pSma) < 0) { if (tdRsmaStartExecutor(pSma) < 0) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
if (!(pRSmaStat->fs.aQTaskInf = taosArrayInit(1, sizeof(SQTaskFile)))) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_FAILED;
}
} else if (smaType == TSDB_SMA_TYPE_TIME_RANGE) { } else if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
// TODO // TODO
} else { } else {
...@@ -307,12 +314,15 @@ static void tdDestroyRSmaStat(void *pRSmaStat) { ...@@ -307,12 +314,15 @@ static void tdDestroyRSmaStat(void *pRSmaStat) {
// step 4: // step 4:
tdRsmaStopExecutor(pSma); tdRsmaStopExecutor(pSma);
// step 5: free pStat // step 5:
tdRSmaFSClose(&pStat->fs);
// step 6: free pStat
taosMemoryFreeClear(pStat); taosMemoryFreeClear(pStat);
} }
} }
void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType) { static void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType) {
tdDestroySmaState(pSmaStat, smaType); tdDestroySmaState(pSmaStat, smaType);
if (smaType == TSDB_SMA_TYPE_TIME_RANGE) { if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
taosMemoryFreeClear(pSmaStat); taosMemoryFreeClear(pSmaStat);
...@@ -329,7 +339,7 @@ void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType) { ...@@ -329,7 +339,7 @@ void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType) {
* @return int32_t * @return int32_t
*/ */
int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType) { static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType) {
if (pSmaStat) { if (pSmaStat) {
if (smaType == TSDB_SMA_TYPE_TIME_RANGE) { if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
tdDestroyTSmaStat(SMA_STAT_TSMA(pSmaStat)); tdDestroyTSmaStat(SMA_STAT_TSMA(pSmaStat));
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "sma.h"
// =================================================================================================
static int32_t tdFetchQTaskInfoFiles(SSma *pSma, int64_t version, SArray **output);
/**
* @brief Open RSma FS from qTaskInfo files
*
* @param pSma
* @param version
* @return int32_t
*/
int32_t tdRSmaFSOpen(SSma *pSma, int64_t version) {
SVnode *pVnode = pSma->pVnode;
int64_t commitID = pVnode->state.commitID;
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pStat = NULL;
SArray *output = NULL;
SRSmaFS *fs = NULL;
if (!pEnv) {
return TSDB_CODE_SUCCESS;
}
if (tdFetchQTaskInfoFiles(pSma, version, &output) < 0) {
goto _end;
}
pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
fs = &pStat->fs;
for (int32_t i = 0; i < taosArrayGetSize(output); ++i) {
int32_t vid = 0;
int64_t version = -1;
sscanf((const char *)taosArrayGetP(output, i), "v%dqinfo.v%" PRIi64, &vid, &version);
SQTaskFile qTaskFile = {.version = version, .nRef = 1};
if ((terrno = tdRSmaFSUpsertQFile(fs, &qTaskFile)) < 0) {
goto _end;
}
smaInfo("vgId:%d, open fs, version:%" PRIi64 ", ref:%" PRIi64, TD_VID(pVnode), qTaskFile.version, qTaskFile.nRef);
}
_end:
for (int32_t i = 0; i < taosArrayGetSize(output); ++i) {
void *ptr = taosArrayGetP(output, i);
taosMemoryFreeClear(ptr);
}
taosArrayDestroy(output);
if (terrno != 0) {
smaError("vgId:%d, open rsma fs failed since %s", TD_VID(pVnode), terrstr());
return TSDB_CODE_FAILED;
}
return TSDB_CODE_SUCCESS;
}
void tdRSmaFSClose(SRSmaFS *fs) { taosArrayDestroy(fs->aQTaskInf); }
/**
* @brief Fetch qtaskfiles no more than version
*
* @param pSma
* @param version
* @param output
* @return int32_t
*/
static int32_t tdFetchQTaskInfoFiles(SSma *pSma, int64_t version, SArray **output) {
SVnode *pVnode = pSma->pVnode;
TdDirPtr pDir = NULL;
TdDirEntryPtr pDirEntry = NULL;
char dir[TSDB_FILENAME_LEN];
const char *pattern = "v[0-9]+qinf\\.v([0-9]+)?$";
regex_t regex;
int code = 0;
tdGetVndDirName(TD_VID(pVnode), tfsGetPrimaryPath(pVnode->pTfs), VNODE_RSMA_DIR, true, dir);
// Resource allocation and init
if ((code = regcomp(&regex, pattern, REG_EXTENDED)) != 0) {
terrno = TSDB_CODE_RSMA_REGEX_MATCH;
char errbuf[128];
regerror(code, &regex, errbuf, sizeof(errbuf));
smaWarn("vgId:%d, fetch qtask files, regcomp for %s failed since %s", TD_VID(pVnode), dir, errbuf);
return TSDB_CODE_FAILED;
}
if ((pDir = taosOpenDir(dir)) == NULL) {
regfree(&regex);
terrno = TAOS_SYSTEM_ERROR(errno);
smaDebug("vgId:%d, fetch qtask files, open dir %s failed since %s", TD_VID(pVnode), dir, terrstr());
return TSDB_CODE_FAILED;
}
int32_t dirLen = strlen(dir);
char *dirEnd = POINTER_SHIFT(dir, dirLen);
regmatch_t regMatch[2];
while ((pDirEntry = taosReadDir(pDir)) != NULL) {
char *entryName = taosGetDirEntryName(pDirEntry);
if (!entryName) {
continue;
}
code = regexec(&regex, entryName, 2, regMatch, 0);
if (code == 0) {
// match
smaInfo("vgId:%d, fetch qtask files, max ver:%" PRIi64 ", %s found", TD_VID(pVnode), version, entryName);
int64_t ver = -1;
sscanf((const char *)POINTER_SHIFT(entryName, regMatch[1].rm_so), "%" PRIi64, &ver);
if ((ver <= version) && (ver > -1)) {
if (!(*output)) {
if (!(*output = taosArrayInit(1, POINTER_BYTES))) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
}
char *entryDup = strdup(entryName);
if (!entryDup) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
if (!taosArrayPush(*output, &entryDup)) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
}
} else if (code == REG_NOMATCH) {
// not match
smaTrace("vgId:%d, fetch qtask files, not match %s", TD_VID(pVnode), entryName);
continue;
} else {
// has other error
char errbuf[128];
regerror(code, &regex, errbuf, sizeof(errbuf));
smaWarn("vgId:%d, fetch qtask files, regexec failed since %s", TD_VID(pVnode), errbuf);
terrno = TSDB_CODE_RSMA_REGEX_MATCH;
goto _end;
}
}
_end:
taosCloseDir(&pDir);
regfree(&regex);
return terrno == 0 ? TSDB_CODE_SUCCESS : TSDB_CODE_FAILED;
}
static int32_t tdQTaskFileCmprFn(const void *p1, const void *p2) {
if (((SQTaskFile *)p1)->version < ((SQTaskFile *)p2)->version) {
return -1;
} else if (((SQTaskFile *)p1)->version > ((SQTaskFile *)p2)->version) {
return 1;
}
return 0;
}
int32_t tdRSmaFSUpsertQFile(SRSmaFS *pFS, SQTaskFile *qTaskFile) {
int32_t code = 0;
int32_t idx = taosArraySearchIdx(pFS->aQTaskInf, qTaskFile, tdQTaskFileCmprFn, TD_GE);
if (idx < 0) {
idx = taosArrayGetSize(pFS->aQTaskInf);
} else {
SQTaskFile *pTaskF = (SQTaskFile *)taosArrayGet(pFS->aQTaskInf, idx);
int32_t c = tdQTaskFileCmprFn(pTaskF, qTaskFile);
if (c == 0) {
pTaskF->nRef = qTaskFile->nRef;
pTaskF->version = qTaskFile->version;
pTaskF->size = qTaskFile->size;
goto _exit;
}
}
if (taosArrayInsert(pFS->aQTaskInf, idx, qTaskFile) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
_exit:
return code;
}
...@@ -150,7 +150,7 @@ int32_t smaOpen(SVnode *pVnode) { ...@@ -150,7 +150,7 @@ int32_t smaOpen(SVnode *pVnode) {
} }
// restore the rsma // restore the rsma
if (tdRsmaRestore(pSma, RSMA_RESTORE_REBOOT, pVnode->state.committed) < 0) { if (tdRSmaRestore(pSma, RSMA_RESTORE_REBOOT, pVnode->state.committed) < 0) {
goto _err; goto _err;
} }
} }
...@@ -181,8 +181,8 @@ int32_t smaClose(SSma *pSma) { ...@@ -181,8 +181,8 @@ int32_t smaClose(SSma *pSma) {
* @param committedVer * @param committedVer
* @return int32_t * @return int32_t
*/ */
int32_t tdRsmaRestore(SSma *pSma, int8_t type, int64_t committedVer) { int32_t tdRSmaRestore(SSma *pSma, int8_t type, int64_t committedVer) {
ASSERT(VND_IS_RSMA(pSma->pVnode)); ASSERT(VND_IS_RSMA(pSma->pVnode));
return tdProcessRSmaRestoreImpl(pSma, type, committedVer); return tdRSmaProcessRestoreImpl(pSma, type, committedVer);
} }
\ No newline at end of file
...@@ -46,6 +46,8 @@ static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo, SArray *pSu ...@@ -46,6 +46,8 @@ static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo, SArray *pSu
static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema,
int64_t suid); int64_t suid);
static void tdRSmaFetchTrigger(void *param, void *tmrId); static void tdRSmaFetchTrigger(void *param, void *tmrId);
static int32_t tdRSmaInfoClone(SSma *pSma, SRSmaInfo *pInfo);
static void tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level);
static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskInfoIter *pIter, STFile *pTFile); static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskInfoIter *pIter, STFile *pTFile);
static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isFinish); static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isFinish);
static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, int8_t type, SRSmaQTaskInfoIter *pIter); static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, int8_t type, SRSmaQTaskInfoIter *pIter);
...@@ -96,7 +98,7 @@ static FORCE_INLINE int32_t tdRSmaQTaskInfoContLen(int32_t lenWithHead) { ...@@ -96,7 +98,7 @@ static FORCE_INLINE int32_t tdRSmaQTaskInfoContLen(int32_t lenWithHead) {
static FORCE_INLINE void tdRSmaQTaskInfoIterDestroy(SRSmaQTaskInfoIter *pIter) { taosMemoryFreeClear(pIter->pBuf); } static FORCE_INLINE void tdRSmaQTaskInfoIterDestroy(SRSmaQTaskInfoIter *pIter) { taosMemoryFreeClear(pIter->pBuf); }
void tdFreeQTaskInfo(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level) { static void tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level) {
// Note: free/kill may in RC // Note: free/kill may in RC
if (!taskHandle || !(*taskHandle)) return; if (!taskHandle || !(*taskHandle)) return;
qTaskInfo_t otaskHandle = atomic_load_ptr(taskHandle); qTaskInfo_t otaskHandle = atomic_load_ptr(taskHandle);
...@@ -129,14 +131,14 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) { ...@@ -129,14 +131,14 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) {
} }
if (isDeepFree && pInfo->taskInfo[i]) { if (isDeepFree && pInfo->taskInfo[i]) {
tdFreeQTaskInfo(&pInfo->taskInfo[i], SMA_VID(pSma), i + 1); tdRSmaQTaskInfoFree(&pInfo->taskInfo[i], SMA_VID(pSma), i + 1);
} else { } else {
smaDebug("vgId:%d, table %" PRIi64 " no need to destroy rsma info level %d since empty taskInfo", SMA_VID(pSma), smaDebug("vgId:%d, table %" PRIi64 " no need to destroy rsma info level %d since empty taskInfo", SMA_VID(pSma),
pInfo->suid, i + 1); pInfo->suid, i + 1);
} }
if (pInfo->iTaskInfo[i]) { if (pInfo->iTaskInfo[i]) {
tdFreeQTaskInfo(&pInfo->iTaskInfo[i], SMA_VID(pSma), i + 1); tdRSmaQTaskInfoFree(&pInfo->iTaskInfo[i], SMA_VID(pSma), i + 1);
} else { } else {
smaDebug("vgId:%d, table %" PRIi64 " no need to destroy rsma info level %d since empty iTaskInfo", smaDebug("vgId:%d, table %" PRIi64 " no need to destroy rsma info level %d since empty iTaskInfo",
SMA_VID(pSma), pInfo->suid, i + 1); SMA_VID(pSma), pInfo->suid, i + 1);
...@@ -330,7 +332,7 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat ...@@ -330,7 +332,7 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
* @param tbName * @param tbName
* @return int32_t * @return int32_t
*/ */
int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName) { int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName) {
if ((param->qmsgLen[0] == 0) && (param->qmsgLen[1] == 0)) { if ((param->qmsgLen[0] == 0) && (param->qmsgLen[1] == 0)) {
smaDebug("vgId:%d, no qmsg1/qmsg2 for rollup table %s %" PRIi64, SMA_VID(pSma), tbName, suid); smaDebug("vgId:%d, no qmsg1/qmsg2 for rollup table %s %" PRIi64, SMA_VID(pSma), tbName, suid);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -427,7 +429,7 @@ int32_t tdProcessRSmaCreate(SSma *pSma, SVCreateStbReq *pReq) { ...@@ -427,7 +429,7 @@ int32_t tdProcessRSmaCreate(SSma *pSma, SVCreateStbReq *pReq) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
return tdProcessRSmaCreateImpl(pSma, &pReq->rsmaParam, pReq->suid, pReq->name); return tdRSmaProcessCreateImpl(pSma, &pReq->rsmaParam, pReq->suid, pReq->name);
} }
/** /**
...@@ -817,6 +819,95 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, ...@@ -817,6 +819,95 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize,
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t tdCloneQTaskInfo(SSma *pSma, qTaskInfo_t dstTaskInfo, qTaskInfo_t srcTaskInfo, SRSmaParam *param,
tb_uid_t suid, int8_t idx) {
SVnode *pVnode = pSma->pVnode;
char *pOutput = NULL;
int32_t len = 0;
if ((terrno = qSerializeTaskStatus(srcTaskInfo, &pOutput, &len)) < 0) {
smaError("vgId:%d, rsma clone, table %" PRIi64 " serialize qTaskInfo failed since %s", TD_VID(pVnode), suid,
terrstr());
goto _err;
}
SReadHandle handle = {
.meta = pVnode->pMeta,
.vnode = pVnode,
.initTqReader = 1,
};
ASSERT(!dstTaskInfo);
dstTaskInfo = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle);
if (!dstTaskInfo) {
terrno = TSDB_CODE_RSMA_QTASKINFO_CREATE;
goto _err;
}
if (qDeserializeTaskStatus(dstTaskInfo, pOutput, len) < 0) {
smaError("vgId:%d, rsma clone, restore rsma task for table:%" PRIi64 " failed since %s", TD_VID(pVnode), suid,
terrstr());
goto _err;
}
smaDebug("vgId:%d, rsma clone, restore rsma task for table:%" PRIi64 " succeed", TD_VID(pVnode), suid);
taosMemoryFreeClear(pOutput);
return TSDB_CODE_SUCCESS;
_err:
taosMemoryFreeClear(pOutput);
tdRSmaQTaskInfoFree(dstTaskInfo, TD_VID(pVnode), idx + 1);
smaError("vgId:%d, rsma clone, restore rsma task for table:%" PRIi64 " failed since %s", TD_VID(pVnode), suid,
terrstr());
return TSDB_CODE_FAILED;
}
/**
* @brief Clone qTaskInfo of SRSmaInfo
*
* @param pSma
* @param pInfo
* @return int32_t
*/
static int32_t tdRSmaInfoClone(SSma *pSma, SRSmaInfo *pInfo) {
SRSmaParam *param = NULL;
if (!pInfo) {
return TSDB_CODE_SUCCESS;
}
SMetaReader mr = {0};
metaReaderInit(&mr, SMA_META(pSma), 0);
smaDebug("vgId:%d, rsma clone qTaskInfo for suid:%" PRIi64, SMA_VID(pSma), pInfo->suid);
if (metaGetTableEntryByUid(&mr, pInfo->suid) < 0) {
smaError("vgId:%d, rsma clone, failed to get table meta for %" PRIi64 " since %s", SMA_VID(pSma), pInfo->suid,
terrstr());
goto _err;
}
ASSERT(mr.me.type == TSDB_SUPER_TABLE);
ASSERT(mr.me.uid == pInfo->suid);
if (TABLE_IS_ROLLUP(mr.me.flags)) {
param = &mr.me.stbEntry.rsmaParam;
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
if (!pInfo->iTaskInfo[i]) {
continue;
}
if (tdCloneQTaskInfo(pSma, pInfo->taskInfo[i], pInfo->iTaskInfo[i], param, pInfo->suid, i) < 0) {
goto _err;
}
}
smaDebug("vgId:%d, rsma clone env success for %" PRIi64, SMA_VID(pSma), pInfo->suid);
} else {
terrno = TSDB_CODE_RSMA_INVALID_SCHEMA;
goto _err;
}
metaReaderClear(&mr);
return TSDB_CODE_SUCCESS;
_err:
metaReaderClear(&mr);
smaError("vgId:%d, rsma clone env failed for %" PRIi64 " since %s", SMA_VID(pSma), pInfo->suid, terrstr());
return TSDB_CODE_FAILED;
}
/** /**
* @brief During async commit, the SRSmaInfo object would be COW from iRSmaInfoHash and write lock should be applied. * @brief During async commit, the SRSmaInfo object would be COW from iRSmaInfoHash and write lock should be applied.
* *
...@@ -848,7 +939,7 @@ static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid) { ...@@ -848,7 +939,7 @@ static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid) {
return NULL; return NULL;
} }
if (!pRSmaInfo->taskInfo[0]) { if (!pRSmaInfo->taskInfo[0]) {
if (tdCloneRSmaInfo(pSma, pRSmaInfo) < 0) { if (tdRSmaInfoClone(pSma, pRSmaInfo) < 0) {
// taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); // taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
return NULL; return NULL;
} }
...@@ -1006,7 +1097,7 @@ static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables) { ...@@ -1006,7 +1097,7 @@ static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables) {
" qmsgLen:%" PRIi32, " qmsgLen:%" PRIi32,
TD_VID(pVnode), suid, i, param->maxdelay[i], param->watermark[i], param->qmsgLen[i]); TD_VID(pVnode), suid, i, param->maxdelay[i], param->watermark[i], param->qmsgLen[i]);
} }
if (tdProcessRSmaCreateImpl(pSma, &mr.me.stbEntry.rsmaParam, suid, mr.me.name) < 0) { if (tdRSmaProcessCreateImpl(pSma, &mr.me.stbEntry.rsmaParam, suid, mr.me.name) < 0) {
smaError("vgId:%d, rsma restore env failed for %" PRIi64 " since %s", TD_VID(pVnode), suid, terrstr()); smaError("vgId:%d, rsma restore env failed for %" PRIi64 " since %s", TD_VID(pVnode), suid, terrstr());
goto _err; goto _err;
} }
...@@ -1118,7 +1209,7 @@ static int32_t tdRSmaRestoreTSDataReload(SSma *pSma) { ...@@ -1118,7 +1209,7 @@ static int32_t tdRSmaRestoreTSDataReload(SSma *pSma) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t tdProcessRSmaRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer) { int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer) {
// step 1: iterate all stables to restore the rsma env // step 1: iterate all stables to restore the rsma env
int64_t nTables = 0; int64_t nTables = 0;
if (tdRSmaRestoreQTaskInfoInit(pSma, &nTables) < 0) { if (tdRSmaRestoreQTaskInfoInit(pSma, &nTables) < 0) {
...@@ -1139,6 +1230,12 @@ int32_t tdProcessRSmaRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer) ...@@ -1139,6 +1230,12 @@ int32_t tdProcessRSmaRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer)
if (tdRSmaRestoreTSDataReload(pSma) < 0) { if (tdRSmaRestoreTSDataReload(pSma) < 0) {
goto _err; goto _err;
} }
// step 4: open SRSmaFS for qTaskFiles
if (tdRSmaFSOpen(pSma, qtaskFileVer) < 0) {
goto _err;
}
smaInfo("vgId:%d, restore rsma task %" PRIi8 " from qtaskf %" PRIi64 " succeed", SMA_VID(pSma), type, qtaskFileVer); smaInfo("vgId:%d, restore rsma task %" PRIi8 " from qtaskf %" PRIi64 " succeed", SMA_VID(pSma), type, qtaskFileVer);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_err: _err:
......
...@@ -332,7 +332,7 @@ int32_t rsmaSnapWriterClose(SRsmaSnapWriter** ppWriter, int8_t rollback) { ...@@ -332,7 +332,7 @@ int32_t rsmaSnapWriterClose(SRsmaSnapWriter** ppWriter, int8_t rollback) {
pWriter->pQTaskFWriter->fname, qTaskInfoFullName); pWriter->pQTaskFWriter->fname, qTaskInfoFullName);
// rsma restore // rsma restore
if ((code = tdRsmaRestore(pWriter->pSma, RSMA_RESTORE_SYNC, pWriter->ever)) < 0) { if ((code = tdRSmaRestore(pWriter->pSma, RSMA_RESTORE_SYNC, pWriter->ever)) < 0) {
goto _err; goto _err;
} }
smaInfo("vgId:%d, vnode snapshot rsma writer restore from %s succeed", SMA_VID(pWriter->pSma), qTaskInfoFullName); smaInfo("vgId:%d, vnode snapshot rsma writer restore from %s succeed", SMA_VID(pWriter->pSma), qTaskInfoFullName);
......
...@@ -20,6 +20,10 @@ ...@@ -20,6 +20,10 @@
#define SMA_STORAGE_MINUTES_DAY 1440 #define SMA_STORAGE_MINUTES_DAY 1440
#define SMA_STORAGE_SPLIT_FACTOR 14400 // least records in tsma file #define SMA_STORAGE_SPLIT_FACTOR 14400 // least records in tsma file
static int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg);
static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg);
static int32_t tdProcessTSmaGetDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days);
// TODO: Who is responsible for resource allocate and release? // TODO: Who is responsible for resource allocate and release?
int32_t tdProcessTSmaInsert(SSma *pSma, int64_t indexUid, const char *msg) { int32_t tdProcessTSmaInsert(SSma *pSma, int64_t indexUid, const char *msg) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
...@@ -59,7 +63,7 @@ int32_t smaGetTSmaDays(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t * ...@@ -59,7 +63,7 @@ int32_t smaGetTSmaDays(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *
* @param days unit is minute * @param days unit is minute
* @return int32_t * @return int32_t
*/ */
int32_t tdProcessTSmaGetDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days) { static int32_t tdProcessTSmaGetDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days) {
SDecoder coder = {0}; SDecoder coder = {0};
tDecoderInit(&coder, pCont, contLen); tDecoderInit(&coder, pCont, contLen);
...@@ -106,7 +110,7 @@ _err: ...@@ -106,7 +110,7 @@ _err:
* @param pMsg * @param pMsg
* @return int32_t * @return int32_t
*/ */
int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg) { static int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg) {
SSmaCfg *pCfg = (SSmaCfg *)pMsg; SSmaCfg *pCfg = (SSmaCfg *)pMsg;
if (TD_VID(pSma->pVnode) == pCfg->dstVgId) { if (TD_VID(pSma->pVnode) == pCfg->dstVgId) {
...@@ -145,7 +149,7 @@ int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg) { ...@@ -145,7 +149,7 @@ int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg) {
* @param msg * @param msg
* @return int32_t * @return int32_t
*/ */
int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) { static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
const SArray *pDataBlocks = (const SArray *)msg; const SArray *pDataBlocks = (const SArray *)msg;
// TODO: destroy SSDataBlocks(msg) // TODO: destroy SSDataBlocks(msg)
if (!pDataBlocks) { if (!pDataBlocks) {
......
...@@ -305,93 +305,4 @@ int32_t tdReleaseSmaRef(int32_t rsetId, int64_t refId) { ...@@ -305,93 +305,4 @@ int32_t tdReleaseSmaRef(int32_t rsetId, int64_t refId) {
smaDebug("rsma release ref for rsetId:%" PRIi64 " refId:%d success", rsetId, refId); smaDebug("rsma release ref for rsetId:%" PRIi64 " refId:%d success", rsetId, refId);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
}
static int32_t tdCloneQTaskInfo(SSma *pSma, qTaskInfo_t dstTaskInfo, qTaskInfo_t srcTaskInfo, SRSmaParam *param,
tb_uid_t suid, int8_t idx) {
SVnode *pVnode = pSma->pVnode;
char *pOutput = NULL;
int32_t len = 0;
if ((terrno = qSerializeTaskStatus(srcTaskInfo, &pOutput, &len)) < 0) {
smaError("vgId:%d, rsma clone, table %" PRIi64 " serialize qTaskInfo failed since %s", TD_VID(pVnode), suid,
terrstr());
goto _err;
}
SReadHandle handle = {
.meta = pVnode->pMeta,
.vnode = pVnode,
.initTqReader = 1,
};
ASSERT(!dstTaskInfo);
dstTaskInfo = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle);
if (!dstTaskInfo) {
terrno = TSDB_CODE_RSMA_QTASKINFO_CREATE;
goto _err;
}
if (qDeserializeTaskStatus(dstTaskInfo, pOutput, len) < 0) {
smaError("vgId:%d, rsma clone, restore rsma task for table:%" PRIi64 " failed since %s", TD_VID(pVnode), suid,
terrstr());
goto _err;
}
smaDebug("vgId:%d, rsma clone, restore rsma task for table:%" PRIi64 " succeed", TD_VID(pVnode), suid);
taosMemoryFreeClear(pOutput);
return TSDB_CODE_SUCCESS;
_err:
taosMemoryFreeClear(pOutput);
tdFreeQTaskInfo(dstTaskInfo, TD_VID(pVnode), idx + 1);
smaError("vgId:%d, rsma clone, restore rsma task for table:%" PRIi64 " failed since %s", TD_VID(pVnode), suid,
terrstr());
return TSDB_CODE_FAILED;
}
/**
* @brief Clone qTaskInfo of SRSmaInfo
*
* @param pSma
* @param pInfo
* @return int32_t
*/
int32_t tdCloneRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) {
SRSmaParam *param = NULL;
if (!pInfo) {
return TSDB_CODE_SUCCESS;
}
SMetaReader mr = {0};
metaReaderInit(&mr, SMA_META(pSma), 0);
smaDebug("vgId:%d, rsma clone qTaskInfo for suid:%" PRIi64, SMA_VID(pSma), pInfo->suid);
if (metaGetTableEntryByUid(&mr, pInfo->suid) < 0) {
smaError("vgId:%d, rsma clone, failed to get table meta for %" PRIi64 " since %s", SMA_VID(pSma), pInfo->suid,
terrstr());
goto _err;
}
ASSERT(mr.me.type == TSDB_SUPER_TABLE);
ASSERT(mr.me.uid == pInfo->suid);
if (TABLE_IS_ROLLUP(mr.me.flags)) {
param = &mr.me.stbEntry.rsmaParam;
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
if (!pInfo->iTaskInfo[i]) {
continue;
}
if (tdCloneQTaskInfo(pSma, pInfo->taskInfo[i], pInfo->iTaskInfo[i], param, pInfo->suid, i) < 0) {
goto _err;
}
}
smaDebug("vgId:%d, rsma clone env success for %" PRIi64, SMA_VID(pSma), pInfo->suid);
} else {
terrno = TSDB_CODE_RSMA_INVALID_SCHEMA;
goto _err;
}
metaReaderClear(&mr);
return TSDB_CODE_SUCCESS;
_err:
metaReaderClear(&mr);
smaError("vgId:%d, rsma clone env failed for %" PRIi64 " since %s", SMA_VID(pSma), pInfo->suid, terrstr());
return TSDB_CODE_FAILED;
} }
\ No newline at end of file
...@@ -618,6 +618,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_REMOVE_EXISTS, "Rsma remove exists" ...@@ -618,6 +618,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_REMOVE_EXISTS, "Rsma remove exists"
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP, "Rsma fetch msg is messed up") TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP, "Rsma fetch msg is messed up")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_EMPTY_INFO, "Rsma info is empty") TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_EMPTY_INFO, "Rsma info is empty")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_SCHEMA, "Rsma invalid schema") TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_SCHEMA, "Rsma invalid schema")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_REGEX_MATCH, "Rsma regex match")
//index //index
TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_REBUILDING, "Index is rebuilding") TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_REBUILDING, "Index is rebuilding")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册