diff --git a/include/common/tmsg.h b/include/common/tmsg.h index b4674aba54d6e45f2b99e9724db042a2283a1fe1..d78e771fcf08af0f864ce42a3b944ab93b16b833 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2807,39 +2807,49 @@ typedef struct { int64_t suid; } SMqRebVgReq; -static FORCE_INLINE int32_t tEncodeSMqRebVgReq(void** buf, const SMqRebVgReq* pReq) { - int32_t tlen = 0; - tlen += taosEncodeFixedI64(buf, pReq->leftForVer); - tlen += taosEncodeFixedI32(buf, pReq->vgId); - tlen += taosEncodeFixedI64(buf, pReq->oldConsumerId); - tlen += taosEncodeFixedI64(buf, pReq->newConsumerId); - tlen += taosEncodeString(buf, pReq->subKey); - tlen += taosEncodeFixedI8(buf, pReq->subType); - tlen += taosEncodeFixedI8(buf, pReq->withMeta); +static FORCE_INLINE int tEncodeSMqRebVgReq(SEncoder *pCoder, const SMqRebVgReq* pReq) { + if (tStartEncode(pCoder) < 0) return -1; + if (tEncodeI64(pCoder, pReq->leftForVer) < 0) return -1; + if (tEncodeI32(pCoder, pReq->vgId) < 0) return -1; + if (tEncodeI64(pCoder, pReq->oldConsumerId) < 0) return -1; + if (tEncodeI64(pCoder, pReq->newConsumerId) < 0) return -1; + if (tEncodeCStr(pCoder, pReq->subKey) < 0) return -1; + if (tEncodeI8(pCoder, pReq->subType) < 0) return -1; + if (tEncodeI8(pCoder, pReq->withMeta) < 0) return -1; + if (pReq->subType == TOPIC_SUB_TYPE__COLUMN) { - tlen += taosEncodeString(buf, pReq->qmsg); + if (tEncodeCStr(pCoder, pReq->qmsg) < 0) return -1; } else if (pReq->subType == TOPIC_SUB_TYPE__TABLE) { - tlen += taosEncodeFixedI64(buf, pReq->suid); - tlen += taosEncodeString(buf, pReq->qmsg); + if (tEncodeI64(pCoder, pReq->suid) < 0) return -1; + if (tEncodeCStr(pCoder, pReq->qmsg) < 0) return -1; } - return tlen; + tEndEncode(pCoder); + return 0; } -static FORCE_INLINE void* tDecodeSMqRebVgReq(const void* buf, SMqRebVgReq* pReq) { - buf = taosDecodeFixedI64(buf, &pReq->leftForVer); - buf = taosDecodeFixedI32(buf, &pReq->vgId); - buf = taosDecodeFixedI64(buf, &pReq->oldConsumerId); - buf = taosDecodeFixedI64(buf, &pReq->newConsumerId); - buf = taosDecodeStringTo(buf, pReq->subKey); - buf = taosDecodeFixedI8(buf, &pReq->subType); - buf = taosDecodeFixedI8(buf, &pReq->withMeta); +static FORCE_INLINE int tDecodeSMqRebVgReq(SDecoder *pCoder, SMqRebVgReq* pReq) { + if (tStartDecode(pCoder) < 0) return -1; + + if (tDecodeI64(pCoder, &pReq->leftForVer) < 0) return -1; + + if (tDecodeI32(pCoder, &pReq->vgId) < 0) return -1; + if (tDecodeI64(pCoder, &pReq->oldConsumerId) < 0) return -1; + if (tDecodeI64(pCoder, &pReq->newConsumerId) < 0) return -1; + if (tDecodeCStrTo(pCoder, pReq->subKey) < 0) return -1; + if (tDecodeI8(pCoder, &pReq->subType) < 0) return -1; + if (tDecodeI8(pCoder, &pReq->withMeta) < 0) return -1; + if (pReq->subType == TOPIC_SUB_TYPE__COLUMN) { - buf = taosDecodeString(buf, &pReq->qmsg); + if (tDecodeCStr(pCoder, &pReq->qmsg) < 0) return -1; } else if (pReq->subType == TOPIC_SUB_TYPE__TABLE) { - buf = taosDecodeFixedI64(buf, &pReq->suid); - buf = taosDecodeString(buf, &pReq->qmsg); + if (tDecodeI64(pCoder, &pReq->suid) < 0) return -1; + if (!tDecodeIsEnd(pCoder)){ + if (tDecodeCStr(pCoder, &pReq->qmsg) < 0) return -1; + } } - return (void*)buf; + + tEndDecode(pCoder); + return 0; } typedef struct { diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index e62102fa77254c37e0d8244ae437a9570f870dfb..74421afa338bdb3f403d78fd9634ebf912d2e501 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -111,7 +111,14 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri req.suid = pSub->stbUid; tstrncpy(req.subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN); - int32_t tlen = sizeof(SMsgHead) + tEncodeSMqRebVgReq(NULL, &req); + int32_t tlen = 0; + int32_t ret = 0; + tEncodeSize(tEncodeSMqRebVgReq, &req, tlen, ret); + if (ret < 0) { + return -1; + } + + tlen += sizeof(SMsgHead); void *buf = taosMemoryMalloc(tlen); if (buf == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -123,8 +130,14 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri pMsgHead->contLen = htonl(tlen); pMsgHead->vgId = htonl(pRebVg->pVgEp->vgId); - void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); - tEncodeSMqRebVgReq(&abuf, &req); + SEncoder encoder = {0}; + tEncoderInit(&encoder, POINTER_SHIFT(buf, sizeof(SMsgHead)), tlen); + if (tEncodeSMqRebVgReq(&encoder, &req) < 0) { + taosMemoryFreeClear(buf); + tEncoderClear(&encoder); + return -1; + } + tEncoderClear(&encoder); *pBuf = buf; *pLen = tlen; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 11760b5fd0e0831e7ff293b600272b19f7d4e087..35055a00e3045e7989b02795e14e18adc4df08d8 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -633,7 +633,16 @@ int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { int ret = 0; SMqRebVgReq req = {0}; - tDecodeSMqRebVgReq(msg, &req); + SDecoder dc = {0}; + + tDecoderInit(&dc, msg, msgLen); + + // decode req + if (tDecodeSMqRebVgReq(&dc, &req) < 0) { + terrno = TSDB_CODE_INVALID_MSG; + tDecoderClear(&dc); + return -1; + } SVnode* pVnode = pTq->pVnode; int32_t vgId = TD_VID(pVnode); @@ -680,8 +689,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg pHandle->snapshotVer = ver; if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { - pHandle->execHandle.execCol.qmsg = req.qmsg; - req.qmsg = NULL; + pHandle->execHandle.execCol.qmsg = taosStrdup(req.qmsg);; pHandle->execHandle.task = qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, vgId, &pHandle->execHandle.numOfCols, req.newConsumerId); @@ -701,8 +709,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) { pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL); pHandle->execHandle.execTb.suid = req.suid; - pHandle->execHandle.execTb.qmsg = req.qmsg; - req.qmsg = NULL; + pHandle->execHandle.execTb.qmsg = taosStrdup(req.qmsg); if(strcmp(pHandle->execHandle.execTb.qmsg, "") != 0) { if (nodesStringToNode(pHandle->execHandle.execTb.qmsg, &pHandle->execHandle.execTb.node) != 0) { @@ -766,7 +773,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg end: taosWUnLockLatch(&pTq->lock); - taosMemoryFree(req.qmsg); + tDecoderClear(&dc); return ret; } diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 6a9a2b609a9add705d532fec589ce668ba977045..0b235d0dc8b6ac9792c123b71357f3af35b03a40 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -564,7 +564,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/user_privilege.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/fsync.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/multilevel.py -#,,n,system-test,python3 ./test.py -f 0-others/compatibility.py +,,n,system-test,python3 ./test.py -f 0-others/compatibility.py ,,n,system-test,python3 ./test.py -f 0-others/tag_index_basic.py ,,n,system-test,python3 ./test.py -f 0-others/udfpy_main.py ,,n,system-test,python3 ./test.py -N 3 -f 0-others/walRetention.py