未验证 提交 73fa7a7f 编写于 作者: L Liu Jicong 提交者: GitHub

vnode process mq msg (#8287)

add vnodeprocess msg function
上级 3639999e
...@@ -42,7 +42,9 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_QUERY, "query" ) ...@@ -42,7 +42,9 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_QUERY, "query" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_FETCH, "fetch" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_FETCH, "fetch" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_UPDATE_TAG_VAL, "update-tag-val" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_UPDATE_TAG_VAL, "update-tag-val" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONNECT, "mq-connect" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONNECT, "mq-connect" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_DISCONNECT, "mq-disconnect" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONSUME, "mq-consume" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONSUME, "mq-consume" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_QUERY, "mq-query" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_ACK, "mq-ack" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_ACK, "mq-ack" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_RESET, "mq-reset" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_RESET, "mq-reset" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY1, "dummy1" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY1, "dummy1" )
...@@ -121,7 +123,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_TP, "drop-tp" ) ...@@ -121,7 +123,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_TP, "drop-tp" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_TP, "alter-tp" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_TP, "alter-tp" )
#ifndef TAOS_MESSAGE_C #ifndef TAOS_MESSAGE_C
TSDB_MSG_TYPE_MAX // 105 TSDB_MSG_TYPE_MAX // 147
#endif #endif
}; };
...@@ -958,6 +960,40 @@ typedef struct { ...@@ -958,6 +960,40 @@ typedef struct {
char reserved2[64]; char reserved2[64];
} SStartupStep; } SStartupStep;
// mq related
typedef struct {
} SMqConnectReq;
typedef struct {
} SMqConnectRsp;
typedef struct {
} SMqDisconnectReq;
typedef struct {
} SMqDisconnectRsp;
typedef struct {
} SMqAckReq;
typedef struct {
} SMqAckRsp;
typedef struct {
} SMqResetReq;
typedef struct {
} SMqResetRsp;
//mq related end
typedef struct { typedef struct {
/* data */ /* data */
} SSubmitReq; } SSubmitReq;
......
...@@ -26,8 +26,9 @@ typedef struct tmqMsgHead { ...@@ -26,8 +26,9 @@ typedef struct tmqMsgHead {
int32_t headLen; int32_t headLen;
int32_t msgVer; int32_t msgVer;
int64_t cgId; int64_t cgId;
int32_t topicLen; int64_t topicId;
char topic[]; int32_t checksum;
int32_t msgType;
} tmqMsgHead; } tmqMsgHead;
//TODO: put msgs into common //TODO: put msgs into common
......
...@@ -97,6 +97,9 @@ int32_t dnodeInitServer() { ...@@ -97,6 +97,9 @@ int32_t dnodeInitServer() {
tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_GRANT] = mnodeProcessMsg; tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_GRANT] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_STATUS] = mnodeProcessMsg; tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_STATUS] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_CONNECT] = vnodeProcessMsg;
/*tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessRead;*/
SRpcInit rpcInit; SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit)); memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localPort = tsDnodeDnodePort; rpcInit.localPort = tsDnodeDnodePort;
...@@ -308,10 +311,12 @@ static int32_t dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, c ...@@ -308,10 +311,12 @@ static int32_t dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, c
} }
int32_t dnodeInitShell() { int32_t dnodeInitShell() {
tsTrans.shellMsgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessMsg; tsTrans.shellMsgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessMsg; tsTrans.shellMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessMsg; tsTrans.shellMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessMsg; tsTrans.shellMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_MQ_QUERY] = vnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessMsg;
// the following message shall be treated as mnode write // the following message shall be treated as mnode write
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_ACCT] = mnodeProcessMsg; tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_ACCT] = mnodeProcessMsg;
......
...@@ -36,6 +36,8 @@ typedef struct SReadMsg { ...@@ -36,6 +36,8 @@ typedef struct SReadMsg {
int32_t vnodeProcessQueryMsg(SVnode *pVnode, SReadMsg *pRead); int32_t vnodeProcessQueryMsg(SVnode *pVnode, SReadMsg *pRead);
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SReadMsg *pRead); int32_t vnodeProcessFetchMsg(SVnode *pVnode, SReadMsg *pRead);
int32_t vnodeProcessConsumeMsg(SVnode *pVnode, SReadMsg *pRead);
int32_t vnodeProcessTqQueryMsg(SVnode *pVnode, SReadMsg *pRead);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -27,9 +27,15 @@ int32_t vnodeProcessDropTableReq(SVnode *pVnode, SDropTableReq *pReq, SDropTable ...@@ -27,9 +27,15 @@ int32_t vnodeProcessDropTableReq(SVnode *pVnode, SDropTableReq *pReq, SDropTable
int32_t vnodeProcessAlterTableReq(SVnode *pVnode, SAlterTableReq *pReq, SAlterTableRsp *pRsp); int32_t vnodeProcessAlterTableReq(SVnode *pVnode, SAlterTableReq *pReq, SAlterTableRsp *pRsp);
int32_t vnodeProcessDropStableReq(SVnode *pVnode, SDropStableReq *pReq, SDropStableRsp *pRsp); int32_t vnodeProcessDropStableReq(SVnode *pVnode, SDropStableReq *pReq, SDropStableRsp *pRsp);
int32_t vnodeProcessUpdateTagValReq(SVnode *pVnode, SUpdateTagValReq *pReq, SUpdateTagValRsp *pRsp); int32_t vnodeProcessUpdateTagValReq(SVnode *pVnode, SUpdateTagValReq *pReq, SUpdateTagValRsp *pRsp);
//mq related
int32_t vnodeProcessMqConnectReq(SVnode* pVnode, SMqConnectReq *pReq, SMqConnectRsp *pRsp);
int32_t vnodeProcessMqDisconnectReq(SVnode* pVnode, SMqConnectReq *pReq, SMqConnectRsp *pRsp);
int32_t vnodeProcessMqAckReq(SVnode* pVnode, SMqAckReq *pReq, SMqAckRsp *pRsp);
int32_t vnodeProcessMqResetReq(SVnode* pVnode, SMqResetReq *pReq, SMqResetRsp *pRsp);
//mq related end
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
#endif /*_TD_VNODE_WRITE_MSG_H_*/ #endif /*_TD_VNODE_WRITE_MSG_H_*/
\ No newline at end of file
...@@ -780,20 +780,30 @@ static void vnodeCleanupVnodes() { ...@@ -780,20 +780,30 @@ static void vnodeCleanupVnodes() {
} }
static void vnodeInitMsgFp() { static void vnodeInitMsgFp() {
tsVmain.msgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = vnodeProcessMgmtMsg; tsVmain.msgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = vnodeProcessMgmtMsg;
tsVmain.msgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = vnodeProcessMgmtMsg; tsVmain.msgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = vnodeProcessMgmtMsg;
tsVmain.msgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE] = vnodeProcessMgmtMsg; tsVmain.msgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE] = vnodeProcessMgmtMsg;
tsVmain.msgFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE] = vnodeProcessMgmtMsg; tsVmain.msgFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE] = vnodeProcessMgmtMsg;
tsVmain.msgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = vnodeProcessMgmtMsg; tsVmain.msgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = vnodeProcessMgmtMsg;
tsVmain.msgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = vnodeProcessMgmtMsg; tsVmain.msgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = vnodeProcessMgmtMsg;
tsVmain.msgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = vnodeProcessWriteMsg; tsVmain.msgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = vnodeProcessWriteMsg;
tsVmain.msgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = vnodeProcessWriteMsg; tsVmain.msgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = vnodeProcessWriteMsg;
tsVmain.msgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = vnodeProcessWriteMsg; tsVmain.msgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = vnodeProcessWriteMsg;
tsVmain.msgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = vnodeProcessWriteMsg; tsVmain.msgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = vnodeProcessWriteMsg;
tsVmain.msgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessWriteMsg; tsVmain.msgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessWriteMsg;
tsVmain.msgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessWriteMsg; tsVmain.msgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessWriteMsg;
tsVmain.msgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessReadMsg; //mq related
tsVmain.msgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessReadMsg; tsVmain.msgFp[TSDB_MSG_TYPE_MQ_CONNECT] = vnodeProcessWriteMsg;
tsVmain.msgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = vnodeProcessWriteMsg;
tsVmain.msgFp[TSDB_MSG_TYPE_MQ_ACK] = vnodeProcessWriteMsg;
tsVmain.msgFp[TSDB_MSG_TYPE_MQ_RESET] = vnodeProcessWriteMsg;
//mq related end
tsVmain.msgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessReadMsg;
tsVmain.msgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessReadMsg;
//mq related
tsVmain.msgFp[TSDB_MSG_TYPE_MQ_QUERY] = vnodeProcessReadMsg;
tsVmain.msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessReadMsg;
//mq related end
} }
void vnodeProcessMsg(SRpcMsg *pMsg) { void vnodeProcessMsg(SRpcMsg *pMsg) {
......
...@@ -141,6 +141,9 @@ void vnodeProcessReadMsg(SRpcMsg *pMsg) { ...@@ -141,6 +141,9 @@ void vnodeProcessReadMsg(SRpcMsg *pMsg) {
static void vnodeInitReadMsgFp() { static void vnodeInitReadMsgFp() {
tsVread.msgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessQueryMsg; tsVread.msgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessQueryMsg;
tsVread.msgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessFetchMsg; tsVread.msgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessFetchMsg;
tsVread.msgFp[TSDB_MSG_TYPE_MQ_QUERY] = vnodeProcessTqQueryMsg;
tsVread.msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessConsumeMsg;
} }
static int32_t vnodeProcessReadStart(SVnode *pVnode, SReadMsg *pRead, int32_t qtype) { static int32_t vnodeProcessReadStart(SVnode *pVnode, SReadMsg *pRead, int32_t qtype) {
......
...@@ -179,6 +179,20 @@ static bool vnodeProcessWriteStart(SVnode *pVnode, SVnWriteMsg *pWrite, int32_t ...@@ -179,6 +179,20 @@ static bool vnodeProcessWriteStart(SVnode *pVnode, SVnWriteMsg *pWrite, int32_t
case TSDB_MSG_TYPE_UPDATE_TAG_VAL: case TSDB_MSG_TYPE_UPDATE_TAG_VAL:
pWrite->code = vnodeProcessUpdateTagValReq(pVnode, (void*)pHead->cont, NULL); pWrite->code = vnodeProcessUpdateTagValReq(pVnode, (void*)pHead->cont, NULL);
break; break;
//mq related
case TSDB_MSG_TYPE_MQ_CONNECT:
pWrite->code = vnodeProcessMqConnectReq(pVnode, (void*)pHead->cont, NULL);
break;
case TSDB_MSG_TYPE_MQ_DISCONNECT:
pWrite->code = vnodeProcessMqDisconnectReq(pVnode, (void*)pHead->cont, NULL);
break;
case TSDB_MSG_TYPE_MQ_ACK:
pWrite->code = vnodeProcessMqAckReq(pVnode, (void*)pHead->cont, NULL);
break;
case TSDB_MSG_TYPE_MQ_RESET:
pWrite->code = vnodeProcessMqResetReq(pVnode, (void*)pHead->cont, NULL);
break;
//mq related end
default: default:
pWrite->code = TSDB_CODE_VND_MSG_NOT_PROCESSED; pWrite->code = TSDB_CODE_VND_MSG_NOT_PROCESSED;
break; break;
...@@ -186,7 +200,7 @@ static bool vnodeProcessWriteStart(SVnode *pVnode, SVnWriteMsg *pWrite, int32_t ...@@ -186,7 +200,7 @@ static bool vnodeProcessWriteStart(SVnode *pVnode, SVnWriteMsg *pWrite, int32_t
if (pWrite->code < 0) return false; if (pWrite->code < 0) return false;
// update fync // update fsync
return (pWrite->code == 0 && msgType != TSDB_MSG_TYPE_SUBMIT); return (pWrite->code == 0 && msgType != TSDB_MSG_TYPE_SUBMIT);
} }
...@@ -233,4 +247,4 @@ void vnodeCleanupWrite() { ...@@ -233,4 +247,4 @@ void vnodeCleanupWrite() {
taos_queue vnodeAllocWriteQueue(SVnode *pVnode) { return tWriteWorkerAllocQueue(&tsVwrite.pool, pVnode); } taos_queue vnodeAllocWriteQueue(SVnode *pVnode) { return tWriteWorkerAllocQueue(&tsVwrite.pool, pVnode); }
void vnodeFreeWriteQueue(taos_queue pQueue) { tWriteWorkerFreeQueue(&tsVwrite.pool, pQueue); } void vnodeFreeWriteQueue(taos_queue pQueue) { tWriteWorkerFreeQueue(&tsVwrite.pool, pQueue); }
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册