提交 2f19c2d0 编写于 作者: C Cary Xu

other: rsma replica restore and memory leak

上级 99cb9bb6
...@@ -337,8 +337,10 @@ static FORCE_INLINE SSchemaWrapper* tCloneSSchemaWrapper(const SSchemaWrapper* p ...@@ -337,8 +337,10 @@ static FORCE_INLINE SSchemaWrapper* tCloneSSchemaWrapper(const SSchemaWrapper* p
} }
static FORCE_INLINE void tDeleteSSchemaWrapper(SSchemaWrapper* pSchemaWrapper) { static FORCE_INLINE void tDeleteSSchemaWrapper(SSchemaWrapper* pSchemaWrapper) {
taosMemoryFree(pSchemaWrapper->pSchema); if (pSchemaWrapper) {
taosMemoryFree(pSchemaWrapper); taosMemoryFree(pSchemaWrapper->pSchema);
taosMemoryFree(pSchemaWrapper);
}
} }
static FORCE_INLINE int32_t taosEncodeSSchema(void** buf, const SSchema* pSchema) { static FORCE_INLINE int32_t taosEncodeSSchema(void** buf, const SSchema* pSchema) {
......
...@@ -608,6 +608,7 @@ int32_t* taosGetErrno(); ...@@ -608,6 +608,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_RSMA_INVALID_STAT TAOS_DEF_ERROR_CODE(0, 0x3151) #define TSDB_CODE_RSMA_INVALID_STAT TAOS_DEF_ERROR_CODE(0, 0x3151)
#define TSDB_CODE_RSMA_QTASKINFO_CREATE TAOS_DEF_ERROR_CODE(0, 0x3152) #define TSDB_CODE_RSMA_QTASKINFO_CREATE TAOS_DEF_ERROR_CODE(0, 0x3152)
#define TSDB_CODE_RSMA_FILE_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x3153) #define TSDB_CODE_RSMA_FILE_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x3153)
#define TSDB_CODE_RSMA_REMOVE_EXISTS TAOS_DEF_ERROR_CODE(0, 0x3154)
//index //index
#define TSDB_CODE_INDEX_REBUILDING TAOS_DEF_ERROR_CODE(0, 0x3200) #define TSDB_CODE_INDEX_REBUILDING TAOS_DEF_ERROR_CODE(0, 0x3200)
......
...@@ -151,6 +151,11 @@ enum { ...@@ -151,6 +151,11 @@ enum {
RSMA_ROLE_ITERATE = 4, RSMA_ROLE_ITERATE = 4,
}; };
enum {
RSMA_RESTORE_REBOOT = 1,
RSMA_RESTORE_SYNC = 2,
};
void tdDestroySmaEnv(SSmaEnv *pSmaEnv); void tdDestroySmaEnv(SSmaEnv *pSmaEnv);
void *tdFreeSmaEnv(SSmaEnv *pSmaEnv); void *tdFreeSmaEnv(SSmaEnv *pSmaEnv);
...@@ -227,7 +232,8 @@ void tdRemoveRSmaInfoBySuid(SSma *pSma, int64_t suid); ...@@ -227,7 +232,8 @@ void tdRemoveRSmaInfoBySuid(SSma *pSma, int64_t suid);
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash); int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash);
int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName); int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName);
int32_t tdProcessRSmaRestoreImpl(SSma *pSma); int32_t tdProcessRSmaRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer);
int32_t tdRsmaRestore(SSma *pSma, int8_t type, int64_t committedVer);
int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg); int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg);
int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg); int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg);
......
...@@ -123,7 +123,7 @@ int32_t smaOpen(SVnode *pVnode) { ...@@ -123,7 +123,7 @@ int32_t smaOpen(SVnode *pVnode) {
} }
// restore the rsma // restore the rsma
if (rsmaRestore(pSma) < 0) { if (tdRsmaRestore(pSma, RSMA_RESTORE_REBOOT, pVnode->state.committed) < 0) {
goto _err; goto _err;
} }
} }
...@@ -148,12 +148,14 @@ int32_t smaClose(SSma *pSma) { ...@@ -148,12 +148,14 @@ int32_t smaClose(SSma *pSma) {
/** /**
* @brief rsma env restore * @brief rsma env restore
* *
* @param pSma * @param pSma
* @return int32_t * @param type
* @param committedVer
* @return int32_t
*/ */
static int32_t rsmaRestore(SSma *pSma) { int32_t tdRsmaRestore(SSma *pSma, int8_t type, int64_t committedVer) {
ASSERT(VND_IS_RSMA(pSma->pVnode)); ASSERT(VND_IS_RSMA(pSma->pVnode));
return tdProcessRSmaRestoreImpl(pSma); return tdProcessRSmaRestoreImpl(pSma, type, committedVer);
} }
\ No newline at end of file
...@@ -41,12 +41,12 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId); ...@@ -41,12 +41,12 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId);
static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskInfoIter *pIter, STFile *pTFile); static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskInfoIter *pIter, STFile *pTFile);
static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isFinish); static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isFinish);
static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, SRSmaQTaskInfoIter *pIter); static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, int8_t type, SRSmaQTaskInfoIter *pIter);
static int32_t tdRSmaQTaskInfoItemRestore(SSma *pSma, const SRSmaQTaskInfoItem *infoItem); static int32_t tdRSmaQTaskInfoItemRestore(SSma *pSma, const SRSmaQTaskInfoItem *infoItem);
static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables); static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables);
static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int64_t *committed); static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int8_t type, int64_t qTaskFileVer);
static int32_t tdRSmaRestoreTSDataReload(SSma *pSma, int64_t committed); static int32_t tdRSmaRestoreTSDataReload(SSma *pSma);
static SRSmaInfo *tdGetRSmaInfoByItem(SRSmaInfoItem *pItem) { static SRSmaInfo *tdGetRSmaInfoByItem(SRSmaInfoItem *pItem) {
// adapt accordingly if definition of SRSmaInfo update // adapt accordingly if definition of SRSmaInfo update
...@@ -80,7 +80,7 @@ void tdRSmaQTaskInfoGetFileName(int32_t vgId, int64_t version, char *outputName) ...@@ -80,7 +80,7 @@ void tdRSmaQTaskInfoGetFileName(int32_t vgId, int64_t version, char *outputName)
tdGetVndFileName(vgId, NULL, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, version, outputName); tdGetVndFileName(vgId, NULL, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, version, outputName);
} }
void tdRSmaQTaskInfoGetFullName(int32_t vgId, int64_t version, const char* path, char *outputName) { void tdRSmaQTaskInfoGetFullName(int32_t vgId, int64_t version, const char *path, char *outputName) {
tdGetVndFileName(vgId, path, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, version, outputName); tdGetVndFileName(vgId, path, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, version, outputName);
} }
...@@ -319,9 +319,13 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con ...@@ -319,9 +319,13 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t)); pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t));
if (pRSmaInfo) { if (pRSmaInfo) {
ASSERT(0); // TODO: free original pRSmaInfo if exists abnormally // TODO: free original pRSmaInfo if exists abnormally
smaDebug("vgId:%d, rsma info already exists for table %s, %" PRIi64, SMA_VID(pSma), tbName, suid); tdFreeRSmaInfo(pSma, *(SRSmaInfo **)pRSmaInfo, true);
return TSDB_CODE_SUCCESS; if (taosHashRemove(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t)) < 0) {
terrno = TSDB_CODE_RSMA_REMOVE_EXISTS;
goto _err;
}
smaWarn("vgId:%d, remove the rsma info already exists for table %s, %" PRIi64, SMA_VID(pSma), tbName, suid);
} }
// from write queue: single thead // from write queue: single thead
...@@ -648,7 +652,7 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType ...@@ -648,7 +652,7 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType
pItem->taskInfo, suid); pItem->taskInfo, suid);
if (qSetMultiStreamInput(pItem->taskInfo, pMsg, 1, inputType) < 0) { // INPUT__DATA_SUBMIT if (qSetMultiStreamInput(pItem->taskInfo, pMsg, 1, inputType) < 0) { // INPUT__DATA_SUBMIT
smaError("vgId:%d, rsma % " PRIi8 " qSetStreamInput failed since %s", SMA_VID(pSma), level, tstrerror(terrno)); smaError("vgId:%d, rsma %" PRIi8 " qSetStreamInput failed since %s", SMA_VID(pSma), level, tstrerror(terrno));
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
...@@ -872,24 +876,23 @@ _err: ...@@ -872,24 +876,23 @@ _err:
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int64_t *committed) { static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int8_t type, int64_t qTaskFileVer) {
SVnode *pVnode = pSma->pVnode; SVnode *pVnode = pSma->pVnode;
STFile tFile = {0}; STFile tFile = {0};
char qTaskInfoFName[TSDB_FILENAME_LEN] = {0}; char qTaskInfoFName[TSDB_FILENAME_LEN] = {0};
tdRSmaQTaskInfoGetFileName(TD_VID(pVnode), pVnode->state.committed, qTaskInfoFName); tdRSmaQTaskInfoGetFileName(TD_VID(pVnode), qTaskFileVer, qTaskInfoFName);
if (tdInitTFile(&tFile, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFName) < 0) { if (tdInitTFile(&tFile, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFName) < 0) {
goto _err; goto _err;
} }
if (!taosCheckExistFile(TD_TFILE_FULL_NAME(&tFile))) { if (!taosCheckExistFile(TD_TFILE_FULL_NAME(&tFile))) {
*committed = 0; if (qTaskFileVer > 0) {
if (pVnode->state.committed > 0) { smaWarn("vgId:%d, restore rsma task %" PRIi8 " for version %" PRIi64 ", not start as %s not exist",
smaWarn("vgId:%d, rsma restore for version %" PRIi64 ", not start as %s not exist", TD_VID(pVnode), TD_VID(pVnode), type, qTaskFileVer, TD_TFILE_FULL_NAME(&tFile));
pVnode->state.committed, TD_TFILE_FULL_NAME(&tFile));
} else { } else {
smaDebug("vgId:%d, rsma restore for version %" PRIi64 ", no need as %s not exist", TD_VID(pVnode), smaDebug("vgId:%d, restore rsma task %" PRIi8 " for version %" PRIi64 ", no need as %s not exist", TD_VID(pVnode),
pVnode->state.committed, TD_TFILE_FULL_NAME(&tFile)); type, qTaskFileVer, TD_TFILE_FULL_NAME(&tFile));
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -914,7 +917,7 @@ static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int64_t *committed) { ...@@ -914,7 +917,7 @@ static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int64_t *committed) {
goto _err; goto _err;
} }
if (tdRSmaQTaskInfoRestore(pSma, &fIter) < 0) { if (tdRSmaQTaskInfoRestore(pSma, type, &fIter) < 0) {
tdRSmaQTaskInfoIterDestroy(&fIter); tdRSmaQTaskInfoIterDestroy(&fIter);
tdCloseTFile(&tFile); tdCloseTFile(&tFile);
tdDestroyTFile(&tFile); tdDestroyTFile(&tFile);
...@@ -925,13 +928,13 @@ static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int64_t *committed) { ...@@ -925,13 +928,13 @@ static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int64_t *committed) {
tdCloseTFile(&tFile); tdCloseTFile(&tFile);
tdDestroyTFile(&tFile); tdDestroyTFile(&tFile);
// restored successfully from committed // restored successfully from committed or sync
*committed = pVnode->state.committed; smaInfo("vgId:%d, restore rsma task %" PRIi8 " for version %" PRIi64 ", qtaskinfo reload succeed", TD_VID(pVnode),
type, qTaskFileVer);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_err: _err:
smaError("vgId:%d, rsma restore for version %" PRIi64 ", qtaskinfo reload failed since %s", TD_VID(pVnode), smaError("vgId:%d, restore rsma task %" PRIi8 " for version %" PRIi64 ", qtaskinfo reload failed since %s",
pVnode->state.committed, terrstr()); TD_VID(pVnode), type, qTaskFileVer, terrstr());
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
...@@ -939,15 +942,14 @@ _err: ...@@ -939,15 +942,14 @@ _err:
* @brief reload ts data from checkpoint * @brief reload ts data from checkpoint
* *
* @param pSma * @param pSma
* @param committed restore from committed version
* @return int32_t * @return int32_t
*/ */
static int32_t tdRSmaRestoreTSDataReload(SSma *pSma, int64_t committed) { static int32_t tdRSmaRestoreTSDataReload(SSma *pSma) {
// NOTHING TODO: the data would be restored from the unified WAL replay procedure // NOTHING TODO: the data would be restored from the unified WAL replay procedure
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t tdProcessRSmaRestoreImpl(SSma *pSma) { int32_t tdProcessRSmaRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer) {
// step 1: iterate all stables to restore the rsma env // step 1: iterate all stables to restore the rsma env
int64_t nTables = 0; int64_t nTables = 0;
if (tdRSmaRestoreQTaskInfoInit(pSma, &nTables) < 0) { if (tdRSmaRestoreQTaskInfoInit(pSma, &nTables) < 0) {
...@@ -955,24 +957,24 @@ int32_t tdProcessRSmaRestoreImpl(SSma *pSma) { ...@@ -955,24 +957,24 @@ int32_t tdProcessRSmaRestoreImpl(SSma *pSma) {
} }
if (nTables <= 0) { if (nTables <= 0) {
smaDebug("vgId:%d, no need to restore rsma task since no tables", SMA_VID(pSma)); smaDebug("vgId:%d, no need to restore rsma task %" PRIi8 " since no tables", SMA_VID(pSma), type);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
// step 2: retrieve qtaskinfo items from the persistence file(rsma/qtaskinfo) and restore // step 2: retrieve qtaskinfo items from the persistence file(rsma/qtaskinfo) and restore
int64_t committed = -1; if (tdRSmaRestoreQTaskInfoReload(pSma, type, qtaskFileVer) < 0) {
if (tdRSmaRestoreQTaskInfoReload(pSma, &committed) < 0) {
goto _err; goto _err;
} }
// step 3: reload ts data from checkpoint // step 3: reload ts data from checkpoint
if (tdRSmaRestoreTSDataReload(pSma, committed) < 0) { if (tdRSmaRestoreTSDataReload(pSma) < 0) {
goto _err; goto _err;
} }
smaInfo("vgId:%d, restore rsma task %" PRIi8 " from qtaskf %" PRIi64 " succeed", SMA_VID(pSma), type, qtaskFileVer);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_err: _err:
smaError("vgId:%d, failed to restore rsma task since %s", SMA_VID(pSma), terrstr()); smaError("vgId:%d, restore rsma task %" PRIi8 "from qtaskf %" PRIi64 " failed since %s", SMA_VID(pSma), type,
qtaskFileVer, terrstr());
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
...@@ -1101,7 +1103,7 @@ static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isF ...@@ -1101,7 +1103,7 @@ static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isF
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, SRSmaQTaskInfoIter *pIter) { static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, int8_t type, SRSmaQTaskInfoIter *pIter) {
while (1) { while (1) {
// block iter // block iter
bool isFinish = false; bool isFinish = false;
...@@ -1117,7 +1119,7 @@ static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, SRSmaQTaskInfoIter *pIter) { ...@@ -1117,7 +1119,7 @@ static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, SRSmaQTaskInfoIter *pIter) {
pIter->qBuf = taosDecodeFixedI32(pIter->qBuf, &qTaskInfoLenWithHead); pIter->qBuf = taosDecodeFixedI32(pIter->qBuf, &qTaskInfoLenWithHead);
if (qTaskInfoLenWithHead < RSMA_QTASKINFO_HEAD_LEN) { if (qTaskInfoLenWithHead < RSMA_QTASKINFO_HEAD_LEN) {
terrno = TSDB_CODE_TDB_FILE_CORRUPTED; terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
smaError("vgId:%d, restore rsma qtaskinfo file %s failed since %s", SMA_VID(pSma), smaError("vgId:%d, restore rsma task %" PRIi8 " from qtaskinfo file %s failed since %s", SMA_VID(pSma), type,
TD_TFILE_FULL_NAME(pIter->pTFile), terrstr()); TD_TFILE_FULL_NAME(pIter->pTFile), terrstr());
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
...@@ -1130,8 +1132,8 @@ static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, SRSmaQTaskInfoIter *pIter) { ...@@ -1130,8 +1132,8 @@ static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, SRSmaQTaskInfoIter *pIter) {
infoItem.qTaskInfo = pIter->qBuf; infoItem.qTaskInfo = pIter->qBuf;
infoItem.len = tdRSmaQTaskInfoContLen(qTaskInfoLenWithHead); infoItem.len = tdRSmaQTaskInfoContLen(qTaskInfoLenWithHead);
// do the restore job // do the restore job
smaDebug("vgId:%d, restore the qtask info %s offset:%" PRIi64 "\n", SMA_VID(pSma), smaDebug("vgId:%d, restore rsma task %" PRIi8 " from qtaskinfo file %s offset:%" PRIi64 "\n", SMA_VID(pSma),
TD_TFILE_FULL_NAME(pIter->pTFile), pIter->offset - pIter->nBytes + pIter->nBufPos); type, TD_TFILE_FULL_NAME(pIter->pTFile), pIter->offset - pIter->nBytes + pIter->nBufPos);
tdRSmaQTaskInfoItemRestore(pSma, &infoItem); tdRSmaQTaskInfoItemRestore(pSma, &infoItem);
pIter->qBuf = POINTER_SHIFT(pIter->qBuf, infoItem.len); pIter->qBuf = POINTER_SHIFT(pIter->qBuf, infoItem.len);
......
...@@ -275,7 +275,7 @@ int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, SRsmaSnapWrit ...@@ -275,7 +275,7 @@ int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, SRsmaSnapWrit
qWriter->pSma = pSma; qWriter->pSma = pSma;
char qTaskInfoFullName[TSDB_FILENAME_LEN]; char qTaskInfoFullName[TSDB_FILENAME_LEN];
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), 1, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFullName); tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), 0, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFullName);
TdFilePtr qTaskF = taosCreateFile(qTaskInfoFullName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); TdFilePtr qTaskF = taosCreateFile(qTaskInfoFullName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (!qTaskF) { if (!qTaskF) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
...@@ -323,10 +323,19 @@ int32_t rsmaSnapWriterClose(SRsmaSnapWriter** ppWriter, int8_t rollback) { ...@@ -323,10 +323,19 @@ int32_t rsmaSnapWriterClose(SRsmaSnapWriter** ppWriter, int8_t rollback) {
// qtaskinfo // qtaskinfo
if (pWriter->pQTaskFWriter) { if (pWriter->pQTaskFWriter) {
char qTaskInfoFullName[TSDB_FILENAME_LEN]; char qTaskInfoFullName[TSDB_FILENAME_LEN];
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), 0, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFullName); tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pWriter->ever, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFullName);
taosRenameFile(pWriter->pQTaskFWriter->fname, qTaskInfoFullName); if (taosRenameFile(pWriter->pQTaskFWriter->fname, qTaskInfoFullName) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
smaInfo("vgId:%d, vnode snapshot rsma writer rename %s to %s", SMA_VID(pWriter->pSma), smaInfo("vgId:%d, vnode snapshot rsma writer rename %s to %s", SMA_VID(pWriter->pSma),
pWriter->pQTaskFWriter->fname, qTaskInfoFullName); pWriter->pQTaskFWriter->fname, qTaskInfoFullName);
// rsma restore
if ((code = tdRsmaRestore(pWriter->pSma, RSMA_RESTORE_SYNC, pWriter->ever)) < 0) {
goto _err;
}
smaInfo("vgId:%d, vnode snapshot rsma writer restore from %s succeed", SMA_VID(pWriter->pSma), qTaskInfoFullName);
} }
} }
......
...@@ -844,6 +844,7 @@ void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock); ...@@ -844,6 +844,7 @@ void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock);
void cleanupBasicInfo(SOptrBasicInfo* pInfo); void cleanupBasicInfo(SOptrBasicInfo* pInfo);
int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr); int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr);
void cleanupExprSupp(SExprSupp* pSup); void cleanupExprSupp(SExprSupp* pSup);
void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs);
int32_t initAggInfo(SExprSupp *pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize, int32_t initAggInfo(SExprSupp *pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
const char* pkey); const char* pkey);
void initResultSizeInfo(SResultInfo * pResultInfo, int32_t numOfRows); void initResultSizeInfo(SResultInfo * pResultInfo, int32_t numOfRows);
......
...@@ -3328,17 +3328,19 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator) { ...@@ -3328,17 +3328,19 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator) {
return fillResult; return fillResult;
} }
static void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) { void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) {
for (int32_t i = 0; i < numOfExprs; ++i) { if (pExpr) {
SExprInfo* pExprInfo = &pExpr[i]; for (int32_t i = 0; i < numOfExprs; ++i) {
for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) { SExprInfo* pExprInfo = &pExpr[i];
if (pExprInfo->base.pParam[j].type == FUNC_PARAM_TYPE_COLUMN) { for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
taosMemoryFreeClear(pExprInfo->base.pParam[j].pCol); if (pExprInfo->base.pParam[j].type == FUNC_PARAM_TYPE_COLUMN) {
taosMemoryFreeClear(pExprInfo->base.pParam[j].pCol);
}
} }
}
taosMemoryFree(pExprInfo->base.pParam); taosMemoryFree(pExprInfo->base.pParam);
taosMemoryFree(pExprInfo->pExpr); taosMemoryFree(pExprInfo->pExpr);
}
} }
} }
...@@ -3768,11 +3770,7 @@ SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) { ...@@ -3768,11 +3770,7 @@ SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) {
static void cleanupTableSchemaInfo(SSchemaInfo* pSchemaInfo) { static void cleanupTableSchemaInfo(SSchemaInfo* pSchemaInfo) {
taosMemoryFreeClear(pSchemaInfo->dbname); taosMemoryFreeClear(pSchemaInfo->dbname);
if (pSchemaInfo->sw == NULL) { taosMemoryFreeClear(pSchemaInfo->tablename);
return;
}
taosMemoryFree(pSchemaInfo->tablename);
tDeleteSSchemaWrapper(pSchemaInfo->sw); tDeleteSSchemaWrapper(pSchemaInfo->sw);
tDeleteSSchemaWrapper(pSchemaInfo->qsw); tDeleteSSchemaWrapper(pSchemaInfo->qsw);
} }
......
...@@ -610,6 +610,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_ENV, "Invalid rsma env") ...@@ -610,6 +610,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_ENV, "Invalid rsma env")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_STAT, "Invalid rsma state") TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_STAT, "Invalid rsma state")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_QTASKINFO_CREATE, "Rsma qtaskinfo creation error") TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_QTASKINFO_CREATE, "Rsma qtaskinfo creation error")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FILE_CORRUPTED, "Rsma file corrupted") TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FILE_CORRUPTED, "Rsma file corrupted")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_REMOVE_EXISTS, "Rsma remove exists")
//index //index
TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_REBUILDING, "Index is rebuilding") TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_REBUILDING, "Index is rebuilding")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册