未验证 提交 eeb5e918 编写于 作者: C Cary Xu 提交者: GitHub

Merge pull request #14268 from taosdata/feature/TD-11274-3.0

refactor: rsma resource release
......@@ -3696,7 +3696,7 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *
if (tDecodeI8(&decoder, &pReq->isTsma) < 0) return -1;
if (pReq->isTsma) {
if (tDecodeBinaryAlloc(&decoder, &pReq->pTsma, NULL) < 0) return -1;
if (tDecodeBinary(&decoder, (uint8_t **)&pReq->pTsma, NULL) < 0) return -1;
}
tEndDecode(&decoder);
......@@ -3707,9 +3707,6 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *
int32_t tFreeSCreateVnodeReq(SCreateVnodeReq *pReq) {
taosArrayDestroy(pReq->pRetensions);
pReq->pRetensions = NULL;
if (pReq->isTsma) {
taosMemoryFreeClear(pReq->pTsma);
}
return 0;
}
......@@ -4747,9 +4744,8 @@ int32_t tDecodeSRSmaParam(SDecoder *pCoder, SRSmaParam *pRSmaParam) {
if (tDecodeI64v(pCoder, &pRSmaParam->watermark[i]) < 0) return -1;
if (tDecodeI32v(pCoder, &pRSmaParam->qmsgLen[i]) < 0) return -1;
if (pRSmaParam->qmsgLen[i] > 0) {
uint64_t len;
if (tDecodeBinaryAlloc(pCoder, (void **)&pRSmaParam->qmsg[i], &len) < 0)
return -1; // qmsgLen contains len of '\0'
tDecoderMalloc(pCoder, pRSmaParam->qmsgLen[i]);
if (tDecodeBinary(pCoder, (uint8_t **)&pRSmaParam->qmsg[i], NULL) < 0) return -1; // qmsgLen contains len of '\0'
} else {
pRSmaParam->qmsg[i] = NULL;
}
......
......@@ -125,6 +125,7 @@ int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT *pReader, STableBlockDistInfo
bool isTsdbCacheLastRow(tsdbReaderT *pReader);
int32_t tsdbGetAllTableList(SMeta *pMeta, uint64_t uid, SArray *list);
int32_t tsdbGetCtbIdList(SMeta *pMeta, int64_t suid, SArray *list);
int32_t tsdbGetStbIdList(SMeta *pMeta, int64_t suid, SArray *list);
void *tsdbGetIdx(SMeta *pMeta);
void *tsdbGetIvtIdx(SMeta *pMeta);
int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT *pHandle);
......
......@@ -69,6 +69,7 @@ struct SMeta {
TTB* pUidIdx;
TTB* pNameIdx;
TTB* pCtbIdx;
TTB* pSuidIdx;
// ivt idx and idx
void* pTagIvtIdx;
TTB* pTagIdx;
......
......@@ -60,8 +60,9 @@ struct SRSmaStat {
SSma *pSma;
void *tmrHandle;
tmr_h tmrId;
int8_t tmrStat;
int32_t tmrSeconds;
int8_t triggerStat;
int8_t runningStat;
SHashObj *rsmaInfoHash; // key: stbUid, value: SRSmaInfo;
};
......@@ -74,11 +75,19 @@ struct SSmaStat {
};
#define SMA_TSMA_STAT(s) (&(s)->tsmaStat)
#define SMA_RSMA_STAT(s) (&(s)->rsmaStat)
#define SMA_RSMA_INFO_HASH(s) ((s)->rsmaStat.rsmaInfoHash)
#define SMA_RSMA_TMR_HANDLE(s) ((s)->rsmaStat.tmrHandle)
#define SMA_RSMA_TMR_STAT(s) ((s)->rsmaStat.tmrStat)
#define RSMA_INFO_HASH(r) ((r)->rsmaInfoHash)
#define RSMA_TMR_ID(r) ((r)->tmrId)
#define RSMA_TMR_HANDLE(r) ((r)->tmrHandle)
#define RSMA_TRIGGER_STAT(r) (&(r)->triggerStat)
#define RSMA_RUNNING_STAT(r) (&(r)->runningStat)
enum {
TASK_TRIGGER_STAT_INIT = 0,
TASK_TRIGGER_STAT_ACTIVE = 1,
TASK_TRIGGER_STAT_INACTIVE = 2,
TASK_TRIGGER_STAT_CANCELLED = 3,
TASK_TRIGGER_STAT_FINISHED = 4,
};
void tdDestroySmaEnv(SSmaEnv *pSmaEnv);
void *tdFreeSmaEnv(SSmaEnv *pSmaEnv);
......@@ -174,6 +183,8 @@ int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg);
int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg);
int32_t tdProcessTSmaGetDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days);
// smaFileUtil ================
typedef struct STFInfo STFInfo;
typedef struct STFile STFile;
......@@ -181,7 +192,7 @@ struct STFInfo {
uint32_t magic;
uint32_t ftype;
uint32_t fver;
uint64_t fsize;
int64_t fsize;
};
struct STFile {
......@@ -199,11 +210,8 @@ struct STFile {
#define TD_FILE_OPENED(tf) (TD_FILE_PFILE(tf) != NULL)
#define TD_FILE_CLOSED(tf) (!TD_FILE_OPENED(tf))
#define TD_FILE_SET_CLOSED(f) (TD_FILE_PFILE(f) = NULL)
#define TD_FILE_STATE(tf) ((tf)->state)
#define TD_FILE_SET_STATE(tf, s) ((tf)->state = (s))
#define TD_FILE_DID(tf) (TD_FILE_F(tf)->did)
#define TD_FILE_IS_OK(tf) (TD_FILE_STATE(tf) == TD_FILE_STATE_OK)
#define TD_FILE_IS_BAD(tf) (TD_FILE_STATE(tf) == TD_FILE_STATE_BAD)
int32_t tdInitTFile(STFile *pTFile, STfs *pTfs, const char *fname);
int32_t tdCreateTFile(STFile *pTFile, STfs *pTfs, bool updateHeader, int8_t fType);
......
......@@ -76,6 +76,7 @@ void vnodeFree(void* p);
// meta
typedef struct SMCtbCursor SMCtbCursor;
typedef struct SMStbCursor SMStbCursor;
typedef struct STbUidStore STbUidStore;
int metaOpen(SVnode* pVnode, SMeta** ppMeta);
......@@ -97,6 +98,9 @@ int metaGetTbNum(SMeta* pMeta);
SMCtbCursor* metaOpenCtbCursor(SMeta* pMeta, tb_uid_t uid);
void metaCloseCtbCursor(SMCtbCursor* pCtbCur);
tb_uid_t metaCtbCursorNext(SMCtbCursor* pCtbCur);
SMStbCursor* metaOpenStbCursor(SMeta* pMeta, tb_uid_t uid);
void metaCloseStbCursor(SMStbCursor* pStbCur);
tb_uid_t metaStbCursorNext(SMStbCursor* pStbCur);
STSma* metaGetSmaInfoByIndex(SMeta* pMeta, int64_t indexUid);
STSmaWrapper* metaGetSmaInfoByTable(SMeta* pMeta, tb_uid_t uid, bool deepCopy);
SArray* metaGetSmaIdsByTable(SMeta* pMeta, tb_uid_t uid);
......@@ -158,6 +162,8 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool
// sma
int32_t smaOpen(SVnode* pVnode);
int32_t smaClose(SSma* pSma);
int32_t smaCloseEnv(SSma* pSma);
int32_t smaCloseEx(SSma* pSma);
int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg);
int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg);
......
......@@ -92,6 +92,13 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) {
goto _err;
}
// open pSuidIdx
ret = tdbTbOpen("suid.idx", sizeof(tb_uid_t), 0, uidIdxKeyCmpr, pMeta->pEnv, &pMeta->pSuidIdx);
if (ret < 0) {
metaError("vgId:%d, failed to open meta super table index since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
// open pTagIdx
// TODO(yihaoDeng), refactor later
char indexFullPath[128] = {0};
......@@ -141,6 +148,7 @@ _err:
if (pMeta->pTagIvtIdx) indexClose(pMeta->pTagIvtIdx);
if (pMeta->pTagIdx) tdbTbClose(pMeta->pTagIdx);
if (pMeta->pCtbIdx) tdbTbClose(pMeta->pCtbIdx);
if (pMeta->pSuidIdx) tdbTbClose(pMeta->pSuidIdx);
if (pMeta->pNameIdx) tdbTbClose(pMeta->pNameIdx);
if (pMeta->pUidIdx) tdbTbClose(pMeta->pUidIdx);
if (pMeta->pSkmDb) tdbTbClose(pMeta->pSkmDb);
......@@ -162,6 +170,7 @@ int metaClose(SMeta *pMeta) {
if (pMeta->pTagIdx) tdbTbClose(pMeta->pTagIdx);
#endif
if (pMeta->pCtbIdx) tdbTbClose(pMeta->pCtbIdx);
if (pMeta->pSuidIdx) tdbTbClose(pMeta->pSuidIdx);
if (pMeta->pNameIdx) tdbTbClose(pMeta->pNameIdx);
if (pMeta->pUidIdx) tdbTbClose(pMeta->pUidIdx);
if (pMeta->pSkmDb) tdbTbClose(pMeta->pSkmDb);
......
......@@ -325,6 +325,70 @@ tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur) {
return pCtbIdxKey->uid;
}
struct SMStbCursor {
SMeta *pMeta;
TBC *pCur;
tb_uid_t suid;
void *pKey;
void *pVal;
int kLen;
int vLen;
};
SMStbCursor *metaOpenStbCursor(SMeta *pMeta, tb_uid_t suid) {
SMStbCursor *pStbCur = NULL;
int ret = 0;
int c = 0;
pStbCur = (SMStbCursor *)taosMemoryCalloc(1, sizeof(*pStbCur));
if (pStbCur == NULL) {
return NULL;
}
pStbCur->pMeta = pMeta;
pStbCur->suid = suid;
metaRLock(pMeta);
ret = tdbTbcOpen(pMeta->pSuidIdx, &pStbCur->pCur, NULL);
if (ret < 0) {
metaULock(pMeta);
taosMemoryFree(pStbCur);
return NULL;
}
// move to the suid
tdbTbcMoveTo(pStbCur->pCur, &suid, sizeof(suid), &c);
if (c > 0) {
tdbTbcMoveToNext(pStbCur->pCur);
}
return pStbCur;
}
void metaCloseStbCursor(SMStbCursor *pStbCur) {
if (pStbCur) {
if (pStbCur->pMeta) metaULock(pStbCur->pMeta);
if (pStbCur->pCur) {
tdbTbcClose(pStbCur->pCur);
tdbFree(pStbCur->pKey);
tdbFree(pStbCur->pVal);
}
taosMemoryFree(pStbCur);
}
}
tb_uid_t metaStbCursorNext(SMStbCursor *pStbCur) {
int ret;
ret = tdbTbcNext(pStbCur->pCur, &pStbCur->pKey, &pStbCur->kLen, &pStbCur->pVal, &pStbCur->vLen);
if (ret < 0) {
return 0;
}
return *(tb_uid_t*)pStbCur->pKey;
}
STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) {
// SMetaReader mr = {0};
STSchema * pTSchema = NULL;
......
......@@ -23,6 +23,7 @@ static int metaUpdateNameIdx(SMeta *pMeta, const SMetaEntry *pME);
static int metaUpdateTtlIdx(SMeta *pMeta, const SMetaEntry *pME);
static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME);
static int metaUpdateCtbIdx(SMeta *pMeta, const SMetaEntry *pME);
static int metaUpdateSuidIdx(SMeta *pMeta, const SMetaEntry *pME);
static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry);
static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type);
......@@ -209,6 +210,7 @@ _drop_super_table:
&pMeta->txn);
tdbTbDelete(pMeta->pNameIdx, pReq->name, strlen(pReq->name) + 1, &pMeta->txn);
tdbTbDelete(pMeta->pUidIdx, &pReq->suid, sizeof(tb_uid_t), &pMeta->txn);
tdbTbDelete(pMeta->pSuidIdx, &pReq->suid, sizeof(tb_uid_t), &pMeta->txn);
metaULock(pMeta);
......@@ -436,11 +438,13 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) {
tdbTbDelete(pMeta->pUidIdx, &uid, sizeof(uid), &pMeta->txn);
if(e.type != TSDB_SUPER_TABLE) metaDeleteTtlIdx(pMeta, &e);
if (e.type == TSDB_CHILD_TABLE) {
tdbTbDelete(pMeta->pCtbIdx, &(SCtbIdxKey){.suid = e.ctbEntry.suid, .uid = uid}, sizeof(SCtbIdxKey), &pMeta->txn);
} else if (e.type == TSDB_NORMAL_TABLE) {
// drop schema.db (todo)
} else if (e.type == TSDB_SUPER_TABLE) {
tdbTbDelete(pMeta->pSuidIdx, &e.uid, sizeof(tb_uid_t), &pMeta->txn);
// drop schema.db (todo)
}
......@@ -911,6 +915,10 @@ static int metaUpdateUidIdx(SMeta *pMeta, const SMetaEntry *pME) {
return tdbTbInsert(pMeta->pUidIdx, &pME->uid, sizeof(tb_uid_t), &pME->version, sizeof(int64_t), &pMeta->txn);
}
static int metaUpdateSuidIdx(SMeta *pMeta, const SMetaEntry *pME) {
return tdbTbInsert(pMeta->pSuidIdx, &pME->uid, sizeof(tb_uid_t), NULL, 0, &pMeta->txn);
}
static int metaUpdateNameIdx(SMeta *pMeta, const SMetaEntry *pME) {
return tdbTbInsert(pMeta->pNameIdx, pME->name, strlen(pME->name) + 1, &pME->uid, sizeof(tb_uid_t), &pMeta->txn);
}
......@@ -1081,6 +1089,10 @@ static int metaHandleEntry(SMeta *pMeta, const SMetaEntry *pME) {
} else {
// update schema.db
if (metaSaveToSkmDb(pMeta, pME) < 0) goto _err;
if (pME->type == TSDB_SUPER_TABLE) {
if (metaUpdateSuidIdx(pMeta, pME) < 0) goto _err;
}
}
if (pME->type != TSDB_SUPER_TABLE) {
......
......@@ -126,22 +126,21 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS
}
if (smaType == TSDB_SMA_TYPE_ROLLUP) {
SMA_RSMA_STAT(*pSmaStat)->pSma = (SSma*)pSma;
SRSmaStat *pRSmaStat = (SRSmaStat *)(*pSmaStat);
pRSmaStat->pSma = (SSma *)pSma;
// init timer
SMA_RSMA_TMR_HANDLE(*pSmaStat) = taosTmrInit(10000, 100, 10000, "RSMA_G");
if (!SMA_RSMA_TMR_HANDLE(*pSmaStat)) {
RSMA_TMR_HANDLE(pRSmaStat) = taosTmrInit(10000, 100, 10000, "RSMA");
if (!RSMA_TMR_HANDLE(pRSmaStat)) {
taosMemoryFreeClear(*pSmaStat);
return TSDB_CODE_FAILED;
}
atomic_store_8(&SMA_RSMA_TMR_STAT(*pSmaStat), TASK_TRIGGER_STATUS__ACTIVE);
// init hash
SMA_RSMA_INFO_HASH(*pSmaStat) = taosHashInit(
RSMA_INFO_HASH(pRSmaStat) = taosHashInit(
RSMA_TASK_INFO_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
if (!SMA_RSMA_INFO_HASH(*pSmaStat)) {
if (SMA_RSMA_TMR_HANDLE(*pSmaStat)) {
taosTmrCleanUp(SMA_RSMA_TMR_HANDLE(*pSmaStat));
if (!RSMA_INFO_HASH(pRSmaStat)) {
if (RSMA_TMR_HANDLE(pRSmaStat)) {
taosTmrCleanUp(RSMA_TMR_HANDLE(pRSmaStat));
}
taosMemoryFreeClear(*pSmaStat);
return TSDB_CODE_FAILED;
......@@ -155,12 +154,79 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS
return TSDB_CODE_SUCCESS;
}
static void *tdFreeTSmaStat(STSmaStat *pStat) {
static void tdDestroyTSmaStat(STSmaStat *pStat) {
if (pStat) {
tDestroyTSma(pStat->pTSma);
taosMemoryFreeClear(pStat->pTSma);
taosMemoryFreeClear(pStat->pTSchema);
}
}
static void *tdFreeTSmaStat(STSmaStat *pStat) {
tdDestroyTSmaStat(pStat);
taosMemoryFreeClear(pStat);
return NULL;
}
static void tdDestroyRSmaStat(SRSmaStat *pStat) {
if (pStat) {
smaDebug("vgId:%d, %s:%d free rsma stat", SMA_VID(pStat->pSma), __func__, __LINE__);
// step 1: set persistence task cancelled
atomic_store_8(RSMA_TRIGGER_STAT(pStat), TASK_TRIGGER_STAT_CANCELLED);
// step 2: clean timer
taosTmrStopA(&RSMA_TMR_ID(pStat));
if (RSMA_TMR_HANDLE(pStat)) {
taosTmrCleanUp(RSMA_TMR_HANDLE(pStat));
}
// step 3: wait the persistence thread to finish
int32_t nLoops = 0;
if (atomic_load_8(RSMA_RUNNING_STAT(pStat)) == 1) {
while (1) {
if (atomic_load_8(RSMA_TRIGGER_STAT(pStat)) == TASK_TRIGGER_STAT_FINISHED) {
break;
} else {
smaDebug("not destroyed since rsma stat in %" PRIi8, atomic_load_8(RSMA_TRIGGER_STAT(pStat)));
}
++nLoops;
if (nLoops > 1000) {
sched_yield();
nLoops = 0;
}
taosMsleep(1000); // TODO: remove this line when release
}
}
// step 4: destroy the rsma info and associated fetch tasks
// TODO: use taosHashSetFreeFp when taosHashSetFreeFp is ready.
void *infoHash = taosHashIterate(RSMA_INFO_HASH(pStat), NULL);
while (infoHash) {
SRSmaInfo *pSmaInfo = *(SRSmaInfo **)infoHash;
tdFreeRSmaInfo(pSmaInfo);
infoHash = taosHashIterate(RSMA_INFO_HASH(pStat), infoHash);
}
taosHashCleanup(RSMA_INFO_HASH(pStat));
// step 5: wait all triggered fetch tasks finished
nLoops = 0;
while (1) {
if (T_REF_VAL_GET((SSmaStat *)pStat) == 0) {
break;
}
++nLoops;
if (nLoops > 1000) {
sched_yield();
nLoops = 0;
}
taosMsleep(1000); // TODO: remove this line when release
}
}
}
static void *tdFreeRSmaStat(SRSmaStat *pStat) {
tdDestroyRSmaStat(pStat);
taosMemoryFreeClear(pStat);
return NULL;
}
......@@ -179,22 +245,16 @@ void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType) {
int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType) {
if (pSmaStat) {
if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
tdFreeTSmaStat(&pSmaStat->tsmaStat);
smaDebug("%s:%d destroy tsma stat", __func__, __LINE__);
tdDestroyTSmaStat(SMA_TSMA_STAT(pSmaStat));
} else if (smaType == TSDB_SMA_TYPE_ROLLUP) {
if (SMA_RSMA_TMR_HANDLE(pSmaStat)) {
taosTmrCleanUp(SMA_RSMA_TMR_HANDLE(pSmaStat));
}
// TODO: use taosHashSetFreeFp when taosHashSetFreeFp is ready.
void *infoHash = taosHashIterate(SMA_RSMA_INFO_HASH(pSmaStat), NULL);
while (infoHash) {
SRSmaInfo *pInfoHash = *(SRSmaInfo **)infoHash;
tdFreeRSmaInfo(pInfoHash);
infoHash = taosHashIterate(SMA_RSMA_INFO_HASH(pSmaStat), infoHash);
}
taosHashCleanup(SMA_RSMA_INFO_HASH(pSmaStat));
smaDebug("%s:%d destroy rsma stat", __func__, __LINE__);
tdDestroyRSmaStat(SMA_RSMA_STAT(pSmaStat));
} else {
ASSERT(0);
}
} else {
smaDebug("%s:%d no need to destroy rsma stat", __func__, __LINE__);
}
return TSDB_CODE_SUCCESS;
}
......@@ -260,34 +320,3 @@ int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType) {
return TSDB_CODE_SUCCESS;
};
\ No newline at end of file
int32_t smaTimerInit(void **timer, int8_t *initFlag, const char *label) {
int8_t old;
while (1) {
old = atomic_val_compare_exchange_8(initFlag, 0, 2);
if (old != 2) break;
}
if (old == 0) {
*timer = taosTmrInit(10000, 100, 10000, label);
if (!(*timer)) {
atomic_store_8(initFlag, 0);
return -1;
}
atomic_store_8(initFlag, 1);
}
return 0;
}
void smaTimerCleanUp(void *timer, int8_t *initFlag) {
int8_t old;
while (1) {
old = atomic_val_compare_exchange_8(initFlag, 1, 2);
if (old != 2) break;
}
if (old == 1) {
taosTmrCleanUp(timer);
atomic_store_8(initFlag, 0);
}
}
\ No newline at end of file
......@@ -126,19 +126,31 @@ _err:
return -1;
}
int32_t smaClose(SSma *pSma) {
int32_t smaCloseEnv(SSma *pSma) {
if(pSma) {
SMA_TSMA_ENV(pSma) = tdFreeSmaEnv(SMA_TSMA_ENV(pSma));
SMA_RSMA_ENV(pSma) = tdFreeSmaEnv(SMA_RSMA_ENV(pSma));
}
return 0;
}
int32_t smaCloseEx(SSma *pSma) {
if (pSma) {
taosThreadMutexDestroy(&pSma->mutex);
if SMA_RSMA_TSDB0 (pSma) tsdbClose(&SMA_RSMA_TSDB0(pSma));
if SMA_RSMA_TSDB1 (pSma) tsdbClose(&SMA_RSMA_TSDB1(pSma));
if SMA_RSMA_TSDB2 (pSma) tsdbClose(&SMA_RSMA_TSDB2(pSma));
// SMA_TSMA_ENV(pSma) = tdFreeSmaEnv(SMA_TSMA_ENV(pSma));
// SMA_RSMA_ENV(pSma) = tdFreeSmaEnv(SMA_RSMA_ENV(pSma));
taosMemoryFreeClear(pSma);
}
return 0;
}
int32_t smaClose(SSma *pSma) {
smaCloseEnv(pSma);
smaCloseEx(pSma);
return 0;
}
/**
* @brief rsma env restore
*
......
......@@ -14,8 +14,8 @@
*/
#include "sma.h"
#include "tstream.h"
#define RSMA_QTASK_PERSIST_MS 7200000
typedef enum { TD_QTASK_TMP_FILE = 0, TD_QTASK_CUR_FILE } TD_QTASK_FILE_T;
static const char *tdQTaskInfoFname[] = {"qtaskinfo.t", "qtaskinfo"};
......@@ -31,13 +31,13 @@ static void tdRSmaPersistTrigger(void *param, void *tmrId);
struct SRSmaInfoItem {
SRSmaInfo *pRsmaInfo;
void *taskInfo; // qTaskInfo_t
void *tmrHandle;
tmr_h tmrId;
int8_t level;
int8_t tmrInitFlag;
int8_t triggerStatus; // TASK_TRIGGER_STATUS__IN_ACTIVE/TASK_TRIGGER_STATUS__ACTIVE
int8_t triggerStat;
int32_t maxDelay;
};
struct SRSmaInfo {
STSchema *pTSchema;
SSma *pSma;
......@@ -45,11 +45,14 @@ struct SRSmaInfo {
SRSmaInfoItem items[TSDB_RETENTION_L2];
};
static FORCE_INLINE void tdFreeTaskHandle(qTaskInfo_t *taskHandle) {
static FORCE_INLINE void tdFreeTaskHandle(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level) {
// Note: free/kill may in RC
qTaskInfo_t otaskHandle = atomic_load_ptr(taskHandle);
if (otaskHandle && atomic_val_compare_exchange_ptr(taskHandle, otaskHandle, NULL)) {
smaDebug("vgId:%d, %s:%d free qTaskInfo_t %p of level %d", vgId, __func__, __LINE__, otaskHandle, level);
qDestroyTask(otaskHandle);
} else {
smaDebug("vgId:%d, %s:%d not free qTaskInfo_t %p of level %d", vgId, __func__, __LINE__, otaskHandle, level);
}
}
......@@ -58,14 +61,19 @@ void *tdFreeRSmaInfo(SRSmaInfo *pInfo) {
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
SRSmaInfoItem *pItem = &pInfo->items[i];
if (pItem->taskInfo) {
tdFreeTaskHandle(pItem->taskInfo);
}
if (pItem->tmrHandle) {
taosTmrCleanUp(pItem->tmrHandle);
smaDebug("vgId:%d, stb %" PRIi64 " stop fetch-timer %p level %d", SMA_VID(pInfo->pSma), pInfo->suid,
pItem->tmrId, i + 1);
taosTmrStopA(&pItem->tmrId);
tdFreeTaskHandle(&pItem->taskInfo, SMA_VID(pInfo->pSma), i + 1);
} else {
smaDebug("vgId:%d, stb %" PRIi64 " no need to destroy rsma info level %d since empty taskInfo",
SMA_VID(pInfo->pSma), pInfo->suid, i + 1);
}
}
taosMemoryFree(pInfo->pTSchema);
taosMemoryFree(pInfo);
} else {
smaDebug("vgId:%d, stb %" PRIi64 " no need to destroy rsma info since empty", SMA_VID(pInfo->pSma), pInfo->suid);
}
return NULL;
......@@ -83,7 +91,7 @@ static FORCE_INLINE int32_t tdUidStoreInit(STbUidStore **pStore) {
static FORCE_INLINE int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids) {
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SSmaStat *pStat = SMA_ENV_STAT(pEnv);
SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
SRSmaInfo *pRSmaInfo = NULL;
if (!suid || !tbUids) {
......@@ -92,28 +100,32 @@ static FORCE_INLINE int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SA
return TSDB_CODE_FAILED;
}
pRSmaInfo = taosHashGet(SMA_RSMA_INFO_HASH(pStat), suid, sizeof(tb_uid_t));
pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), suid, sizeof(tb_uid_t));
if (!pRSmaInfo || !(pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) {
smaError("vgId:%d, failed to get rsma info for uid:%" PRIi64, SMA_VID(pSma), *suid);
terrno = TSDB_CODE_RSMA_INVALID_STAT;
return TSDB_CODE_FAILED;
}
if (pRSmaInfo->items[0].taskInfo && (qUpdateQualifiedTableId(pRSmaInfo->items[0].taskInfo, tbUids, true) < 0)) {
if (pRSmaInfo->items[0].taskInfo) {
if ((qUpdateQualifiedTableId(pRSmaInfo->items[0].taskInfo, tbUids, true) < 0)) {
smaError("vgId:%d, update tbUidList failed for uid:%" PRIi64 " since %s", SMA_VID(pSma), *suid, terrstr(terrno));
return TSDB_CODE_FAILED;
} else {
smaDebug("vgId:%d, update tbUidList succeed for qTaskInfo:%p with suid:%" PRIi64 ", uid:%" PRIi64, SMA_VID(pSma),
pRSmaInfo->items[0].taskInfo, *suid, *(int64_t *)taosArrayGet(tbUids, 0));
}
}
if (pRSmaInfo->items[1].taskInfo && (qUpdateQualifiedTableId(pRSmaInfo->items[1].taskInfo, tbUids, true) < 0)) {
if (pRSmaInfo->items[1].taskInfo) {
if ((qUpdateQualifiedTableId(pRSmaInfo->items[1].taskInfo, tbUids, true) < 0)) {
smaError("vgId:%d, update tbUidList failed for uid:%" PRIi64 " since %s", SMA_VID(pSma), *suid, terrstr(terrno));
return TSDB_CODE_FAILED;
} else {
smaDebug("vgId:%d, update tbUidList succeed for qTaskInfo:%p with suid:%" PRIi64 ", uid:%" PRIi64, SMA_VID(pSma),
pRSmaInfo->items[1].taskInfo, *suid, *(int64_t *)taosArrayGet(tbUids, 0));
}
}
return TSDB_CODE_SUCCESS;
}
......@@ -159,9 +171,9 @@ int32_t tdFetchTbUidList(SSma *pSma, STbUidStore **ppStore, tb_uid_t suid, tb_ui
return TSDB_CODE_SUCCESS;
}
SSmaStat *pStat = SMA_ENV_STAT(pEnv);
SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
SHashObj *infoHash = NULL;
if (!pStat || !(infoHash = SMA_RSMA_INFO_HASH(pStat))) {
if (!pStat || !(infoHash = RSMA_INFO_HASH(pStat))) {
terrno = TSDB_CODE_RSMA_INVALID_STAT;
return TSDB_CODE_FAILED;
}
......@@ -195,11 +207,11 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaInfo
if (param->qmsg[idx]) {
SRSmaInfoItem *pItem = &(pRSmaInfo->items[idx]);
pItem->pRsmaInfo = pRSmaInfo;
pItem->taskInfo = qCreateStreamExecTaskInfo(param->qmsg[0], pReadHandle);
pItem->taskInfo = qCreateStreamExecTaskInfo(param->qmsg[idx], pReadHandle);
if (!pItem->taskInfo) {
goto _err;
}
pItem->triggerStatus = TASK_TRIGGER_STATUS__IN_ACTIVE;
pItem->triggerStat = TASK_TRIGGER_STAT_INACTIVE;
if (param->maxdelay[idx] < TSDB_MIN_ROLLUP_MAX_DELAY) {
int64_t msInterval =
convertTimeFromPrecisionToUnit(pRetention[idx + 1].freq, pTsdbCfg->precision, TIME_UNIT_MILLISECOND);
......@@ -211,10 +223,6 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaInfo
pItem->maxDelay = TSDB_MAX_ROLLUP_MAX_DELAY;
}
pItem->level = (idx == 0 ? TSDB_RETENTION_L1 : TSDB_RETENTION_L2);
pItem->tmrHandle = taosTmrInit(10000, 100, 10000, "RSMA");
if (!pItem->tmrHandle) {
goto _err;
}
}
return TSDB_CODE_SUCCESS;
_err:
......@@ -251,10 +259,10 @@ int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq *pReq) {
}
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SSmaStat *pStat = SMA_ENV_STAT(pEnv);
SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
SRSmaInfo *pRSmaInfo = NULL;
pRSmaInfo = taosHashGet(SMA_RSMA_INFO_HASH(pStat), &pReq->suid, sizeof(tb_uid_t));
pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &pReq->suid, sizeof(tb_uid_t));
if (pRSmaInfo) {
ASSERT(0); // TODO: free original pRSmaInfo is exists abnormally
smaWarn("vgId:%d, rsma info already exists for stb: %s, %" PRIi64, SMA_VID(pSma), pReq->name, pReq->suid);
......@@ -293,16 +301,23 @@ int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq *pReq) {
if (tdSetRSmaInfoItemParams(pSma, param, pRSmaInfo, &handle, 0) < 0) {
goto _err;
}
if (tdSetRSmaInfoItemParams(pSma, param, pRSmaInfo, &handle, 1) < 0) {
goto _err;
}
if (taosHashPut(SMA_RSMA_INFO_HASH(pStat), &pReq->suid, sizeof(tb_uid_t), &pRSmaInfo, sizeof(pRSmaInfo)) < 0) {
if (taosHashPut(RSMA_INFO_HASH(pStat), &pReq->suid, sizeof(tb_uid_t), &pRSmaInfo, sizeof(pRSmaInfo)) < 0) {
goto _err;
} else {
smaDebug("vgId:%d, register rsma info succeed for suid:%" PRIi64, SMA_VID(pSma), pReq->suid);
}
// start the persist timer
if (TASK_TRIGGER_STAT_INIT ==
atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pStat), TASK_TRIGGER_STAT_INIT, TASK_TRIGGER_STAT_ACTIVE)) {
taosTmrStart(tdRSmaPersistTrigger, RSMA_QTASK_PERSIST_MS, pStat, RSMA_TMR_HANDLE(pStat));
}
return TSDB_CODE_SUCCESS;
_err:
tdFreeRSmaInfo(pRSmaInfo);
......@@ -432,6 +447,16 @@ static int32_t tdFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) {
return 0;
}
static void tdDestroySDataBlockArray(SArray *pArray) {
#if 0
for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
SSDataBlock *pDataBlock = taosArrayGet(pArray, i);
blockDestroyInner(pDataBlock);
}
#endif
taosArrayDestroy(pArray);
}
static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType) {
SArray *pResult = NULL;
SRSmaInfo *pRSmaInfo = pItem->pRsmaInfo;
......@@ -466,6 +491,7 @@ static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType)
#endif
STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb1 : pSma->pRSmaTsdb2);
SSubmitReq *pReq = NULL;
// TODO: the schema update should be handled
if (buildSubmitReqFromDataBlock(&pReq, pResult, pRSmaInfo->pTSchema, SMA_VID(pSma), pRSmaInfo->suid) < 0) {
goto _err;
}
......@@ -477,17 +503,13 @@ static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType)
taosMemoryFreeClear(pReq);
} else {
smaDebug("vgId:%d, no rsma % " PRIi8 " data generated since %s", SMA_VID(pSma), pItem->level, tstrerror(terrno));
smaDebug("vgId:%d, no rsma %" PRIi8 " data generated since %s", SMA_VID(pSma), pItem->level, tstrerror(terrno));
}
if (blkType == STREAM_DATA_TYPE_SUBMIT_BLOCK) {
atomic_store_8(&pItem->triggerStatus, TASK_TRIGGER_STATUS__ACTIVE);
}
taosArrayDestroy(pResult);
tdDestroySDataBlockArray(pResult);
return TSDB_CODE_SUCCESS;
_err:
taosArrayDestroy(pResult);
tdDestroySDataBlockArray(pResult);
return TSDB_CODE_FAILED;
}
......@@ -499,22 +521,34 @@ _err:
*/
static void tdRSmaFetchTrigger(void *param, void *tmrId) {
SRSmaInfoItem *pItem = param;
SSma *pSma = pItem->pRsmaInfo->pSma;
SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT((SSmaEnv *)pSma->pRSmaEnv);
if (atomic_load_8(&pItem->triggerStatus) == TASK_TRIGGER_STATUS__ACTIVE) {
smaWarn("%s:%d THREAD:%" PRIi64 " level %" PRIi8 " status is active for tb suid:%" PRIi64, __func__, __LINE__,
taosGetSelfPthreadId(), pItem->level, pItem->pRsmaInfo->suid);
SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL};
int8_t rsmaTriggerStat = atomic_load_8(RSMA_TRIGGER_STAT(pStat));
if (rsmaTriggerStat == TASK_TRIGGER_STAT_CANCELLED || rsmaTriggerStat == TASK_TRIGGER_STAT_FINISHED) {
smaDebug("vgId:%d, %s:%d level %" PRIi8 " not fetch since stat is cancelled for table suid:%" PRIi64, SMA_VID(pSma),
__func__, __LINE__, pItem->level, pItem->pRsmaInfo->suid);
return;
}
atomic_store_8(&pItem->triggerStatus, TASK_TRIGGER_STATUS__IN_ACTIVE);
qSetStreamInput(pItem->taskInfo, &dataBlock, STREAM_DATA_TYPE_SSDATA_BLOCK, false);
int8_t fetchTriggerStat =
atomic_val_compare_exchange_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE, TASK_TRIGGER_STAT_INACTIVE);
if (fetchTriggerStat == TASK_TRIGGER_STAT_ACTIVE) {
smaDebug("vgId:%d, %s:%d level %" PRIi8 " stat is active for table suid:%" PRIi64, SMA_VID(pSma), __func__,
__LINE__, pItem->level, pItem->pRsmaInfo->suid);
tdRefSmaStat(pSma, (SSmaStat *)pStat);
SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL};
qSetStreamInput(pItem->taskInfo, &dataBlock, STREAM_DATA_TYPE_SSDATA_BLOCK, false);
tdFetchAndSubmitRSmaResult(pItem, STREAM_DATA_TYPE_SSDATA_BLOCK);
tdUnRefSmaStat(pSma, (SSmaStat *)pStat);
} else {
smaWarn("%s:%d THREAD:%" PRIi64 " level %" PRIi8 " status is inactive for tb suid:%" PRIi64, __func__, __LINE__,
taosGetSelfPthreadId(), pItem->level, pItem->pRsmaInfo->suid);
smaDebug("vgId:%d, %s:%d level %" PRIi8 " stat is inactive for table suid:%" PRIi64, SMA_VID(pSma), __func__,
__LINE__, pItem->level, pItem->pRsmaInfo->suid);
}
// taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay, pItem, pItem->tmrHandle, &pItem->tmrId);
}
static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfoItem *pItem,
......@@ -533,14 +567,17 @@ static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int3
}
tdFetchAndSubmitRSmaResult(pItem, STREAM_DATA_TYPE_SUBMIT_BLOCK);
atomic_store_8(&pItem->triggerStatus, TASK_TRIGGER_STATUS__ACTIVE);
smaWarn("%s:%d THREAD:%" PRIi64 " process rsma insert", __func__, __LINE__, taosGetSelfPthreadId());
atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE);
smaDebug("vgId:%d, %s:%d process rsma insert", SMA_VID(pSma), __func__, __LINE__);
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pStat = SMA_RSMA_STAT(pEnv->pStat);
taosTmrStart(tdRSmaPersistTrigger, 5000, pStat, pStat->tmrHandle);
taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay, pItem, pItem->tmrHandle, &pItem->tmrId);
if (pStat->tmrHandle) {
taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay, pItem, pStat->tmrHandle, &pItem->tmrId);
} else {
ASSERT(0);
}
return TSDB_CODE_SUCCESS;
}
......@@ -552,10 +589,10 @@ static int32_t tdExecuteRSma(SSma *pSma, const void *pMsg, int32_t inputType, tb
return TSDB_CODE_SUCCESS;
}
SSmaStat *pStat = SMA_ENV_STAT(pEnv);
SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
SRSmaInfo *pRSmaInfo = NULL;
pRSmaInfo = taosHashGet(SMA_RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t));
pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t));
if (!pRSmaInfo || !(pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) {
smaDebug("vgId:%d, return as no rsma info for suid:%" PRIu64, SMA_VID(pSma), suid);
......@@ -608,7 +645,7 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) {
return TSDB_CODE_SUCCESS;
}
void tdRSmaQTaskGetFName(int32_t vid, int8_t ftype, char* outputName) {
void tdRSmaQTaskGetFName(int32_t vid, int8_t ftype, char *outputName) {
tdGetVndFileName(vid, "rsma", tdQTaskInfoFname[ftype], outputName);
}
......@@ -618,6 +655,23 @@ static void *tdRSmaPersistExec(void *param) {
SSma *pSma = pRSmaStat->pSma;
STfs *pTfs = pSma->pVnode->pTfs;
int64_t toffset = 0;
bool isFileCreated = false;
if (TASK_TRIGGER_STAT_CANCELLED == atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat))) {
goto _end;
}
#if 0
SArray *suidList = taosArrayInit(1, sizeof(tb_uid_t));
if (tsdbGetStbIdList(SMA_META(pSma), 0, suidList) < 0) {
ASSERT(0);
} else {
for (int32_t i = 0; i < taosArrayGetSize(suidList); ++i) {
tb_uid_t suid = *(tb_uid_t *)taosArrayGet(suidList, i);
smaDebug("suid [%d] is %" PRIi64, i, suid);
}
}
#endif
void *infoHash = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), NULL);
if (!infoHash) {
......@@ -625,36 +679,81 @@ static void *tdRSmaPersistExec(void *param) {
}
STFile tFile = {0};
int32_t vid = 2;
char qTaskInfoFName[TSDB_FILENAME_LEN];
tdRSmaQTaskGetFName(vid, TD_QTASK_TMP_FILE, qTaskInfoFName);
tdInitTFile(&tFile, pTfs, qTaskInfoFName);
tdCreateTFile(&tFile, pTfs, true, -1);
int32_t vid = SMA_VID(pSma);
while (infoHash) {
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash;
#if 0
smaDebug("table %" PRIi64 " sleep 15s start ...", pRSmaInfo->items[0].pRsmaInfo->suid);
for (int32_t i = 15; i > 0; --i) {
taosSsleep(1);
smaDebug("table %" PRIi64 " countdown %d", pRSmaInfo->items[0].pRsmaInfo->suid, i);
}
smaDebug("table %" PRIi64 " sleep 15s end ...", pRSmaInfo->items[0].pRsmaInfo->suid);
#endif
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
qTaskInfo_t taskInfo = pRSmaInfo->items[i].taskInfo;
if (!taskInfo) {
smaDebug("vgId:%d, table %" PRIi64 " level %d qTaskInfo is NULL", vid, pRSmaInfo->suid, i + 1);
continue;
}
char *pOutput = NULL;
int32_t len = 0;
if (qSerializeTaskStatus(pRSmaInfo->items[0].taskInfo, &pOutput, &len) < 0) {
smaError("serialize rsma task for table %" PRIi64 " failed since %s", pRSmaInfo->items[0].pRsmaInfo->suid,
int8_t type = 0;
if (qSerializeTaskStatus(taskInfo, &pOutput, &len) < 0) {
smaError("vgId:%d, table %" PRIi64 " level %d serialize rsma task failed since %s", vid, pRSmaInfo->suid, i + 1,
terrstr(terrno));
goto _err;
} else {
smaWarn("serialize rsma task for table %" PRIi64 " success and len is %d", pRSmaInfo->items[0].pRsmaInfo->suid,
len);
if (!pOutput) {
smaDebug("vgId:%d, table %" PRIi64
" level %d serialize rsma task success but no output(len %d) and no need to persist",
vid, pRSmaInfo->suid, i + 1, len);
continue;
} else if (len <= 0) {
smaDebug("vgId:%d, table %" PRIi64 " level %d serialize rsma task success with len %d and no need to persist",
vid, pRSmaInfo->suid, i + 1, len);
taosMemoryFree(pOutput);
}
smaDebug("vgId:%d, table %" PRIi64 " level %d serialize rsma task success with len %d and need persist", vid,
pRSmaInfo->suid, i + 1, len);
#if 1
if (qDeserializeTaskStatus(taskInfo, pOutput, len) < 0) {
smaError("vgId:%d, table %" PRIi64 "level %d deserialize rsma task failed since %s", vid, pRSmaInfo->suid,
i + 1, terrstr(terrno));
} else {
smaDebug("vgId:%d, table %" PRIi64 " level %d deserialize rsma task success", vid, pRSmaInfo->suid, i + 1);
}
#endif
}
if (!isFileCreated) {
char qTaskInfoFName[TSDB_FILENAME_LEN];
tdRSmaQTaskGetFName(vid, TD_QTASK_TMP_FILE, qTaskInfoFName);
tdInitTFile(&tFile, pTfs, qTaskInfoFName);
tdCreateTFile(&tFile, pTfs, true, -1);
isFileCreated = true;
}
len += (sizeof(len) + sizeof(pRSmaInfo->suid));
tdAppendTFile(&tFile, &len, sizeof(len), &toffset);
tdAppendTFile(&tFile, &pRSmaInfo->suid, sizeof(pRSmaInfo->suid), &toffset);
tdAppendTFile(&tFile, pOutput, len, &toffset);
taosMemoryFree(pOutput);
}
infoHash = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), infoHash);
}
_end:
_normal:
if (isFileCreated) {
if (tdUpdateTFileHeader(&tFile) < 0) {
smaError("vgId:%d, failed to update tfile %s header since %s", vid, TD_FILE_FULL_NAME(&tFile), tstrerror(terrno));
tdCloseTFile(&tFile);
tdRemoveTFile(&tFile);
return NULL;
goto _err;
} else {
smaDebug("vgId:%d, succeed to update tfile %s header", vid, TD_FILE_FULL_NAME(&tFile));
}
tdCloseTFile(&tFile);
......@@ -663,29 +762,59 @@ _end:
strncpy(newFName, TD_FILE_FULL_NAME(&tFile), TSDB_FILENAME_LEN);
char *pos = strstr(newFName, tdQTaskInfoFname[TD_QTASK_TMP_FILE]);
strncpy(pos, tdQTaskInfoFname[TD_QTASK_CUR_FILE], TSDB_FILENAME_LEN - POINTER_DISTANCE(pos, newFName));
taosRenameFile(TD_FILE_FULL_NAME(&tFile), newFName);
atomic_store_8(&pRSmaStat->tmrStat, TASK_TRIGGER_STATUS__ACTIVE);
return NULL;
if (taosRenameFile(TD_FILE_FULL_NAME(&tFile), newFName) != 0) {
smaError("vgId:%d, failed to rename %s to %s", vid, TD_FILE_FULL_NAME(&tFile), newFName);
goto _err;
} else {
smaDebug("vgId:%d, succeed to rename %s to %s", vid, TD_FILE_FULL_NAME(&tFile), newFName);
}
}
goto _end;
_err:
atomic_store_8(&pRSmaStat->tmrStat, TASK_TRIGGER_STATUS__ACTIVE);
// remove the .tmp file
if (isFileCreated) {
tdRemoveTFile(&tFile);
}
_end:
if (TASK_TRIGGER_STAT_INACTIVE == atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat),
TASK_TRIGGER_STAT_INACTIVE,
TASK_TRIGGER_STAT_ACTIVE)) {
smaDebug("vgId:%d, persist task is active again", vid);
} else if (TASK_TRIGGER_STAT_CANCELLED == atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat),
TASK_TRIGGER_STAT_CANCELLED,
TASK_TRIGGER_STAT_FINISHED)) {
smaDebug("vgId:%d, persist task is cancelled", vid);
} else {
smaWarn("vgId:%d, persist task in abnormal stat %" PRIi8, vid, atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat)));
ASSERT(0);
}
atomic_store_8(RSMA_RUNNING_STAT(pRSmaStat), 0);
taosThreadExit(NULL);
return NULL;
}
static void tdRSmaPersistTask(SRSmaStat *pRSmaStat) {
smaWarn("%s:%d entry ", __func__, __LINE__);
TdThread threadId;
TdThreadAttr thAttr;
taosThreadAttrInit(&thAttr);
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_DETACHED);
if (taosThreadCreate(&threadId, &thAttr, tdRSmaPersistExec, pRSmaStat) != 0) {
smaError("failed to create thread to persist rsma qTaskInfo since %s", strerror(errno));
TdThread tid;
if (taosThreadCreate(&tid, &thAttr, tdRSmaPersistExec, pRSmaStat) != 0) {
if (TASK_TRIGGER_STAT_INACTIVE == atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat),
TASK_TRIGGER_STAT_INACTIVE,
TASK_TRIGGER_STAT_ACTIVE)) {
smaDebug("persist task is active again");
} else if (TASK_TRIGGER_STAT_CANCELLED == atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat),
TASK_TRIGGER_STAT_CANCELLED,
TASK_TRIGGER_STAT_FINISHED)) {
smaDebug(" persist task is cancelled and set finished");
} else {
smaWarn("persist task in abnormal stat %" PRIi8, atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat)));
ASSERT(0);
}
atomic_store_8(RSMA_RUNNING_STAT(pRSmaStat), 0);
}
taosThreadAttrDestroy(&thAttr);
smaWarn("%s:%d end ", __func__, __LINE__);
}
/**
......@@ -696,17 +825,33 @@ static void tdRSmaPersistTask(SRSmaStat *pRSmaStat) {
*/
static void tdRSmaPersistTrigger(void *param, void *tmrId) {
SRSmaStat *pRSmaStat = param;
if (atomic_load_8(&pRSmaStat->tmrStat) == TASK_TRIGGER_STATUS__ACTIVE) {
smaWarn("%s:%d THREAD:%" PRIi64 " rsma persistence start since active", __func__, __LINE__, taosGetSelfPthreadId());
atomic_store_8(&pRSmaStat->tmrStat, TASK_TRIGGER_STATUS__IN_ACTIVE);
// execution
int8_t tmrStat =
atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_ACTIVE, TASK_TRIGGER_STAT_INACTIVE);
switch (tmrStat) {
case TASK_TRIGGER_STAT_ACTIVE: {
atomic_store_8(RSMA_RUNNING_STAT(pRSmaStat), 1);
if (TASK_TRIGGER_STAT_CANCELLED != atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat),
TASK_TRIGGER_STAT_CANCELLED,
TASK_TRIGGER_STAT_FINISHED)) {
smaDebug("%s:%d rsma persistence start since active", __func__, __LINE__);
tdRSmaPersistTask(pRSmaStat);
taosTmrReset(tdRSmaPersistTrigger, RSMA_QTASK_PERSIST_MS, pRSmaStat, pRSmaStat->tmrHandle, &pRSmaStat->tmrId);
} else {
smaWarn("%s:%d THREAD:%" PRIi64 " rsma persistence not start since inactive", __func__, __LINE__,
taosGetSelfPthreadId());
atomic_store_8(RSMA_RUNNING_STAT(pRSmaStat), 0);
}
} break;
case TASK_TRIGGER_STAT_CANCELLED: {
atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_FINISHED);
smaDebug("%s:%d rsma persistence not start since cancelled and finished", __func__, __LINE__);
} break;
case TASK_TRIGGER_STAT_INACTIVE: {
smaDebug("%s:%d rsma persistence not start since inactive", __func__, __LINE__);
} break;
case TASK_TRIGGER_STAT_INIT: {
smaDebug("%s:%d rsma persistence not start since init", __func__, __LINE__);
} break;
default: {
smaWarn("%s:%d rsma persistence not start since unknown stat %" PRIi8, __func__, __LINE__, tmrStat);
} break;
}
taosTmrReset(tdRSmaPersistTrigger, 3600000, pRSmaStat, pRSmaStat->tmrHandle, &pRSmaStat->tmrId);
}
\ No newline at end of file
......@@ -15,6 +15,8 @@
#include "sma.h"
// smaFileUtil ================
#define TD_FILE_HEAD_SIZE 512
#define TD_FILE_STATE_OK 0
......@@ -32,7 +34,7 @@ static int32_t tdEncodeTFInfo(void **buf, STFInfo *pInfo) {
tlen += taosEncodeFixedU32(buf, pInfo->magic);
tlen += taosEncodeFixedU32(buf, pInfo->ftype);
tlen += taosEncodeFixedU32(buf, pInfo->fver);
tlen += taosEncodeFixedU64(buf, pInfo->fsize);
tlen += taosEncodeFixedI64(buf, pInfo->fsize);
return tlen;
}
......@@ -41,7 +43,7 @@ static void *tdDecodeTFInfo(void *buf, STFInfo *pInfo) {
buf = taosDecodeFixedU32(buf, &(pInfo->magic));
buf = taosDecodeFixedU32(buf, &(pInfo->ftype));
buf = taosDecodeFixedU32(buf, &(pInfo->fver));
buf = taosDecodeFixedU64(buf, &(pInfo->fsize));
buf = taosDecodeFixedI64(buf, &(pInfo->fsize));
return buf;
}
......@@ -236,3 +238,6 @@ int32_t tdCreateTFile(STFile *pTFile, STfs *pTfs, bool updateHeader, int8_t fTyp
}
int32_t tdRemoveTFile(STFile *pTFile) { return tfsRemoveFile(TD_FILE_F(pTFile)); }
// smaXXXUtil ================
// ...
\ No newline at end of file
......@@ -2876,6 +2876,30 @@ int32_t tsdbGetCtbIdList(SMeta* pMeta, int64_t suid, SArray* list) {
return TSDB_CODE_SUCCESS;
}
/**
* @brief Get all suids since suid
*
* @param pMeta
* @param suid return all suids in one vnode if suid is 0
* @param list
* @return int32_t
*/
int32_t tsdbGetStbIdList(SMeta* pMeta, int64_t suid, SArray* list) {
SMStbCursor* pCur = metaOpenStbCursor(pMeta, suid);
while (1) {
tb_uid_t id = metaStbCursorNext(pCur);
if (id == 0) {
break;
}
taosArrayPush(list, &id);
}
metaCloseStbCursor(pCur);
return TSDB_CODE_SUCCESS;
}
static void destroyHelper(void* param) {
if (param == NULL) {
return;
......
......@@ -72,7 +72,12 @@ int vnodeBegin(SVnode *pVnode) {
return 0;
}
int vnodeShouldCommit(SVnode *pVnode) { return pVnode->inUse->size > pVnode->config.szBuf / 3; }
int vnodeShouldCommit(SVnode *pVnode) {
if (pVnode->inUse) {
return pVnode->inUse->size > pVnode->config.szBuf / 3;
}
return false;
}
int vnodeSaveInfo(const char *dir, const SVnodeInfo *pInfo) {
char fname[TSDB_FILENAME_LEN];
......
......@@ -152,12 +152,13 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
return pVnode;
_err:
if (pVnode->pSma) smaClose(pVnode->pSma);
if (pVnode->pQuery) vnodeQueryClose(pVnode);
if (pVnode->pTq) tqClose(pVnode->pTq);
if (pVnode->pWal) walClose(pVnode->pWal);
if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb);
if (pVnode->pMeta) metaClose(pVnode->pMeta);
if (pVnode->pSma) smaClose(pVnode->pSma);
tsem_destroy(&(pVnode->canCommit));
taosMemoryFree(pVnode);
......@@ -166,13 +167,14 @@ _err:
void vnodeClose(SVnode *pVnode) {
if (pVnode) {
smaCloseEnv(pVnode->pSma);
vnodeCommit(pVnode);
vnodeSyncClose(pVnode);
vnodeQueryClose(pVnode);
walClose(pVnode->pWal);
smaCloseEx(pVnode->pSma);
tqClose(pVnode->pTq);
if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb);
smaClose(pVnode->pSma);
metaClose(pVnode->pMeta);
vnodeCloseBufPool(pVnode);
// destroy handle
......
......@@ -808,6 +808,7 @@ int32_t getMaximumIdleDurationSec();
* ops: root operator
* data: *data save the result of encode, need to be freed by caller
* length: *length save the length of *data
* nOptrWithVal: *nOptrWithVal save the number of optr with value
* return: result code, 0 means success
*/
int32_t encodeOperator(SOperatorInfo* ops, char** data, int32_t *length);
......
......@@ -2929,6 +2929,13 @@ int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* len
int32_t totalSize =
sizeof(int32_t) + sizeof(int32_t) + size * (sizeof(int32_t) + keyLen + sizeof(int32_t) + pSup->resultRowSize);
// no result
if (getTotalBufSize(pSup->pResultBuf) == 0) {
*result = NULL;
*length = 0;
return TSDB_CODE_SUCCESS;
}
*result = (char*)taosMemoryCalloc(1, totalSize);
if (*result == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
......@@ -4483,6 +4490,8 @@ int32_t encodeOperator(SOperatorInfo* ops, char** result, int32_t* length) {
return code;
}
ASSERT(currLength >= 0);
if (*result == NULL) {
*result = (char*)taosMemoryCalloc(1, currLength + sizeof(int32_t));
if (*result == NULL) {
......@@ -4507,7 +4516,6 @@ int32_t encodeOperator(SOperatorInfo* ops, char** result, int32_t* length) {
taosMemoryFree(pCurrent);
*length = *(int32_t*)(*result);
}
for (int32_t i = 0; i < ops->numOfDownstream; ++i) {
code = encodeOperator(ops->pDownstream[i], result, length);
if (code != TDB_CODE_SUCCESS) {
......
......@@ -157,14 +157,14 @@ int32_t taosRenameFile(const char *oldName, const char *newName) {
#ifdef WINDOWS
bool code = MoveFileEx(oldName, newName, MOVEFILE_REPLACE_EXISTING | MOVEFILE_COPY_ALLOWED);
if (!code) {
printf("failed to rename file %s to %s, reason:%s", oldName, newName, strerror(errno));
printf("failed to rename file %s to %s, reason:%s\n", oldName, newName, strerror(errno));
}
return !code;
#else
int32_t code = rename(oldName, newName);
if (code < 0) {
printf("failed to rename file %s to %s, reason:%s", oldName, newName, strerror(errno));
printf("failed to rename file %s to %s, reason:%s\n", oldName, newName, strerror(errno));
}
return code;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册