未验证 提交 77f34c8f 编写于 作者: wmmhello's avatar wmmhello 提交者: GitHub

Merge pull request #14094 from taosdata/feature/TD-16524

fix: error in windows
......@@ -137,6 +137,8 @@ extern bool tsSmlDataFormat;
// internal
extern int32_t tsTransPullupInterval;
extern int32_t tsMqRebalanceInterval;
extern int32_t tsTtlUnit;
extern int32_t tsTtlPushInterval;
#define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize)
......
......@@ -145,6 +145,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mq-tmr", SMTimerReq, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TELEM_TIMER, "telem-tmr", SMTimerReq, SMTimerReq)
TD_DEF_MSG_TYPE(TDMT_MND_TRANS_TIMER, "trans-tmr", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TTL_TIMER, "ttl-tmr", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_KILL_TRANS, "kill-trans", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_KILL_QUERY, "kill-query", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_KILL_CONN, "kill-conn", NULL, NULL)
......
......@@ -187,6 +187,9 @@ bool tsStartUdfd = true;
// internal
int32_t tsTransPullupInterval = 2;
int32_t tsMqRebalanceInterval = 2;
int32_t tsTtlUnit = 86400;
int32_t tsTtlPushInterval = 60;
void taosAddDataDir(int32_t index, char *v1, int32_t level, int32_t primary) {
tstrncpy(tsDiskCfg[index].dir, v1, TSDB_FILENAME_LEN);
......@@ -467,6 +470,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "transPullupInterval", tsTransPullupInterval, 1, 10000, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "mqRebalanceInterval", tsMqRebalanceInterval, 1, 10000, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "ttlUnit", tsTtlUnit, 1, 86400*365, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "ttlPushInterval", tsTtlPushInterval, 1, 10000, 1) != 0) return -1;
if (cfgAddBool(pCfg, "udf", tsStartUdfd, 0) != 0) return -1;
return 0;
......@@ -619,6 +624,8 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsTransPullupInterval = cfgGetItem(pCfg, "transPullupInterval")->i32;
tsMqRebalanceInterval = cfgGetItem(pCfg, "mqRebalanceInterval")->i32;
tsTtlUnit = cfgGetItem(pCfg, "ttlUnit")->i32;
tsTtlPushInterval = cfgGetItem(pCfg, "ttlPushInterval")->i32;
tsStartUdfd = cfgGetItem(pCfg, "udf")->bval;
......
......@@ -65,6 +65,13 @@ static void mndPullupTrans(SMnode *pMnode) {
}
}
static void mndTtlTimer(SMnode *pMnode) {
int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen);
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TTL_TIMER, .pCont = pReq, .contLen = contLen};
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
}
static void mndCalMqRebalance(SMnode *pMnode) {
int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen);
......@@ -83,41 +90,6 @@ static void mndPullupTelem(SMnode *pMnode) {
}
}
static void mndPushTtlTime(SMnode *pMnode) {
SSdb *pSdb = pMnode->pSdb;
SVgObj *pVgroup = NULL;
void *pIter = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break;
int32_t contLen = sizeof(SMsgHead) + sizeof(int32_t);
SMsgHead *pHead = rpcMallocCont(contLen);
if (pHead == NULL) {
sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pVgroup);
continue;
}
pHead->contLen = htonl(contLen);
pHead->vgId = htonl(pVgroup->vgId);
int32_t t = taosGetTimestampSec();
*(int32_t *)(POINTER_SHIFT(pHead, sizeof(SMsgHead))) = htonl(t);
SRpcMsg rpcMsg = {.msgType = TDMT_VND_DROP_TTL_TABLE, .pCont = pHead, .contLen = contLen};
SEpSet epSet = mndGetVgroupEpset(pMnode, pVgroup);
int32_t code = tmsgSendReq(&epSet, &rpcMsg);
if (code != 0) {
mError("failed to send ttl time seed msg, code:0x%x", code);
} else {
mInfo("send ttl time seed msg, time:%d", t);
}
sdbRelease(pSdb, pVgroup);
}
}
static void *mndThreadFp(void *param) {
SMnode *pMnode = param;
int64_t lastTime = 0;
......@@ -125,14 +97,13 @@ static void *mndThreadFp(void *param) {
while (1) {
lastTime++;
if (lastTime % (864000) == 0) { // sleep 1 day for ttl
mndPushTtlTime(pMnode);
}
taosMsleep(100);
if (mndGetStop(pMnode)) break;
if (lastTime % (tsTransPullupInterval * 10) == 1) {
mndTtlTimer(pMnode);
}
if (lastTime % (tsTransPullupInterval * 10) == 0) {
mndPullupTrans(pMnode);
}
......@@ -558,12 +529,12 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
if (!IsReq(pMsg)) return 0;
if (mndAcquireRpcRef(pMsg->info.node) == 0) return 0;
if (pMsg->msgType == TDMT_MND_MQ_TIMER || pMsg->msgType == TDMT_MND_TELEM_TIMER ||
pMsg->msgType == TDMT_MND_TRANS_TIMER) {
pMsg->msgType == TDMT_MND_TRANS_TIMER || TDMT_MND_TTL_TIMER) {
return -1;
}
const STraceId *trace = &pMsg->info.traceId;
mGError("msg:%p, failed to check mnode state since %s, type:%s", pMsg, terrstr(), TMSG_INFO(pMsg->msgType));
mError("msg:%p, failed to check mnode state since %s, type:%s", pMsg, terrstr(), TMSG_INFO(pMsg->msgType));
SEpSet epSet = {0};
mndGetMnodeEpSet(pMsg->info.node, &epSet);
......
......@@ -37,6 +37,7 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw);
static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb);
static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb);
static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew);
static int32_t mndProcessTtlTimer(SRpcMsg *pReq);
static int32_t mndProcessCreateStbReq(SRpcMsg *pReq);
static int32_t mndProcessAlterStbReq(SRpcMsg *pReq);
static int32_t mndProcessDropStbReq(SRpcMsg *pReq);
......@@ -63,6 +64,7 @@ int32_t mndInitStb(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_VND_ALTER_STB_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_DROP_STB_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_TABLE_META, mndProcessTableMetaReq);
mndSetMsgHandle(pMnode, TDMT_MND_TTL_TIMER, mndProcessTtlTimer);
mndSetMsgHandle(pMnode, TDMT_MND_TABLE_CFG, mndProcessTableCfgReq);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STB, mndRetrieveStb);
......@@ -799,6 +801,43 @@ int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *p
return 0;
}
static int32_t mndProcessTtlTimer(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
SVgObj *pVgroup = NULL;
void *pIter = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break;
int32_t contLen = sizeof(SMsgHead) + sizeof(int32_t);
SMsgHead *pHead = rpcMallocCont(contLen);
if (pHead == NULL) {
sdbCancelFetch(pSdb, pVgroup);
sdbRelease(pSdb, pVgroup);
continue;
}
pHead->contLen = htonl(contLen);
pHead->vgId = htonl(pVgroup->vgId);
int32_t t = taosGetTimestampSec();
*(int32_t *)((char *)pHead + sizeof(SMsgHead)) = htonl(t);
SRpcMsg rpcMsg = {.msgType = TDMT_VND_DROP_TTL_TABLE, .pCont = pHead, .contLen = contLen};
SEpSet epSet = mndGetVgroupEpset(pMnode, pVgroup);
int32_t code = tmsgSendReq(&epSet, &rpcMsg);
if (code != 0) {
mError("failed to send ttl time seed, code:0x%x", code);
} else {
mDebug("send ttl time seed success, time:%d", t);
}
sdbRelease(pSdb, pVgroup);
}
return 0;
}
static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
int32_t code = -1;
......
......@@ -56,6 +56,7 @@ static bool mndCannotExecuteTransAction(SMnode *pMnode) { return !pMnode->dep
static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans);
static int32_t mndProcessTransReq(SRpcMsg *pReq);
static int32_t mndProcessTtl(SRpcMsg *pReq);
static int32_t mndProcessKillTransReq(SRpcMsg *pReq);
static int32_t mndRetrieveTrans(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
......
......@@ -400,8 +400,7 @@ static void metaBuildTtlIdxKey(STtlIdxKey *ttlKey, const SMetaEntry *pME){
if (ttlDays <= 0) return;
ttlKey->dtime = ctime / 1000 + ttlDays * 24 * 60 * 60;
// ttlKey->dtime = ctime / 1000 + ttlDays;
ttlKey->dtime = ctime / 1000 + ttlDays * tsTtlUnit;
ttlKey->uid = pME->uid;
}
......
......@@ -315,7 +315,7 @@ static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *p
if (tbUids == NULL) return TSDB_CODE_OUT_OF_MEMORY;
int32_t t = ntohl(*(int32_t *)pReq);
vError("rec ttl time:%d", t);
vDebug("vgId:%d, recv ttl msg, time:%d", pVnode->config.vgId, t);
int32_t ret = metaTtlDropTable(pVnode->pMeta, t, tbUids);
if (ret != 0) {
goto end;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册