提交 fd90848e 编写于 作者: L Liu Jicong

add mq msg type

上级 d410ef1c
...@@ -1518,15 +1518,17 @@ static FORCE_INLINE void* taosDecodeSMqMsg(void* buf, SMqHbMsg* pMsg) { ...@@ -1518,15 +1518,17 @@ static FORCE_INLINE void* taosDecodeSMqMsg(void* buf, SMqHbMsg* pMsg) {
} }
typedef struct SMqSetCVgReq { typedef struct SMqSetCVgReq {
int32_t vgId; int32_t vgId;
int64_t oldConsumerId; int64_t oldConsumerId;
int64_t newConsumerId; int64_t newConsumerId;
char topicName[TSDB_TOPIC_FNAME_LEN]; char topicName[TSDB_TOPIC_FNAME_LEN];
char cgroup[TSDB_CONSUMER_GROUP_LEN]; char cgroup[TSDB_CONSUMER_GROUP_LEN];
char* sql; char* sql;
char* logicalPlan; char* logicalPlan;
char* physicalPlan; char* physicalPlan;
SSubQueryMsg msg; uint32_t qmsgLen;
void* qmsg;
//SSubQueryMsg msg;
} SMqSetCVgReq; } SMqSetCVgReq;
static FORCE_INLINE int32_t tEncodeSSubQueryMsg(void** buf, const SSubQueryMsg* pMsg) { static FORCE_INLINE int32_t tEncodeSSubQueryMsg(void** buf, const SSubQueryMsg* pMsg) {
...@@ -1558,7 +1560,9 @@ static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* ...@@ -1558,7 +1560,9 @@ static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq*
tlen += taosEncodeString(buf, pReq->sql); tlen += taosEncodeString(buf, pReq->sql);
tlen += taosEncodeString(buf, pReq->logicalPlan); tlen += taosEncodeString(buf, pReq->logicalPlan);
tlen += taosEncodeString(buf, pReq->physicalPlan); tlen += taosEncodeString(buf, pReq->physicalPlan);
tlen += tEncodeSSubQueryMsg(buf, &pReq->msg); tlen += taosEncodeFixedU32(buf, pReq->qmsgLen);
tlen += taosEncodeBinary(buf, pReq->qmsg, pReq->qmsgLen);
//tlen += tEncodeSSubQueryMsg(buf, &pReq->msg);
return tlen; return tlen;
} }
...@@ -1571,15 +1575,18 @@ static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) { ...@@ -1571,15 +1575,18 @@ static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) {
buf = taosDecodeString(buf, &pReq->sql); buf = taosDecodeString(buf, &pReq->sql);
buf = taosDecodeString(buf, &pReq->logicalPlan); buf = taosDecodeString(buf, &pReq->logicalPlan);
buf = taosDecodeString(buf, &pReq->physicalPlan); buf = taosDecodeString(buf, &pReq->physicalPlan);
buf = tDecodeSSubQueryMsg(buf, &pReq->msg); buf = taosDecodeFixedU32(buf, &pReq->qmsgLen);
buf = taosDecodeBinary(buf, &pReq->qmsg, pReq->qmsgLen);
//buf = tDecodeSSubQueryMsg(buf, &pReq->msg);
return buf; return buf;
} }
typedef struct SMqSetCVgRsp { typedef struct SMqSetCVgRsp {
int32_t vgId; SMsgHead header;
int64_t consumerId; int32_t vgId;
char topicName[TSDB_TOPIC_FNAME_LEN]; int64_t consumerId;
char cGroup[TSDB_CONSUMER_GROUP_LEN]; char topicName[TSDB_TOPIC_FNAME_LEN];
char cGroup[TSDB_CONSUMER_GROUP_LEN];
} SMqSetCVgRsp; } SMqSetCVgRsp;
typedef struct SMqColData { typedef struct SMqColData {
......
...@@ -32,7 +32,7 @@ struct SSubplan; ...@@ -32,7 +32,7 @@ struct SSubplan;
* @param streamReadHandle * @param streamReadHandle
* @return * @return
*/ */
qTaskInfo_t qCreateStreamExecTaskInfo(SSubQueryMsg *pMsg, void* streamReadHandle); qTaskInfo_t qCreateStreamExecTaskInfo(void *msg, void* streamReadHandle);
int32_t qSetStreamInput(qTaskInfo_t tinfo, void* input); int32_t qSetStreamInput(qTaskInfo_t tinfo, void* input);
......
...@@ -113,7 +113,8 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { ...@@ -113,7 +113,8 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_TOPIC)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_TOPIC)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_TOPIC)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_TOPIC)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SUBSCRIBE)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SUBSCRIBE)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SUBSCRIBE_RSP)] = dndProcessMnodeWriteMsg; /*pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SUBSCRIBE_RSP)] = dndProcessMnodeWriteMsg;*/
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CONN_RSP)] = dndProcessMnodeWriteMsg;
// Requests handled by VNODE // Requests handled by VNODE
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SUBMIT)] = dndProcessVnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SUBMIT)] = dndProcessVnodeWriteMsg;
...@@ -143,6 +144,8 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { ...@@ -143,6 +144,8 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_DROP_TABLE)] = dndProcessVnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_DROP_TABLE)] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES)] = dndProcessVnodeFetchMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES)] = dndProcessVnodeFetchMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES_FETCH)] = dndProcessVnodeFetchMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES_FETCH)] = dndProcessVnodeFetchMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CONN)] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CUR)] = dndProcessVnodeFetchMsg;
} }
static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) { static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) {
......
...@@ -55,7 +55,7 @@ int32_t mndInitSubscribe(SMnode *pMnode) { ...@@ -55,7 +55,7 @@ int32_t mndInitSubscribe(SMnode *pMnode) {
.deleteFp = (SdbDeleteFp)mndSubActionDelete}; .deleteFp = (SdbDeleteFp)mndSubActionDelete};
mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE, mndProcessSubscribeReq); mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE, mndProcessSubscribeReq);
mndSetMsgHandle(pMnode, TDMT_VND_SUBSCRIBE_RSP, mndProcessSubscribeInternalRsp); mndSetMsgHandle(pMnode, TDMT_VND_MQ_SET_CONN_RSP, mndProcessSubscribeInternalRsp);
mndSetMsgHandle(pMnode, TDMT_MND_MQ_TIMER, mndProcessMqTimerMsg); mndSetMsgHandle(pMnode, TDMT_MND_MQ_TIMER, mndProcessMqTimerMsg);
return sdbSetTable(pMnode->pSdb, table); return sdbSetTable(pMnode->pSdb, table);
} }
...@@ -107,8 +107,8 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { ...@@ -107,8 +107,8 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
pReq->sql = strdup(pTopic->sql); pReq->sql = strdup(pTopic->sql);
pReq->logicalPlan = strdup(pTopic->logicalPlan); pReq->logicalPlan = strdup(pTopic->logicalPlan);
pReq->physicalPlan = strdup(pTopic->physicalPlan); pReq->physicalPlan = strdup(pTopic->physicalPlan);
pReq->msg.contentLen = pCEp->qmsgLen; pReq->qmsgLen = pCEp->qmsgLen;
memcpy(pReq->msg.msg, pCEp->qmsg, pCEp->qmsgLen); memcpy(pReq->qmsg, pCEp->qmsg, pCEp->qmsgLen);
int32_t tlen = tEncodeSMqSetCVgReq(NULL, pReq); int32_t tlen = tEncodeSMqSetCVgReq(NULL, pReq);
void *reqStr = malloc(tlen); void *reqStr = malloc(tlen);
if (reqStr == NULL) { if (reqStr == NULL) {
...@@ -168,9 +168,10 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas ...@@ -168,9 +168,10 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas
CEp.lastConsumerHbTs = CEp.lastVgHbTs = -1; CEp.lastConsumerHbTs = CEp.lastVgHbTs = -1;
STaskInfo* pTaskInfo = taosArrayGet(pArray, i); STaskInfo* pTaskInfo = taosArrayGet(pArray, i);
tConvertQueryAddrToEpSet(&CEp.epSet, &pTaskInfo->addr); tConvertQueryAddrToEpSet(&CEp.epSet, &pTaskInfo->addr);
mDebug("subscribe convert ep %d %s %s %s %s %s\n", CEp.epSet.numOfEps, CEp.epSet.fqdn[0], CEp.epSet.fqdn[1], CEp.epSet.fqdn[2], CEp.epSet.fqdn[3], CEp.epSet.fqdn[4]); /*mDebug("subscribe convert ep %d %s %s %s %s %s\n", CEp.epSet.numOfEps, CEp.epSet.fqdn[0], CEp.epSet.fqdn[1], CEp.epSet.fqdn[2], CEp.epSet.fqdn[3], CEp.epSet.fqdn[4]);*/
CEp.vgId = pTaskInfo->addr.nodeId; CEp.vgId = pTaskInfo->addr.nodeId;
CEp.qmsg = malloc(sizeof(pTaskInfo->msg->contentLen)); CEp.qmsgLen = pTaskInfo->msg->contentLen;
CEp.qmsg = malloc(CEp.qmsgLen);
if (CEp.qmsg == NULL) { if (CEp.qmsg == NULL) {
return -1; return -1;
} }
...@@ -195,27 +196,33 @@ static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsume ...@@ -195,27 +196,33 @@ static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsume
}; };
strcpy(req.cgroup, pConsumer->cgroup); strcpy(req.cgroup, pConsumer->cgroup);
strcpy(req.topicName, pTopic->name); strcpy(req.topicName, pTopic->name);
req.sql = strdup(pTopic->sql); req.sql = pTopic->sql;
req.logicalPlan = strdup(pTopic->logicalPlan); req.logicalPlan = pTopic->logicalPlan;
req.physicalPlan = strdup(pTopic->physicalPlan); req.physicalPlan = pTopic->physicalPlan;
int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req); int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req);
void *reqStr = malloc(tlen); void *buf = malloc(sizeof(SMsgHead) + tlen);
if (reqStr == NULL) { if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
void *abuf = reqStr;
SMsgHead* pMsgHead = (SMsgHead*)buf;
pMsgHead->contLen = htonl(sizeof(SMsgHead) + tlen);
pMsgHead->vgId = htonl(vgId);
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tEncodeSMqSetCVgReq(&abuf, &req); tEncodeSMqSetCVgReq(&abuf, &req);
STransAction action = {0}; STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgObj); action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
action.pCont = reqStr; action.pCont = buf;
action.contLen = tlen; action.contLen = tlen;
action.msgType = TDMT_VND_MQ_SET_CONN; action.msgType = TDMT_VND_MQ_SET_CONN;
mndReleaseVgroup(pMnode, pVgObj); mndReleaseVgroup(pMnode, pVgObj);
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
free(reqStr); free(buf);
return -1; return -1;
} }
} }
......
...@@ -319,7 +319,7 @@ int tqSendLaunchQuery(STqMsgItem*, int64_t offset); ...@@ -319,7 +319,7 @@ int tqSendLaunchQuery(STqMsgItem*, int64_t offset);
#endif #endif
int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp); int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp);
int32_t tqProcessSetConnReq(STQ* pTq, char* msg); int32_t tqProcessSetConnReq(STQ* pTq, char* msg, SRpcMsg** ppRsp);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -779,7 +779,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) { ...@@ -779,7 +779,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) {
return 0; return 0;
} }
int32_t tqProcessSetConnReq(STQ* pTq, char* msg) { int32_t tqProcessSetConnReq(STQ* pTq, char* msg, SRpcMsg** ppRsp) {
SMqSetCVgReq req; SMqSetCVgReq req;
tDecodeSMqSetCVgReq(msg, &req); tDecodeSMqSetCVgReq(msg, &req);
STqConsumerHandle* pConsumer = calloc(sizeof(STqConsumerHandle), 1); STqConsumerHandle* pConsumer = calloc(sizeof(STqConsumerHandle), 1);
...@@ -795,9 +795,9 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) { ...@@ -795,9 +795,9 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
return -1; return -1;
} }
strcpy(pTopic->topicName, req.topicName); strcpy(pTopic->topicName, req.topicName);
strcpy(pTopic->sql, req.sql); pTopic->sql = strdup(req.sql);
strcpy(pTopic->logicalPlan, req.logicalPlan); pTopic->logicalPlan = strdup(req.logicalPlan);
strcpy(pTopic->physicalPlan, req.physicalPlan); pTopic->physicalPlan = strdup(req.physicalPlan);
pTopic->buffer.firstOffset = -1; pTopic->buffer.firstOffset = -1;
pTopic->buffer.lastOffset = -1; pTopic->buffer.lastOffset = -1;
...@@ -807,9 +807,10 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) { ...@@ -807,9 +807,10 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
for (int i = 0; i < TQ_BUFFER_SIZE; i++) { for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
pTopic->buffer.output[i].status = 0; pTopic->buffer.output[i].status = 0;
STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pMeta); STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pMeta);
pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(&req.msg, pReadHandle); pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(&req.qmsg, pReadHandle);
} }
taosArrayPush(pConsumer->topics, pTopic); taosArrayPush(pConsumer->topics, pTopic);
terrno = TSDB_CODE_SUCCESS;
return 0; return 0;
} }
...@@ -822,7 +823,7 @@ STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) { ...@@ -822,7 +823,7 @@ STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) {
pReadHandle->pMsg = NULL; pReadHandle->pMsg = NULL;
pReadHandle->ver = -1; pReadHandle->ver = -1;
pReadHandle->pColIdList = NULL; pReadHandle->pColIdList = NULL;
return NULL; return pReadHandle;
} }
void tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitMsg* pMsg, int64_t ver) { void tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitMsg* pMsg, int64_t ver) {
......
...@@ -112,7 +112,8 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { ...@@ -112,7 +112,8 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
} }
break; break;
case TDMT_VND_MQ_SET_CONN: { case TDMT_VND_MQ_SET_CONN: {
if (tqProcessSetConnReq(pVnode->pTq, ptr) < 0) { if (tqProcessSetConnReq(pVnode->pTq, POINTER_SHIFT(ptr, sizeof(SMsgHead)), NULL) < 0) {
// TODO: handle error
} }
} break; } break;
default: default:
......
...@@ -60,8 +60,8 @@ int32_t qSetStreamInput(qTaskInfo_t tinfo, void* input) { ...@@ -60,8 +60,8 @@ int32_t qSetStreamInput(qTaskInfo_t tinfo, void* input) {
return code; return code;
} }
qTaskInfo_t qCreateStreamExecTaskInfo(SSubQueryMsg* pMsg, void* streamReadHandle) { qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) {
if (pMsg == NULL || streamReadHandle == NULL) { if (msg == NULL || streamReadHandle == NULL) {
return NULL; return NULL;
} }
...@@ -74,7 +74,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(SSubQueryMsg* pMsg, void* streamReadHandle ...@@ -74,7 +74,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(SSubQueryMsg* pMsg, void* streamReadHandle
#endif #endif
struct SSubplan* plan = NULL; struct SSubplan* plan = NULL;
int32_t code = qStringToSubplan(pMsg->msg, &plan); int32_t code = qStringToSubplan(msg, &plan);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
terrno = code; terrno = code;
return NULL; return NULL;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册