提交 43f7e18c 编写于 作者: S Shengliang Guan

serialize timer msg

上级 03c026a5
...@@ -720,8 +720,11 @@ int32_t tSerializeSStatusRsp(void* buf, int32_t bufLen, SStatusRsp* pRsp); ...@@ -720,8 +720,11 @@ int32_t tSerializeSStatusRsp(void* buf, int32_t bufLen, SStatusRsp* pRsp);
int32_t tDeserializeSStatusRsp(void* buf, int32_t bufLen, SStatusRsp* pRsp); int32_t tDeserializeSStatusRsp(void* buf, int32_t bufLen, SStatusRsp* pRsp);
typedef struct { typedef struct {
int32_t reserve; int32_t reserved;
} STransReq; } SMTimerReq;
int32_t tSerializeSMTimerMsg(void* buf, int32_t bufLen, SMTimerReq* pReq);
int32_t tDeserializeSMTimerMsg(void* buf, int32_t bufLen, SMTimerReq* pReq);
typedef struct { typedef struct {
int32_t id; int32_t id;
...@@ -1195,10 +1198,6 @@ static FORCE_INLINE void* tDeserializeSMVSubscribeReq(void* buf, SMVSubscribeReq ...@@ -1195,10 +1198,6 @@ static FORCE_INLINE void* tDeserializeSMVSubscribeReq(void* buf, SMVSubscribeReq
return buf; return buf;
} }
typedef struct {
int32_t reserved;
} SMqTmrMsg;
typedef struct { typedef struct {
const char* key; const char* key;
SArray* lostConsumers; // SArray<int64_t> SArray* lostConsumers; // SArray<int64_t>
......
...@@ -142,7 +142,7 @@ enum { ...@@ -142,7 +142,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "mnode-drop-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "mnode-drop-topic", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_SUBSCRIBE, "mnode-subscribe", SCMSubscribeReq, SCMSubscribeRsp) TD_DEF_MSG_TYPE(TDMT_MND_SUBSCRIBE, "mnode-subscribe", SCMSubscribeReq, SCMSubscribeRsp)
TD_DEF_MSG_TYPE(TDMT_MND_GET_SUB_EP, "mnode-get-sub-ep", SMqCMGetSubEpReq, SMqCMGetSubEpRsp) TD_DEF_MSG_TYPE(TDMT_MND_GET_SUB_EP, "mnode-get-sub-ep", SMqCMGetSubEpReq, SMqCMGetSubEpRsp)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mnode-mq-timer", SMqTmrMsg, SMqTmrMsg) TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mnode-mq-timer", SMTimerReq, SMTimerReq)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_DO_REBALANCE, "mnode-mq-do-rebalance", SMqDoRebalanceMsg, SMqDoRebalanceMsg) TD_DEF_MSG_TYPE(TDMT_MND_MQ_DO_REBALANCE, "mnode-mq-do-rebalance", SMqDoRebalanceMsg, SMqDoRebalanceMsg)
// Requests handled by VNODE // Requests handled by VNODE
......
...@@ -1984,7 +1984,7 @@ int32_t tDeserializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) { ...@@ -1984,7 +1984,7 @@ int32_t tDeserializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) {
if (tStartDecode(&decoder) < 0) return -1; if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->acctId) < 0) return -1; if (tDecodeI32(&decoder, &pRsp->acctId) < 0) return -1;
if (tDecodeI64(&decoder, &pRsp->clusterId) < 0) return -1; if (tDecodeI64(&decoder, &pRsp->clusterId) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->connId) < 0) return -1; if (tDecodeI32(&decoder, &pRsp->connId) < 0) return -1;
if (tDecodeI8(&decoder, &pRsp->superUser) < 0) return -1; if (tDecodeI8(&decoder, &pRsp->superUser) < 0) return -1;
if (tDecodeSEpSet(&decoder, &pRsp->epSet) < 0) return -1; if (tDecodeSEpSet(&decoder, &pRsp->epSet) < 0) return -1;
...@@ -1994,3 +1994,28 @@ int32_t tDeserializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) { ...@@ -1994,3 +1994,28 @@ int32_t tDeserializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) {
tCoderClear(&decoder); tCoderClear(&decoder);
return 0; return 0;
} }
int32_t tSerializeSMTimerMsg(void *buf, int32_t bufLen, SMTimerReq *pReq) {
SCoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI32(&encoder, pReq->reserved) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tCoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSMTimerMsg(void *buf, int32_t bufLen, SMTimerReq *pReq) {
SCoder decoder = {0};
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->reserved) < 0) return -1;
tEndDecode(&decoder);
tCoderClear(&decoder);
return 0;
}
...@@ -394,7 +394,7 @@ void dndSendStatusReq(SDnode *pDnode) { ...@@ -394,7 +394,7 @@ void dndSendStatusReq(SDnode *pDnode) {
static void dndUpdateDnodeCfg(SDnode *pDnode, SDnodeCfg *pCfg) { static void dndUpdateDnodeCfg(SDnode *pDnode, SDnodeCfg *pCfg) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt; SDnodeMgmt *pMgmt = &pDnode->dmgmt;
if (pMgmt->dnodeId == 0) { if (pMgmt->dnodeId == 0) {
dInfo("set dnodeId:%d clusterId:0x%" PRId64, pCfg->dnodeId, pCfg->clusterId); dInfo("set dnodeId:%d clusterId:%" PRId64, pCfg->dnodeId, pCfg->clusterId);
taosWLockLatch(&pMgmt->latch); taosWLockLatch(&pMgmt->latch);
pMgmt->dnodeId = pCfg->dnodeId; pMgmt->dnodeId = pCfg->dnodeId;
pMgmt->clusterId = pCfg->clusterId; pMgmt->clusterId = pCfg->clusterId;
......
...@@ -60,11 +60,25 @@ void mndSendRedirectRsp(SMnode *pMnode, SRpcMsg *pMsg) { ...@@ -60,11 +60,25 @@ void mndSendRedirectRsp(SMnode *pMnode, SRpcMsg *pMsg) {
} }
} }
static void *mndBuildTimerMsg(int32_t *pContLen) {
SMTimerReq timerReq = {0};
int32_t contLen = tSerializeSMTimerMsg(NULL, 0, &timerReq);
if (contLen <= 0) return NULL;
void *pReq = rpcMallocCont(contLen);
if (pReq == NULL) return NULL;
tSerializeSMTimerMsg(pReq, contLen, &timerReq);
*pContLen = contLen;
return pReq;
}
static void mndTransReExecute(void *param, void *tmrId) { static void mndTransReExecute(void *param, void *tmrId) {
SMnode *pMnode = param; SMnode *pMnode = param;
if (mndIsMaster(pMnode)) { if (mndIsMaster(pMnode)) {
STransReq *pMsg = rpcMallocCont(sizeof(STransReq)); int32_t contLen = 0;
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS, .pCont = pMsg, .contLen = sizeof(STransReq)}; void *pReq = mndBuildTimerMsg(&contLen);
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS, .pCont = pReq, .contLen = contLen};
pMnode->putReqToMWriteQFp(pMnode->pDnode, &rpcMsg); pMnode->putReqToMWriteQFp(pMnode->pDnode, &rpcMsg);
} }
...@@ -74,8 +88,9 @@ static void mndTransReExecute(void *param, void *tmrId) { ...@@ -74,8 +88,9 @@ static void mndTransReExecute(void *param, void *tmrId) {
static void mndCalMqRebalance(void *param, void *tmrId) { static void mndCalMqRebalance(void *param, void *tmrId) {
SMnode *pMnode = param; SMnode *pMnode = param;
if (mndIsMaster(pMnode)) { if (mndIsMaster(pMnode)) {
SMqTmrMsg *pMsg = rpcMallocCont(sizeof(SMqTmrMsg)); int32_t contLen = 0;
SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_TIMER, .pCont = pMsg, .contLen = sizeof(SMqTmrMsg)}; void *pReq = mndBuildTimerMsg(&contLen);
SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_TIMER, .pCont = pReq, .contLen = contLen};
pMnode->putReqToMReadQFp(pMnode->pDnode, &rpcMsg); pMnode->putReqToMReadQFp(pMnode->pDnode, &rpcMsg);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册