提交 f78c61aa 编写于 作者: C Cary Xu

refactor: rsma fetch trigger refactor and use ref to manage qtaskf

上级 c0fd55f7
......@@ -538,12 +538,12 @@ bool tdSTSRowIterGetTpVal(STSRowIter *pIter, col_type_t colType, int32_t offset,
} else {
pVal->val = POINTER_SHIFT(TD_ROW_DATA(pRow), offset);
}
return TSDB_CODE_SUCCESS;
return true;
}
if (tdGetBitmapValType(pIter->pBitmap, pIter->colIdx - 1, &pVal->valType, 0) != TSDB_CODE_SUCCESS) {
pVal->valType = TD_VTYPE_NONE;
return terrno;
return true;
}
if (pVal->valType == TD_VTYPE_NORM) {
......
......@@ -38,7 +38,7 @@ typedef struct SSmaEnv SSmaEnv;
typedef struct SSmaStat SSmaStat;
typedef struct STSmaStat STSmaStat;
typedef struct SRSmaStat SRSmaStat;
typedef struct SSmaKey SSmaKey;
typedef struct SRSmaRef SRSmaRef;
typedef struct SRSmaInfo SRSmaInfo;
typedef struct SRSmaInfoItem SRSmaInfoItem;
typedef struct SRSmaFS SRSmaFS;
......@@ -55,10 +55,21 @@ struct SSmaEnv {
#define SMA_ENV_FLG_CLOSE ((int8_t)0x1)
struct SRSmaRef {
int64_t refId; // for SRSmaStat
int64_t suid;
};
typedef struct {
int8_t inited;
int32_t rsetId;
void *tmrHandle; // shared by all fetch tasks
/**
* @brief key: void* of SRSmaInfoItem, value: SRSmaRef
* N.B. Although there is a very small possibility that "void*" point to different objects while with the same
* address after release/renew, the functionality is not affected as it just used to fetch the rsma results.
*/
SHashObj *refHash; // shared by all vgroups
} SSmaMgmt;
#define SMA_ENV_LOCK(env) (&(env)->lock)
......@@ -81,12 +92,12 @@ struct SQTaskFile {
struct SQTaskFReader {
SSma *pSma;
SQTaskFile fTask;
int64_t version;
TdFilePtr pReadH;
};
struct SQTaskFWriter {
SSma *pSma;
SQTaskFile fTask;
int64_t version;
TdFilePtr pWriteH;
char *fname;
};
......@@ -124,6 +135,7 @@ struct SSmaStat {
#define RSMA_TRIGGER_STAT(r) (&(r)->triggerStat)
#define RSMA_COMMIT_STAT(r) (&(r)->commitStat)
#define RSMA_REF_ID(r) ((r)->refId)
#define RSMA_FS(r) (&(r)->fs)
#define RSMA_FS_LOCK(r) (&(r)->lock)
struct SRSmaInfoItem {
......@@ -138,7 +150,6 @@ struct SRSmaInfoItem {
struct SRSmaInfo {
STSchema *pTSchema;
int64_t suid;
int64_t refId; // refId of SRSmaStat
int64_t lastRecv; // ms
int8_t assigned; // 0 idle, 1 assgined for exec
int8_t delFlag;
......@@ -197,7 +208,9 @@ int32_t tdUnRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo);
void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree);
int32_t tdRSmaFSOpen(SSma *pSma, int64_t version);
void tdRSmaFSClose(SRSmaFS *fs);
int32_t tdRSmaFSUpsertQFile(SRSmaFS *fs, SQTaskFile *pTaskF);
int32_t tdRSmaFSRef(SSma *pSma, SRSmaStat *pStat, int64_t version);
void tdRSmaFSUnRef(SSma *pSma, SRSmaStat *pStat, int64_t version);
int32_t tdRSmaFSUpsertQTaskFile(SRSmaFS *pFS, SQTaskFile *qTaskFile);
int32_t tdRSmaRestore(SSma *pSma, int8_t type, int64_t committedVer);
int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName);
int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type);
......
......@@ -70,8 +70,8 @@ typedef struct SStreamTaskReader SStreamTaskReader;
typedef struct SStreamTaskWriter SStreamTaskWriter;
typedef struct SStreamStateReader SStreamStateReader;
typedef struct SStreamStateWriter SStreamStateWriter;
typedef struct SRsmaSnapReader SRsmaSnapReader;
typedef struct SRsmaSnapWriter SRsmaSnapWriter;
typedef struct SRSmaSnapReader SRSmaSnapReader;
typedef struct SRSmaSnapWriter SRSmaSnapWriter;
typedef struct SSnapDataHdr SSnapDataHdr;
#define VNODE_META_DIR "meta"
......@@ -246,14 +246,14 @@ int32_t tqOffsetSnapWrite(STqOffsetWriter* pWriter, uint8_t* pData, uint32_t nDa
// SStreamTaskReader ======================================
// SStreamStateWriter =====================================
// SStreamStateReader =====================================
// SRsmaSnapReader ========================================
int32_t rsmaSnapReaderOpen(SSma* pSma, int64_t sver, int64_t ever, SRsmaSnapReader** ppReader);
int32_t rsmaSnapReaderClose(SRsmaSnapReader** ppReader);
int32_t rsmaSnapRead(SRsmaSnapReader* pReader, uint8_t** ppData);
// SRsmaSnapWriter ========================================
int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, SRsmaSnapWriter** ppWriter);
int32_t rsmaSnapWrite(SRsmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData);
int32_t rsmaSnapWriterClose(SRsmaSnapWriter** ppWriter, int8_t rollback);
// SRSmaSnapReader ========================================
int32_t rsmaSnapReaderOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapReader** ppReader);
int32_t rsmaSnapReaderClose(SRSmaSnapReader** ppReader);
int32_t rsmaSnapRead(SRSmaSnapReader* pReader, uint8_t** ppData);
// SRSmaSnapWriter ========================================
int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapWriter** ppWriter);
int32_t rsmaSnapWrite(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData);
int32_t rsmaSnapWriterClose(SRSmaSnapWriter** ppWriter, int8_t rollback);
typedef struct {
int8_t streamType; // sma or other
......
......@@ -15,6 +15,8 @@
#include "sma.h"
extern SSmaMgmt smaMgmt;
static int32_t tdProcessRSmaSyncPreCommitImpl(SSma *pSma);
static int32_t tdProcessRSmaSyncCommitImpl(SSma *pSma);
static int32_t tdProcessRSmaSyncPostCommitImpl(SSma *pSma);
......@@ -178,10 +180,12 @@ static int32_t tdProcessRSmaSyncCommitImpl(SSma *pSma) {
*/
static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pStat) {
SVnode *pVnode = pSma->pVnode;
SRSmaFS *pFS = &pStat->fs;
SRSmaFS *pFS = RSMA_FS(pStat);
int64_t committed = pStat->commitAppliedVer;
char qTaskInfoFullName[TSDB_FILENAME_LEN];
taosWLockLatch(RSMA_FS_LOCK(pStat));
for (int32_t i = 0; i < taosArrayGetSize(pFS->aQTaskInf);) {
SQTaskFile *pTaskF = taosArrayGet(pFS->aQTaskInf, i);
if (atomic_sub_fetch_32(&pTaskF->nRef, 1) <= 0) {
......@@ -199,10 +203,12 @@ static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pStat) {
}
SQTaskFile qFile = {.nRef = 1, .padding = 0, .version = committed, .size = 0};
if (tdRSmaFSUpsertQFile(pFS, &qFile) < 0) {
if (tdRSmaFSUpsertQTaskFile(pFS, &qFile) < 0) {
taosWUnLockLatch(RSMA_FS_LOCK(pStat));
return TSDB_CODE_FAILED;
}
taosWUnLockLatch(RSMA_FS_LOCK(pStat));
return TSDB_CODE_SUCCESS;
}
......@@ -358,7 +364,7 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) {
}
SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
SArray *rsmaDeleted = NULL;
SRSmaInfoItem *pItem = NULL;
// step 1: merge qTaskInfo and iQTaskInfo
// lock
......@@ -371,11 +377,7 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) {
if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
int32_t refVal = T_REF_VAL_GET(pRSmaInfo);
if (refVal == 0) {
if (!rsmaDeleted) {
if ((rsmaDeleted = taosArrayInit(1, sizeof(tb_uid_t)))) {
taosArrayPush(rsmaDeleted, pSuid);
}
}
taosHashRemove(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(*pSuid));
} else {
smaDebug(
"vgId:%d, rsma async post commit, not free rsma info since ref is %d although already deleted for "
......@@ -401,20 +403,6 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) {
#endif
}
for (int32_t i = 0; i < taosArrayGetSize(rsmaDeleted); ++i) {
tb_uid_t *pSuid = taosArrayGet(rsmaDeleted, i);
void *pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t));
if ((pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) {
tdFreeRSmaInfo(pSma, pRSmaInfo, true);
smaDebug(
"vgId:%d, rsma async post commit, free rsma info since already deleted and ref is 0 for "
"table:%" PRIi64,
SMA_VID(pSma), *pSuid);
}
taosHashRemove(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t));
}
taosArrayDestroy(rsmaDeleted);
// unlock
// taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
......
......@@ -61,12 +61,23 @@ int32_t smaInit() {
return TSDB_CODE_FAILED;
}
int32_t type = (8 == POINTER_BYTES) ? TSDB_DATA_TYPE_UBIGINT : TSDB_DATA_TYPE_UINT;
smaMgmt.refHash = taosHashInit(1, taosGetDefaultHashFunction(type), true, HASH_ENTRY_LOCK);
if (!smaMgmt.refHash) {
taosCloseRef(smaMgmt.rsetId);
atomic_store_8(&smaMgmt.inited, 0);
smaError("failed to init sma tmr hanle since %s", terrstr());
return TSDB_CODE_FAILED;
}
// init fetch timer handle
smaMgmt.tmrHandle = taosTmrInit(10000, 100, 10000, "RSMA");
if (!smaMgmt.tmrHandle) {
taosCloseRef(smaMgmt.rsetId);
taosHashCleanup(smaMgmt.refHash);
smaMgmt.refHash = NULL;
atomic_store_8(&smaMgmt.inited, 0);
smaError("failed to init sma tmr hanle since %s", terrstr());
smaError("failed to init sma tmr handle since %s", terrstr());
return TSDB_CODE_FAILED;
}
......@@ -95,6 +106,7 @@ void smaCleanUp() {
if (old == 1) {
taosCloseRef(smaMgmt.rsetId);
taosHashCleanup(smaMgmt.refHash);
taosTmrCleanUp(smaMgmt.tmrHandle);
smaInfo("sma mgmt env is cleaned up, rsetId:%d, tmrHandle:%p", smaMgmt.rsetId, smaMgmt.tmrHandle);
atomic_store_8(&smaMgmt.inited, 0);
......@@ -197,6 +209,21 @@ int32_t tdUnRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) {
return 0;
}
static void tRSmaInfoHashFreeNode(void *data) {
SRSmaInfo *pRSmaInfo = NULL;
SRSmaInfoItem *pItem = NULL;
if ((pRSmaInfo = *(SRSmaInfo **)data)) {
if ((pItem = RSMA_INFO_ITEM((SRSmaInfo *)pRSmaInfo, 0)) && pItem->level) {
taosHashRemove(smaMgmt.refHash, &pItem, POINTER_BYTES);
}
if ((pItem = RSMA_INFO_ITEM((SRSmaInfo *)pRSmaInfo, 1)) && pItem->level) {
taosHashRemove(smaMgmt.refHash, &pItem, POINTER_BYTES);
}
tdFreeRSmaInfo(NULL, pRSmaInfo, true);
}
}
static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pSma) {
ASSERT(pSmaStat != NULL);
......@@ -242,12 +269,13 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS
if (!RSMA_INFO_HASH(pRSmaStat)) {
return TSDB_CODE_FAILED;
}
taosHashSetFreeFp(RSMA_INFO_HASH(pRSmaStat), tRSmaInfoHashFreeNode);
if (tdRsmaStartExecutor(pSma) < 0) {
return TSDB_CODE_FAILED;
}
if (!(pRSmaStat->fs.aQTaskInf = taosArrayInit(1, sizeof(SQTaskFile)))) {
if (!(RSMA_FS(pRSmaStat)->aQTaskInf = taosArrayInit(1, sizeof(SQTaskFile)))) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_FAILED;
}
......@@ -285,14 +313,6 @@ static void tdDestroyRSmaStat(void *pRSmaStat) {
tsem_destroy(&(pStat->notEmpty));
// step 2: destroy the rsma info and associated fetch tasks
if (taosHashGetSize(RSMA_INFO_HASH(pStat)) > 0) {
void *infoHash = taosHashIterate(RSMA_INFO_HASH(pStat), NULL);
while (infoHash) {
SRSmaInfo *pSmaInfo = *(SRSmaInfo **)infoHash;
tdFreeRSmaInfo(pSma, pSmaInfo, true);
infoHash = taosHashIterate(RSMA_INFO_HASH(pStat), infoHash);
}
}
taosHashCleanup(RSMA_INFO_HASH(pStat));
// step 3: wait for all triggered fetch tasks to finish
......@@ -315,7 +335,7 @@ static void tdDestroyRSmaStat(void *pRSmaStat) {
tdRsmaStopExecutor(pSma);
// step 5:
tdRSmaFSClose(&pStat->fs);
tdRSmaFSClose(RSMA_FS(pStat));
// step 6: free pStat
taosMemoryFreeClear(pStat);
......@@ -347,7 +367,7 @@ static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType) {
SRSmaStat *pRSmaStat = &pSmaStat->rsmaStat;
int32_t vid = SMA_VID(pRSmaStat->pSma);
int64_t refId = RSMA_REF_ID(pRSmaStat);
if (taosRemoveRef(smaMgmt.rsetId, RSMA_REF_ID(pRSmaStat)) < 0) {
if (taosRemoveRef(smaMgmt.rsetId, refId) < 0) {
smaError("vgId:%d, remove refId:%" PRIi64 " from rsmaRef:%" PRIi32 " failed since %s", vid, refId,
smaMgmt.rsetId, terrstr());
} else {
......
......@@ -18,7 +18,8 @@
// =================================================================================================
static int32_t tdFetchQTaskInfoFiles(SSma *pSma, int64_t version, SArray **output);
static int32_t tdQTaskInfCmprFn1(const void *p1, const void *p2);
static int32_t tdQTaskInfCmprFn2(const void *p1, const void *p2);
/**
* @brief Open RSma FS from qTaskInfo files
*
......@@ -32,7 +33,6 @@ int32_t tdRSmaFSOpen(SSma *pSma, int64_t version) {
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pStat = NULL;
SArray *output = NULL;
SRSmaFS *fs = NULL;
if (!pEnv) {
return TSDB_CODE_SUCCESS;
......@@ -43,14 +43,13 @@ int32_t tdRSmaFSOpen(SSma *pSma, int64_t version) {
}
pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
fs = &pStat->fs;
for (int32_t i = 0; i < taosArrayGetSize(output); ++i) {
int32_t vid = 0;
int64_t version = -1;
sscanf((const char *)taosArrayGetP(output, i), "v%dqinfo.v%" PRIi64, &vid, &version);
SQTaskFile qTaskFile = {.version = version, .nRef = 1};
if ((terrno = tdRSmaFSUpsertQFile(fs, &qTaskFile)) < 0) {
if ((terrno = tdRSmaFSUpsertQTaskFile(RSMA_FS(pStat), &qTaskFile)) < 0) {
goto _end;
}
smaInfo("vgId:%d, open fs, version:%" PRIi64 ", ref:%" PRIi64, TD_VID(pVnode), qTaskFile.version, qTaskFile.nRef);
......@@ -72,8 +71,56 @@ _end:
void tdRSmaFSClose(SRSmaFS *fs) { taosArrayDestroy(fs->aQTaskInf); }
static int32_t tdQTaskInfCmprFn1(const void *p1, const void *p2) {
if (*(int64_t *)p1 < ((SQTaskFile *)p2)->version) {
return -1;
} else if (*(int64_t *)p1 > ((SQTaskFile *)p2)->version) {
return 1;
}
return 0;
}
int32_t tdRSmaFSRef(SSma *pSma, SRSmaStat *pStat, int64_t version) {
SArray *aQTaskInf = RSMA_FS(pStat)->aQTaskInf;
SQTaskFile *pTaskF = NULL;
int32_t oldVal = 0;
taosRLockLatch(RSMA_FS_LOCK(pStat));
if ((pTaskF = taosArraySearch(aQTaskInf, &version, tdQTaskInfCmprFn1, TD_EQ))) {
oldVal = atomic_fetch_add_32(&pTaskF->nRef, 1);
ASSERT(oldVal > 0);
}
taosRUnLockLatch(RSMA_FS_LOCK(pStat));
return oldVal;
}
void tdRSmaFSUnRef(SSma *pSma, SRSmaStat *pStat, int64_t version) {
SVnode *pVnode = pSma->pVnode;
SArray *aQTaskInf = RSMA_FS(pStat)->aQTaskInf;
char qTaskFullName[TSDB_FILENAME_LEN];
SQTaskFile *pTaskF = NULL;
int32_t idx = -1;
taosWLockLatch(RSMA_FS_LOCK(pStat));
if ((idx = taosArraySearchIdx(aQTaskInf, &version, tdQTaskInfCmprFn1, TD_EQ)) >= 0) {
ASSERT(idx < taosArrayGetSize(aQTaskInf));
pTaskF = taosArrayGet(aQTaskInf, idx);
if (atomic_sub_fetch_32(&pTaskF->nRef, 1) <= 0) {
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->version, tfsGetPrimaryPath(pVnode->pTfs), qTaskFullName);
if (taosRemoveFile(qTaskFullName) < 0) {
smaWarn("vgId:%d, failed to remove %s since %s", TD_VID(pVnode), qTaskFullName,
tstrerror(TAOS_SYSTEM_ERROR(errno)));
} else {
smaDebug("vgId:%d, success to remove %s", TD_VID(pVnode), qTaskFullName);
}
taosArrayRemove(aQTaskInf, idx);
}
}
taosWUnLockLatch(RSMA_FS_LOCK(pStat));
}
/**
* @brief Fetch qtaskfiles no more than version
* @brief Fetch qtaskfiles LE than version
*
* @param pSma
* @param version
......@@ -100,7 +147,7 @@ static int32_t tdFetchQTaskInfoFiles(SSma *pSma, int64_t version, SArray **outpu
return TSDB_CODE_FAILED;
}
if ((pDir = taosOpenDir(dir)) == NULL) {
if (!(pDir = taosOpenDir(dir))) {
regfree(&regex);
terrno = TAOS_SYSTEM_ERROR(errno);
smaDebug("vgId:%d, fetch qtask files, open dir %s failed since %s", TD_VID(pVnode), dir, terrstr());
......@@ -110,7 +157,7 @@ static int32_t tdFetchQTaskInfoFiles(SSma *pSma, int64_t version, SArray **outpu
int32_t dirLen = strlen(dir);
char *dirEnd = POINTER_SHIFT(dir, dirLen);
regmatch_t regMatch[2];
while ((pDirEntry = taosReadDir(pDir)) != NULL) {
while ((pDirEntry = taosReadDir(pDir))) {
char *entryName = taosGetDirEntryName(pDirEntry);
if (!entryName) {
continue;
......@@ -140,6 +187,7 @@ static int32_t tdFetchQTaskInfoFiles(SSma *pSma, int64_t version, SArray **outpu
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
} else {
}
} else if (code == REG_NOMATCH) {
// not match
......@@ -154,14 +202,13 @@ static int32_t tdFetchQTaskInfoFiles(SSma *pSma, int64_t version, SArray **outpu
goto _end;
}
}
_end:
taosCloseDir(&pDir);
regfree(&regex);
return terrno == 0 ? TSDB_CODE_SUCCESS : TSDB_CODE_FAILED;
}
static int32_t tdQTaskFileCmprFn(const void *p1, const void *p2) {
static int32_t tdQTaskFileCmprFn2(const void *p1, const void *p2) {
if (((SQTaskFile *)p1)->version < ((SQTaskFile *)p2)->version) {
return -1;
} else if (((SQTaskFile *)p1)->version > ((SQTaskFile *)p2)->version) {
......@@ -171,15 +218,15 @@ static int32_t tdQTaskFileCmprFn(const void *p1, const void *p2) {
return 0;
}
int32_t tdRSmaFSUpsertQFile(SRSmaFS *pFS, SQTaskFile *qTaskFile) {
int32_t tdRSmaFSUpsertQTaskFile(SRSmaFS *pFS, SQTaskFile *qTaskFile) {
int32_t code = 0;
int32_t idx = taosArraySearchIdx(pFS->aQTaskInf, qTaskFile, tdQTaskFileCmprFn, TD_GE);
int32_t idx = taosArraySearchIdx(pFS->aQTaskInf, qTaskFile, tdQTaskFileCmprFn2, TD_GE);
if (idx < 0) {
idx = taosArrayGetSize(pFS->aQTaskInf);
} else {
SQTaskFile *pTaskF = (SQTaskFile *)taosArrayGet(pFS->aQTaskInf, idx);
int32_t c = tdQTaskFileCmprFn(pTaskF, qTaskFile);
int32_t c = tdQTaskFileCmprFn2(pTaskF, qTaskFile);
if (c == 0) {
pTaskF->nRef = qTaskFile->nRef;
pTaskF->version = qTaskFile->version;
......
......@@ -20,7 +20,7 @@
#define RSMA_QTASKEXEC_SMOOTH_SIZE (100) // cnt
#define RSMA_SUBMIT_BATCH_SIZE (1024) // cnt
#define RSMA_FETCH_DELAY_MAX (900000) // ms
#define RSMA_FETCH_ACTIVE_MAX (1800) // ms
#define RSMA_FETCH_ACTIVE_MAX (1000) // ms
#define RSMA_FETCH_INTERVAL (5000) // ms
SSmaMgmt smaMgmt = {
......@@ -42,7 +42,7 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSiz
static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid);
static void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo);
static void tdFreeRSmaSubmitItems(SArray *pItems);
static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo, SArray *pSubmitArr);
static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo);
static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema,
int64_t suid);
static void tdRSmaFetchTrigger(void *param, void *tmrId);
......@@ -121,27 +121,27 @@ static void tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t l
*/
void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) {
if (pInfo) {
int32_t vid = pSma ? SMA_VID(pSma) : -1;
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
SRSmaInfoItem *pItem = &pInfo->items[i];
if (isDeepFree && pItem->tmrId) {
smaDebug("vgId:%d, stop fetch timer %p for table %" PRIi64 " level %d", SMA_VID(pSma), pInfo->suid,
pItem->tmrId, i + 1);
smaDebug("vgId:%d, stop fetch timer %p for table %" PRIi64 " level %d", vid, pItem->tmrId, pInfo->suid, i + 1);
taosTmrStopA(&pItem->tmrId);
}
if (isDeepFree && pInfo->taskInfo[i]) {
tdRSmaQTaskInfoFree(&pInfo->taskInfo[i], SMA_VID(pSma), i + 1);
tdRSmaQTaskInfoFree(&pInfo->taskInfo[i], vid, i + 1);
} else {
smaDebug("vgId:%d, table %" PRIi64 " no need to destroy rsma info level %d since empty taskInfo", SMA_VID(pSma),
smaDebug("vgId:%d, table %" PRIi64 " no need to destroy rsma info level %d since empty taskInfo", vid,
pInfo->suid, i + 1);
}
if (pInfo->iTaskInfo[i]) {
tdRSmaQTaskInfoFree(&pInfo->iTaskInfo[i], SMA_VID(pSma), i + 1);
tdRSmaQTaskInfoFree(&pInfo->iTaskInfo[i], vid, i + 1);
} else {
smaDebug("vgId:%d, table %" PRIi64 " no need to destroy rsma info level %d since empty iTaskInfo",
SMA_VID(pSma), pInfo->suid, i + 1);
smaDebug("vgId:%d, table %" PRIi64 " no need to destroy rsma info level %d since empty iTaskInfo", vid,
pInfo->suid, i + 1);
}
}
if (isDeepFree) {
......@@ -315,10 +315,17 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
}
pItem->level = idx == 0 ? TSDB_RETENTION_L1 : TSDB_RETENTION_L2;
ASSERT(pItem->level > 0);
SRSmaRef rsmaRef = {.refId = pStat->refId, .suid = pRSmaInfo->suid};
taosHashPut(smaMgmt.refHash, &pItem, POINTER_BYTES, &rsmaRef, sizeof(rsmaRef));
taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
smaInfo("vgId:%d, table:%" PRIi64 " level:%" PRIi8 " maxdelay:%" PRIi64 " watermark:%" PRIi64
smaInfo("vgId:%d, item:%p table:%" PRIi64 " level:%" PRIi8 " maxdelay:%" PRIi64 " watermark:%" PRIi64
", finally maxdelay:%" PRIi32,
TD_VID(pVnode), pRSmaInfo->suid, idx + 1, param->maxdelay[idx], param->watermark[idx], pItem->maxDelay);
TD_VID(pVnode), pItem, pRSmaInfo->suid, idx + 1, param->maxdelay[idx], param->watermark[idx],
pItem->maxDelay);
}
return TSDB_CODE_SUCCESS;
}
......@@ -385,7 +392,6 @@ int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
goto _err;
}
pRSmaInfo->suid = suid;
pRSmaInfo->refId = RSMA_REF_ID(pStat);
T_REF_INIT_VAL(pRSmaInfo, 1);
if (tdSetRSmaInfoItemParams(pSma, param, pStat, pRSmaInfo, 0) < 0) {
......@@ -656,15 +662,15 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma
if (taosArrayGetSize(pResList) == 0) {
if (terrno == 0) {
// smaDebug("vgId:%d, no rsma %" PRIi8 " data fetched yet", SMA_VID(pSma), pItem->level);
// smaDebug("vgId:%d, no rsma level %" PRIi8 " data fetched yet", SMA_VID(pSma), pItem->level);
} else {
smaDebug("vgId:%d, no rsma %" PRIi8 " data fetched since %s", SMA_VID(pSma), pItem->level, terrstr());
smaDebug("vgId:%d, no rsma level %" PRIi8 " data fetched since %s", SMA_VID(pSma), pItem->level, terrstr());
goto _err;
}
break;
} else {
smaDebug("vgId:%d, rsma %" PRIi8 " data fetched", SMA_VID(pSma), pItem->level);
smaDebug("vgId:%d, rsma level %" PRIi8 " data fetched", SMA_VID(pSma), pItem->level);
}
#if 0
char flag[10] = {0};
......@@ -678,21 +684,22 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma
// TODO: the schema update should be handled later(TD-17965)
if (buildSubmitReqFromDataBlock(&pReq, output, pTSchema, SMA_VID(pSma), suid) < 0) {
smaError("vgId:%d, build submit req for rsma stable %" PRIi64 " level %" PRIi8 " failed since %s",
SMA_VID(pSma), suid, pItem->level, terrstr());
smaError("vgId:%d, build submit req for rsma table %" PRIi64 " level %" PRIi8 " failed since %s", SMA_VID(pSma),
suid, pItem->level, terrstr());
goto _err;
}
if (pReq && tdProcessSubmitReq(sinkTsdb, output->info.version, pReq) < 0) {
taosMemoryFreeClear(pReq);
smaError("vgId:%d, process submit req for rsma stable %" PRIi64 " level %" PRIi8 " failed since %s",
smaError("vgId:%d, process submit req for rsma table %" PRIi64 " level %" PRIi8 " failed since %s",
SMA_VID(pSma), suid, pItem->level, terrstr());
goto _err;
}
taosMemoryFreeClear(pReq);
smaDebug("vgId:%d, process submit req for rsma table %" PRIi64 " level %" PRIi8 " version:%" PRIi64,
SMA_VID(pSma), suid, pItem->level, output->info.version);
smaDebug("vgId:%d, process submit req for rsma table %" PRIi64 " level %" PRIi8 " ver %" PRIi64 " len %" PRIu32,
SMA_VID(pSma), suid, pItem->level, output->info.version, htonl(pReq->header.contLen));
taosMemoryFreeClear(pReq);
}
}
......@@ -1565,26 +1572,46 @@ _err:
* @param tmrId
*/
static void tdRSmaFetchTrigger(void *param, void *tmrId) {
SRSmaInfoItem *pItem = param;
SRSmaRef *pRSmaRef = NULL;
SSma *pSma = NULL;
SRSmaInfo *pRSmaInfo = tdGetRSmaInfoByItem(pItem);
SRSmaStat *pStat = NULL;
SRSmaInfo *pRSmaInfo = NULL;
SRSmaInfoItem *pItem = NULL;
if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
smaDebug("rsma fetch task not start since rsma info already deleted, rsetId:%" PRIi64 " refId:%d)", smaMgmt.rsetId,
pRSmaInfo->refId);
if (!(pRSmaRef = taosHashGet(smaMgmt.refHash, &param, POINTER_BYTES))) {
smaDebug("rsma fetch task not start since rsma info item:%p not exist in refHash:%p, rsetId:%" PRIi64, param,
*(int64_t *)&param, smaMgmt.refHash, smaMgmt.rsetId);
return;
}
SRSmaStat *pStat = (SRSmaStat *)tdAcquireSmaRef(smaMgmt.rsetId, pRSmaInfo->refId);
if (!pStat) {
if (!(pStat = (SRSmaStat *)tdAcquireSmaRef(smaMgmt.rsetId, pRSmaRef->refId))) {
smaDebug("rsma fetch task not start since rsma stat already destroyed, rsetId:%" PRIi64 " refId:%d)",
smaMgmt.rsetId, pRSmaInfo->refId);
smaMgmt.rsetId, pRSmaRef->refId); // pRSmaRef freed in taosHashRemove
taosHashRemove(smaMgmt.refHash, &param, POINTER_BYTES);
return;
}
pSma = pStat->pSma;
if (!(pRSmaInfo = tdAcquireRSmaInfoBySuid(pSma, pRSmaRef->suid))) {
smaDebug("rsma fetch task not start since rsma info not exist, rsetId:%" PRIi64 " refId:%d)", smaMgmt.rsetId,
pRSmaRef->refId); // pRSmaRef freed in taosHashRemove
tdReleaseSmaRef(smaMgmt.rsetId, pRSmaRef->refId);
taosHashRemove(smaMgmt.refHash, &param, POINTER_BYTES);
return;
}
if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
smaDebug("rsma fetch task not start since rsma info already deleted, rsetId:%" PRIi64 " refId:%d)", smaMgmt.rsetId,
pRSmaRef->refId); // pRSmaRef freed in taosHashRemove
tdReleaseRSmaInfo(pSma, pRSmaInfo);
tdReleaseSmaRef(smaMgmt.rsetId, pRSmaRef->refId);
taosHashRemove(smaMgmt.refHash, &param, POINTER_BYTES);
return;
}
pItem = *(SRSmaInfoItem **)&param;
// if rsma trigger stat in paused, cancelled or finished, not start fetch task
int8_t rsmaTriggerStat = atomic_load_8(RSMA_TRIGGER_STAT(pStat));
switch (rsmaTriggerStat) {
......@@ -1592,11 +1619,12 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
case TASK_TRIGGER_STAT_CANCELLED: {
smaDebug("vgId:%d, rsma fetch task not start for level %" PRIi8 " since stat is %" PRIi8
", rsetId rsetId:%" PRIi64 " refId:%d",
SMA_VID(pSma), pItem->level, rsmaTriggerStat, smaMgmt.rsetId, pRSmaInfo->refId);
SMA_VID(pSma), pItem->level, rsmaTriggerStat, smaMgmt.rsetId, pRSmaRef->refId);
if (rsmaTriggerStat == TASK_TRIGGER_STAT_PAUSED) {
taosTmrReset(tdRSmaFetchTrigger, RSMA_FETCH_INTERVAL, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
}
tdReleaseSmaRef(smaMgmt.rsetId, pRSmaInfo->refId);
tdReleaseRSmaInfo(pSma, pRSmaInfo);
tdReleaseSmaRef(smaMgmt.rsetId, pRSmaRef->refId);
return;
}
default:
......@@ -1607,7 +1635,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
atomic_val_compare_exchange_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE, TASK_TRIGGER_STAT_INACTIVE);
switch (fetchTriggerStat) {
case TASK_TRIGGER_STAT_ACTIVE: {
smaDebug("vgId:%d, rsma fetch task planned for level:%" PRIi8 " suid:%" PRIi64 " since stat is active",
smaInfo("vgId:%d, rsma fetch task planned for level:%" PRIi8 " suid:%" PRIi64 " since stat is active",
SMA_VID(pSma), pItem->level, pRSmaInfo->suid);
// async process
pItem->fetchLevel = pItem->level;
......@@ -1641,11 +1669,11 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
_end:
taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
tdReleaseSmaRef(smaMgmt.rsetId, pRSmaInfo->refId);
tdReleaseRSmaInfo(pSma, pRSmaInfo);
tdReleaseSmaRef(smaMgmt.rsetId, pRSmaRef->refId);
}
static void tdFreeRSmaSubmitItems(SArray *pItems) {
ASSERT(taosArrayGetSize(pItems) > 0);
for (int32_t i = 0; i < taosArrayGetSize(pItems); ++i) {
taosFreeQitem(*(void **)taosArrayGet(pItems, i));
}
......@@ -1657,10 +1685,9 @@ static void tdFreeRSmaSubmitItems(SArray *pItems) {
*
* @param pSma
* @param pInfo
* @param pSubmitArr
* @return int32_t
*/
static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo, SArray *pSubmitArr) {
static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo) {
SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL};
for (int8_t i = 1; i <= TSDB_RETENTION_L2; ++i) {
SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, i - 1);
......@@ -1677,7 +1704,7 @@ static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo, SArray *pSubmi
SMA_VID(pSma), pInfo->suid, i, pItem->nSkipped, pItem->maxDelay);
} else if (((curMs - pInfo->lastRecv) < RSMA_FETCH_ACTIVE_MAX)) {
++pItem->nSkipped;
smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " curMs:%" PRIi64 " lastRecv:%" PRIi64 ", fetch skipped ",
smaInfo("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " curMs:%" PRIi64 " lastRecv:%" PRIi64 ", fetch skipped ",
SMA_VID(pSma), pInfo->suid, i, curMs, pInfo->lastRecv);
continue;
} else {
......@@ -1706,10 +1733,8 @@ static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo, SArray *pSubmi
}
_end:
tdReleaseRSmaInfo(pSma, pInfo);
return TSDB_CODE_SUCCESS;
_err:
tdReleaseRSmaInfo(pSma, pInfo);
return TSDB_CODE_FAILED;
}
......@@ -1806,7 +1831,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
if (oldStat == 0 ||
((oldStat == 2) && atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat)) < TASK_TRIGGER_STAT_PAUSED)) {
atomic_fetch_add_32(&pRSmaStat->nFetchAll, 1);
tdRSmaFetchAllResult(pSma, pInfo, pSubmitArr);
tdRSmaFetchAllResult(pSma, pInfo);
if (0 == atomic_sub_fetch_32(&pRSmaStat->nFetchAll, 1)) {
atomic_store_8(RSMA_COMMIT_STAT(pRSmaStat), 0);
}
......
......@@ -15,11 +15,13 @@
#include "sma.h"
static int32_t rsmaSnapReadQTaskInfo(SRsmaSnapReader* pReader, uint8_t** ppData);
static int32_t rsmaSnapWriteQTaskInfo(SRsmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData);
static int32_t rsmaSnapReadQTaskInfo(SRSmaSnapReader* pReader, uint8_t** ppData);
static int32_t rsmaSnapWriteQTaskInfo(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData);
static int32_t rsmaQTaskInfSnapReaderOpen(SRSmaSnapReader* pReader, int64_t version);
static int32_t rsmaQTaskInfSnapReaderClose(SQTaskFReader** ppReader);
// SRsmaSnapReader ========================================
struct SRsmaSnapReader {
// SRSmaSnapReader ========================================
struct SRSmaSnapReader {
SSma* pSma;
int64_t sver;
int64_t ever;
......@@ -33,13 +35,13 @@ struct SRsmaSnapReader {
SQTaskFReader* pQTaskFReader;
};
int32_t rsmaSnapReaderOpen(SSma* pSma, int64_t sver, int64_t ever, SRsmaSnapReader** ppReader) {
int32_t rsmaSnapReaderOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapReader** ppReader) {
int32_t code = 0;
SVnode* pVnode = pSma->pVnode;
SRsmaSnapReader* pReader = NULL;
SRSmaSnapReader* pReader = NULL;
// alloc
pReader = (SRsmaSnapReader*)taosMemoryCalloc(1, sizeof(*pReader));
pReader = (SRSmaSnapReader*)taosMemoryCalloc(1, sizeof(*pReader));
if (pReader == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
......@@ -48,7 +50,7 @@ int32_t rsmaSnapReaderOpen(SSma* pSma, int64_t sver, int64_t ever, SRsmaSnapRead
pReader->sver = sver;
pReader->ever = ever;
// rsma1/rsma2
// open rsma1/rsma2
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
if (pSma->pRSmaTsdb[i]) {
code = tsdbSnapReaderOpen(pSma->pRSmaTsdb[i], sver, ever, i == 0 ? SNAP_DATA_RSMA1 : SNAP_DATA_RSMA2,
......@@ -59,51 +61,112 @@ int32_t rsmaSnapReaderOpen(SSma* pSma, int64_t sver, int64_t ever, SRsmaSnapRead
}
}
// qtaskinfo
// 1. add ref to qtaskinfo.v${ever} if exists and then start to replicate
// open qtaskinfo
if ((code = rsmaQTaskInfSnapReaderOpen(pReader, ever)) < 0) {
goto _err;
}
*ppReader = pReader;
return TSDB_CODE_SUCCESS;
_err:
smaError("vgId:%d, vnode snapshot rsma reader open failed since %s", TD_VID(pVnode), tstrerror(code));
return TSDB_CODE_FAILED;
}
static int32_t rsmaQTaskInfSnapReaderOpen(SRSmaSnapReader* pReader, int64_t version) {
int32_t code = 0;
SSma* pSma = pReader->pSma;
SVnode* pVnode = pSma->pVnode;
SSmaEnv* pEnv = NULL;
SRSmaStat* pStat = NULL;
if (!(pEnv = SMA_RSMA_ENV(pSma))) {
smaInfo("vgId:%d, vnode snapshot rsma reader for qtaskinfo version %" PRIi64 " not need as env is NULL",
TD_VID(pVnode), version);
return TSDB_CODE_SUCCESS;
}
pStat = (SRSmaStat*)SMA_ENV_STAT(pEnv);
int32_t ref = tdRSmaFSRef(pReader->pSma, pStat, version);
if (ref < 1) {
smaInfo("vgId:%d, vnode snapshot rsma reader for qtaskinfo version %" PRIi64 " not need as ref is %d",
TD_VID(pVnode), version, ref);
return TSDB_CODE_SUCCESS;
}
char qTaskInfoFullName[TSDB_FILENAME_LEN];
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), ever, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFullName);
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), version, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFullName);
if (!taosCheckExistFile(qTaskInfoFullName)) {
smaInfo("vgId:%d, vnode snapshot rsma reader for qtaskinfo not need as %s not exists", TD_VID(pVnode),
qTaskInfoFullName);
} else {
tdRSmaFSUnRef(pSma, pStat, version);
smaInfo("vgId:%d, vnode snapshot rsma reader for qtaskinfo version %" PRIi64 " not need as %s not exists",
TD_VID(pVnode), qTaskInfoFullName);
return TSDB_CODE_SUCCESS;
}
pReader->pQTaskFReader = taosMemoryCalloc(1, sizeof(SQTaskFReader));
if (!pReader->pQTaskFReader) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
goto _end;
}
TdFilePtr qTaskF = taosOpenFile(qTaskInfoFullName, TD_FILE_READ);
if (!qTaskF) {
TdFilePtr fp = taosOpenFile(qTaskInfoFullName, TD_FILE_READ);
if (!fp) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
taosMemoryFreeClear(pReader->pQTaskFReader);
goto _end;
}
pReader->pQTaskFReader->pReadH = qTaskF;
#if 0
SQTaskFile* pQTaskF = &pReader->pQTaskFReader->fTask;
pQTaskF->nRef = 1;
#endif
pReader->pQTaskFReader->pReadH = fp;
pReader->pQTaskFReader->pSma = pSma;
pReader->pQTaskFReader->version = pReader->ever;
_end:
if (code < 0) {
tdRSmaFSUnRef(pSma, pStat, version);
smaError("vgId:%d, vnode snapshot rsma reader open %s succeed", TD_VID(pVnode), qTaskInfoFullName);
return TSDB_CODE_FAILED;
}
*ppReader = pReader;
smaInfo("vgId:%d, vnode snapshot rsma reader opened %s succeed", TD_VID(pVnode), qTaskInfoFullName);
smaInfo("vgId:%d, vnode snapshot rsma reader open %s succeed", TD_VID(pVnode), qTaskInfoFullName);
return TSDB_CODE_SUCCESS;
}
static int32_t rsmaQTaskInfSnapReaderClose(SQTaskFReader** ppReader) {
if (!(*ppReader)) {
return TSDB_CODE_SUCCESS;
}
SSma* pSma = (*ppReader)->pSma;
SRSmaStat* pStat = SMA_RSMA_STAT(pSma);
int64_t version = (*ppReader)->version;
taosCloseFile(&(*ppReader)->pReadH);
tdRSmaFSUnRef(pSma, pStat, version);
taosMemoryFreeClear(*ppReader);
smaInfo("vgId:%d, vnode snapshot rsma reader closed for qTaskInfo version %" PRIi64, SMA_VID(pSma), version);
return TSDB_CODE_SUCCESS;
_err:
smaError("vgId:%d, vnode snapshot rsma reader opened failed since %s", TD_VID(pVnode), tstrerror(code));
return TSDB_CODE_FAILED;
}
static int32_t rsmaSnapReadQTaskInfo(SRsmaSnapReader* pReader, uint8_t** ppBuf) {
static int32_t rsmaSnapReadQTaskInfo(SRSmaSnapReader* pReader, uint8_t** ppBuf) {
int32_t code = 0;
SSma* pSma = pReader->pSma;
int64_t n = 0;
uint8_t* pBuf = NULL;
SQTaskFReader* qReader = pReader->pQTaskFReader;
if (!qReader) {
*ppBuf = NULL;
smaInfo("vgId:%d, vnode snapshot rsma reader qtaskinfo, qTaskReader is NULL", SMA_VID(pSma));
return 0;
}
if (!qReader->pReadH) {
*ppBuf = NULL;
smaInfo("vgId:%d, vnode snapshot rsma reader qtaskinfo, readh is empty", SMA_VID(pSma));
smaInfo("vgId:%d, vnode snapshot rsma reader qtaskinfo, readh is NULL", SMA_VID(pSma));
return 0;
}
......@@ -153,7 +216,7 @@ _err:
return code;
}
int32_t rsmaSnapRead(SRsmaSnapReader* pReader, uint8_t** ppData) {
int32_t rsmaSnapRead(SRSmaSnapReader* pReader, uint8_t** ppData) {
int32_t code = 0;
*ppData = NULL;
......@@ -205,9 +268,9 @@ _err:
return code;
}
int32_t rsmaSnapReaderClose(SRsmaSnapReader** ppReader) {
int32_t rsmaSnapReaderClose(SRSmaSnapReader** ppReader) {
int32_t code = 0;
SRsmaSnapReader* pReader = *ppReader;
SRSmaSnapReader* pReader = *ppReader;
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
if (pReader->pDataReader[i]) {
......@@ -215,11 +278,7 @@ int32_t rsmaSnapReaderClose(SRsmaSnapReader** ppReader) {
}
}
if (pReader->pQTaskFReader) {
taosCloseFile(&pReader->pQTaskFReader->pReadH);
taosMemoryFreeClear(pReader->pQTaskFReader);
smaInfo("vgId:%d, vnode snapshot rsma reader closed for qTaskInfo", SMA_VID(pReader->pSma));
}
rsmaQTaskInfSnapReaderClose(&pReader->pQTaskFReader);
smaInfo("vgId:%d, vnode snapshot rsma reader closed", SMA_VID(pReader->pSma));
......@@ -227,8 +286,8 @@ int32_t rsmaSnapReaderClose(SRsmaSnapReader** ppReader) {
return code;
}
// SRsmaSnapWriter ========================================
struct SRsmaSnapWriter {
// SRSmaSnapWriter ========================================
struct SRSmaSnapWriter {
SSma* pSma;
int64_t sver;
int64_t ever;
......@@ -244,13 +303,13 @@ struct SRsmaSnapWriter {
SQTaskFWriter* pQTaskFWriter;
};
int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, SRsmaSnapWriter** ppWriter) {
int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapWriter** ppWriter) {
int32_t code = 0;
SRsmaSnapWriter* pWriter = NULL;
SRSmaSnapWriter* pWriter = NULL;
SVnode* pVnode = pSma->pVnode;
// alloc
pWriter = (SRsmaSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
pWriter = (SRSmaSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
if (pWriter == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
......@@ -301,9 +360,9 @@ _err:
return code;
}
int32_t rsmaSnapWriterClose(SRsmaSnapWriter** ppWriter, int8_t rollback) {
int32_t rsmaSnapWriterClose(SRSmaSnapWriter** ppWriter, int8_t rollback) {
int32_t code = 0;
SRsmaSnapWriter* pWriter = *ppWriter;
SRSmaSnapWriter* pWriter = *ppWriter;
SVnode* pVnode = pWriter->pSma->pVnode;
if (rollback) {
......@@ -349,7 +408,7 @@ _err:
return code;
}
int32_t rsmaSnapWrite(SRsmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
int32_t rsmaSnapWrite(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
int32_t code = 0;
SSnapDataHdr* pHdr = (SSnapDataHdr*)pData;
......@@ -377,7 +436,7 @@ _err:
return code;
}
static int32_t rsmaSnapWriteQTaskInfo(SRsmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
static int32_t rsmaSnapWriteQTaskInfo(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
int32_t code = 0;
SQTaskFWriter* qWriter = pWriter->pQTaskFWriter;
......
......@@ -39,7 +39,7 @@ struct SVSnapReader {
SStreamStateReader *pStreamStateReader;
// rsma
int8_t rsmaDone;
SRsmaSnapReader *pRsmaReader;
SRSmaSnapReader *pRsmaReader;
};
int32_t vnodeSnapReaderOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapReader **ppReader) {
......@@ -241,7 +241,7 @@ struct SVSnapWriter {
SStreamTaskWriter *pStreamTaskWriter;
SStreamStateWriter *pStreamStateWriter;
// rsma
SRsmaSnapWriter *pRsmaSnapWriter;
SRSmaSnapWriter *pRsmaSnapWriter;
};
int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWriter **ppWriter) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册