diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 614ce14a6ba185f20b7aa968a39fd7ddd3086fc4..a80a77f0c4c05ab8617447ef2c07842750493924 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1518,15 +1518,17 @@ static FORCE_INLINE void* taosDecodeSMqMsg(void* buf, SMqHbMsg* pMsg) { } typedef struct SMqSetCVgReq { - int32_t vgId; - int64_t oldConsumerId; - int64_t newConsumerId; - char topicName[TSDB_TOPIC_FNAME_LEN]; - char cgroup[TSDB_CONSUMER_GROUP_LEN]; - char* sql; - char* logicalPlan; - char* physicalPlan; - SSubQueryMsg msg; + int32_t vgId; + int64_t oldConsumerId; + int64_t newConsumerId; + char topicName[TSDB_TOPIC_FNAME_LEN]; + char cgroup[TSDB_CONSUMER_GROUP_LEN]; + char* sql; + char* logicalPlan; + char* physicalPlan; + uint32_t qmsgLen; + void* qmsg; + //SSubQueryMsg msg; } SMqSetCVgReq; static FORCE_INLINE int32_t tEncodeSSubQueryMsg(void** buf, const SSubQueryMsg* pMsg) { @@ -1558,7 +1560,9 @@ static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* tlen += taosEncodeString(buf, pReq->sql); tlen += taosEncodeString(buf, pReq->logicalPlan); tlen += taosEncodeString(buf, pReq->physicalPlan); - tlen += tEncodeSSubQueryMsg(buf, &pReq->msg); + tlen += taosEncodeFixedU32(buf, pReq->qmsgLen); + tlen += taosEncodeBinary(buf, pReq->qmsg, pReq->qmsgLen); + //tlen += tEncodeSSubQueryMsg(buf, &pReq->msg); return tlen; } @@ -1571,15 +1575,18 @@ static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) { buf = taosDecodeString(buf, &pReq->sql); buf = taosDecodeString(buf, &pReq->logicalPlan); buf = taosDecodeString(buf, &pReq->physicalPlan); - buf = tDecodeSSubQueryMsg(buf, &pReq->msg); + buf = taosDecodeFixedU32(buf, &pReq->qmsgLen); + buf = taosDecodeBinary(buf, &pReq->qmsg, pReq->qmsgLen); + //buf = tDecodeSSubQueryMsg(buf, &pReq->msg); return buf; } typedef struct SMqSetCVgRsp { - int32_t vgId; - int64_t consumerId; - char topicName[TSDB_TOPIC_FNAME_LEN]; - char cGroup[TSDB_CONSUMER_GROUP_LEN]; + SMsgHead header; + int32_t vgId; + int64_t consumerId; + char topicName[TSDB_TOPIC_FNAME_LEN]; + char cGroup[TSDB_CONSUMER_GROUP_LEN]; } SMqSetCVgRsp; typedef struct SMqColData { diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 457245e9a34ca763a1a129e260da90c2c73e12e6..020f9f7ccdceef9c8950c96a8078c64c377e5d5f 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -32,7 +32,7 @@ struct SSubplan; * @param streamReadHandle * @return */ -qTaskInfo_t qCreateStreamExecTaskInfo(SSubQueryMsg *pMsg, void* streamReadHandle); +qTaskInfo_t qCreateStreamExecTaskInfo(void *msg, void* streamReadHandle); int32_t qSetStreamInput(qTaskInfo_t tinfo, void* input); diff --git a/source/dnode/mgmt/impl/src/dndTransport.c b/source/dnode/mgmt/impl/src/dndTransport.c index 84f1574b24193ef52933d8d4fe9fc47c50ff5d4a..a2ca86438bfc8818a26155c0ff63aa8d6be332e5 100644 --- a/source/dnode/mgmt/impl/src/dndTransport.c +++ b/source/dnode/mgmt/impl/src/dndTransport.c @@ -113,7 +113,8 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_TOPIC)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_TOPIC)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SUBSCRIBE)] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SUBSCRIBE_RSP)] = dndProcessMnodeWriteMsg; + /*pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SUBSCRIBE_RSP)] = dndProcessMnodeWriteMsg;*/ + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CONN_RSP)] = dndProcessMnodeWriteMsg; // Requests handled by VNODE pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SUBMIT)] = dndProcessVnodeWriteMsg; @@ -143,6 +144,8 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { pMgmt->msgFp[TMSG_INDEX(TDMT_VND_DROP_TABLE)] = dndProcessVnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES)] = dndProcessVnodeFetchMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES_FETCH)] = dndProcessVnodeFetchMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CONN)] = dndProcessVnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CUR)] = dndProcessVnodeFetchMsg; } static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) { diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 518757aa19f44ebd772bc5f23ebed069ca36a00a..78e9a7c17c24ac8610944c6a9db41f54a4e3878b 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -55,7 +55,7 @@ int32_t mndInitSubscribe(SMnode *pMnode) { .deleteFp = (SdbDeleteFp)mndSubActionDelete}; mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE, mndProcessSubscribeReq); - mndSetMsgHandle(pMnode, TDMT_VND_SUBSCRIBE_RSP, mndProcessSubscribeInternalRsp); + mndSetMsgHandle(pMnode, TDMT_VND_MQ_SET_CONN_RSP, mndProcessSubscribeInternalRsp); mndSetMsgHandle(pMnode, TDMT_MND_MQ_TIMER, mndProcessMqTimerMsg); return sdbSetTable(pMnode->pSdb, table); } @@ -107,8 +107,8 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { pReq->sql = strdup(pTopic->sql); pReq->logicalPlan = strdup(pTopic->logicalPlan); pReq->physicalPlan = strdup(pTopic->physicalPlan); - pReq->msg.contentLen = pCEp->qmsgLen; - memcpy(pReq->msg.msg, pCEp->qmsg, pCEp->qmsgLen); + pReq->qmsgLen = pCEp->qmsgLen; + memcpy(pReq->qmsg, pCEp->qmsg, pCEp->qmsgLen); int32_t tlen = tEncodeSMqSetCVgReq(NULL, pReq); void *reqStr = malloc(tlen); if (reqStr == NULL) { @@ -168,9 +168,10 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas CEp.lastConsumerHbTs = CEp.lastVgHbTs = -1; STaskInfo* pTaskInfo = taosArrayGet(pArray, i); tConvertQueryAddrToEpSet(&CEp.epSet, &pTaskInfo->addr); - mDebug("subscribe convert ep %d %s %s %s %s %s\n", CEp.epSet.numOfEps, CEp.epSet.fqdn[0], CEp.epSet.fqdn[1], CEp.epSet.fqdn[2], CEp.epSet.fqdn[3], CEp.epSet.fqdn[4]); + /*mDebug("subscribe convert ep %d %s %s %s %s %s\n", CEp.epSet.numOfEps, CEp.epSet.fqdn[0], CEp.epSet.fqdn[1], CEp.epSet.fqdn[2], CEp.epSet.fqdn[3], CEp.epSet.fqdn[4]);*/ CEp.vgId = pTaskInfo->addr.nodeId; - CEp.qmsg = malloc(sizeof(pTaskInfo->msg->contentLen)); + CEp.qmsgLen = pTaskInfo->msg->contentLen; + CEp.qmsg = malloc(CEp.qmsgLen); if (CEp.qmsg == NULL) { return -1; } @@ -195,27 +196,33 @@ static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsume }; strcpy(req.cgroup, pConsumer->cgroup); strcpy(req.topicName, pTopic->name); - req.sql = strdup(pTopic->sql); - req.logicalPlan = strdup(pTopic->logicalPlan); - req.physicalPlan = strdup(pTopic->physicalPlan); + req.sql = pTopic->sql; + req.logicalPlan = pTopic->logicalPlan; + req.physicalPlan = pTopic->physicalPlan; int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req); - void *reqStr = malloc(tlen); - if (reqStr == NULL) { + void *buf = malloc(sizeof(SMsgHead) + tlen); + if (buf == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - void *abuf = reqStr; + + SMsgHead* pMsgHead = (SMsgHead*)buf; + + pMsgHead->contLen = htonl(sizeof(SMsgHead) + tlen); + pMsgHead->vgId = htonl(vgId); + + void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); tEncodeSMqSetCVgReq(&abuf, &req); STransAction action = {0}; action.epSet = mndGetVgroupEpset(pMnode, pVgObj); - action.pCont = reqStr; + action.pCont = buf; action.contLen = tlen; action.msgType = TDMT_VND_MQ_SET_CONN; mndReleaseVgroup(pMnode, pVgObj); if (mndTransAppendRedoAction(pTrans, &action) != 0) { - free(reqStr); + free(buf); return -1; } } diff --git a/source/dnode/vnode/inc/tq.h b/source/dnode/vnode/inc/tq.h index 0f318dea0b409d79548dbca2ddb5721fccd4be4d..9e33c0402291e525119e9a484f4160ffd786d385 100644 --- a/source/dnode/vnode/inc/tq.h +++ b/source/dnode/vnode/inc/tq.h @@ -319,7 +319,7 @@ int tqSendLaunchQuery(STqMsgItem*, int64_t offset); #endif int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp); -int32_t tqProcessSetConnReq(STQ* pTq, char* msg); +int32_t tqProcessSetConnReq(STQ* pTq, char* msg, SRpcMsg** ppRsp); #ifdef __cplusplus } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 49bbb77797a4e01f81ee41356a77b1a3f3f39029..1034a05443df6e8380e1bec91ec02dcdad51d440 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -779,7 +779,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) { return 0; } -int32_t tqProcessSetConnReq(STQ* pTq, char* msg) { +int32_t tqProcessSetConnReq(STQ* pTq, char* msg, SRpcMsg** ppRsp) { SMqSetCVgReq req; tDecodeSMqSetCVgReq(msg, &req); STqConsumerHandle* pConsumer = calloc(sizeof(STqConsumerHandle), 1); @@ -795,9 +795,9 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) { return -1; } strcpy(pTopic->topicName, req.topicName); - strcpy(pTopic->sql, req.sql); - strcpy(pTopic->logicalPlan, req.logicalPlan); - strcpy(pTopic->physicalPlan, req.physicalPlan); + pTopic->sql = strdup(req.sql); + pTopic->logicalPlan = strdup(req.logicalPlan); + pTopic->physicalPlan = strdup(req.physicalPlan); pTopic->buffer.firstOffset = -1; pTopic->buffer.lastOffset = -1; @@ -807,9 +807,10 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) { for (int i = 0; i < TQ_BUFFER_SIZE; i++) { pTopic->buffer.output[i].status = 0; STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pMeta); - pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(&req.msg, pReadHandle); + pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(&req.qmsg, pReadHandle); } taosArrayPush(pConsumer->topics, pTopic); + terrno = TSDB_CODE_SUCCESS; return 0; } @@ -822,7 +823,7 @@ STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) { pReadHandle->pMsg = NULL; pReadHandle->ver = -1; pReadHandle->pColIdList = NULL; - return NULL; + return pReadHandle; } void tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitMsg* pMsg, int64_t ver) { diff --git a/source/dnode/vnode/src/vnd/vnodeWrite.c b/source/dnode/vnode/src/vnd/vnodeWrite.c index ccddfd56d8f89832790b364643981a62dba4cee7..211bb9cc4b87edefeb1bea754da7478b53d96945 100644 --- a/source/dnode/vnode/src/vnd/vnodeWrite.c +++ b/source/dnode/vnode/src/vnd/vnodeWrite.c @@ -112,7 +112,8 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { } break; case TDMT_VND_MQ_SET_CONN: { - if (tqProcessSetConnReq(pVnode->pTq, ptr) < 0) { + if (tqProcessSetConnReq(pVnode->pTq, POINTER_SHIFT(ptr, sizeof(SMsgHead)), NULL) < 0) { + // TODO: handle error } } break; default: diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 2f1f40813c2b4f400e1e5a33b2ff3b9421f49d5b..b7be85dc347c06ea9eb566e61bf7a1acba5b764a 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -60,8 +60,8 @@ int32_t qSetStreamInput(qTaskInfo_t tinfo, void* input) { return code; } -qTaskInfo_t qCreateStreamExecTaskInfo(SSubQueryMsg* pMsg, void* streamReadHandle) { - if (pMsg == NULL || streamReadHandle == NULL) { +qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) { + if (msg == NULL || streamReadHandle == NULL) { return NULL; } @@ -74,7 +74,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(SSubQueryMsg* pMsg, void* streamReadHandle #endif struct SSubplan* plan = NULL; - int32_t code = qStringToSubplan(pMsg->msg, &plan); + int32_t code = qStringToSubplan(msg, &plan); if (code != TSDB_CODE_SUCCESS) { terrno = code; return NULL;