提交 efb2cd27 编写于 作者: C Cary Xu

refactor: add tref to solve the timer not stopped problem

上级 c6be5197
...@@ -28,7 +28,8 @@ extern "C" { ...@@ -28,7 +28,8 @@ extern "C" {
#define smaError(...) do { if (smaDebugFlag & DEBUG_ERROR) { taosPrintLog("SMA ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0) #define smaError(...) do { if (smaDebugFlag & DEBUG_ERROR) { taosPrintLog("SMA ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0)
#define smaWarn(...) do { if (smaDebugFlag & DEBUG_WARN) { taosPrintLog("SMA WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0) #define smaWarn(...) do { if (smaDebugFlag & DEBUG_WARN) { taosPrintLog("SMA WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0)
#define smaInfo(...) do { if (smaDebugFlag & DEBUG_INFO) { taosPrintLog("SMA ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0) #define smaInfo(...) do { if (smaDebugFlag & DEBUG_INFO) { taosPrintLog("SMA ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0)
#define smaDebug(...) do { if (smaDebugFlag & DEBUG_DEBUG) { taosPrintLog("SMA ", DEBUG_DEBUG, tsdbDebugFlag, __VA_ARGS__); }} while(0) // #define smaDebug(...) do { if (smaDebugFlag & DEBUG_DEBUG) { taosPrintLog("SMA ", DEBUG_DEBUG, tsdbDebugFlag, __VA_ARGS__); }} while(0)
#define smaDebug(...) do { if (smaDebugFlag & DEBUG_WARN) { taosPrintLog("SMA WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0)
#define smaTrace(...) do { if (smaDebugFlag & DEBUG_TRACE) { taosPrintLog("SMA ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0) #define smaTrace(...) do { if (smaDebugFlag & DEBUG_TRACE) { taosPrintLog("SMA ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0)
// clang-format on // clang-format on
...@@ -46,6 +47,11 @@ struct SSmaEnv { ...@@ -46,6 +47,11 @@ struct SSmaEnv {
SSmaStat *pStat; SSmaStat *pStat;
}; };
typedef struct {
int32_t smaRef;
int32_t refId;
} SSmaMgmt;
#define SMA_ENV_LOCK(env) ((env)->lock) #define SMA_ENV_LOCK(env) ((env)->lock)
#define SMA_ENV_TYPE(env) ((env)->type) #define SMA_ENV_TYPE(env) ((env)->type)
#define SMA_ENV_STAT(env) ((env)->pStat) #define SMA_ENV_STAT(env) ((env)->pStat)
...@@ -58,6 +64,7 @@ struct STSmaStat { ...@@ -58,6 +64,7 @@ struct STSmaStat {
struct SRSmaStat { struct SRSmaStat {
SSma *pSma; SSma *pSma;
int64_t refId;
void *tmrHandle; void *tmrHandle;
tmr_h tmrId; tmr_h tmrId;
int32_t tmrSeconds; int32_t tmrSeconds;
...@@ -73,6 +80,7 @@ struct SSmaStat { ...@@ -73,6 +80,7 @@ struct SSmaStat {
}; };
T_REF_DECLARE() T_REF_DECLARE()
}; };
#define SMA_TSMA_STAT(s) (&(s)->tsmaStat) #define SMA_TSMA_STAT(s) (&(s)->tsmaStat)
#define SMA_RSMA_STAT(s) (&(s)->rsmaStat) #define SMA_RSMA_STAT(s) (&(s)->rsmaStat)
#define RSMA_INFO_HASH(r) ((r)->rsmaInfoHash) #define RSMA_INFO_HASH(r) ((r)->rsmaInfoHash)
...@@ -80,6 +88,7 @@ struct SSmaStat { ...@@ -80,6 +88,7 @@ struct SSmaStat {
#define RSMA_TMR_HANDLE(r) ((r)->tmrHandle) #define RSMA_TMR_HANDLE(r) ((r)->tmrHandle)
#define RSMA_TRIGGER_STAT(r) (&(r)->triggerStat) #define RSMA_TRIGGER_STAT(r) (&(r)->triggerStat)
#define RSMA_RUNNING_STAT(r) (&(r)->runningStat) #define RSMA_RUNNING_STAT(r) (&(r)->runningStat)
#define RSMA_REF_ID(r) ((r)->refId)
enum { enum {
TASK_TRIGGER_STAT_INIT = 0, TASK_TRIGGER_STAT_INIT = 0,
...@@ -192,10 +201,18 @@ typedef struct STFInfo STFInfo; ...@@ -192,10 +201,18 @@ typedef struct STFInfo STFInfo;
typedef struct STFile STFile; typedef struct STFile STFile;
struct STFInfo { struct STFInfo {
// common fields
uint32_t magic; uint32_t magic;
uint32_t ftype; uint32_t ftype;
uint32_t fver; uint32_t fver;
int64_t fsize; int64_t fsize;
// specific fields
union {
struct {
int64_t applyVer[2];
} qTaskInfo;
};
}; };
struct STFile { struct STFile {
...@@ -230,7 +247,7 @@ int32_t tdUpdateTFileHeader(STFile *pTFile); ...@@ -230,7 +247,7 @@ int32_t tdUpdateTFileHeader(STFile *pTFile);
void tdUpdateTFileMagic(STFile *pTFile, void *pCksm); void tdUpdateTFileMagic(STFile *pTFile, void *pCksm);
void tdCloseTFile(STFile *pTFile); void tdCloseTFile(STFile *pTFile);
void tdGetVndFileName(int32_t vid, const char *dname, const char *fname, char *outputName); void tdGetVndFileName(int32_t vgId, const char *dname, const char *fname, char *outputName);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -27,6 +27,7 @@ ...@@ -27,6 +27,7 @@
#include "tdatablock.h" #include "tdatablock.h"
#include "tdb.h" #include "tdb.h"
#include "tencode.h" #include "tencode.h"
#include "tref.h"
#include "tfs.h" #include "tfs.h"
#include "tglobal.h" #include "tglobal.h"
#include "tjson.h" #include "tjson.h"
......
...@@ -18,6 +18,9 @@ ...@@ -18,6 +18,9 @@
typedef struct SSmaStat SSmaStat; typedef struct SSmaStat SSmaStat;
#define RSMA_TASK_INFO_HASH_SLOT 8 #define RSMA_TASK_INFO_HASH_SLOT 8
#define SMA_MGMT_REF_NUM 1024
extern SSmaMgmt smaMgmt;
// declaration of static functions // declaration of static functions
...@@ -25,6 +28,7 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *p ...@@ -25,6 +28,7 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *p
static SSmaEnv *tdNewSmaEnv(const SSma *pSma, int8_t smaType, const char *path); static SSmaEnv *tdNewSmaEnv(const SSma *pSma, int8_t smaType, const char *path);
static int32_t tdInitSmaEnv(SSma *pSma, int8_t smaType, const char *path, SSmaEnv **pEnv); static int32_t tdInitSmaEnv(SSma *pSma, int8_t smaType, const char *path, SSmaEnv **pEnv);
static void *tdFreeTSmaStat(STSmaStat *pStat); static void *tdFreeTSmaStat(STSmaStat *pStat);
static void tdDestroyRSmaStat(void *pRSmaStat);
// implementation // implementation
...@@ -128,6 +132,22 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS ...@@ -128,6 +132,22 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS
if (smaType == TSDB_SMA_TYPE_ROLLUP) { if (smaType == TSDB_SMA_TYPE_ROLLUP) {
SRSmaStat *pRSmaStat = (SRSmaStat *)(*pSmaStat); SRSmaStat *pRSmaStat = (SRSmaStat *)(*pSmaStat);
pRSmaStat->pSma = (SSma *)pSma; pRSmaStat->pSma = (SSma *)pSma;
// init smaMgmt
smaMgmt.smaRef = taosOpenRef(SMA_MGMT_REF_NUM, tdDestroyRSmaStat);
if (smaMgmt.refId < 0) {
smaError("init smaRef failed, num:%d", SMA_MGMT_REF_NUM);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_FAILED;
}
int64_t refId = taosAddRef(smaMgmt.smaRef, pRSmaStat);
if (refId < 0) {
smaError("taosAddRef smaRef failed, since:%s", tstrerror(terrno));
return TSDB_CODE_FAILED;
}
pRSmaStat->refId = refId;
// init timer // init timer
RSMA_TMR_HANDLE(pRSmaStat) = taosTmrInit(10000, 100, 10000, "RSMA"); RSMA_TMR_HANDLE(pRSmaStat) = taosTmrInit(10000, 100, 10000, "RSMA");
if (!RSMA_TMR_HANDLE(pRSmaStat)) { if (!RSMA_TMR_HANDLE(pRSmaStat)) {
...@@ -150,6 +170,7 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS ...@@ -150,6 +170,7 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS
} else { } else {
ASSERT(0); ASSERT(0);
} }
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -169,9 +190,10 @@ static void *tdFreeTSmaStat(STSmaStat *pStat) { ...@@ -169,9 +190,10 @@ static void *tdFreeTSmaStat(STSmaStat *pStat) {
return NULL; return NULL;
} }
static void tdDestroyRSmaStat(SRSmaStat *pStat) { static void tdDestroyRSmaStat(void *pRSmaStat) {
if (pStat) { if (pRSmaStat) {
smaDebug("vgId:%d destroy rsma stat", SMA_VID(pStat->pSma)); SRSmaStat *pStat = (SRSmaStat *)pRSmaStat;
smaDebug("vgId:%d %s:%d destroy rsma stat %p", SMA_VID(pStat->pSma), __func__, __LINE__, pRSmaStat);
// step 1: set persistence task cancelled // step 1: set persistence task cancelled
atomic_store_8(RSMA_TRIGGER_STAT(pStat), TASK_TRIGGER_STAT_CANCELLED); atomic_store_8(RSMA_TRIGGER_STAT(pStat), TASK_TRIGGER_STAT_CANCELLED);
...@@ -183,9 +205,10 @@ static void tdDestroyRSmaStat(SRSmaStat *pStat) { ...@@ -183,9 +205,10 @@ static void tdDestroyRSmaStat(SRSmaStat *pStat) {
if (atomic_load_8(RSMA_RUNNING_STAT(pStat)) == 1) { if (atomic_load_8(RSMA_RUNNING_STAT(pStat)) == 1) {
while (1) { while (1) {
if (atomic_load_8(RSMA_TRIGGER_STAT(pStat)) == TASK_TRIGGER_STAT_FINISHED) { if (atomic_load_8(RSMA_TRIGGER_STAT(pStat)) == TASK_TRIGGER_STAT_FINISHED) {
smaDebug("rsma, persist task finished already");
break; break;
} else { } else {
smaDebug("not destroyed since rsma stat in %" PRIi8, atomic_load_8(RSMA_TRIGGER_STAT(pStat))); smaDebug("rsma, persist task not finished yet since rsma stat in %" PRIi8, atomic_load_8(RSMA_TRIGGER_STAT(pStat)));
} }
++nLoops; ++nLoops;
if (nLoops > 1000) { if (nLoops > 1000) {
...@@ -209,7 +232,10 @@ static void tdDestroyRSmaStat(SRSmaStat *pStat) { ...@@ -209,7 +232,10 @@ static void tdDestroyRSmaStat(SRSmaStat *pStat) {
nLoops = 0; nLoops = 0;
while (1) { while (1) {
if (T_REF_VAL_GET((SSmaStat *)pStat) == 0) { if (T_REF_VAL_GET((SSmaStat *)pStat) == 0) {
smaDebug("rsma, all fetch task finished already");
break; break;
} else {
smaDebug("rsma, fetch tasks not all finished yet");
} }
++nLoops; ++nLoops;
if (nLoops > 1000) { if (nLoops > 1000) {
...@@ -222,18 +248,17 @@ static void tdDestroyRSmaStat(SRSmaStat *pStat) { ...@@ -222,18 +248,17 @@ static void tdDestroyRSmaStat(SRSmaStat *pStat) {
if (RSMA_TMR_HANDLE(pStat)) { if (RSMA_TMR_HANDLE(pStat)) {
taosTmrCleanUp(RSMA_TMR_HANDLE(pStat)); taosTmrCleanUp(RSMA_TMR_HANDLE(pStat));
} }
}
}
static void *tdFreeRSmaStat(SRSmaStat *pStat) { }
tdDestroyRSmaStat(pStat);
taosMemoryFreeClear(pStat);
return NULL;
} }
void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType) { void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType) {
tdDestroySmaState(pSmaStat, smaType); tdDestroySmaState(pSmaStat, smaType);
taosMemoryFreeClear(pSmaStat); if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
taosMemoryFreeClear(pSmaStat);
}
// tref used to free rsma stat
return NULL; return NULL;
} }
...@@ -243,17 +268,21 @@ void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType) { ...@@ -243,17 +268,21 @@ void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType) {
* @param pSmaStat * @param pSmaStat
* @return int32_t * @return int32_t
*/ */
int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType) { int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType) {
if (pSmaStat) { if (pSmaStat) {
if (smaType == TSDB_SMA_TYPE_TIME_RANGE) { if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
tdDestroyTSmaStat(SMA_TSMA_STAT(pSmaStat)); tdDestroyTSmaStat(SMA_TSMA_STAT(pSmaStat));
} else if (smaType == TSDB_SMA_TYPE_ROLLUP) { } else if (smaType == TSDB_SMA_TYPE_ROLLUP) {
tdDestroyRSmaStat(SMA_RSMA_STAT(pSmaStat)); SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSmaStat);
if (taosRemoveRef(smaMgmt.smaRef, RSMA_REF_ID(pRSmaStat)) < 0) {
smaError("remove refId from smaRef failed, refId:0x%" PRIx64, RSMA_REF_ID(pRSmaStat));
}
} else { } else {
ASSERT(0); ASSERT(0);
} }
} }
return TSDB_CODE_SUCCESS; return 0;
} }
int32_t tdLockSma(SSma *pSma) { int32_t tdLockSma(SSma *pSma) {
......
...@@ -18,9 +18,13 @@ ...@@ -18,9 +18,13 @@
#define RSMA_QTASKINFO_PERSIST_MS 7200000 #define RSMA_QTASKINFO_PERSIST_MS 7200000
#define RSMA_QTASKINFO_BUFSIZE 32768 #define RSMA_QTASKINFO_BUFSIZE 32768
#define RSMA_QTASKINFO_HEAD_LEN (sizeof(int32_t) + sizeof(int8_t) + sizeof(int64_t)) // len + type + suid #define RSMA_QTASKINFO_HEAD_LEN (sizeof(int32_t) + sizeof(int8_t) + sizeof(int64_t)) // len + type + suid
typedef enum { TD_QTASK_TMP_FILE = 0, TD_QTASK_CUR_FILE } TD_QTASK_FILE_T;
static const char *tdQTaskInfoFname[] = {"qtaskinfo.t", "qtaskinfo"};
SSmaMgmt smaMgmt = {
.smaRef = -1,
};
typedef enum { TD_QTASK_TMP_F = 0, TD_QTASK_CUR_F} TD_QTASK_FILE_T;
static const char *tdQTaskInfoFname[] = {"qtaskinfo.t", "qtaskinfo"};
typedef struct SRSmaQTaskInfoItem SRSmaQTaskInfoItem; typedef struct SRSmaQTaskInfoItem SRSmaQTaskInfoItem;
typedef struct SRSmaQTaskInfoIter SRSmaQTaskInfoIter; typedef struct SRSmaQTaskInfoIter SRSmaQTaskInfoIter;
...@@ -80,6 +84,10 @@ struct SRSmaQTaskInfoIter { ...@@ -80,6 +84,10 @@ struct SRSmaQTaskInfoIter {
int32_t nBufPos; int32_t nBufPos;
}; };
static void tdRSmaQTaskInfoGetFName(int32_t vgId, int8_t ftype, char *outputName) {
tdGetVndFileName(vgId, VNODE_RSMA_DIR, tdQTaskInfoFname[ftype], outputName);
}
static FORCE_INLINE int32_t tdRSmaQTaskInfoContLen(int32_t lenWithHead) { static FORCE_INLINE int32_t tdRSmaQTaskInfoContLen(int32_t lenWithHead) {
return lenWithHead - RSMA_QTASKINFO_HEAD_LEN; return lenWithHead - RSMA_QTASKINFO_HEAD_LEN;
} }
...@@ -761,7 +769,7 @@ static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma) { ...@@ -761,7 +769,7 @@ static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma) {
STFile tFile = {0}; STFile tFile = {0};
char qTaskInfoFName[TSDB_FILENAME_LEN]; char qTaskInfoFName[TSDB_FILENAME_LEN];
tdRSmaQTaskInfoGetFName(TD_VID(pVnode), TD_QTASK_CUR_FILE, qTaskInfoFName); tdRSmaQTaskInfoGetFName(TD_VID(pVnode), TD_QTASK_TMP_F, qTaskInfoFName);
if (tdInitTFile(&tFile, pVnode->pTfs, qTaskInfoFName) < 0) { if (tdInitTFile(&tFile, pVnode->pTfs, qTaskInfoFName) < 0) {
goto _err; goto _err;
} }
...@@ -1003,10 +1011,6 @@ static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, SRSmaQTaskInfoIter *pIter) { ...@@ -1003,10 +1011,6 @@ static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, SRSmaQTaskInfoIter *pIter) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void tdRSmaQTaskInfoGetFName(int32_t vid, int8_t ftype, char *outputName) {
tdGetVndFileName(vid, VNODE_RSMA_DIR, tdQTaskInfoFname[ftype], outputName);
}
static void *tdRSmaPersistExec(void *param) { static void *tdRSmaPersistExec(void *param) {
setThreadName("rsma-task-persist"); setThreadName("rsma-task-persist");
SRSmaStat *pRSmaStat = param; SRSmaStat *pRSmaStat = param;
...@@ -1063,7 +1067,7 @@ static void *tdRSmaPersistExec(void *param) { ...@@ -1063,7 +1067,7 @@ static void *tdRSmaPersistExec(void *param) {
if (!isFileCreated) { if (!isFileCreated) {
char qTaskInfoFName[TSDB_FILENAME_LEN]; char qTaskInfoFName[TSDB_FILENAME_LEN];
tdRSmaQTaskInfoGetFName(vid, TD_QTASK_TMP_FILE, qTaskInfoFName); tdRSmaQTaskInfoGetFName(vid, TD_QTASK_TMP_F, qTaskInfoFName);
tdInitTFile(&tFile, pTfs, qTaskInfoFName); tdInitTFile(&tFile, pTfs, qTaskInfoFName);
tdCreateTFile(&tFile, pTfs, true, -1); tdCreateTFile(&tFile, pTfs, true, -1);
...@@ -1079,8 +1083,8 @@ static void *tdRSmaPersistExec(void *param) { ...@@ -1079,8 +1083,8 @@ static void *tdRSmaPersistExec(void *param) {
ASSERT(headLen <= RSMA_QTASKINFO_HEAD_LEN); ASSERT(headLen <= RSMA_QTASKINFO_HEAD_LEN);
tdAppendTFile(&tFile, (void *)&tmpBuf, headLen, &toffset); tdAppendTFile(&tFile, (void *)&tmpBuf, headLen, &toffset);
smaDebug("vgId:%d, table %" PRIi64 " level %d head part len:%d appended to offset:%" PRIi64, vid, pRSmaInfo->suid, smaDebug("vgId:%d, table %" PRIi64 " level %d head part(len:%d) appended to offset:%" PRIi64, vid,
i + 1, headLen, toffset); pRSmaInfo->suid, i + 1, headLen, toffset);
tdAppendTFile(&tFile, pOutput, len, &toffset); tdAppendTFile(&tFile, pOutput, len, &toffset);
smaDebug("vgId:%d, table %" PRIi64 " level %d body part len:%d appended to offset:%" PRIi64, vid, pRSmaInfo->suid, smaDebug("vgId:%d, table %" PRIi64 " level %d body part len:%d appended to offset:%" PRIi64, vid, pRSmaInfo->suid,
i + 1, len, toffset); i + 1, len, toffset);
...@@ -1106,8 +1110,8 @@ _normal: ...@@ -1106,8 +1110,8 @@ _normal:
char newFName[TSDB_FILENAME_LEN]; char newFName[TSDB_FILENAME_LEN];
strncpy(newFName, TD_TFILE_FULL_NAME(&tFile), TSDB_FILENAME_LEN); strncpy(newFName, TD_TFILE_FULL_NAME(&tFile), TSDB_FILENAME_LEN);
char *pos = strstr(newFName, tdQTaskInfoFname[TD_QTASK_TMP_FILE]); char *pos = strstr(newFName, tdQTaskInfoFname[TD_QTASK_TMP_F]);
strncpy(pos, tdQTaskInfoFname[TD_QTASK_CUR_FILE], TSDB_FILENAME_LEN - POINTER_DISTANCE(pos, newFName)); strncpy(pos, tdQTaskInfoFname[TD_QTASK_TMP_F], TSDB_FILENAME_LEN - POINTER_DISTANCE(pos, newFName));
if (taosRenameFile(TD_TFILE_FULL_NAME(&tFile), newFName) != 0) { if (taosRenameFile(TD_TFILE_FULL_NAME(&tFile), newFName) != 0) {
smaError("vgId:%d, failed to rename %s to %s", vid, TD_TFILE_FULL_NAME(&tFile), newFName); smaError("vgId:%d, failed to rename %s to %s", vid, TD_TFILE_FULL_NAME(&tFile), newFName);
goto _err; goto _err;
...@@ -1134,6 +1138,7 @@ _end: ...@@ -1134,6 +1138,7 @@ _end:
ASSERT(0); ASSERT(0);
} }
atomic_store_8(RSMA_RUNNING_STAT(pRSmaStat), 0); atomic_store_8(RSMA_RUNNING_STAT(pRSmaStat), 0);
taosReleaseRef(smaMgmt.smaRef, pRSmaStat->refId);
taosThreadExit(NULL); taosThreadExit(NULL);
return NULL; return NULL;
} }
...@@ -1159,6 +1164,7 @@ static void tdRSmaPersistTask(SRSmaStat *pRSmaStat) { ...@@ -1159,6 +1164,7 @@ static void tdRSmaPersistTask(SRSmaStat *pRSmaStat) {
ASSERT(0); ASSERT(0);
} }
atomic_store_8(RSMA_RUNNING_STAT(pRSmaStat), 0); atomic_store_8(RSMA_RUNNING_STAT(pRSmaStat), 0);
taosReleaseRef(smaMgmt.smaRef, pRSmaStat->refId);
} }
taosThreadAttrDestroy(&thAttr); taosThreadAttrDestroy(&thAttr);
...@@ -1171,9 +1177,16 @@ static void tdRSmaPersistTask(SRSmaStat *pRSmaStat) { ...@@ -1171,9 +1177,16 @@ static void tdRSmaPersistTask(SRSmaStat *pRSmaStat) {
* @param tmrId * @param tmrId
*/ */
static void tdRSmaPersistTrigger(void *param, void *tmrId) { static void tdRSmaPersistTrigger(void *param, void *tmrId) {
SRSmaStat *pRSmaStat = param; SRSmaStat *rsmaStat = param;
int64_t refId = rsmaStat->refId;
SRSmaStat *pRSmaStat = (SRSmaStat *)taosAcquireRef(smaMgmt.smaRef, refId);
if(!pRSmaStat) {
smaDebug("rsma persistence task not start since already destroyed");
return;
}
int8_t tmrStat = int8_t tmrStat =
atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_ACTIVE, TASK_TRIGGER_STAT_INACTIVE); atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_ACTIVE, TASK_TRIGGER_STAT_INACTIVE);
switch (tmrStat) { switch (tmrStat) {
case TASK_TRIGGER_STAT_ACTIVE: { case TASK_TRIGGER_STAT_ACTIVE: {
...@@ -1191,6 +1204,7 @@ static void tdRSmaPersistTrigger(void *param, void *tmrId) { ...@@ -1191,6 +1204,7 @@ static void tdRSmaPersistTrigger(void *param, void *tmrId) {
} else { } else {
atomic_store_8(RSMA_RUNNING_STAT(pRSmaStat), 0); atomic_store_8(RSMA_RUNNING_STAT(pRSmaStat), 0);
} }
return;
} break; } break;
case TASK_TRIGGER_STAT_CANCELLED: { case TASK_TRIGGER_STAT_CANCELLED: {
atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_FINISHED); atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_FINISHED);
...@@ -1206,4 +1220,5 @@ static void tdRSmaPersistTrigger(void *param, void *tmrId) { ...@@ -1206,4 +1220,5 @@ static void tdRSmaPersistTrigger(void *param, void *tmrId) {
smaWarn("rsma persistence not start since unknown stat %" PRIi8, tmrStat); smaWarn("rsma persistence not start since unknown stat %" PRIi8, tmrStat);
} break; } break;
} }
taosReleaseRef(smaMgmt.smaRef, refId);
} }
...@@ -141,8 +141,8 @@ int64_t tdAppendTFile(STFile *pTFile, void *buf, int64_t nbyte, int64_t *offset) ...@@ -141,8 +141,8 @@ int64_t tdAppendTFile(STFile *pTFile, void *buf, int64_t nbyte, int64_t *offset)
} }
#if 1 #if 1
smaDebug("append to file %s, offset:%" PRIi64 " + nbyte:%" PRIi64 " =%" PRIi64, TD_TFILE_FULL_NAME(pTFile), toffset, smaDebug("append to file %s, offset:%" PRIi64 " nbyte:%" PRIi64 " fsize:%" PRIi64, TD_TFILE_FULL_NAME(pTFile),
nbyte, toffset + nbyte); toffset, nbyte, toffset + nbyte);
#endif #endif
ASSERT(pTFile->info.fsize == toffset); ASSERT(pTFile->info.fsize == toffset);
...@@ -179,8 +179,8 @@ void tdCloseTFile(STFile *pTFile) { ...@@ -179,8 +179,8 @@ void tdCloseTFile(STFile *pTFile) {
} }
} }
void tdGetVndFileName(int32_t vid, const char *dname, const char *fname, char *outputName) { void tdGetVndFileName(int32_t vgId, const char *dname, const char *fname, char *outputName) {
snprintf(outputName, TSDB_FILENAME_LEN, "vnode/vnode%d/%s/%s", vid, dname, fname); snprintf(outputName, TSDB_FILENAME_LEN, "vnode/vnode%d/%s/%s", vgId, dname, fname);
} }
int32_t tdInitTFile(STFile *pTFile, STfs *pTfs, const char *fname) { int32_t tdInitTFile(STFile *pTFile, STfs *pTfs, const char *fname) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册