未验证 提交 9bf71800 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #21969 from taosdata/enh/TD-23769-3.0

enh: rsma stream state adaption
......@@ -757,9 +757,8 @@ int32_t* taosGetErrno();
#define TSDB_CODE_RSMA_INVALID_SCHEMA TAOS_DEF_ERROR_CODE(0, 0x3153)
#define TSDB_CODE_RSMA_STREAM_STATE_OPEN TAOS_DEF_ERROR_CODE(0, 0x3154)
#define TSDB_CODE_RSMA_STREAM_STATE_COMMIT TAOS_DEF_ERROR_CODE(0, 0x3155)
#define TSDB_CODE_RSMA_FS_REF TAOS_DEF_ERROR_CODE(0, 0x3156)
#define TSDB_CODE_RSMA_FS_SYNC TAOS_DEF_ERROR_CODE(0, 0x3157)
#define TSDB_CODE_RSMA_FS_UPDATE TAOS_DEF_ERROR_CODE(0, 0x3158)
#define TSDB_CODE_RSMA_FS_SYNC TAOS_DEF_ERROR_CODE(0, 0x3156)
#define TSDB_CODE_RSMA_RESULT TAOS_DEF_ERROR_CODE(0, 0x3157)
//index
#define TSDB_CODE_INDEX_REBUILDING TAOS_DEF_ERROR_CODE(0, 0x3200)
......
......@@ -32,7 +32,6 @@ target_sources(
# sma
"src/sma/smaEnv.c"
"src/sma/smaUtil.c"
"src/sma/smaFS.c"
"src/sma/smaOpen.c"
"src/sma/smaCommit.c"
"src/sma/smaRollup.c"
......
......@@ -105,17 +105,16 @@ struct SRSmaFS {
struct SRSmaStat {
SSma *pSma;
int64_t commitAppliedVer; // vnode applied version for async commit
int64_t refId; // shared by fetch tasks
volatile int64_t nBufItems; // number of items in queue buffer
SRWLatch lock; // r/w lock for rsma fs(e.g. qtaskinfo)
volatile int32_t nFetchAll; // active number of fetch all
volatile int8_t triggerStat; // shared by fetch tasks
volatile int8_t commitStat; // 0 not in committing, 1 in committing
volatile int8_t delFlag; // 0 no deleted SRSmaInfo, 1 has deleted SRSmaInfo
SRSmaFS fs; // for recovery/snapshot r/w
SHashObj *infoHash; // key: suid, value: SRSmaInfo
tsem_t notEmpty; // has items in queue buffer
int64_t refId; // shared by fetch tasks
volatile int64_t nBufItems; // number of items in queue buffer
SRWLatch lock; // r/w lock for rsma fs(e.g. qtaskinfo)
volatile int32_t nFetchAll; // active number of fetch all
volatile int8_t triggerStat; // shared by fetch tasks
volatile int8_t commitStat; // 0 not in committing, 1 in committing
volatile int8_t delFlag; // 0 no deleted SRSmaInfo, 1 has deleted SRSmaInfo
SRSmaFS fs; // for recovery/snapshot r/w
SHashObj *infoHash; // key: suid, value: SRSmaInfo
tsem_t notEmpty; // has items in queue buffer
};
struct SSmaStat {
......@@ -156,12 +155,9 @@ struct SRSmaInfo {
int16_t padding;
T_REF_DECLARE()
SRSmaInfoItem items[TSDB_RETENTION_L2];
void *taskInfo[TSDB_RETENTION_L2]; // qTaskInfo_t
STaosQueue *queue; // buffer queue of SubmitReq
STaosQall *qall; // buffer qall of SubmitReq
void *iTaskInfo[TSDB_RETENTION_L2]; // immutable qTaskInfo_t
STaosQueue *iQueue; // immutable buffer queue of SubmitReq
STaosQall *iQall; // immutable buffer qall of SubmitReq
void *taskInfo[TSDB_RETENTION_L2]; // qTaskInfo_t
STaosQueue *queue; // buffer queue of SubmitReq
STaosQall *qall; // buffer qall of SubmitReq
};
#define RSMA_INFO_HEAD_LEN offsetof(SRSmaInfo, items)
......@@ -191,6 +187,12 @@ typedef enum {
RSMA_EXEC_COMMIT = 3, // triggered by commit
} ERsmaExecType;
#define TD_SMA_LOOPS_CHECK(n, limit) \
if (++(n) > limit) { \
sched_yield(); \
(n) = 0; \
}
// sma
int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType);
void tdDestroySmaEnv(SSmaEnv *pSmaEnv);
......@@ -213,27 +215,12 @@ int32_t smaPreClose(SSma *pSma);
// rsma
void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree);
int32_t tdRSmaFSOpen(SSma *pSma, int64_t version, int8_t rollback);
void tdRSmaFSClose(SRSmaFS *fs);
int32_t tdRSmaFSPrepareCommit(SSma *pSma, SRSmaFS *pFSNew);
int32_t tdRSmaFSCommit(SSma *pSma);
int32_t tdRSmaFSFinishCommit(SSma *pSma);
int32_t tdRSmaFSCopy(SSma *pSma, SRSmaFS *pFS);
int32_t tdRSmaFSTakeSnapshot(SSma *pSma, SRSmaFS *pFS);
int32_t tdRSmaFSRef(SSma *pSma, SRSmaFS *pFS);
void tdRSmaFSUnRef(SSma *pSma, SRSmaFS *pFS);
int32_t tdRSmaFSUpsertQTaskFile(SSma *pSma, SRSmaFS *pFS, SQTaskFile *qTaskFile, int32_t nSize);
int32_t tdRSmaFSRollback(SSma *pSma);
int32_t tdRSmaRestore(SSma *pSma, int8_t type, int64_t committedVer, int8_t rollback);
int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName);
int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type);
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash);
// int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash);
int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer, int8_t rollback);
void tdRSmaQTaskInfoGetFileName(SVnode *pVnode, int64_t suid, int8_t level, int64_t version, char *outputName);
void tdRSmaQTaskInfoGetFullName(SVnode *pVnode, int64_t suid, int8_t level, int64_t version, STfs *pTfs,
char *outputName);
void tdRSmaQTaskInfoGetFullPath(SVnode *pVnode, int8_t level, STfs *pTfs, char *outputName);
void tdRSmaQTaskInfoGetFullPathEx(SVnode *pVnode, tb_uid_t suid, int8_t level, STfs *pTfs, char *outputName);
void tdRSmaQTaskInfoGetFullPath(SVnode *pVnode, tb_uid_t suid, int8_t level, STfs *pTfs, char *outputName);
static FORCE_INLINE void tdRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) {
int32_t ref = T_REF_INC(pRSmaInfo);
......@@ -244,8 +231,6 @@ static FORCE_INLINE void tdUnRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) {
smaTrace("vgId:%d, unref rsma info:%p, val:%d", SMA_VID(pSma), pRSmaInfo, ref);
}
void tdRSmaGetFileName(SVnode *pVnode, STfs *pTfs, const char *fname, int64_t suid, int8_t level, int64_t version,
char *outputName);
void tdRSmaGetDirName(SVnode *pVnode, STfs *pTfs, bool endWithSep, char *outputName);
#ifdef __cplusplus
......
......@@ -319,7 +319,6 @@ int32_t rsmaSnapRead(SRSmaSnapReader* pReader, uint8_t** ppData);
// SRSmaSnapWriter ========================================
int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapWriter** ppWriter);
int32_t rsmaSnapWrite(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData);
int32_t rsmaSnapWriterPrepareClose(SRSmaSnapWriter* pWriter);
int32_t rsmaSnapWriterClose(SRSmaSnapWriter** ppWriter, int8_t rollback);
typedef struct {
......
......@@ -108,9 +108,6 @@ int32_t smaFinishCommit(SSma *pSma) {
int32_t lino = 0;
SVnode *pVnode = pSma->pVnode;
code = tdRSmaFSFinishCommit(pSma);
TSDB_CHECK_CODE(code, lino, _exit);
if (VND_RSMA1(pVnode) && (code = tsdbFinishCommit(VND_RSMA1(pVnode))) < 0) {
TSDB_CHECK_CODE(code, lino, _exit);
}
......@@ -150,18 +147,7 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma, bool isCommit) {
atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED);
if (isCommit) {
while (atomic_val_compare_exchange_8(RSMA_COMMIT_STAT(pRSmaStat), 0, 1) != 0) {
++nLoops;
if (nLoops > 1000) {
sched_yield();
nLoops = 0;
}
}
pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied;
if (ASSERTS(pRSmaStat->commitAppliedVer >= -1, "commit applied version %" PRIi64 " < -1",
pRSmaStat->commitAppliedVer)) {
code = TSDB_CODE_APP_ERROR;
TSDB_CHECK_CODE(code, lino, _exit);
TD_SMA_LOOPS_CHECK(nLoops, 1000)
}
}
// step 2: wait for all triggered fetch tasks to finish
......@@ -173,11 +159,7 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma, bool isCommit) {
} else {
smaDebug("vgId:%d, rsma commit%d, fetch tasks are not all finished yet", SMA_VID(pSma), isCommit);
}
++nLoops;
if (nLoops > 1000) {
sched_yield();
nLoops = 0;
}
TD_SMA_LOOPS_CHECK(nLoops, 1000);
}
/**
......@@ -189,40 +171,17 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma, bool isCommit) {
(void *)taosGetSelfPthreadId());
nLoops = 0;
while (atomic_load_64(&pRSmaStat->nBufItems) > 0) {
++nLoops;
if (nLoops > 1000) {
sched_yield();
nLoops = 0;
}
TD_SMA_LOOPS_CHECK(nLoops, 1000);
}
if (!isCommit) goto _exit;
smaInfo("vgId:%d, rsma commit, all items are consumed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
code = tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat));
// code = tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat));
TSDB_CHECK_CODE(code, lino, _exit);
smaInfo("vgId:%d, rsma commit, operator state committed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
#if 0 // consuming task of qTaskInfo clone
// step 4: swap queue/qall and iQueue/iQall
// lock
taosWLockLatch(SMA_ENV_LOCK(pEnv));
void *pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), NULL);
while (pIter) {
SRSmaInfo *pInfo = *(SRSmaInfo **)pIter;
TSWAP(pInfo->iQall, pInfo->qall);
TSWAP(pInfo->iQueue, pInfo->queue);
TSWAP(pInfo->iTaskInfo[0], pInfo->taskInfo[0]);
TSWAP(pInfo->iTaskInfo[1], pInfo->taskInfo[1]);
pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), pIter);
}
// unlock
taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
#endif
smaInfo("vgId:%d, rsma commit, all items are consumed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
// all rsma results are written completely
STsdb *pTsdb = NULL;
......@@ -258,9 +217,6 @@ static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma, SCommitInfo *pInfo) {
goto _exit;
}
code = tdRSmaFSCommit(pSma);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbCommit(VND_RSMA1(pVnode), pInfo);
TSDB_CHECK_CODE(code, lino, _exit);
......@@ -310,20 +266,6 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) {
continue;
}
#if 0
if (pRSmaInfo->taskInfo[0]) {
if (pRSmaInfo->iTaskInfo[0]) {
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pRSmaInfo->iTaskInfo[0];
tdFreeRSmaInfo(pSma, pRSmaInfo, false);
pRSmaInfo->iTaskInfo[0] = NULL;
}
} else {
TSWAP(pRSmaInfo->taskInfo[0], pRSmaInfo->iTaskInfo[0]);
}
taosHashPut(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t), pIter, sizeof(pIter));
smaDebug("vgId:%d, rsma async post commit, migrated from iRsmaInfoHash for table:%" PRIi64, SMA_VID(pSma), *pSuid);
#endif
}
// unlock
......
......@@ -30,7 +30,6 @@ static int32_t tdRsmaStartExecutor(const SSma *pSma);
static int32_t tdRsmaStopExecutor(const SSma *pSma);
static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType);
static void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType);
static void *tdFreeTSmaStat(STSmaStat *pStat);
static void tdDestroyRSmaStat(void *pRSmaStat);
/**
......@@ -63,19 +62,15 @@ int32_t smaInit() {
int32_t type = (8 == POINTER_BYTES) ? TSDB_DATA_TYPE_UBIGINT : TSDB_DATA_TYPE_UINT;
smaMgmt.refHash = taosHashInit(64, taosGetDefaultHashFunction(type), true, HASH_ENTRY_LOCK);
if (!smaMgmt.refHash) {
taosCloseRef(smaMgmt.rsetId);
atomic_store_8(&smaMgmt.inited, 0);
smaError("failed to init sma tmr hanle since %s", terrstr());
return TSDB_CODE_FAILED;
}
// init fetch timer handle
smaMgmt.tmrHandle = taosTmrInit(10000, 100, 10000, "RSMA");
if (!smaMgmt.tmrHandle) {
if (!smaMgmt.refHash || !smaMgmt.tmrHandle) {
taosCloseRef(smaMgmt.rsetId);
taosHashCleanup(smaMgmt.refHash);
smaMgmt.refHash = NULL;
if (smaMgmt.refHash) {
taosHashCleanup(smaMgmt.refHash);
smaMgmt.refHash = NULL;
}
atomic_store_8(&smaMgmt.inited, 0);
smaError("failed to init sma tmr handle since %s", terrstr());
return TSDB_CODE_FAILED;
......@@ -143,10 +138,6 @@ static int32_t tdNewSmaEnv(SSma *pSma, int8_t smaType, SSmaEnv **ppEnv) {
}
static int32_t tdInitSmaEnv(SSma *pSma, int8_t smaType, SSmaEnv **ppEnv) {
if (!ppEnv) {
terrno = TSDB_CODE_INVALID_PTR;
return TSDB_CODE_FAILED;
}
if (!(*ppEnv)) {
if (tdNewSmaEnv(pSma, smaType, ppEnv) != TSDB_CODE_SUCCESS) {
......@@ -196,10 +187,6 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS
int32_t code = 0;
int32_t lino = 0;
if (ASSERTS(pSmaStat != NULL, "pSmaStat is NULL")) {
terrno = TSDB_CODE_RSMA_INVALID_ENV;
TSDB_CHECK_CODE(code, lino, _exit);
}
if (*pSmaStat) { // no lock
return code; // success, return directly
......@@ -255,15 +242,13 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS
taosInitRWLatch(RSMA_FS_LOCK(pRSmaStat));
} else if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
// TODO
} else {
ASSERTS(0, "unknown smaType:%" PRIi8, smaType);
code = TSDB_CODE_APP_ERROR;
TSDB_CHECK_CODE(code, lino, _exit);
}
}
_exit:
if (code) {
smaError("vgId:%d, %s failed at line %d since %s", SMA_VID(pSma), __func__, lino, tstrerror(code));
} else {
smaDebug("vgId:%d, %s succeed, type:%" PRIi8, SMA_VID(pSma), __func__, smaType);
}
return code;
}
......@@ -277,12 +262,6 @@ static void tdDestroyTSmaStat(STSmaStat *pStat) {
}
}
static void *tdFreeTSmaStat(STSmaStat *pStat) {
tdDestroyTSmaStat(pStat);
taosMemoryFreeClear(pStat);
return NULL;
}
static void tdDestroyRSmaStat(void *pRSmaStat) {
if (pRSmaStat) {
SRSmaStat *pStat = (SRSmaStat *)pRSmaStat;
......@@ -300,11 +279,7 @@ static void tdDestroyRSmaStat(void *pRSmaStat) {
} else {
smaDebug("vgId:%d, rsma fetch tasks are not all finished yet", SMA_VID(pSma));
}
++nLoops;
if (nLoops > 1000) {
sched_yield();
nLoops = 0;
}
TD_SMA_LOOPS_CHECK(nLoops, 1000);
}
// step 3:
......@@ -313,10 +288,7 @@ static void tdDestroyRSmaStat(void *pRSmaStat) {
// step 4: destroy the rsma info and associated fetch tasks
taosHashCleanup(RSMA_INFO_HASH(pStat));
// step 5:
tdRSmaFSClose(RSMA_FS(pStat));
// step 6: free pStat
// step 5: free pStat
tsem_destroy(&(pStat->notEmpty));
taosMemoryFreeClear(pStat);
}
......@@ -354,10 +326,7 @@ static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType) {
smaDebug("vgId:%d, remove refId:%" PRIi64 " from rsmaRef:%" PRIi32 " succeed", vid, refId, smaMgmt.rsetId);
}
} else {
ASSERTS(0, "unknown smaType:%" PRIi8, smaType);
terrno = TSDB_CODE_APP_ERROR;
smaError("%s failed at line %d since %s", __func__, __LINE__, terrstr());
return -1;
smaError("%s failed at line %d since Unknown type", __func__, __LINE__);
}
}
return 0;
......@@ -375,11 +344,6 @@ int32_t tdLockSma(SSma *pSma) {
}
int32_t tdUnLockSma(SSma *pSma) {
if (ASSERTS(SMA_LOCKED(pSma), "pSma %p is not locked:%d", pSma, pSma->locked)) {
terrno = TSDB_CODE_APP_ERROR;
smaError("vgId:%d, failed to unlock since %s", SMA_VID(pSma), tstrerror(terrno));
return -1;
}
pSma->locked = false;
int code = taosThreadMutexUnlock(&pSma->mutex);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "sma.h"
#include "vnd.h"
// =================================================================================================
// static int32_t tdFetchQTaskInfoFiles(SSma *pSma, int64_t version, SArray **output);
static int32_t tdQTaskInfCmprFn1(const void *p1, const void *p2);
static FORCE_INLINE int32_t tPutQTaskF(uint8_t *p, SQTaskFile *pFile) {
int32_t n = 0;
n += tPutI8(p ? p + n : p, pFile->level);
n += tPutI64v(p ? p + n : p, pFile->size);
n += tPutI64v(p ? p + n : p, pFile->suid);
n += tPutI64v(p ? p + n : p, pFile->version);
n += tPutI64v(p ? p + n : p, pFile->mtime);
return n;
}
static int32_t tdRSmaFSToBinary(uint8_t *p, SRSmaFS *pFS) {
int32_t n = 0;
uint32_t size = taosArrayGetSize(pFS->aQTaskInf);
// version
n += tPutI8(p ? p + n : p, 0);
// SArray<SQTaskFile>
n += tPutU32v(p ? p + n : p, size);
for (uint32_t i = 0; i < size; ++i) {
n += tPutQTaskF(p ? p + n : p, taosArrayGet(pFS->aQTaskInf, i));
}
return n;
}
int32_t tdRSmaGetQTaskF(uint8_t *p, SQTaskFile *pFile) {
int32_t n = 0;
n += tGetI8(p + n, &pFile->level);
n += tGetI64v(p + n, &pFile->size);
n += tGetI64v(p + n, &pFile->suid);
n += tGetI64v(p + n, &pFile->version);
n += tGetI64v(p + n, &pFile->mtime);
return n;
}
static int32_t tsdbBinaryToFS(uint8_t *pData, int64_t nData, SRSmaFS *pFS) {
int32_t code = 0;
int32_t n = 0;
int8_t version = 0;
// version
n += tGetI8(pData + n, &version);
// SArray<SQTaskFile>
taosArrayClear(pFS->aQTaskInf);
uint32_t size = 0;
n += tGetU32v(pData + n, &size);
for (uint32_t i = 0; i < size; ++i) {
SQTaskFile qTaskF = {0};
int32_t nt = tdRSmaGetQTaskF(pData + n, &qTaskF);
if (nt < 0) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _exit;
}
n += nt;
if (taosArrayPush(pFS->aQTaskInf, &qTaskF) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
}
if (ASSERTS(n + sizeof(TSCKSUM) == nData, "n:%d + sizeof(TSCKSUM):%d != nData:%d", n, (int32_t)sizeof(TSCKSUM),
nData)) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _exit;
}
_exit:
return code;
}
static int32_t tdRSmaSaveFSToFile(SRSmaFS *pFS, const char *fname) {
int32_t code = 0;
int32_t lino = 0;
// encode to binary
int32_t size = tdRSmaFSToBinary(NULL, pFS) + sizeof(TSCKSUM);
uint8_t *pData = taosMemoryMalloc(size);
if (pData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
tdRSmaFSToBinary(pData, pFS);
taosCalcChecksumAppend(0, pData, size);
// save to file
TdFilePtr pFD = taosCreateFile(fname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
if (pFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
int64_t n = taosWriteFile(pFD, pData, size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
taosCloseFile(&pFD);
TSDB_CHECK_CODE(code, lino, _exit);
}
if (taosFsyncFile(pFD) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
taosCloseFile(&pFD);
TSDB_CHECK_CODE(code, lino, _exit);
}
taosCloseFile(&pFD);
_exit:
if (pData) taosMemoryFree(pData);
if (code) {
smaError("%s failed at line %d since %s, fname:%s", __func__, lino, tstrerror(code), fname);
}
return code;
}
static int32_t tdRSmaFSCreate(SRSmaFS *pFS, int32_t size) {
int32_t code = 0;
pFS->aQTaskInf = taosArrayInit(size, sizeof(SQTaskFile));
if (pFS->aQTaskInf == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
_exit:
return code;
}
static void tdRSmaGetCurrentFName(SSma *pSma, char *current, char *current_t) {
SVnode *pVnode = pSma->pVnode;
int32_t offset = 0;
vnodeGetPrimaryDir(pVnode->path, pVnode->pTfs, current, TSDB_FILENAME_LEN);
offset = strlen(current);
snprintf(current + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s%sPRESENT", TD_DIRSEP, VNODE_RSMA_DIR, TD_DIRSEP);
vnodeGetPrimaryDir(pVnode->path, pVnode->pTfs, current_t, TSDB_FILENAME_LEN);
offset = strlen(current_t);
snprintf(current_t + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s%sPRESENT.t", TD_DIRSEP, VNODE_RSMA_DIR, TD_DIRSEP);
}
static int32_t tdRSmaLoadFSFromFile(const char *fname, SRSmaFS *pFS) {
int32_t code = 0;
int32_t lino = 0;
uint8_t *pData = NULL;
// load binary
TdFilePtr pFD = taosOpenFile(fname, TD_FILE_READ);
if (pFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
int64_t size;
if (taosFStatFile(pFD, &size, NULL) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
taosCloseFile(&pFD);
TSDB_CHECK_CODE(code, lino, _exit);
}
pData = taosMemoryMalloc(size);
if (pData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
taosCloseFile(&pFD);
TSDB_CHECK_CODE(code, lino, _exit);
}
if (taosReadFile(pFD, pData, size) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
taosCloseFile(&pFD);
TSDB_CHECK_CODE(code, lino, _exit);
}
if (!taosCheckChecksumWhole(pData, size)) {
code = TSDB_CODE_FILE_CORRUPTED;
taosCloseFile(&pFD);
TSDB_CHECK_CODE(code, lino, _exit);
}
taosCloseFile(&pFD);
// decode binary
code = tsdbBinaryToFS(pData, size, pFS);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (pData) taosMemoryFree(pData);
if (code) {
smaError("%s failed at line %d since %s, fname:%s", __func__, lino, tstrerror(code), fname);
}
return code;
}
static int32_t tdQTaskInfCmprFn1(const void *p1, const void *p2) {
const SQTaskFile *q1 = (const SQTaskFile *)p1;
const SQTaskFile *q2 = (const SQTaskFile *)p2;
if (q1->suid < q2->suid) {
return -1;
} else if (q1->suid > q2->suid) {
return 1;
}
if (q1->level < q2->level) {
return -1;
} else if (q1->level > q2->level) {
return 1;
}
if (q1->version < q2->version) {
return -2;
} else if (q1->version > q2->version) {
return 1;
}
return 0;
}
static int32_t tdRSmaFSApplyChange(SSma *pSma, SRSmaFS *pFSNew) {
int32_t code = 0;
int32_t lino = 0;
int32_t nRef = 0;
SVnode *pVnode = pSma->pVnode;
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
SRSmaFS *pFSOld = RSMA_FS(pStat);
int64_t version = pStat->commitAppliedVer;
char fname[TSDB_FILENAME_LEN] = {0};
// SQTaskFile
int32_t nNew = taosArrayGetSize(pFSNew->aQTaskInf);
int32_t iNew = 0;
while (iNew < nNew) {
SQTaskFile *pQTaskFNew = TARRAY_GET_ELEM(pFSNew->aQTaskInf, iNew++);
int32_t idx = taosArraySearchIdx(pFSOld->aQTaskInf, pQTaskFNew, tdQTaskInfCmprFn1, TD_GE);
if (idx < 0) {
idx = taosArrayGetSize(pFSOld->aQTaskInf);
pQTaskFNew->nRef = 1;
} else {
SQTaskFile *pTaskF = TARRAY_GET_ELEM(pFSOld->aQTaskInf, idx);
int32_t c1 = tdQTaskInfCmprFn1(pQTaskFNew, pTaskF);
if (c1 == 0) {
// utilize the item in pFSOld->qQTaskInf, instead of pFSNew
continue;
} else if (c1 < 0) {
// NOTHING TODO
} else {
code = TSDB_CODE_RSMA_FS_UPDATE;
TSDB_CHECK_CODE(code, lino, _exit);
}
}
if (taosArrayInsert(pFSOld->aQTaskInf, idx, pQTaskFNew) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
// remove previous version
while (--idx >= 0) {
SQTaskFile *preTaskF = TARRAY_GET_ELEM(pFSOld->aQTaskInf, idx);
int32_t c2 = tdQTaskInfCmprFn1(preTaskF, pQTaskFNew);
if (c2 == 0) {
code = TSDB_CODE_RSMA_FS_UPDATE;
TSDB_CHECK_CODE(code, lino, _exit);
} else if (c2 != -2) {
break;
}
nRef = atomic_sub_fetch_32(&preTaskF->nRef, 1);
if (nRef <= 0) {
tdRSmaQTaskInfoGetFullName(pVnode, preTaskF->suid, preTaskF->level, preTaskF->version, pVnode->pTfs, fname);
(void)taosRemoveFile(fname);
taosArrayRemove(pFSOld->aQTaskInf, idx);
}
}
}
_exit:
if (code) {
smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
}
return code;
}
static int32_t tdRSmaFSScanAndTryFix(SSma *pSma) {
int32_t code = 0;
#if 0
int32_t lino = 0;
SVnode *pVnode = pSma->pVnode;
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
SRSmaFS *pFS = RSMA_FS(pStat);
char fname[TSDB_FILENAME_LEN] = {0};
char fnameVer[TSDB_FILENAME_LEN] = {0};
// SArray<SQTaskFile>
int32_t size = taosArrayGetSize(pFS->aQTaskInf);
for (int32_t i = 0; i < size; ++i) {
SQTaskFile *pTaskF = (SQTaskFile *)taosArrayGet(pFS->aQTaskInf, i);
// main.tdb =========
tdRSmaQTaskInfoGetFullName(pVnode, pTaskF->suid, pTaskF->level, pTaskF->version,
pVnode->pTfs, fnameVer);
tdRSmaQTaskInfoGetFullName(pVnode, pTaskF->suid, pTaskF->level, -1, pVnode->pTfs, fname);
if (taosCheckExistFile(fnameVer)) {
if (taosRenameFile(fnameVer, fname) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
smaDebug("vgId:%d, %s:%d succeed to to rename %s to %s", TD_VID(pVnode), __func__, lino, fnameVer, fname);
} else if (taosCheckExistFile(fname)) {
if (taosRemoveFile(fname) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
smaDebug("vgId:%d, %s:%d succeed to to remove %s", TD_VID(pVnode), __func__, lino, fname);
}
}
{
// remove those invalid files (todo)
// main.tdb-journal.5 // TDB should handle its clear for kill -9
}
_exit:
if (code) {
smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
}
#endif
return code;
}
// EXPOSED APIS ====================================================================================
int32_t tdRSmaFSOpen(SSma *pSma, int64_t version, int8_t rollback) {
int32_t code = 0;
int32_t lino = 0;
SVnode *pVnode = pSma->pVnode;
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
// open handle
code = tdRSmaFSCreate(RSMA_FS(pStat), 0);
TSDB_CHECK_CODE(code, lino, _exit);
// open impl
char current[TSDB_FILENAME_LEN] = {0};
char current_t[TSDB_FILENAME_LEN] = {0};
tdRSmaGetCurrentFName(pSma, current, current_t);
if (taosCheckExistFile(current)) {
code = tdRSmaLoadFSFromFile(current, RSMA_FS(pStat));
TSDB_CHECK_CODE(code, lino, _exit);
if (taosCheckExistFile(current_t)) {
if (rollback) {
code = tdRSmaFSRollback(pSma);
TSDB_CHECK_CODE(code, lino, _exit);
} else {
code = tdRSmaFSCommit(pSma);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
} else {
// 1st time open with empty current/qTaskInfoFile
code = tdRSmaSaveFSToFile(RSMA_FS(pStat), current);
TSDB_CHECK_CODE(code, lino, _exit);
}
// scan and try fix(remove main.db/main.db.xxx and use the one with version)
code = tdRSmaFSScanAndTryFix(pSma);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
}
return code;
}
void tdRSmaFSClose(SRSmaFS *pFS) { pFS->aQTaskInf = taosArrayDestroy(pFS->aQTaskInf); }
int32_t tdRSmaFSPrepareCommit(SSma *pSma, SRSmaFS *pFSNew) {
int32_t code = 0;
int32_t lino = 0;
char tfname[TSDB_FILENAME_LEN];
tdRSmaGetCurrentFName(pSma, NULL, tfname);
// generate PRESENT.t
code = tdRSmaSaveFSToFile(pFSNew, tfname);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pSma->pVnode), __func__, lino, tstrerror(code));
}
return code;
}
int32_t tdRSmaFSCommit(SSma *pSma) {
int32_t code = 0;
int32_t lino = 0;
SRSmaFS fs = {0};
char current[TSDB_FILENAME_LEN] = {0};
char current_t[TSDB_FILENAME_LEN] = {0};
tdRSmaGetCurrentFName(pSma, current, current_t);
if (!taosCheckExistFile(current_t)) {
goto _exit;
}
// rename the file
if (taosRenameFile(current_t, current) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
// load the new FS
code = tdRSmaFSCreate(&fs, 1);
TSDB_CHECK_CODE(code, lino, _exit);
code = tdRSmaLoadFSFromFile(current, &fs);
TSDB_CHECK_CODE(code, lino, _exit);
// apply file change
code = tdRSmaFSApplyChange(pSma, &fs);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
tdRSmaFSClose(&fs);
if (code) {
smaError("vgId:%d, %s failed at line %d since %s", SMA_VID(pSma), __func__, lino, tstrerror(code));
}
return code;
}
int32_t tdRSmaFSFinishCommit(SSma *pSma) {
int32_t code = 0;
int32_t lino = 0;
SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv);
taosWLockLatch(RSMA_FS_LOCK(pStat));
code = tdRSmaFSCommit(pSma);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
taosWUnLockLatch(RSMA_FS_LOCK(pStat));
if (code) {
smaError("vgId:%d, %s failed at line %d since %s", SMA_VID(pSma), __func__, lino, tstrerror(code));
} else {
smaInfo("vgId:%d, rsmaFS finish commit", SMA_VID(pSma));
}
return code;
}
int32_t tdRSmaFSRollback(SSma *pSma) {
int32_t code = 0;
int32_t lino = 0;
char current_t[TSDB_FILENAME_LEN] = {0};
tdRSmaGetCurrentFName(pSma, NULL, current_t);
(void)taosRemoveFile(current_t);
_exit:
if (code) {
smaError("vgId:%d, %s failed at line %d since %s", SMA_VID(pSma), __func__, lino, tstrerror(errno));
}
return code;
}
int32_t tdRSmaFSUpsertQTaskFile(SSma *pSma, SRSmaFS *pFS, SQTaskFile *qTaskFile, int32_t nSize) {
int32_t code = 0;
for (int32_t i = 0; i < nSize; ++i) {
SQTaskFile *qTaskF = qTaskFile + i;
int32_t idx = taosArraySearchIdx(pFS->aQTaskInf, qTaskF, tdQTaskInfCmprFn1, TD_GE);
if (idx < 0) {
idx = taosArrayGetSize(pFS->aQTaskInf);
} else {
SQTaskFile *pTaskF = (SQTaskFile *)taosArrayGet(pFS->aQTaskInf, idx);
int32_t c = tdQTaskInfCmprFn1(pTaskF, qTaskF);
if (c == 0) {
if (pTaskF->size != qTaskF->size) {
code = TSDB_CODE_RSMA_FS_UPDATE;
smaError("vgId:%d, %s failed at line %d since %s, level:%" PRIi8 ", suid:%" PRIi64 ", version:%" PRIi64
", size:%" PRIi64 " != %" PRIi64,
SMA_VID(pSma), __func__, __LINE__, tstrerror(code), pTaskF->level, pTaskF->suid, pTaskF->version,
pTaskF->size, qTaskF->size);
goto _exit;
}
continue;
}
}
if (!taosArrayInsert(pFS->aQTaskInf, idx, qTaskF)) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
}
_exit:
return code;
}
int32_t tdRSmaFSRef(SSma *pSma, SRSmaFS *pFS) {
int32_t code = 0;
int32_t lino = 0;
int32_t nRef = 0;
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
SRSmaFS *qFS = RSMA_FS(pStat);
int32_t size = taosArrayGetSize(qFS->aQTaskInf);
pFS->aQTaskInf = taosArrayInit_s(sizeof(SQTaskFile), size);
if (pFS->aQTaskInf == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
for (int32_t i = 0; i < size; ++i) {
SQTaskFile *qTaskF = (SQTaskFile *)taosArrayGet(qFS->aQTaskInf, i);
nRef = atomic_fetch_add_32(&qTaskF->nRef, 1);
if (nRef <= 0) {
code = TSDB_CODE_RSMA_FS_REF;
TSDB_CHECK_CODE(code, lino, _exit);
}
}
memcpy(pFS->aQTaskInf->pData, qFS->aQTaskInf->pData, size * sizeof(SQTaskFile));
_exit:
if (code) {
smaError("vgId:%d, %s failed at line %d since %s, nRef %d", TD_VID(pSma->pVnode), __func__, lino, tstrerror(code),
nRef);
}
return code;
}
void tdRSmaFSUnRef(SSma *pSma, SRSmaFS *pFS) {
int32_t nRef = 0;
char fname[TSDB_FILENAME_LEN];
SVnode *pVnode = pSma->pVnode;
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
int32_t size = taosArrayGetSize(pFS->aQTaskInf);
for (int32_t i = 0; i < size; ++i) {
SQTaskFile *pTaskF = (SQTaskFile *)taosArrayGet(pFS->aQTaskInf, i);
nRef = atomic_sub_fetch_32(&pTaskF->nRef, 1);
if (nRef == 0) {
tdRSmaQTaskInfoGetFullName(pVnode, pTaskF->suid, pTaskF->level, pTaskF->version, pVnode->pTfs, fname);
if (taosRemoveFile(fname) < 0) {
smaWarn("vgId:%d, failed to remove %s since %s", TD_VID(pVnode), fname, tstrerror(TAOS_SYSTEM_ERROR(errno)));
} else {
smaDebug("vgId:%d, success to remove %s", TD_VID(pVnode), fname);
}
} else if (nRef < 0) {
smaWarn("vgId:%d, abnormal unref %s since %s", TD_VID(pVnode), fname, tstrerror(TSDB_CODE_RSMA_FS_REF));
}
}
taosArrayDestroy(pFS->aQTaskInf);
}
int32_t tdRSmaFSTakeSnapshot(SSma *pSma, SRSmaFS *pFS) {
int32_t code = 0;
int32_t lino = 0;
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
taosRLockLatch(RSMA_FS_LOCK(pStat));
code = tdRSmaFSRef(pSma, pFS);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
taosRUnLockLatch(RSMA_FS_LOCK(pStat));
if (code) {
smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pSma->pVnode), __func__, lino, tstrerror(code));
}
return code;
}
int32_t tdRSmaFSCopy(SSma *pSma, SRSmaFS *pFS) {
int32_t code = 0;
int32_t lino = 0;
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
SRSmaFS *qFS = RSMA_FS(pStat);
int32_t size = taosArrayGetSize(qFS->aQTaskInf);
code = tdRSmaFSCreate(pFS, size);
TSDB_CHECK_CODE(code, lino, _exit);
taosArrayAddBatch(pFS->aQTaskInf, qFS->aQTaskInf->pData, size);
_exit:
if (code) {
smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pSma->pVnode), __func__, lino, tstrerror(code));
}
return code;
}
......@@ -101,10 +101,6 @@ int smaSetKeepCfg(SVnode *pVnode, STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int ty
terrno = 0;
pKeepCfg->precision = pCfg->precision;
switch (type) {
case TSDB_TYPE_TSMA:
ASSERTS(0, "undefined smaType:%d", (int32_t)type);
terrno = TSDB_CODE_APP_ERROR;
break;
case TSDB_TYPE_RSMA_L0:
SMA_SET_KEEP_CFG(pVnode, 0);
break;
......@@ -115,7 +111,6 @@ int smaSetKeepCfg(SVnode *pVnode, STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int ty
SMA_SET_KEEP_CFG(pVnode, 2);
break;
default:
ASSERTS(0, "unknown smaType:%d", (int32_t)type);
terrno = TSDB_CODE_APP_ERROR;
break;
}
......@@ -189,8 +184,7 @@ int32_t smaClose(SSma *pSma) {
*/
int32_t tdRSmaRestore(SSma *pSma, int8_t type, int64_t committedVer, int8_t rollback) {
if (!VND_IS_RSMA(pSma->pVnode)) {
terrno = TSDB_CODE_RSMA_INVALID_ENV;
return TSDB_CODE_FAILED;
return TSDB_CODE_RSMA_INVALID_ENV;
}
return tdRSmaProcessRestoreImpl(pSma, type, committedVer, rollback);
......
......@@ -15,9 +15,6 @@
#include "sma.h"
static int32_t rsmaSnapReadQTaskInfo(SRSmaSnapReader* pReader, uint8_t** ppData);
static int32_t rsmaSnapWriteQTaskInfo(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData);
// SRSmaSnapReader ========================================
struct SRSmaSnapReader {
SSma* pSma;
......@@ -28,11 +25,6 @@ struct SRSmaSnapReader {
// for data file
int8_t rsmaDataDone[TSDB_RETENTION_L2];
STsdbSnapReader* pDataReader[TSDB_RETENTION_L2];
// for qtaskinfo file
int8_t qTaskDone;
int32_t fsIter;
SQTaskFReader* pQTaskFReader;
};
int32_t rsmaSnapReaderOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapReader** ppReader) {
......@@ -62,22 +54,6 @@ int32_t rsmaSnapReaderOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapRead
}
}
// open qtaskinfo
taosRLockLatch(RSMA_FS_LOCK(pStat));
code = tdRSmaFSRef(pSma, &pReader->fs);
taosRUnLockLatch(RSMA_FS_LOCK(pStat));
TSDB_CHECK_CODE(code, lino, _exit);
if (taosArrayGetSize(pReader->fs.aQTaskInf) > 0) {
pReader->pQTaskFReader = taosMemoryCalloc(1, sizeof(SQTaskFReader));
if (!pReader->pQTaskFReader) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
pReader->pQTaskFReader->pSma = pSma;
pReader->pQTaskFReader->version = pReader->ever;
}
*ppReader = pReader;
_exit:
if (code) {
......@@ -88,113 +64,6 @@ _exit:
return code;
}
static int32_t rsmaSnapReadQTaskInfo(SRSmaSnapReader* pReader, uint8_t** ppBuf) {
int32_t code = 0;
int32_t lino = 0;
SVnode* pVnode = pReader->pSma->pVnode;
SQTaskFReader* qReader = pReader->pQTaskFReader;
SRSmaFS* pFS = &pReader->fs;
int64_t n = 0;
uint8_t* pBuf = NULL;
int64_t version = pReader->ever;
char fname[TSDB_FILENAME_LEN];
if (!qReader) {
*ppBuf = NULL;
smaInfo("vgId:%d, vnode snapshot rsma reader qtaskinfo, not needed since qTaskReader is NULL", TD_VID(pVnode));
goto _exit;
}
if (pReader->fsIter >= taosArrayGetSize(pFS->aQTaskInf)) {
*ppBuf = NULL;
smaInfo("vgId:%d, vnode snapshot rsma reader qtaskinfo, fsIter reach end", TD_VID(pVnode));
goto _exit;
}
while (pReader->fsIter < taosArrayGetSize(pFS->aQTaskInf)) {
SQTaskFile* qTaskF = taosArrayGet(pFS->aQTaskInf, pReader->fsIter++);
if (qTaskF->version != version) {
continue;
}
tdRSmaQTaskInfoGetFullName(pVnode, qTaskF->suid, qTaskF->level, version, pVnode->pTfs, fname);
if (!taosCheckExistFile(fname)) {
smaError("vgId:%d, vnode snapshot rsma reader for qtaskinfo, table %" PRIi64 ", level %" PRIi8
", version %" PRIi64 " failed since %s not exist",
TD_VID(pVnode), qTaskF->suid, qTaskF->level, version, fname);
code = TSDB_CODE_RSMA_FS_SYNC;
TSDB_CHECK_CODE(code, lino, _exit);
}
TdFilePtr fp = taosOpenFile(fname, TD_FILE_READ);
if (!fp) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
qReader->pReadH = fp;
qReader->level = qTaskF->level;
qReader->suid = qTaskF->suid;
}
if (!qReader->pReadH) {
*ppBuf = NULL;
smaInfo("vgId:%d, vnode snapshot rsma reader qtaskinfo, not needed since readh is NULL", TD_VID(pVnode));
goto _exit;
}
int64_t size = 0;
if (taosFStatFile(qReader->pReadH, &size, NULL) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
// seek
if (taosLSeekFile(qReader->pReadH, 0, SEEK_SET) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
if (*ppBuf) {
*ppBuf = taosMemoryRealloc(*ppBuf, sizeof(SSnapDataHdr) + size);
} else {
*ppBuf = taosMemoryMalloc(sizeof(SSnapDataHdr) + size);
}
if (!(*ppBuf)) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
// read
n = taosReadFile(qReader->pReadH, POINTER_SHIFT(*ppBuf, sizeof(SSnapDataHdr)), size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
} else if (n != size) {
code = TSDB_CODE_FILE_CORRUPTED;
TSDB_CHECK_CODE(code, lino, _exit);
}
smaInfo("vgId:%d, vnode snapshot rsma read qtaskinfo, version:%" PRIi64 ", size:%" PRIi64, TD_VID(pVnode), version,
size);
SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppBuf);
pHdr->type = SNAP_DATA_QTASK;
pHdr->flag = qReader->level;
pHdr->index = qReader->suid;
pHdr->size = size;
_exit:
if (qReader) taosCloseFile(&qReader->pReadH);
if (code) {
*ppBuf = NULL;
smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
} else {
smaInfo("vgId:%d, vnode snapshot rsma read qtaskinfo succeed", TD_VID(pVnode));
}
return code;
}
int32_t rsmaSnapRead(SRSmaSnapReader* pReader, uint8_t** ppData) {
int32_t code = 0;
int32_t lino = 0;
......@@ -222,18 +91,6 @@ int32_t rsmaSnapRead(SRSmaSnapReader* pReader, uint8_t** ppData) {
}
}
// read qtaskinfo file
if (!pReader->qTaskDone) {
smaInfo("vgId:%d, vnode snapshot rsma qtaskinfo not done", SMA_VID(pReader->pSma));
code = rsmaSnapReadQTaskInfo(pReader, ppData);
TSDB_CHECK_CODE(code, lino, _exit);
if (*ppData) {
goto _exit;
} else {
pReader->qTaskDone = 1;
}
}
_exit:
if (code) {
smaError("vgId:%d, vnode snapshot rsma read failed since %s", SMA_VID(pReader->pSma), tstrerror(code));
......@@ -248,9 +105,6 @@ int32_t rsmaSnapReaderClose(SRSmaSnapReader** ppReader) {
int32_t code = 0;
SRSmaSnapReader* pReader = *ppReader;
tdRSmaFSUnRef(pReader->pSma, &pReader->fs);
taosMemoryFreeClear(pReader->pQTaskFReader);
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
if (pReader->pDataReader[i]) {
tsdbSnapReaderClose(&pReader->pDataReader[i]);
......@@ -298,10 +152,6 @@ int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapWrit
}
}
// qtaskinfo
code = tdRSmaFSCopy(pSma, &pWriter->fs);
TSDB_CHECK_CODE(code, lino, _exit);
// snapWriter
*ppWriter = pWriter;
_exit:
......@@ -315,22 +165,6 @@ _exit:
return code;
}
int32_t rsmaSnapWriterPrepareClose(SRSmaSnapWriter* pWriter) {
int32_t code = 0;
int32_t lino = 0;
if (pWriter) {
code = tdRSmaFSPrepareCommit(pWriter->pSma, &pWriter->fs);
TSDB_CHECK_CODE(code, lino, _exit);
}
_exit:
if (code) {
smaError("vgId:%d, %s failed at line %d since %s", SMA_VID(pWriter->pSma), __func__, lino, tstrerror(code));
}
return code;
}
int32_t rsmaSnapWriterClose(SRSmaSnapWriter** ppWriter, int8_t rollback) {
int32_t code = 0;
int32_t lino = 0;
......@@ -361,61 +195,6 @@ int32_t rsmaSnapWriterClose(SRSmaSnapWriter** ppWriter, int8_t rollback) {
}
}
// qtaskinfo
if (rollback) {
tdRSmaFSRollback(pSma);
// remove qTaskFiles
} else {
// sendFile from fname.Ver to fname
SRSmaFS* pFS = &pWriter->fs;
int32_t size = taosArrayGetSize(pFS->aQTaskInf);
for (int32_t i = 0; i < size; ++i) {
SQTaskFile* pTaskF = TARRAY_GET_ELEM(pFS->aQTaskInf, i);
if (pTaskF->version == pWriter->ever) {
tdRSmaQTaskInfoGetFullName(pVnode, pTaskF->suid, pTaskF->level, pTaskF->version, pVnode->pTfs, fnameVer);
tdRSmaQTaskInfoGetFullName(pVnode, pTaskF->suid, pTaskF->level, -1, pVnode->pTfs, fname);
pInFD = taosOpenFile(fnameVer, TD_FILE_READ);
if (pInFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
pOutFD = taosCreateFile(fname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
if (pOutFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
int64_t size = 0;
if (taosFStatFile(pInFD, &size, NULL) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
int64_t offset = 0;
if (taosFSendFile(pOutFD, pInFD, &offset, size) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
smaError("vgId:%d, vnode snapshot rsma writer, send qtaskinfo file %s to %s failed since %s", TD_VID(pVnode),
fnameVer, fname, tstrerror(code));
TSDB_CHECK_CODE(code, lino, _exit);
}
taosCloseFile(&pOutFD);
taosCloseFile(&pInFD);
}
}
// lock
taosWLockLatch(RSMA_FS_LOCK(pStat));
code = tdRSmaFSCommit(pSma);
if (code) {
taosWUnLockLatch(RSMA_FS_LOCK(pStat));
goto _exit;
}
// unlock
taosWUnLockLatch(RSMA_FS_LOCK(pStat));
}
// rsma restore
code = tdRSmaRestore(pWriter->pSma, RSMA_RESTORE_SYNC, pWriter->ever, rollback);
TSDB_CHECK_CODE(code, lino, _exit);
......@@ -447,8 +226,6 @@ int32_t rsmaSnapWrite(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData)
} else if (pHdr->type == SNAP_DATA_RSMA2) {
pHdr->type = SNAP_DATA_TSDB;
code = tsdbSnapWrite(pWriter->pDataWriter[1], pHdr);
} else if (pHdr->type == SNAP_DATA_QTASK) {
code = rsmaSnapWriteQTaskInfo(pWriter, pData, nData);
} else {
code = TSDB_CODE_RSMA_FS_SYNC;
}
......@@ -463,67 +240,3 @@ _exit:
}
return code;
}
static int32_t rsmaSnapWriteQTaskInfo(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
int32_t code = 0;
int32_t lino = 0;
SSma* pSma = pWriter->pSma;
SVnode* pVnode = pSma->pVnode;
char fname[TSDB_FILENAME_LEN];
TdFilePtr fp = NULL;
SSnapDataHdr* pHdr = (SSnapDataHdr*)pData;
fname[0] = '\0';
if (pHdr->size != (nData - sizeof(SSnapDataHdr))) {
code = TSDB_CODE_RSMA_FS_SYNC;
TSDB_CHECK_CODE(code, lino, _exit);
}
SQTaskFile qTaskFile = {
.nRef = 1, .level = pHdr->flag, .suid = pHdr->index, .version = pWriter->ever, .size = pHdr->size};
tdRSmaQTaskInfoGetFullName(pVnode, pHdr->index, pHdr->flag, qTaskFile.version, pVnode->pTfs, fname);
fp = taosCreateFile(fname, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (!fp) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
int64_t contLen = taosWriteFile(fp, pHdr->data, pHdr->size);
if (contLen != pHdr->size) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
uint32_t mtime = 0;
if (taosFStatFile(fp, NULL, &mtime) != 0) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
} else {
qTaskFile.mtime = mtime;
}
if (taosFsyncFile(fp) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
taosCloseFile(&fp);
code = tdRSmaFSUpsertQTaskFile(pSma, &pWriter->fs, &qTaskFile, 1);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
if (fp) {
(void)taosRemoveFile(fname);
}
smaError("vgId:%d, %s failed at line %d since %s, file:%s", TD_VID(pVnode), __func__, lino, tstrerror(code), fname);
} else {
smaInfo("vgId:%d, vnode snapshot rsma write qtaskinfo %s succeed", TD_VID(pVnode), fname);
}
return code;
}
......@@ -29,27 +29,21 @@ int32_t tdProcessTSmaInsert(SSma *pSma, int64_t indexUid, const char *msg) {
int32_t code = TSDB_CODE_SUCCESS;
if ((code = tdProcessTSmaInsertImpl(pSma, indexUid, msg)) < 0) {
smaError("vgId:%d, insert tsma data failed since %s", SMA_VID(pSma), tstrerror(terrno));
smaError("vgId:%d, insert tsma data failed since %s", SMA_VID(pSma), tstrerror(code));
}
return code;
}
int32_t tdProcessTSmaCreate(SSma *pSma, int64_t version, const char *msg) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t code = tdProcessTSmaCreateImpl(pSma, version, msg);
if ((code = tdProcessTSmaCreateImpl(pSma, version, msg)) < 0) {
smaWarn("vgId:%d, create tsma failed since %s", SMA_VID(pSma), tstrerror(terrno));
}
return code;
}
int32_t smaGetTSmaDays(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days) {
int32_t code = TSDB_CODE_SUCCESS;
if ((code = tdProcessTSmaGetDaysImpl(pCfg, pCont, contLen, days)) < 0) {
smaWarn("vgId:%d, get tsma days failed since %s", pCfg->vgId, tstrerror(terrno));
}
smaDebug("vgId:%d, get tsma days %d", pCfg->vgId, *days);
int32_t code = tdProcessTSmaGetDaysImpl(pCfg, pCont, contLen, days);
return code;
}
......@@ -63,19 +57,22 @@ int32_t smaGetTSmaDays(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *
* @return int32_t
*/
static int32_t tdProcessTSmaGetDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days) {
int32_t code = 0;
int32_t lino = 0;
SDecoder coder = {0};
tDecoderInit(&coder, pCont, contLen);
STSma tsma = {0};
if (tDecodeSVCreateTSmaReq(&coder, &tsma) < 0) {
terrno = TSDB_CODE_MSG_DECODE_ERROR;
goto _err;
code = TSDB_CODE_MSG_DECODE_ERROR;
TSDB_CHECK_CODE(code, lino, _exit);
}
STsdbCfg *pTsdbCfg = &pCfg->tsdbCfg;
int64_t sInterval = convertTimeFromPrecisionToUnit(tsma.interval, pTsdbCfg->precision, TIME_UNIT_SECOND);
if (sInterval <= 0) {
*days = pTsdbCfg->days;
return 0;
goto _exit;
}
int64_t records = pTsdbCfg->days * 60 / sInterval;
if (records >= SMA_STORAGE_SPLIT_FACTOR) {
......@@ -94,11 +91,14 @@ static int32_t tdProcessTSmaGetDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t c
*days = pTsdbCfg->days;
}
}
_exit:
if (code) {
smaWarn("vgId:%d, failed at line %d to get tsma days %d since %s", pCfg->vgId, lino, *days, tstrerror(code));
} else {
smaDebug("vgId:%d, succeed to get tsma days %d", pCfg->vgId, *days);
}
tDecoderClear(&coder);
return 0;
_err:
tDecoderClear(&coder);
return -1;
return code;
}
/**
......@@ -157,6 +157,8 @@ _exit:
int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *pTSchema,
SSchemaWrapper *pTagSchemaWrapper, bool createTb, int64_t suid, const char *stbFullName,
SBatchDeleteReq *pDeleteReq, void **ppData, int32_t *pLen) {
int32_t code = 0;
int32_t lino = 0;
void *pBuf = NULL;
int32_t len = 0;
SSubmitReq2 *pReq = NULL;
......@@ -166,21 +168,14 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
int32_t sz = taosArrayGetSize(pBlocks);
if (!(tagArray = taosArrayInit(1, sizeof(STagVal)))) {
goto _end;
}
if (!(createTbArray = taosArrayInit(sz, POINTER_BYTES))) {
goto _end;
}
if (!(pReq = taosMemoryCalloc(1, sizeof(SSubmitReq2)))) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
tagArray = taosArrayInit(1, sizeof(STagVal));
createTbArray = taosArrayInit(sz, POINTER_BYTES);
pReq = taosMemoryCalloc(1, sizeof(SSubmitReq2));
pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData));
if (!(pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) {
goto _end;
if(!tagArray || !createTbArray || !pReq || !pReq->aSubmitTbData) {
code = terrno == TSDB_CODE_SUCCESS ? TSDB_CODE_OUT_OF_MEMORY : terrno;
TSDB_CHECK_CODE(code, lino, _exit);
}
// create table req
......@@ -194,7 +189,8 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
}
if (!(pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateStbReq)))) {
goto _end;
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
};
// don't move to the end of loop as to destroy in the end of func when error occur
......@@ -223,8 +219,8 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
STag *pTag = NULL;
tTagNew(tagArray, 1, false, &pTag);
if (pTag == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
pCreateTbReq->ctb.pTag = (uint8_t *)pTag;
......@@ -259,7 +255,8 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
SSubmitTbData tbData = {0};
if (!(tbData.aRowP = taosArrayInit(rows, sizeof(SRow *)))) {
goto _end;
code = terrno;
TSDB_CHECK_CODE(code, lino, _exit);
}
tbData.suid = suid;
tbData.uid = 0; // uid is assigned by vnode
......@@ -272,7 +269,8 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
if (!pVals && !(pVals = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal)))) {
taosArrayDestroy(tbData.aRowP);
goto _end;
code = terrno;
TSDB_CHECK_CODE(code, lino, _exit);
}
for (int32_t j = 0; j < rows; ++j) {
......@@ -298,9 +296,9 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
}
}
SRow *pRow = NULL;
if ((terrno = tRowBuild(pVals, (STSchema *)pTSchema, &pRow)) < 0) {
if ((code = tRowBuild(pVals, (STSchema *)pTSchema, &pRow)) < 0) {
tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
goto _end;
TSDB_CHECK_CODE(code, lino, _exit);
}
taosArrayPush(tbData.aRowP, &pRow);
}
......@@ -309,25 +307,27 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
}
// encode
tEncodeSize(tEncodeSubmitReq, pReq, len, terrno);
if (TSDB_CODE_SUCCESS == terrno) {
tEncodeSize(tEncodeSubmitReq, pReq, len, code);
if (TSDB_CODE_SUCCESS == code) {
SEncoder encoder;
len += sizeof(SSubmitReq2Msg);
pBuf = rpcMallocCont(len);
if (NULL == pBuf) {
goto _end;
if (!(pBuf = rpcMallocCont(len))) {
code = terrno;
TSDB_CHECK_CODE(code, lino, _exit);
}
((SSubmitReq2Msg *)pBuf)->header.vgId = TD_VID(pVnode);
((SSubmitReq2Msg *)pBuf)->header.contLen = htonl(len);
((SSubmitReq2Msg *)pBuf)->version = htobe64(1);
tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg));
if (tEncodeSubmitReq(&encoder, pReq) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
/*vError("failed to encode submit req since %s", terrstr());*/
tEncoderClear(&encoder);
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
tEncoderClear(&encoder);
}
_end:
_exit:
taosArrayDestroy(createTbArray);
taosArrayDestroy(tagArray);
taosArrayDestroy(pVals);
......@@ -336,14 +336,15 @@ _end:
taosMemoryFree(pReq);
}
if (terrno != 0) {
if (code) {
rpcFreeCont(pBuf);
taosArrayDestroy(pDeleteReq->deleteReqs);
return TSDB_CODE_FAILED;
smaWarn("vgId:%d, failed at line %d since %s", TD_VID(pVnode), lino, tstrerror(code));
} else {
if (ppData) *ppData = pBuf;
if (pLen) *pLen = len;
}
if (ppData) *ppData = pBuf;
if (pLen) *pLen = len;
return TSDB_CODE_SUCCESS;
return code;
}
static int32_t tsmaProcessDelReq(SSma *pSma, int64_t indexUid, SBatchDeleteReq *pDelReq) {
......@@ -391,22 +392,18 @@ _exit:
* @return int32_t
*/
static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
int32_t code = 0;
int32_t lino = 0;
const SArray *pDataBlocks = (const SArray *)msg;
if (!pDataBlocks) {
terrno = TSDB_CODE_TSMA_INVALID_PTR;
smaWarn("vgId:%d, insert tsma data failed since pDataBlocks is NULL", SMA_VID(pSma));
return TSDB_CODE_FAILED;
}
if (taosArrayGetSize(pDataBlocks) <= 0) {
terrno = TSDB_CODE_TSMA_INVALID_PARA;
smaWarn("vgId:%d, insert tsma data failed since pDataBlocks is empty", SMA_VID(pSma));
return TSDB_CODE_FAILED;
code = TSDB_CODE_TSMA_INVALID_PARA;
TSDB_CHECK_CODE(code, lino, _exit);
}
if (tdCheckAndInitSmaEnv(pSma, TSDB_SMA_TYPE_TIME_RANGE) != 0) {
terrno = TSDB_CODE_TSMA_INIT_FAILED;
return TSDB_CODE_FAILED;
code = TSDB_CODE_TSMA_INIT_FAILED;
TSDB_CHECK_CODE(code, lino, _exit);
}
SSmaEnv *pEnv = SMA_TSMA_ENV(pSma);
......@@ -414,49 +411,43 @@ static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char
STSmaStat *pTsmaStat = NULL;
if (!pEnv || !(pStat = SMA_ENV_STAT(pEnv))) {
terrno = TSDB_CODE_TSMA_INVALID_ENV;
return TSDB_CODE_FAILED;
code = TSDB_CODE_TSMA_INVALID_ENV;
TSDB_CHECK_CODE(code, lino, _exit);
}
pTsmaStat = SMA_STAT_TSMA(pStat);
if (!pTsmaStat->pTSma) {
terrno = 0;
STSma *pTSma = metaGetSmaInfoByIndex(SMA_META(pSma), indexUid);
if (!pTSma) {
smaError("vgId:%d, failed to get STSma while tsma insert for smaIndex %" PRIi64 " since %s", SMA_VID(pSma),
indexUid, tstrerror(terrno));
goto _err;
code = terrno ? terrno : TSDB_CODE_TSMA_INVALID_PTR;
TSDB_CHECK_CODE(code, lino, _exit);
}
pTsmaStat->pTSma = pTSma;
pTsmaStat->pTSchema = metaGetTbTSchema(SMA_META(pSma), pTSma->dstTbUid, -1, 1);
if (!pTsmaStat->pTSchema) {
smaError("vgId:%d, failed to get STSchema while tsma insert for smaIndex %" PRIi64 " since %s", SMA_VID(pSma),
indexUid, tstrerror(terrno));
goto _err;
code = terrno ? terrno : TSDB_CODE_TSMA_INVALID_PTR;
TSDB_CHECK_CODE(code, lino, _exit);
}
}
if (pTsmaStat->pTSma->indexUid != indexUid) {
terrno = TSDB_CODE_APP_ERROR;
smaError("vgId:%d, tsma insert for smaIndex %" PRIi64 "(!=%" PRIi64 ") failed since %s", SMA_VID(pSma), indexUid,
pTsmaStat->pTSma->indexUid, tstrerror(terrno));
goto _err;
if (ASSERTS(pTsmaStat->pTSma->indexUid == indexUid, "indexUid:%" PRIi64 " != %" PRIi64, pTsmaStat->pTSma->indexUid,
indexUid)) {
code = TSDB_CODE_APP_ERROR;
TSDB_CHECK_CODE(code, lino, _exit);
}
SBatchDeleteReq deleteReq = {0};
void *pSubmitReq = NULL;
int32_t contLen = 0;
if (smaBlockToSubmit(pSma->pVnode, (const SArray *)msg, pTsmaStat->pTSchema, &pTsmaStat->pTSma->schemaTag, true,
pTsmaStat->pTSma->dstTbUid, pTsmaStat->pTSma->dstTbName, &deleteReq, &pSubmitReq,
&contLen) < 0) {
smaError("vgId:%d, failed to gen submit msg while tsma insert for smaIndex %" PRIi64 " since %s", SMA_VID(pSma),
indexUid, tstrerror(terrno));
goto _err;
}
code = smaBlockToSubmit(pSma->pVnode, (const SArray *)msg, pTsmaStat->pTSchema, &pTsmaStat->pTSma->schemaTag, true,
pTsmaStat->pTSma->dstTbUid, pTsmaStat->pTSma->dstTbName, &deleteReq, &pSubmitReq, &contLen);
TSDB_CHECK_CODE(code, lino, _exit);
if ((terrno = tsmaProcessDelReq(pSma, indexUid, &deleteReq)) != 0) {
goto _err;
goto _exit;
}
#if 0
......@@ -474,13 +465,13 @@ static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char
.contLen = contLen,
};
if (tmsgPutToQueue(&pSma->pVnode->msgCb, WRITE_QUEUE, &submitReqMsg) < 0) {
smaError("vgId:%d, failed to put SubmitReq msg while tsma insert for smaIndex %" PRIi64 " since %s", SMA_VID(pSma),
indexUid, tstrerror(terrno));
goto _err;
}
code = tmsgPutToQueue(&pSma->pVnode->msgCb, WRITE_QUEUE, &submitReqMsg);
TSDB_CHECK_CODE(code, lino, _exit);
return TSDB_CODE_SUCCESS;
_err:
return TSDB_CODE_FAILED;
_exit:
if (code) {
smaError("vgId:%d, %s failed at line %d since %s, smaIndex:%" PRIi64, SMA_VID(pSma), __func__, lino,
tstrerror(code), indexUid);
}
return code;
}
......@@ -16,60 +16,12 @@
#include "sma.h"
#include "vnd.h"
#define TD_QTASKINFO_FNAME_PREFIX "main.tdb"
void tdRSmaQTaskInfoGetFileName(SVnode *pVnode, int64_t suid, int8_t level, int64_t version, char *outputName) {
tdRSmaGetFileName(pVnode, NULL, TD_QTASKINFO_FNAME_PREFIX, suid, level, version, outputName);
}
void tdRSmaQTaskInfoGetFullName(SVnode *pVnode, int64_t suid, int8_t level, int64_t version, STfs *pTfs,
char *outputName) {
tdRSmaGetFileName(pVnode, pTfs, TD_QTASKINFO_FNAME_PREFIX, suid, level, version, outputName);
}
void tdRSmaQTaskInfoGetFullPath(SVnode *pVnode, int8_t level, STfs *pTfs, char *outputName) {
tdRSmaGetDirName(pVnode, pTfs, true, outputName);
int32_t rsmaLen = strlen(outputName);
snprintf(outputName + rsmaLen, TSDB_FILENAME_LEN - rsmaLen, "%" PRIi8, level);
}
void tdRSmaQTaskInfoGetFullPathEx(SVnode *pVnode, tb_uid_t suid, int8_t level, STfs *pTfs, char *outputName) {
void tdRSmaQTaskInfoGetFullPath(SVnode *pVnode, tb_uid_t suid, int8_t level, STfs *pTfs, char *outputName) {
tdRSmaGetDirName(pVnode, pTfs, true, outputName);
int32_t rsmaLen = strlen(outputName);
snprintf(outputName + rsmaLen, TSDB_FILENAME_LEN - rsmaLen, "%" PRIi8 "%s%" PRIi64, level, TD_DIRSEP, suid);
}
void tdRSmaGetFileName(SVnode *pVnode, STfs *pTfs, const char *fname, int64_t suid, int8_t level, int64_t version,
char *outputName) {
int32_t offset = 0;
// vnode
vnodeGetPrimaryDir(pVnode->path, pTfs, outputName, TSDB_FILENAME_LEN);
offset = strlen(outputName);
// rsma
snprintf(outputName + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s", TD_DIRSEP, VNODE_RSMA_DIR);
offset = strlen(outputName);
// level & suid || vgid
if (level >= 0 && suid > 0) {
snprintf(outputName + offset, TSDB_FILENAME_LEN - offset - 1, "%s%" PRIi8 "%s%" PRIi64 "%s", TD_DIRSEP, level,
TD_DIRSEP, suid, TD_DIRSEP);
} else {
snprintf(outputName + offset, TSDB_FILENAME_LEN - offset - 1, "%sv%d", TD_DIRSEP, TD_VID(pVnode));
}
offset = strlen(outputName);
// fname
snprintf(outputName + offset, TSDB_FILENAME_LEN - offset - 1, "%s", fname);
offset = strlen(outputName);
// version
if (version >= 0) {
snprintf(outputName + offset, TSDB_FILENAME_LEN - offset - 1, ".%" PRIi64, version);
}
}
void tdRSmaGetDirName(SVnode *pVnode, STfs *pTfs, bool endWithSep, char *outputName) {
int32_t offset = 0;
......@@ -83,22 +35,13 @@ void tdRSmaGetDirName(SVnode *pVnode, STfs *pTfs, bool endWithSep, char *outputN
}
// smaXXXUtil ================
void *tdAcquireSmaRef(int32_t rsetId, int64_t refId) {
void *pResult = taosAcquireRef(rsetId, refId);
if (!pResult) {
smaWarn("rsma acquire ref for rsetId:%d refId:%" PRIi64 " failed since %s", rsetId, refId, terrstr());
} else {
smaTrace("rsma acquire ref for rsetId:%d refId:%" PRIi64 " success", rsetId, refId);
}
return pResult;
}
void *tdAcquireSmaRef(int32_t rsetId, int64_t refId) { return taosAcquireRef(rsetId, refId); }
int32_t tdReleaseSmaRef(int32_t rsetId, int64_t refId) {
if (taosReleaseRef(rsetId, refId) < 0) {
smaWarn("rsma release ref for rsetId:%d refId:%" PRIi64 " failed since %s", rsetId, refId, terrstr());
return TSDB_CODE_FAILED;
}
smaTrace("rsma release ref for rsetId:%d refId:%" PRIi64 " success", rsetId, refId);
return TSDB_CODE_SUCCESS;
}
......@@ -367,12 +367,6 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
goto _err;
}
// open sma
if (smaOpen(pVnode, rollback)) {
vError("vgId:%d, failed to open vnode sma since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
// open wal
sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_WAL_DIR);
taosRealPath(tdir, NULL, sizeof(tdir));
......@@ -392,6 +386,12 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
goto _err;
}
// open sma
if (smaOpen(pVnode, rollback)) {
vError("vgId:%d, failed to open vnode sma since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
// open query
if (vnodeQueryOpen(pVnode)) {
vError("vgId:%d, failed to open vnode query since %s", TD_VID(pVnode), tstrerror(terrno));
......
......@@ -74,6 +74,9 @@ static int32_t vnodeRetentionTask(void *param) {
code = tsdbDoRetention(pInfo->pVnode->pTsdb, pInfo->now);
TSDB_CHECK_CODE(code, lino, _exit);
code = smaDoRetention(pInfo->pVnode->pSma, pInfo->now);
TSDB_CHECK_CODE(code, lino, _exit);
// commit info
vnodeCommitInfo(dir);
......
......@@ -619,9 +619,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_QTASKINFO_CREATE, "Rsma qtaskinfo crea
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_SCHEMA, "Rsma invalid schema")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_STREAM_STATE_OPEN, "Rsma stream state open")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_STREAM_STATE_COMMIT, "Rsma stream state commit")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FS_REF, "Rsma fs ref error")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FS_SYNC, "Rsma fs sync error")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FS_UPDATE, "Rsma fs update error")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_RESULT, "Rsma result error")
//index
TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_REBUILDING, "Index is rebuilding")
......
......@@ -4,9 +4,6 @@ system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
#todo xukaili sma should use rocksdb.
return 1
print =============== create database with retentions
sql create database d0 retentions 5s:7d,10s:21d,15s:365d;
sql use d0
......
......@@ -4,7 +4,7 @@ system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
#todo xukaili sma should use rocksdb.
#todo wait for streamState checkpoint
return 1
print =============== create database with retentions
......@@ -13,17 +13,18 @@ sql use d0
print =============== create super table and register rsma
sql create table if not exists stb (ts timestamp, c1 int, c2 float) tags (city binary(20),district binary(20)) rollup(max) max_delay 5s,5s watermark 2s,3s;
sql create table if not exists stb1 (ts timestamp, c1 int, c2 float) tags (city binary(20),district binary(20)) rollup(max) max_delay 5s,5s watermark 2s,3s;
sql show stables
if $rows != 1 then
if $rows != 2 then
return -1
endi
print =============== create child table
sql create table ct1 using stb tags("BeiJing", "ChaoYang");
sql create table ct1 using stb tags("BeiJing", "ChaoYang") ct_1 using stb1 tags("BeiJing", "ChaoYang");
sql show tables
if $rows != 1 then
if $rows != 2 then
return -1
endi
......@@ -31,6 +32,9 @@ print =============== insert data and trigger rollup
sql insert into ct1 values(now, 10, 10.0);
sql insert into ct1 values(now+1s, 1, 1.0);
sql insert into ct1 values(now+2s, 100, 100.0);
sql insert into ct_1 values(now, 10, 10.0);
sql insert into ct_1 values(now+1s, 1, 1.0);
sql insert into ct_1 values(now+2s, 100, 100.0);
print =============== wait maxdelay 5+2 seconds for results
sleep 7000
......@@ -44,6 +48,20 @@ if $rows > 2 then
return -1
endi
if $data01 != 100 then
if $data01 != 10 then
print retention level 2 file result $data01 != 100 or 10
return -1
endi
endi
sql select * from ct_1;
print $data00 $data01 $data02
print $data10 $data11 $data12
if $rows > 2 then
print retention level 2 file rows $rows > 2
return -1
endi
if $data01 != 100 then
if $data01 != 10 then
......@@ -68,6 +86,21 @@ if $data01 != 100 then
endi
endi
sql select * from ct_1 where ts > now-8d;
print $data00 $data01 $data02
print $data10 $data11 $data12
if $rows > 2 then
print retention level 1 file rows $rows > 2
return -1
endi
if $data01 != 100 then
if $data01 != 10 then
print retention level 1 file result $data01 != 100 or 10
return -1
endi
endi
print =============== select * from retention level 0 from memory
sql select * from ct1 where ts > now-3d;
print $data00 $data01 $data02
......@@ -84,6 +117,21 @@ if $data01 != 10 then
return -1
endi
sql select * from ct_1 where ts > now-3d;
print $data00 $data01 $data02
print $data10 $data11 $data12
print $data20 $data21 $data22
if $rows < 1 then
print retention level 0 file rows $rows < 1
return -1
endi
if $data01 != 10 then
print retention level 0 file result $data01 != 10
return -1
endi
#===================================================================
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s start
......@@ -100,6 +148,22 @@ if $rows > 2 then
endi
if $data01 != 100 then
if $data01 != 10 then
print retention level 2 file result $data01 != 100 or 10
return -1
endi
endi
sql select * from ct_1;
print $data00 $data01 $data02
print $data10 $data11 $data12
if $rows > 2 then
print retention level 2 file rows $rows > 2
return -1
endi
if $data01 != 100 then
if $data01 != 10 then
print retention level 2 file result $data01 != 100 or 10
......@@ -123,6 +187,21 @@ if $data01 != 100 then
endi
endi
sql select * from ct_1 where ts > now-8d;
print $data00 $data01 $data02
print $data10 $data11 $data12
if $rows > 2 then
print retention level 1 file rows $rows > 2
return -1
endi
if $data01 != 100 then
if $data01 != 10 then
print retention level 1 file result $data01 != 100 or 10
return -1
endi
endi
print =============== select * from retention level 0 from memory after reboot
sql select * from ct1 where ts > now-3d;
print $data00 $data01 $data02
......@@ -139,6 +218,21 @@ if $data01 != 10 then
return -1
endi
sql select * from ct_1 where ts > now-3d;
print $data00 $data01 $data02
print $data10 $data11 $data12
print $data20 $data21 $data22
if $rows < 1 then
print retention level 0 file rows $rows < 1
return -1
endi
if $data01 != 10 then
print retention level 0 file result $data01 != 10
return -1
endi
#==================== flush database to trigger commit data to file
sql flush database d0;
......@@ -161,6 +255,21 @@ if $data01 != 100 then
endi
endi
sql select * from ct_1;
print $data00 $data01 $data02
print $data10 $data11 $data12
if $rows > 2 then
print retention level 2 file rows $rows > 2
return -1
endi
if $data01 != 100 then
if $data01 != 10 then
print retention level 2 file result $data01 != 100 or 10
return -1
endi
endi
print =============== select * from retention level 1 from file
sql select * from ct1 where ts > now-8d;
print $data00 $data01 $data02
......@@ -177,6 +286,21 @@ if $data01 != 100 then
endi
endi
sql select * from ct_1 where ts > now-8d;
print $data00 $data01 $data02
print $data10 $data11 $data12
if $rows > 2 then
print retention level 1 file rows $rows > 2
return -1
endi
if $data01 != 100 then
if $data01 != 10 then
print retention level 1 file result $data01 != 100 or 10
return -1
endi
endi
print =============== select * from retention level 0 from file
sql select * from ct1 where ts > now-3d;
print $data00 $data01 $data02
......@@ -192,9 +316,25 @@ if $data01 != 10 then
return -1
endi
sql select * from ct_1 where ts > now-3d;
print $data00 $data01 $data02
print $data10 $data11 $data12
print $data20 $data21 $data22
if $rows < 1 then
print retention level 0 file rows $rows < 1
return -1
endi
if $data01 != 10 then
print retention level 0 file result $data01 != 10
return -1
endi
print =============== insert after rsma qtaskinfo recovery
sql insert into ct1 values(now, 50, 500.0);
sql insert into ct1 values(now+1s, 40, 40.0);
sql insert into ct_1 values(now, 50, 500.0);
sql insert into ct_1 values(now+1s, 40, 40.0);
print =============== wait maxdelay 5+2 seconds for results
sleep 7000
......@@ -217,11 +357,37 @@ endi
if $data02 != 500.00000 then
if $data02 != 100.00000 then
print retention level 1 file/mem result $data02 != 500.00000 or 100.00000
if $data02 != 10.00000 then
print retention level 1 file/mem result $data02 != 500.00000 or 100.00000 or 10.00000
return -1
endi
endi
endi
sql select * from ct_1;
print $data00 $data01 $data02
print $data10 $data11 $data12
if $rows > 2 then
print retention level 2 file/mem rows $rows > 2
return -1
endi
if $data01 != 100 then
if $data01 != 10 then
print retention level 2 file/mem result $data01 != 100 or 10
return -1
endi
endi
if $data02 != 500.00000 then
if $data02 != 100.00000 then
if $data02 != 10.00000 then
print retention level 1 file/mem result $data02 != 500.00000 or 100.00000 or 10.00000
return -1
endi
endi
endi
print =============== select * from retention level 1 from file and memory after rsma qtaskinfo recovery
sql select * from ct1 where ts > now-8d;
print $data00 $data01 $data02
......@@ -240,11 +406,37 @@ endi
if $data02 != 500.00000 then
if $data02 != 100.00000 then
print retention level 1 file/mem result $data02 != 500.00000 or 100.00000
if $data02 != 10.00000 then
print retention level 1 file/mem result $data02 != 500.00000 or 100.00000 or 10.00000
return -1
endi
endi
endi
sql select * from ct_1 where ts > now-8d;
print $data00 $data01 $data02
print $data10 $data11 $data12
if $rows > 2 then
print retention level 1 file/mem rows $rows > 2
return -1
endi
if $data01 != 100 then
if $data01 != 10 then
print retention level 1 file/mem result $data01 != 100 or 10
return -1
endi
endi
if $data02 != 500.00000 then
if $data02 != 100.00000 then
if $data02 != 10.00000 then
print retention level 1 file/mem result $data02 != 500.00000 or 100.00000 or 10.00000
return -1
endi
endi
endi
print =============== select * from retention level 0 from file and memory after rsma qtaskinfo recovery
sql select * from ct1 where ts > now-3d;
......@@ -295,6 +487,61 @@ if $data42 != 40.00000 then
return -1
endi
sql select * from ct_1 where ts > now-3d;
print $data00 $data01 $data02
print $data10 $data11 $data12
print $data20 $data21 $data22
print $data30 $data31 $data32
print $data40 $data41 $data42
if $rows < 1 then
print retention level 0 file/mem rows $rows < 1
return -1
endi
if $data01 != 10 then
print retention level 0 file/mem result $data01 != 10
return -1
endi
if $data11 != 1 then
print retention level 0 file/mem result $data11 != 1
return -1
endi
if $data21 != 100 then
print retention level 0 file/mem result $data21 != 100
return -1
endi
if $data31 != 50 then
print retention level 0 file/mem result $data31 != 50
return -1
endi
if $data32 != 500.00000 then
print retention level 0 file/mem result $data32 != 500.00000
return -1
endi
if $data41 != 40 then
print retention level 0 file/mem result $data41 != 40
return -1
endi
if $data42 != 40.00000 then
print retention level 0 file/mem result $data42 != 40.00000
return -1
endi
print =============== drop stb1
sql drop table stb1;
sql flush database d0;
print =============== select * from retention level 0 from file and memory after rsma qtaskinfo recovery
sql_error select * from ct_1 where ts > now-3d;
sql_error select * from ct_1 where ts > now-8d;
sql_error select * from ct_1;
system sh/exec.sh -n dnode1 -s stop -x SIGINT
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册