提交 6d774dee 编写于 作者: D dapan1121

enh: refact tmq pull request message

上级 a6c3bc20
...@@ -4723,6 +4723,84 @@ void tFreeSSubQueryMsg(SSubQueryMsg *pReq) { ...@@ -4723,6 +4723,84 @@ void tFreeSSubQueryMsg(SSubQueryMsg *pReq) {
taosMemoryFreeClear(pReq->msg); taosMemoryFreeClear(pReq->msg);
} }
int32_t tSerializeSTqOffsetVal(SEncoder *pEncoder, STqOffsetVal *pOffset) {
if (tEncodeI8(pEncoder, pOffset->type) < 0) return -1;
if (tEncodeI64(pEncoder, pOffset->uid) < 0) return -1;
if (tEncodeI64(pEncoder, pOffset->ts) < 0) return -1;
return 0;
}
int32_t tDerializeSTqOffsetVal(SDecoder *pDecoder, STqOffsetVal *pOffset) {
if (tDecodeI8(pDecoder, &pOffset->type) < 0) return -1;
if (tDecodeI64(pDecoder, &pOffset->uid) < 0) return -1;
if (tDecodeI64(pDecoder, &pOffset->ts) < 0) return -1;
return 0;
}
int32_t tSerializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) {
int32_t headLen = sizeof(SMsgHead);
if (buf != NULL) {
buf = (char *)buf + headLen;
bufLen -= headLen;
}
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->subKey) < 0) return -1;
if (tEncodeI8(&encoder, pReq->withTbName) < 0) return -1;
if (tEncodeI8(&encoder, pReq->useSnapshot) < 0) return -1;
if (tEncodeI32(&encoder, pReq->epoch) < 0) return -1;
if (tEncodeU64(&encoder, pReq->reqId) < 0) return -1;
if (tEncodeI64(&encoder, pReq->consumerId) < 0) return -1;
if (tEncodeI64(&encoder, pReq->timeout) < 0) return -1;
if (tSerializeSTqOffsetVal(&encoder, &pReq->reqOffset) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
if (buf != NULL) {
SMsgHead *pHead = (SMsgHead *)((char *)buf - headLen);
pHead->vgId = htonl(pReq->header.vgId);
pHead->contLen = htonl(tlen + headLen);
}
return tlen + headLen;
}
int32_t tDeserializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) {
int32_t headLen = sizeof(SMsgHead);
SMsgHead *pHead = buf;
pHead->vgId = pReq->header.vgId;
pHead->contLen = pReq->header.contLen;
SDecoder decoder = {0};
tDecoderInit(&decoder, (char *)buf + headLen, bufLen - headLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->subKey) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->withTbName) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->useSnapshot) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->epoch) < 0) return -1;
if (tDecodeU64(&decoder, &pReq->reqId) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->consumerId) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->timeout) < 0) return -1;
if (tDerializeSTqOffsetVal(&decoder, &pReq->reqOffset) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
int32_t tSerializeSTaskDropReq(void *buf, int32_t bufLen, STaskDropReq *pReq) { int32_t tSerializeSTaskDropReq(void *buf, int32_t bufLen, STaskDropReq *pReq) {
int32_t headLen = sizeof(SMsgHead); int32_t headLen = sizeof(SMsgHead);
if (buf != NULL) { if (buf != NULL) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册