提交 fb861a39 编写于 作者: D dapan1121

enh: refactor tmq messages

上级 3a642f06
...@@ -3198,6 +3198,12 @@ static FORCE_INLINE void tFreeSBatchRspMsg(void* p) { ...@@ -3198,6 +3198,12 @@ static FORCE_INLINE void tFreeSBatchRspMsg(void* p) {
taosMemoryFree(pRsp->msg); taosMemoryFree(pRsp->msg);
} }
int32_t tSerializeSMqAskEpReq(void *buf, int32_t bufLen, SMqAskEpReq *pReq);
int32_t tDeserializeSMqAskEpReq(void *buf, int32_t bufLen, SMqAskEpReq *pReq);
int32_t tSerializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq);
int32_t tDeserializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq);
#pragma pack(pop) #pragma pack(pop)
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -728,12 +728,26 @@ void tmqSendHbReq(void* param, void* tmrId) { ...@@ -728,12 +728,26 @@ void tmqSendHbReq(void* param, void* tmrId) {
taosMemoryFree(param); taosMemoryFree(param);
return; return;
} }
int64_t consumerId = tmq->consumerId;
int32_t epoch = tmq->epoch; SMqHbReq req = {0};
SMqHbReq* pReq = taosMemoryMalloc(sizeof(SMqHbReq)); req.consumerId = tmq->consumerId;
if (pReq == NULL) goto OVER; req.epoch = tmq->epoch;
pReq->consumerId = htobe64(consumerId);
pReq->epoch = epoch; int32_t tlen = tSerializeSMqHbReq(NULL, 0, &req);
if (tlen < 0) {
tscError("tSerializeSMqHbReq failed");
return;
}
void *pReq = taosMemoryCalloc(1, tlen);
if (tlen < 0) {
tscError("failed to malloc MqHbReq msg, size:%d", tlen);
return;
}
if (tSerializeSMqHbReq(pReq, tlen, &req) < 0) {
tscError("tSerializeSMqHbReq %d failed", tlen);
taosMemoryFree(pReq);
return;
}
SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (sendInfo == NULL) { if (sendInfo == NULL) {
...@@ -1378,21 +1392,31 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) { ...@@ -1378,21 +1392,31 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) {
} }
atomic_store_32(&tmq->epSkipCnt, 0); atomic_store_32(&tmq->epSkipCnt, 0);
#endif #endif
int32_t tlen = sizeof(SMqAskEpReq); SMqAskEpReq req = {0};
SMqAskEpReq* req = taosMemoryCalloc(1, tlen); req.consumerId = tmq->consumerId;
if (req == NULL) { req.epoch = tmq->epoch;
tscError("failed to malloc get subscribe ep buf"); strcpy(req.cgroup, tmq->groupId);
/*atomic_store_8(&tmq->epStatus, 0);*/
int32_t tlen = tSerializeSMqAskEpReq(NULL, 0, &req);
if (tlen < 0) {
tscError("tSerializeSMqAskEpReq failed");
return -1;
}
void *pReq = taosMemoryCalloc(1, tlen);
if (tlen < 0) {
tscError("failed to malloc askEpReq msg, size:%d", tlen);
return -1;
}
if (tSerializeSMqAskEpReq(pReq, tlen, &req) < 0) {
tscError("tSerializeSMqAskEpReq %d failed", tlen);
taosMemoryFree(pReq);
return -1; return -1;
} }
req->consumerId = htobe64(tmq->consumerId);
req->epoch = htonl(tmq->epoch);
strcpy(req->cgroup, tmq->groupId);
SMqAskEpCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam)); SMqAskEpCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
if (pParam == NULL) { if (pParam == NULL) {
tscError("failed to malloc subscribe param"); tscError("failed to malloc subscribe param");
taosMemoryFree(req); taosMemoryFree(pReq);
/*atomic_store_8(&tmq->epStatus, 0);*/ /*atomic_store_8(&tmq->epStatus, 0);*/
return -1; return -1;
} }
...@@ -1405,13 +1429,13 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) { ...@@ -1405,13 +1429,13 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) {
if (sendInfo == NULL) { if (sendInfo == NULL) {
tsem_destroy(&pParam->rspSem); tsem_destroy(&pParam->rspSem);
taosMemoryFree(pParam); taosMemoryFree(pParam);
taosMemoryFree(req); taosMemoryFree(pReq);
/*atomic_store_8(&tmq->epStatus, 0);*/ /*atomic_store_8(&tmq->epStatus, 0);*/
return -1; return -1;
} }
sendInfo->msgInfo = (SDataBuf){ sendInfo->msgInfo = (SDataBuf){
.pData = req, .pData = pReq,
.len = tlen, .len = tlen,
.handle = NULL, .handle = NULL,
}; };
......
...@@ -4579,6 +4579,71 @@ int32_t tDeserializeSBatchRsp(void *buf, int32_t bufLen, SBatchRsp *pRsp) { ...@@ -4579,6 +4579,71 @@ int32_t tDeserializeSBatchRsp(void *buf, int32_t bufLen, SBatchRsp *pRsp) {
} }
int32_t tSerializeSMqAskEpReq(void *buf, int32_t bufLen, SMqAskEpReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI64(&encoder, pReq->consumerId) < 0) return -1;
if (tEncodeI32(&encoder, pReq->epoch) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->cgroup) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSMqAskEpReq(void *buf, int32_t bufLen, SMqAskEpReq *pReq) {
SDecoder decoder = {0};
tDecoderInit(&decoder, (char *)buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->consumerId) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->epoch) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->cgroup) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
int32_t tSerializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI64(&encoder, pReq->consumerId) < 0) return -1;
if (tEncodeI32(&encoder, pReq->epoch) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) {
SDecoder decoder = {0};
tDecoderInit(&decoder, (char *)buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->consumerId) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->epoch) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
int32_t tSerializeSSchedulerHbReq(void *buf, int32_t bufLen, SSchedulerHbReq *pReq) { int32_t tSerializeSSchedulerHbReq(void *buf, int32_t bufLen, SSchedulerHbReq *pReq) {
int32_t headLen = sizeof(SMsgHead); int32_t headLen = sizeof(SMsgHead);
if (buf != NULL) { if (buf != NULL) {
......
...@@ -325,9 +325,14 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { ...@@ -325,9 +325,14 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node; SMnode *pMnode = pMsg->info.node;
SMqHbReq *pReq = (SMqHbReq *)pMsg->pCont; SMqHbReq req = {0};
int64_t consumerId = be64toh(pReq->consumerId);
if (tDeserializeSMqHbReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
int64_t consumerId = req.consumerId;
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId); SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
if (pConsumer == NULL) { if (pConsumer == NULL) {
mError("consumer %" PRId64 " not exist", consumerId); mError("consumer %" PRId64 " not exist", consumerId);
...@@ -359,10 +364,16 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { ...@@ -359,10 +364,16 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node; SMnode *pMnode = pMsg->info.node;
SMqAskEpReq *pReq = (SMqAskEpReq *)pMsg->pCont; SMqAskEpReq req = {0};
SMqAskEpRsp rsp = {0}; SMqAskEpRsp rsp = {0};
int64_t consumerId = be64toh(pReq->consumerId);
int32_t epoch = ntohl(pReq->epoch); if (tDeserializeSMqAskEpReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
int64_t consumerId = req.consumerId;
int32_t epoch = req.epoch;
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId); SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
if (pConsumer == NULL) { if (pConsumer == NULL) {
...@@ -370,7 +381,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { ...@@ -370,7 +381,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
return -1; return -1;
} }
ASSERT(strcmp(pReq->cgroup, pConsumer->cgroup) == 0); ASSERT(strcmp(req.cgroup, pConsumer->cgroup) == 0);
atomic_store_32(&pConsumer->hbStatus, 0); atomic_store_32(&pConsumer->hbStatus, 0);
......
...@@ -301,6 +301,7 @@ int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg) { ...@@ -301,6 +301,7 @@ int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg) {
batchRsp.pRsps = taosArrayInit(msgNum, sizeof(SBatchRspMsg)); batchRsp.pRsps = taosArrayInit(msgNum, sizeof(SBatchRspMsg));
if (NULL == batchRsp.pRsps) { if (NULL == batchRsp.pRsps) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
qError("taosArrayInit %d SBatchRspMsg failed", msgNum);
goto _exit; goto _exit;
} }
...@@ -337,15 +338,18 @@ int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg) { ...@@ -337,15 +338,18 @@ int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg) {
rspSize = tSerializeSBatchRsp(NULL, 0, &batchRsp); rspSize = tSerializeSBatchRsp(NULL, 0, &batchRsp);
if (rspSize < 0) { if (rspSize < 0) {
qError("tSerializeSBatchRsp failed");
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit; goto _exit;
} }
pRsp = rpcMallocCont(rspSize); pRsp = rpcMallocCont(rspSize);
if (pRsp == NULL) { if (pRsp == NULL) {
qError("rpcMallocCont %d failed", rspSize);
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit; goto _exit;
} }
if (tSerializeSBatchRsp(pRsp, rspSize, &batchRsp)) { if (tSerializeSBatchRsp(pRsp, rspSize, &batchRsp) < 0) {
qError("tSerializeSBatchRsp %d failed", rspSize);
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit; goto _exit;
} }
...@@ -362,7 +366,7 @@ _exit: ...@@ -362,7 +366,7 @@ _exit:
qError("vnd get batch meta failed cause of %s", tstrerror(code)); qError("vnd get batch meta failed cause of %s", tstrerror(code));
} }
taosArrayDestroyEx(batchRsp.pRsps, vnodeFreeSBatchRspMsg); taosArrayDestroyEx(batchRsp.pRsps, tFreeSBatchRspMsg);
tmsgSendRsp(&rspMsg); tmsgSendRsp(&rspMsg);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册