提交 16750e14 编写于 作者: C Cary Xu

enh: rsma async commit support

上级 5e2776ac
......@@ -709,7 +709,7 @@ static int32_t mndProcessCreateSmaReq(SRpcMsg *pReq) {
_OVER:
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("sma:%s, failed to create since %s", createReq.name, terrstr(terrno));
mError("sma:%s, failed to create since %s", createReq.name, terrstr());
}
mndReleaseStb(pMnode, pStb);
......
......@@ -32,6 +32,8 @@ extern "C" {
#define smaTrace(...) do { if (smaDebugFlag & DEBUG_TRACE) { taosPrintLog("SMA ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0)
// clang-format on
#define RSMA_TASK_INFO_HASH_SLOT 8
typedef struct SSmaEnv SSmaEnv;
typedef struct SSmaStat SSmaStat;
typedef struct STSmaStat STSmaStat;
......@@ -41,7 +43,7 @@ typedef struct SRSmaInfo SRSmaInfo;
typedef struct SRSmaInfoItem SRSmaInfoItem;
struct SSmaEnv {
TdThreadRwlock lock;
SRWLatch lock;
int8_t type;
SSmaStat *pStat;
};
......@@ -52,7 +54,7 @@ typedef struct {
void *tmrHandle; // shared by all fetch tasks
} SSmaMgmt;
#define SMA_ENV_LOCK(env) ((env)->lock)
#define SMA_ENV_LOCK(env) (&(env)->lock)
#define SMA_ENV_TYPE(env) ((env)->type)
#define SMA_ENV_STAT(env) ((env)->pStat)
......@@ -64,12 +66,14 @@ struct STSmaStat {
struct SRSmaStat {
SSma *pSma;
int64_t submitVer;
int64_t refId; // shared by fetch tasks
int8_t triggerStat; // shared by fetch tasks
int8_t commitStat; // 0 not in committing, 1 in committing
SHashObj *rsmaInfoHash; // key: stbUid, value: SRSmaInfo;
SHashObj *iRsmaInfoHash; // key: stbUid, value: SRSmaInfo; immutable rsmaInfoHash
int64_t commitAppliedVer; // vnode applied version for async commit
int64_t commitSubmitVer; // rsma submit version for async commit
int64_t submitVer; // latest submit version
int64_t refId; // shared by fetch tasks
int8_t triggerStat; // shared by fetch tasks
int8_t commitStat; // 0 not in committing, 1 in committing
SHashObj *rsmaInfoHash; // key: stbUid, value: SRSmaInfo;
SHashObj *iRsmaInfoHash; // key: stbUid, value: SRSmaInfo; immutable rsmaInfoHash
};
struct SSmaStat {
......@@ -113,7 +117,13 @@ enum {
TASK_TRIGGER_STAT_DROPPED = 5,
};
#define RSMA_TASK_INFO_HASH_SLOT 8
enum {
RSMA_ROLE_CREATE = 0,
RSMA_ROLE_DROP = 1,
RSMA_ROLE_FETCH = 2,
RSMA_ROLE_SUBMIT = 3,
RSMA_ROLE_ITERATE = 4,
};
void tdDestroySmaEnv(SSmaEnv *pSmaEnv);
void *tdFreeSmaEnv(SSmaEnv *pSmaEnv);
......@@ -133,33 +143,6 @@ int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType);
int32_t tdLockSma(SSma *pSma);
int32_t tdUnLockSma(SSma *pSma);
static FORCE_INLINE int32_t tdRLockSmaEnv(SSmaEnv *pEnv) {
int code = taosThreadRwlockRdlock(&(pEnv->lock));
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code);
return -1;
}
return 0;
}
static FORCE_INLINE int32_t tdWLockSmaEnv(SSmaEnv *pEnv) {
int code = taosThreadRwlockWrlock(&(pEnv->lock));
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code);
return -1;
}
return 0;
}
static FORCE_INLINE int32_t tdUnLockSmaEnv(SSmaEnv *pEnv) {
int code = taosThreadRwlockUnlock(&(pEnv->lock));
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code);
return -1;
}
return 0;
}
static FORCE_INLINE int8_t tdSmaStat(STSmaStat *pTStat) {
if (pTStat) {
return atomic_load_8(&pTStat->state);
......@@ -204,11 +187,13 @@ static FORCE_INLINE void tdSmaStatSetDropped(STSmaStat *pTStat) {
atomic_or_fetch_8(&pTStat->state, TSDB_SMA_STAT_DROPPED);
}
}
int32_t tdCloneRSmaInfo(SSma *pSma, SRSmaInfo *pDest, SRSmaInfo *pSrc);
void tdFreeQTaskInfo(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level);
static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType);
void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType);
void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo);
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat);
void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree);
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash);
int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName);
int32_t tdProcessRSmaRestoreImpl(SSma *pSma);
......
......@@ -168,6 +168,7 @@ int32_t smaSyncPreCommit(SSma* pSma);
int32_t smaSyncCommit(SSma* pSma);
int32_t smaSyncPostCommit(SSma* pSma);
int32_t smaAsyncPreCommit(SSma* pSma);
int32_t smaAsyncCommit(SSma* pSma);
int32_t smaAsyncPostCommit(SSma* pSma);
int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg);
......
......@@ -15,11 +15,13 @@
#include "sma.h"
static int32_t tdProcessRSmaPreCommitImpl(SSma *pSma);
static int32_t tdProcessRSmaCommitImpl(SSma *pSma);
static int32_t tdProcessRSmaPostCommitImpl(SSma *pSma);
static int32_t tdProcessRSmaSyncPreCommitImpl(SSma *pSma);
static int32_t tdProcessRSmaSyncCommitImpl(SSma *pSma);
static int32_t tdProcessRSmaSyncPostCommitImpl(SSma *pSma);
static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma);
static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma);
static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma);
static int32_t tdClearupQTaskInfoFiles(SSma *pSma, SRSmaStat *pRSmaStat);
/**
* @brief Only applicable to Rollup SMA
......@@ -27,7 +29,7 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma);
* @param pSma
* @return int32_t
*/
int32_t smaSyncPreCommit(SSma *pSma) { return tdProcessRSmaPreCommitImpl(pSma); }
int32_t smaSyncPreCommit(SSma *pSma) { return tdProcessRSmaSyncPreCommitImpl(pSma); }
/**
* @brief Only applicable to Rollup SMA
......@@ -35,7 +37,7 @@ int32_t smaSyncPreCommit(SSma *pSma) { return tdProcessRSmaPreCommitImpl(pSma);
* @param pSma
* @return int32_t
*/
int32_t smaSyncCommit(SSma *pSma) { return tdProcessRSmaCommitImpl(pSma); }
int32_t smaSyncCommit(SSma *pSma) { return tdProcessRSmaSyncCommitImpl(pSma); }
/**
* @brief Only applicable to Rollup SMA
......@@ -43,7 +45,7 @@ int32_t smaSyncCommit(SSma *pSma) { return tdProcessRSmaCommitImpl(pSma); }
* @param pSma
* @return int32_t
*/
int32_t smaSyncPostCommit(SSma *pSma) { return tdProcessRSmaPostCommitImpl(pSma); }
int32_t smaSyncPostCommit(SSma *pSma) { return tdProcessRSmaSyncPostCommitImpl(pSma); }
/**
* @brief Only applicable to Rollup SMA
......@@ -53,6 +55,14 @@ int32_t smaSyncPostCommit(SSma *pSma) { return tdProcessRSmaPostCommitImpl(pSma)
*/
int32_t smaAsyncPreCommit(SSma *pSma) { return tdProcessRSmaAsyncPreCommitImpl(pSma); }
/**
* @brief Only applicable to Rollup SMA
*
* @param pSma
* @return int32_t
*/
int32_t smaAsyncCommit(SSma *pSma) { return tdProcessRSmaAsyncCommitImpl(pSma); }
/**
* @brief Only applicable to Rollup SMA
*
......@@ -106,7 +116,7 @@ int32_t smaBegin(SSma *pSma) {
* @param pSma
* @return int32_t
*/
static int32_t tdProcessRSmaPreCommitImpl(SSma *pSma) {
static int32_t tdProcessRSmaSyncPreCommitImpl(SSma *pSma) {
SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma);
if (!pSmaEnv) {
return TSDB_CODE_SUCCESS;
......@@ -135,7 +145,9 @@ static int32_t tdProcessRSmaPreCommitImpl(SSma *pSma) {
}
// step 3: perform persist task for qTaskInfo
tdRSmaPersistExecImpl(pRSmaStat);
pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied;
pRSmaStat->commitSubmitVer = pRSmaStat->submitVer;
tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat));
smaDebug("vgId:%d, rsma pre commit success", SMA_VID(pSma));
......@@ -148,7 +160,7 @@ static int32_t tdProcessRSmaPreCommitImpl(SSma *pSma) {
* @param pSma
* @return int32_t
*/
static int32_t tdProcessRSmaCommitImpl(SSma *pSma) {
static int32_t tdProcessRSmaSyncCommitImpl(SSma *pSma) {
SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma);
if (!pSmaEnv) {
return TSDB_CODE_SUCCESS;
......@@ -156,21 +168,9 @@ static int32_t tdProcessRSmaCommitImpl(SSma *pSma) {
return TSDB_CODE_SUCCESS;
}
/**
* @brief post-commit for rollup sma
* 1) clean up the outdated qtaskinfo files
*
* @param pSma
* @return int32_t
*/
static int32_t tdProcessRSmaPostCommitImpl(SSma *pSma) {
SVnode *pVnode = pSma->pVnode;
if (!VND_IS_RSMA(pVnode)) {
return TSDB_CODE_SUCCESS;
}
int64_t committed = pVnode->state.committed;
static int32_t tdClearupQTaskInfoFiles(SSma *pSma, SRSmaStat *pRSmaStat) {
SVnode *pVnode = pSma->pVnode;
int64_t committed = pRSmaStat->commitAppliedVer;
TdDirPtr pDir = NULL;
TdDirEntryPtr pDirEntry = NULL;
char dir[TSDB_FILENAME_LEN];
......@@ -238,6 +238,29 @@ static int32_t tdProcessRSmaPostCommitImpl(SSma *pSma) {
taosCloseDir(&pDir);
regfree(&regex);
return TSDB_CODE_SUCCESS;
}
/**
* @brief post-commit for rollup sma
* 1) clean up the outdated qtaskinfo files
*
* @param pSma
* @return int32_t
*/
static int32_t tdProcessRSmaSyncPostCommitImpl(SSma *pSma) {
SVnode *pVnode = pSma->pVnode;
if (!VND_IS_RSMA(pVnode)) {
return TSDB_CODE_SUCCESS;
}
SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(SMA_ENV_STAT(pSmaEnv));
// cleanup outdated qtaskinfo files
tdClearupQTaskInfoFiles(pSma, pRSmaStat);
return TSDB_CODE_SUCCESS;
}
......@@ -259,8 +282,9 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv);
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat);
// step 1: set rsma stat paused
// step 1: set rsma stat
atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED);
atomic_store_8(RSMA_COMMIT_STAT(pRSmaStat), 1);
// step 2: wait all triggered fetch tasks finished
int32_t nLoops = 0;
......@@ -285,19 +309,45 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
RSMA_IMU_INFO_HASH(pRSmaStat) = RSMA_INFO_HASH(pRSmaStat);
RSMA_INFO_HASH(pRSmaStat) =
taosHashInit(RSMA_TASK_INFO_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
if (!RSMA_INFO_HASH(pRSmaStat)) {
smaError("vgId:%d, rsma async commit failed since %s", SMA_VID(pSma), terrstr());
return TSDB_CODE_FAILED;
}
// step 4: others
pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied;
pRSmaStat->commitSubmitVer = pRSmaStat->submitVer;
return TSDB_CODE_SUCCESS;
}
/**
* @brief commit for rollup sma
*
* @param pSma
* @return int32_t
*/
static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma) {
SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma);
if (!pSmaEnv) {
return TSDB_CODE_SUCCESS;
}
SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv);
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat);
// perform persist task for qTaskInfo
tdRSmaPersistExecImpl(pRSmaStat, RSMA_IMU_INFO_HASH(pRSmaStat));
return TSDB_CODE_SUCCESS;
}
/**
* @brief Migrate rsmaInfo from iRsmaInfo to rsmaInfo if rsmaInfoHash not empty.
*
* @param pSma
* @return int32_t
*
* @param pSma
* @return int32_t
*/
static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) {
SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma);
......@@ -309,9 +359,10 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) {
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat);
// step 1: merge rsmaInfoHash and iRsmaInfoHash
taosWLockLatch(SMA_ENV_LOCK(pSmaEnv));
if (taosHashGetSize(RSMA_INFO_HASH(pRSmaStat)) <= 0) {
// TODO:
// TODO: optimization - just switch the hash pointer if rsmaInfoHash is empty
}
void *pIter = taosHashIterate(RSMA_IMU_INFO_HASH(pRSmaStat), NULL);
......@@ -320,15 +371,12 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) {
if (!taosHashGet(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t))) {
taosHashPut(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t), pIter, sizeof(pIter));
smaDebug("vgId:%d, rsma async post commit, migrated from iRsmaInfoHash for table:%" PRIi64, SMA_VID(pSma), *pSuid);
smaDebug("vgId:%d, rsma async post commit, migrated from iRsmaInfoHash for table:%" PRIi64, SMA_VID(pSma),
*pSuid);
} else {
// free the resources
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pIter;
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
SRSmaInfoItem *pItem = &pRSmaInfo->items[i];
if (pItem->taskInfo) {
tdFreeQTaskInfo(pItem->taskInfo, SMA_VID(pSma), i + 1);
}
}
tdFreeRSmaInfo(pSma, pRSmaInfo, false);
smaDebug("vgId:%d, rsma async post commit, free rsma info since already COW for table:%" PRIi64, SMA_VID(pSma),
*pSuid);
}
......@@ -339,5 +387,10 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) {
taosHashCleanup(RSMA_IMU_INFO_HASH(pRSmaStat));
RSMA_IMU_INFO_HASH(pRSmaStat) = NULL;
taosWUnLockLatch(SMA_ENV_LOCK(pSmaEnv));
// step 2: cleanup outdated qtaskinfo files
tdClearupQTaskInfoFiles(pSma, pRSmaStat);
return TSDB_CODE_SUCCESS;
}
......@@ -108,12 +108,7 @@ static SSmaEnv *tdNewSmaEnv(const SSma *pSma, int8_t smaType, const char *path)
SMA_ENV_TYPE(pEnv) = smaType;
int code = taosThreadRwlockInit(&(pEnv->lock), NULL);
if (code) {
terrno = TAOS_SYSTEM_ERROR(code);
taosMemoryFree(pEnv);
return NULL;
}
taosInitRWLatch(&(pEnv->lock));
if (tdInitSmaStat(&SMA_ENV_STAT(pEnv), smaType, pSma) != TSDB_CODE_SUCCESS) {
tdFreeSmaEnv(pEnv);
......@@ -147,7 +142,6 @@ static int32_t tdInitSmaEnv(SSma *pSma, int8_t smaType, const char *path, SSmaEn
void tdDestroySmaEnv(SSmaEnv *pSmaEnv) {
if (pSmaEnv) {
pSmaEnv->pStat = tdFreeSmaState(pSmaEnv->pStat, SMA_ENV_TYPE(pSmaEnv));
taosThreadRwlockDestroy(&(pSmaEnv->lock));
}
}
......@@ -259,7 +253,7 @@ static void tdDestroyRSmaStat(void *pRSmaStat) {
void *infoHash = taosHashIterate(RSMA_INFO_HASH(pStat), NULL);
while (infoHash) {
SRSmaInfo *pSmaInfo = *(SRSmaInfo **)infoHash;
tdFreeRSmaInfo(pSma, pSmaInfo);
tdFreeRSmaInfo(pSma, pSmaInfo, true);
infoHash = taosHashIterate(RSMA_INFO_HASH(pStat), infoHash);
}
}
......@@ -359,7 +353,7 @@ int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType) {
}
break;
default:
smaError("vgId:%d, undefined smaType:%", SMA_VID(pSma), smaType);
smaError("vgId:%d undefined smaType:%", SMA_VID(pSma), smaType);
return TSDB_CODE_FAILED;
}
......
......@@ -91,6 +91,7 @@ static FORCE_INLINE void tdRSmaQTaskInfoIterDestroy(SRSmaQTaskInfoIter *pIter) {
void tdFreeQTaskInfo(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level) {
// Note: free/kill may in RC
if (!taskHandle) return;
qTaskInfo_t otaskHandle = atomic_load_ptr(taskHandle);
if (otaskHandle && atomic_val_compare_exchange_ptr(taskHandle, otaskHandle, NULL)) {
smaDebug("vgId:%d, free qTaskInfo_t %p of level %d", vgId, otaskHandle, level);
......@@ -98,14 +99,23 @@ void tdFreeQTaskInfo(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level) {
} else {
smaDebug("vgId:%d, not free qTaskInfo_t %p of level %d", vgId, otaskHandle, level);
}
// TODO: clear files related to qTaskInfo?
}
void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) {
/**
* @brief general function to free rsmaInfo
*
* @param pSma
* @param pInfo
* @param isDeepFree Only stop tmrId and free pTSchema for deep free
* @return void*
*/
void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) {
if (pInfo) {
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
SRSmaInfoItem *pItem = &pInfo->items[i];
if (pItem->taskInfo) {
if (pItem->tmrId) {
if (isDeepFree && pItem->tmrId) {
smaDebug("vgId:%d, table %" PRIi64 " stop fetch timer %p level %d", SMA_VID(pSma), pInfo->suid, pItem->tmrId,
i + 1);
taosTmrStopA(&pItem->tmrId);
......@@ -116,7 +126,9 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) {
pInfo->suid, i + 1);
}
}
taosMemoryFree(pInfo->pTSchema);
if (isDeepFree) {
taosMemoryFree(pInfo->pTSchema);
}
taosMemoryFree(pInfo);
}
......@@ -138,7 +150,7 @@ static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids)
if (!suid || !tbUids) {
terrno = TSDB_CODE_INVALID_PTR;
smaError("vgId:%d, failed to get rsma info for uid:%" PRIi64 " since %s", SMA_VID(pSma), *suid, terrstr(terrno));
smaError("vgId:%d, failed to get rsma info for uid:%" PRIi64 " since %s", SMA_VID(pSma), *suid, terrstr());
return TSDB_CODE_FAILED;
}
......@@ -152,7 +164,7 @@ static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids)
if (pRSmaInfo->items[0].taskInfo) {
if ((qUpdateQualifiedTableId(pRSmaInfo->items[0].taskInfo, tbUids, true) < 0)) {
smaError("vgId:%d, update tbUidList failed for uid:%" PRIi64 " since %s", SMA_VID(pSma), *suid, terrstr(terrno));
smaError("vgId:%d, update tbUidList failed for uid:%" PRIi64 " since %s", SMA_VID(pSma), *suid, terrstr());
return TSDB_CODE_FAILED;
} else {
smaDebug("vgId:%d, update tbUidList succeed for qTaskInfo:%p with suid:%" PRIi64 ", uid:%" PRIi64, SMA_VID(pSma),
......@@ -162,7 +174,7 @@ static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids)
if (pRSmaInfo->items[1].taskInfo) {
if ((qUpdateQualifiedTableId(pRSmaInfo->items[1].taskInfo, tbUids, true) < 0)) {
smaError("vgId:%d, update tbUidList failed for uid:%" PRIi64 " since %s", SMA_VID(pSma), *suid, terrstr(terrno));
smaError("vgId:%d, update tbUidList failed for uid:%" PRIi64 " since %s", SMA_VID(pSma), *suid, terrstr());
return TSDB_CODE_FAILED;
} else {
smaDebug("vgId:%d, update tbUidList succeed for qTaskInfo:%p with suid:%" PRIi64 ", uid:%" PRIi64, SMA_VID(pSma),
......@@ -247,9 +259,10 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
if ((param->qmsgLen > 0) && param->qmsg[idx]) {
SRetention *pRetention = SMA_RETENTION(pSma);
STsdbCfg *pTsdbCfg = SMA_TSDB_CFG(pSma);
SVnode *pVnode = pSma->pVnode;
SReadHandle handle = {
.meta = pSma->pVnode->pMeta,
.vnode = pSma->pVnode,
.meta = pVnode->pMeta,
.vnode = pVnode,
.initTqReader = 1,
};
......@@ -274,7 +287,7 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
pItem->level = idx == 0 ? TSDB_RETENTION_L1 : TSDB_RETENTION_L2;
smaInfo("vgId:%d, table:%" PRIi64 " level:%" PRIi8 " maxdelay:%" PRIi64 " watermark:%" PRIi64
", finally maxdelay:%" PRIi32,
SMA_VID(pSma), pRSmaInfo->suid, idx + 1, param->maxdelay[idx], param->watermark[idx], pItem->maxDelay);
TD_VID(pVnode), pRSmaInfo->suid, idx + 1, param->maxdelay[idx], param->watermark[idx], pItem->maxDelay);
}
return TSDB_CODE_SUCCESS;
}
......@@ -341,7 +354,7 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
return TSDB_CODE_SUCCESS;
_err:
tdFreeRSmaInfo(pSma, pRSmaInfo);
tdFreeRSmaInfo(pSma, pRSmaInfo, true);
return TSDB_CODE_FAILED;
}
......@@ -635,9 +648,18 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType
return TSDB_CODE_SUCCESS;
}
/**
* @brief During async commit, the SRSmaInfo object would be COW from iRSmaInfoHash and write lock should be applied.
*
* @param pSma
* @param suid
* @return SRSmaInfo*
*/
static SRSmaInfo *tdGetRSmaInfoBySuid(SSma *pSma, int64_t suid) {
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pStat = NULL;
SRSmaInfo *pRSmaInfo = NULL;
if (!pEnv) {
// only applicable when rsma env exists
return NULL;
......@@ -648,11 +670,37 @@ static SRSmaInfo *tdGetRSmaInfoBySuid(SSma *pSma, int64_t suid) {
return NULL;
}
SRSmaInfo *pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t));
if (!pRSmaInfo || !(pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) {
pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t));
if (pRSmaInfo && (pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) {
return pRSmaInfo;
}
if (RSMA_COMMIT_STAT(pStat) == 0) {
return NULL;
}
return pRSmaInfo;
// clone the SRSmaInfo from iRsmaInfoHash to rsmaInfoHash if in committing stat
SRSmaInfo *pCowRSmaInfo = NULL;
// lock
taosWLockLatch(SMA_ENV_LOCK(pEnv));
void *iRSmaInfo = taosHashGet(RSMA_IMU_INFO_HASH(pStat), &suid, sizeof(tb_uid_t));
if (iRSmaInfo) {
SRSmaInfo *pIRSmaInfo = *(SRSmaInfo **)iRSmaInfo;
if (pIRSmaInfo) {
if (tdCloneRSmaInfo(pSma, pCowRSmaInfo, pIRSmaInfo) < 0) {
taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
smaError("vgId:%d, clone rsma info failed for suid:%" PRIu64 " since %s", SMA_VID(pSma), suid, terrstr());
return NULL;
}
if (taosHashPut(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t), &pCowRSmaInfo, sizeof(pCowRSmaInfo)) < 0) {
taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
return NULL;
}
}
}
// unlock
taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
return pCowRSmaInfo;
}
static int32_t tdExecuteRSma(SSma *pSma, const void *pMsg, int32_t inputType, tb_uid_t suid) {
......@@ -881,6 +929,13 @@ _err:
return TSDB_CODE_FAILED;
}
/**
* @brief Restore from SRSmaQTaskInfoItem
*
* @param pSma
* @param pItem
* @return int32_t
*/
static int32_t tdRSmaQTaskInfoItemRestore(SSma *pSma, const SRSmaQTaskInfoItem *pItem) {
SRSmaInfo *pRSmaInfo = NULL;
void *qTaskInfo = NULL;
......@@ -906,7 +961,7 @@ static int32_t tdRSmaQTaskInfoItemRestore(SSma *pSma, const SRSmaQTaskInfoItem *
if (qDeserializeTaskStatus(qTaskInfo, pItem->qTaskInfo, pItem->len) < 0) {
smaError("vgId:%d, restore rsma task failed for table:%" PRIi64 " level %d since %s", SMA_VID(pSma), pItem->suid,
pItem->type, terrstr(terrno));
pItem->type, terrstr());
return TSDB_CODE_FAILED;
}
smaDebug("vgId:%d, restore rsma task success for table:%" PRIi64 " level %d", SMA_VID(pSma), pItem->suid,
......@@ -1053,26 +1108,27 @@ static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, SRSmaQTaskInfoIter *pIter) {
return TSDB_CODE_SUCCESS;
}
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat) {
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
SSma *pSma = pRSmaStat->pSma;
SVnode *pVnode = pSma->pVnode;
int32_t vid = SMA_VID(pSma);
int64_t toffset = 0;
bool isFileCreated = false;
if (taosHashGetSize(RSMA_INFO_HASH(pRSmaStat)) <= 0) {
if (taosHashGetSize(pInfoHash) <= 0) {
return TSDB_CODE_SUCCESS;
}
void *infoHash = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), NULL);
void *infoHash = taosHashIterate(pInfoHash, NULL);
if (!infoHash) {
return TSDB_CODE_SUCCESS;
}
STFile tFile = {0};
if (RSMA_SUBMIT_VER(pRSmaStat) > 0) {
#if 0
if (pRSmaStat->commitAppliedVer > 0) {
char qTaskInfoFName[TSDB_FILENAME_LEN];
tdRSmaQTaskInfoGetFName(vid, pSma->pVnode->state.applied, qTaskInfoFName);
tdRSmaQTaskInfoGetFName(vid, pRSmaStat->commitAppliedVer, qTaskInfoFName);
if (tdInitTFile(&tFile, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFName) < 0) {
smaError("vgId:%d, rsma persit, init %s failed since %s", vid, qTaskInfoFName, terrstr());
goto _err;
......@@ -1085,6 +1141,7 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat) {
isFileCreated = true;
}
#endif
while (infoHash) {
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash;
......@@ -1100,7 +1157,7 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat) {
int8_t type = (int8_t)(i + 1);
if (qSerializeTaskStatus(taskInfo, &pOutput, &len) < 0) {
smaError("vgId:%d, rsma, table %" PRIi64 " level %d serialize qTaskInfo failed since %s", vid, pRSmaInfo->suid,
i + 1, terrstr(terrno));
i + 1, terrstr());
goto _err;
}
if (!pOutput || len <= 0) {
......@@ -1116,7 +1173,7 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat) {
if (!isFileCreated) {
char qTaskInfoFName[TSDB_FILENAME_LEN];
tdRSmaQTaskInfoGetFName(vid, pSma->pVnode->state.applied, qTaskInfoFName);
tdRSmaQTaskInfoGetFName(vid, pRSmaStat->commitAppliedVer, qTaskInfoFName);
if (tdInitTFile(&tFile, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFName) < 0) {
smaError("vgId:%d, rsma persit, init %s failed since %s", vid, qTaskInfoFName, terrstr());
goto _err;
......@@ -1149,11 +1206,11 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat) {
taosMemoryFree(pOutput);
}
infoHash = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), infoHash);
infoHash = taosHashIterate(pInfoHash, infoHash);
}
if (isFileCreated) {
tFile.info.qTaskInfo.submitVer = atomic_load_64(&pRSmaStat->submitVer);
tFile.info.qTaskInfo.submitVer = atomic_load_64(&pRSmaStat->commitSubmitVer);
if (tdUpdateTFileHeader(&tFile) < 0) {
smaError("vgId:%d, rsma, failed to update tfile %s header since %s", vid, TD_TFILE_FULL_NAME(&tFile),
tstrerror(terrno));
......
......@@ -313,4 +313,99 @@ int32_t tdReleaseSmaRef(int32_t rsetId, int64_t refId, const char *tags, int32_t
return TSDB_CODE_SUCCESS;
}
static int32_t tdCloneQTaskInfo(SSma *pSma, qTaskInfo_t dstTaskInfo, qTaskInfo_t srcTaskInfo, SRSmaParam *param,
tb_uid_t suid, int8_t idx) {
SVnode *pVnode = pSma->pVnode;
char *pOutput = NULL;
int32_t len = 0;
if (qSerializeTaskStatus(srcTaskInfo, &pOutput, &len) < 0) {
smaError("vgId:%d, rsma clone, table %" PRIi64 " serialize qTaskInfo failed since %s", TD_VID(pVnode), suid,
terrstr());
goto _err;
}
SReadHandle handle = {
.meta = pVnode->pMeta,
.vnode = pVnode,
.initTqReader = 1,
};
ASSERT(!dstTaskInfo);
dstTaskInfo = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle);
if (!dstTaskInfo) {
terrno = TSDB_CODE_RSMA_QTASKINFO_CREATE;
goto _err;
}
if (qDeserializeTaskStatus(dstTaskInfo, pOutput, len) < 0) {
smaError("vgId:%d, rsma clone, restore rsma task for table:%" PRIi64 " failed since %s", TD_VID(pVnode), suid,
terrstr());
goto _err;
}
smaError("vgId:%d, rsma clone, restore rsma task for table:%" PRIi64 " succeed", TD_VID(pVnode), suid);
taosMemoryFreeClear(pOutput);
return TSDB_CODE_SUCCESS;
_err:
taosMemoryFreeClear(pOutput);
tdFreeQTaskInfo(dstTaskInfo, TD_VID(pVnode), idx + 1);
return TSDB_CODE_FAILED;
}
/**
* @brief pTSchema is shared
*
* @param pSma
* @param pDest
* @param pSrc
* @return int32_t
*/
int32_t tdCloneRSmaInfo(SSma *pSma, SRSmaInfo *pDest, SRSmaInfo *pSrc) {
SVnode *pVnode = pSma->pVnode;
SRSmaParam *param = NULL;
if (!pSrc) {
return TSDB_CODE_SUCCESS;
}
if (!pDest) {
pDest = taosMemoryCalloc(1, sizeof(SRSmaInfo));
if (!pDest) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_FAILED;
}
}
memcpy(pDest, pSrc, sizeof(SRSmaInfo));
SMetaReader mr = {0};
metaReaderInit(&mr, SMA_META(pSma), 0);
smaDebug("vgId:%d, rsma clone, suid is %" PRIi64, TD_VID(pVnode), pSrc->suid);
if (metaGetTableEntryByUid(&mr, pSrc->suid) < 0) {
smaError("vgId:%d, rsma clone, failed to get table meta for %" PRIi64 " since %s", TD_VID(pVnode), pSrc->suid,
terrstr());
goto _err;
}
ASSERT(mr.me.type == TSDB_SUPER_TABLE);
ASSERT(mr.me.uid == pSrc->suid);
if (TABLE_IS_ROLLUP(mr.me.flags)) {
param = &mr.me.stbEntry.rsmaParam;
for (int i = 0; i < TSDB_RETENTION_L2; ++i) {
SRSmaInfoItem *pItem = &pSrc->items[i];
if (pItem->taskInfo) {
tdCloneQTaskInfo(pSma, pDest->items[i].taskInfo, pItem->taskInfo, param, pSrc->suid, i);
}
}
smaDebug("vgId:%d, rsma clone env success for %" PRIi64, TD_VID(pVnode), pSrc->suid);
}
metaReaderClear(&mr);
return TSDB_CODE_SUCCESS;
_err:
metaReaderClear(&mr);
tdFreeRSmaInfo(pSma, pDest, false);
return TSDB_CODE_FAILED;
}
// ...
\ No newline at end of file
......@@ -233,7 +233,8 @@ int vnodeCommit(SVnode *pVnode) {
walBeginSnapshot(pVnode->pWal, pVnode->state.applied);
// preCommit
smaSyncPreCommit(pVnode->pSma);
// smaSyncPreCommit(pVnode->pSma);
smaAsyncPreCommit(pVnode->pSma);
// commit each sub-system
if (metaCommit(pVnode->pMeta) < 0) {
......@@ -242,6 +243,8 @@ int vnodeCommit(SVnode *pVnode) {
}
if (VND_IS_RSMA(pVnode)) {
smaAsyncCommit(pVnode->pSma);
if (tsdbCommit(VND_RSMA0(pVnode)) < 0) {
ASSERT(0);
return -1;
......@@ -276,7 +279,8 @@ int vnodeCommit(SVnode *pVnode) {
pVnode->state.committed = info.state.committed;
// postCommit
smaSyncPostCommit(pVnode->pSma);
// smaSyncPostCommit(pVnode->pSma);
smaAsyncPostCommit(pVnode->pSma);
// apply the commit (TODO)
walEndSnapshot(pVnode->pWal);
......
......@@ -900,7 +900,7 @@ static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *
_err:
tDecoderClear(&coder);
vError("vgId:%d, failed to create tsma %s:%" PRIi64 " version %" PRIi64 "for table %" PRIi64 " since %s",
TD_VID(pVnode), req.indexName, req.indexUid, version, req.tableUid, terrstr(terrno));
TD_VID(pVnode), req.indexName, req.indexUid, version, req.tableUid, terrstr());
return -1;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册