提交 ad96d32b 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/3.0' into feature/dnode3

import hudson.model.Result import hudson.model.Result
import hudson.model.*;
import jenkins.model.CauseOfInterruption import jenkins.model.CauseOfInterruption
properties([pipelineTriggers([githubPush()])])
node { node {
git url: 'https://github.com/taosdata/TDengine.git'
} }
def skipbuild=0 def skipbuild=0
def win_stop=0
def abortPreviousBuilds() { def abortPreviousBuilds() {
def currentJobName = env.JOB_NAME def currentJobName = env.JOB_NAME
...@@ -121,6 +121,7 @@ def pre_test(){ ...@@ -121,6 +121,7 @@ def pre_test(){
pipeline { pipeline {
agent none agent none
options { skipDefaultCheckout() }
environment{ environment{
WK = '/var/lib/jenkins/workspace/TDinternal' WK = '/var/lib/jenkins/workspace/TDinternal'
WKC= '/var/lib/jenkins/workspace/TDinternal/community' WKC= '/var/lib/jenkins/workspace/TDinternal/community'
...@@ -128,6 +129,7 @@ pipeline { ...@@ -128,6 +129,7 @@ pipeline {
stages { stages {
stage('pre_build'){ stage('pre_build'){
agent{label 'master'} agent{label 'master'}
options { skipDefaultCheckout() }
when { when {
changeRequest() changeRequest()
} }
...@@ -181,6 +183,7 @@ pipeline { ...@@ -181,6 +183,7 @@ pipeline {
} }
stage('Parallel test stage') { stage('Parallel test stage') {
//only build pr //only build pr
options { skipDefaultCheckout() }
when { when {
allOf{ allOf{
changeRequest() changeRequest()
......
...@@ -42,7 +42,9 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_QUERY, "query" ) ...@@ -42,7 +42,9 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_QUERY, "query" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_FETCH, "fetch" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_FETCH, "fetch" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_UPDATE_TAG_VAL, "update-tag-val" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_UPDATE_TAG_VAL, "update-tag-val" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONNECT, "mq-connect" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONNECT, "mq-connect" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_DISCONNECT, "mq-disconnect" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONSUME, "mq-consume" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONSUME, "mq-consume" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_QUERY, "mq-query" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_ACK, "mq-ack" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_ACK, "mq-ack" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_RESET, "mq-reset" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_RESET, "mq-reset" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY1, "dummy1" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY1, "dummy1" )
...@@ -121,7 +123,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_TP, "drop-tp" ) ...@@ -121,7 +123,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_TP, "drop-tp" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_TP, "alter-tp" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_TP, "alter-tp" )
#ifndef TAOS_MESSAGE_C #ifndef TAOS_MESSAGE_C
TSDB_MSG_TYPE_MAX // 105 TSDB_MSG_TYPE_MAX // 147
#endif #endif
}; };
...@@ -954,6 +956,40 @@ typedef struct { ...@@ -954,6 +956,40 @@ typedef struct {
char reserved2[64]; char reserved2[64];
} SStartupStep; } SStartupStep;
// mq related
typedef struct {
} SMqConnectReq;
typedef struct {
} SMqConnectRsp;
typedef struct {
} SMqDisconnectReq;
typedef struct {
} SMqDisconnectRsp;
typedef struct {
} SMqAckReq;
typedef struct {
} SMqAckRsp;
typedef struct {
} SMqResetReq;
typedef struct {
} SMqResetRsp;
//mq related end
typedef struct { typedef struct {
/* data */ /* data */
} SSubmitReq; } SSubmitReq;
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TD_RAFT_H
#define TD_RAFT_H
#ifdef __cplusplus
extern "C" {
#endif
#include <stdio.h>
#include "taosdef.h"
typedef unsigned int RaftId;
typedef unsigned int RaftGroupId;
// buffer holding data
typedef struct RaftBuffer {
void* data;
size_t len;
} RaftBuffer;
// a single server information in a cluster
typedef struct RaftServer {
RaftId id;
char fqdn[TSDB_FQDN_LEN];
uint16_t port;
} RaftServer;
// all servers in a cluster
typedef struct RaftConfiguration {
RaftServer *servers;
int nServer;
} RaftConfiguration;
// raft lib module
struct Raft;
typedef struct Raft Raft;
struct RaftNode;
typedef struct RaftNode RaftNode;
// raft state machine
struct RaftFSM;
typedef struct RaftFSM {
// statemachine user data
void *data;
// apply buffer data, bufs will be free by raft module
int (*apply)(struct RaftFSM *fsm, const RaftBuffer *bufs[], int nBufs);
// configuration commit callback
int (*onConfigurationCommit)(const RaftConfiguration* cluster);
// fsm return snapshot in ppBuf, bufs will be free by raft module
// TODO: getSnapshot SHOULD be async?
int (*getSnapshot)(struct RaftFSM *fsm, RaftBuffer **ppBuf);
// fsm restore with pBuf data
int (*restore)(struct RaftFSM *fsm, RaftBuffer *pBuf);
// fsm send data in buf to server,buf will be free by raft module
int (*send)(struct RaftFSM* fsm, const RaftServer* server, const RaftBuffer *buf);
} RaftFSM;
typedef struct RaftNodeOptions {
// user define state machine
RaftFSM* pFSM;
// election timeout(in ms)
// by default: 1000
int electionTimeoutMS;
// heart timeout(in ms)
// by default: 100
int heartbeatTimeoutMS;
// install snapshot timeout(in ms)
int installSnapshotTimeoutMS;
/**
* number of log entries before starting a new snapshot.
* by default: 1024
*/
int snapshotThreshold;
/**
* Number of log entries to keep in the log after a snapshot has
* been taken.
* by default: 128.
*/
int snapshotTrailing;
/**
* Enable or disable pre-vote support.
* by default: false
*/
bool preVote;
} RaftNodeOptions;
// create raft lib
int RaftCreate(Raft** ppRaft);
int RaftDestroy(Raft* pRaft);
// start a raft node with options,node id,group id
int RaftStart(Raft* pRaft,
RaftId selfId,
RaftGroupId selfGroupId,
const RaftConfiguration* cluster,
const RaftNodeOptions* options,
RaftNode **ppNode);
// stop a raft node
int RaftStop(RaftNode* pNode);
// client apply a cmd in buf
typedef void (*RaftApplyFp)(const RaftBuffer *pBuf, int result);
int RaftApply(RaftNode *pNode,
const RaftBuffer *pBuf,
RaftApplyFp applyCb);
// recv data from other servers in cluster,buf will be free in raft
int RaftRecv(RaftNode *pNode, const RaftBuffer* pBuf);
// change cluster servers API
typedef void (*RaftChangeFp)(const RaftServer* pServer, int result);
int RaftAddServer(RaftNode *pNode,
const RaftServer* pServer,
RaftChangeFp changeCb);
int RaftRemoveServer(RaftNode *pNode,
const RaftServer* pServer,
RaftChangeFp changeCb);
// transfer leader to id
typedef void (*RaftTransferFp)(RaftId id, int result);
int RaftTransfer(RaftNode *pNode,
RaftId id,
RaftTransferFp transferCb);
#ifdef __cplusplus
}
#endif
#endif /* TD_RAFT_H */
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_RAFT_SYNC_H
#define TDENGINE_RAFT_SYNC_H
#ifdef __cplusplus
extern "C" {
#endif
#include <stdint.h>
#include "taosdef.h"
#include "wal.h"
typedef uint32_t SyncNodeId;
typedef int32_t SyncGroupId;
typedef int64_t SyncIndex;
typedef uint64_t SSyncTerm;
typedef enum {
TAOS_SYNC_ROLE_FOLLOWER = 0,
TAOS_SYNC_ROLE_CANDIDATE = 1,
TAOS_SYNC_ROLE_LEADER = 2,
} ESyncRole;
typedef struct {
void* data;
size_t len;
} SSyncBuffer;
typedef struct {
SyncNodeId nodeId; // node ID assigned by TDengine
uint16_t nodePort; // node sync Port
char nodeFqdn[TSDB_FQDN_LEN]; // node FQDN
} SNodeInfo;
typedef struct {
int selfIndex;
int nNode;
SNodeInfo* nodeInfo;
} SSyncCluster;
typedef struct {
int32_t selfIndex;
int nNode;
SyncNodeId* nodeId;
ESyncRole* role;
} SNodesRole;
struct SSyncFSM;
typedef struct SSyncFSM {
void* pData;
// apply committed log, bufs will be free by raft module
int (*applyLog)(struct SSyncFSM *fsm, SyncIndex index, const SSyncBuffer *buf, void *pData);
// cluster commit callback
int (*onClusterChanged)(struct SSyncFSM *fsm, const SSyncCluster* cluster, void *pData);
// fsm return snapshot in ppBuf, bufs will be free by raft module
// TODO: getSnapshot SHOULD be async?
int (*getSnapshot)(struct SSyncFSM *fsm, SSyncBuffer **ppBuf, int* objId, bool *isLast);
// fsm apply snapshot with pBuf data
int (*applySnapshot)(struct SSyncFSM *fsm, SSyncBuffer *pBuf, int objId, bool isLast);
// call when restore snapshot and log done
int (*onRestoreDone)(struct SSyncFSM *fsm);
void (*onRollback)(struct SSyncFSM *fsm, SyncIndex index, const SSyncBuffer *buf);
void (*onRoleChanged)(struct SSyncFSM *fsm, const SNodesRole* pRole);
} SSyncFSM;
typedef struct SSyncServerState {
SyncNodeId voteFor;
SSyncTerm term;
} SSyncServerState;
typedef struct SStateManager {
void* pData;
void (*saveServerState)(struct SStateManager* stateMng, const SSyncServerState* state);
const SSyncServerState* (*readServerState)(struct SStateManager* stateMng);
void (*saveCluster)(struct SStateManager* stateMng, const SSyncCluster* cluster);
const SSyncCluster* (*readCluster)(struct SStateManager* stateMng);
} SStateManager;
typedef struct {
SyncGroupId vgId;
twalh walHandle;
SyncIndex snapshotIndex; // initial version
SSyncCluster syncCfg; // configuration from mgmt
SSyncFSM fsm;
SStateManager stateManager;
} SSyncInfo;
int32_t syncInit();
void syncCleanUp();
SyncNodeId syncStart(const SSyncInfo *);
void syncStop(SyncNodeId);
int32_t syncPropose(SyncNodeId nodeId, SSyncBuffer buffer, void *pData, bool isWeak);
extern int32_t raftDebugFlag;
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_RAFT_SYNC_H
...@@ -26,8 +26,9 @@ typedef struct tmqMsgHead { ...@@ -26,8 +26,9 @@ typedef struct tmqMsgHead {
int32_t headLen; int32_t headLen;
int32_t msgVer; int32_t msgVer;
int64_t cgId; int64_t cgId;
int32_t topicLen; int64_t topicId;
char topic[]; int32_t checksum;
int32_t msgType;
} tmqMsgHead; } tmqMsgHead;
//TODO: put msgs into common //TODO: put msgs into common
......
add_subdirectory(transport) add_subdirectory(transport)
add_subdirectory(raft) add_subdirectory(sync)
add_subdirectory(tkv) add_subdirectory(tkv)
add_subdirectory(index) add_subdirectory(index)
add_subdirectory(wal) add_subdirectory(wal)
......
aux_source_directory(src RAFT_SRC)
add_library(raft ${RAFT_SRC})
target_include_directories(
raft
PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/raft"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
\ No newline at end of file
aux_source_directory(src SYNC_SRC)
add_library(sync ${SYNC_SRC})
target_link_libraries(
sync
PUBLIC common
PUBLIC util
PUBLIC wal
)
target_include_directories(
sync
PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/sync"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
\ No newline at end of file
...@@ -11,4 +11,6 @@ ...@@ -11,4 +11,6 @@
* *
* You should have received a copy of the GNU Affero General Public License * You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
\ No newline at end of file
#include "sync.h"
\ No newline at end of file
...@@ -97,6 +97,9 @@ static int32_t dnodeInitServer() { ...@@ -97,6 +97,9 @@ static int32_t dnodeInitServer() {
tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_GRANT] = mnodeProcessMsg; tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_GRANT] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_STATUS] = mnodeProcessMsg; tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_STATUS] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_CONNECT] = vnodeProcessMsg;
/*tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessRead;*/
SRpcInit rpcInit; SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit)); memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localPort = tsDnodeDnodePort; rpcInit.localPort = tsDnodeDnodePort;
...@@ -297,6 +300,8 @@ static int32_t dnodeInitShell() { ...@@ -297,6 +300,8 @@ static int32_t dnodeInitShell() {
tsTrans.shellMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessMsg; tsTrans.shellMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessMsg; tsTrans.shellMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessMsg; tsTrans.shellMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_MQ_QUERY] = vnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessMsg;
// the following message shall be treated as mnode write // the following message shall be treated as mnode write
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_ACCT] = mnodeProcessMsg; tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_ACCT] = mnodeProcessMsg;
......
...@@ -36,6 +36,8 @@ typedef struct SReadMsg { ...@@ -36,6 +36,8 @@ typedef struct SReadMsg {
int32_t vnodeProcessQueryMsg(SVnode *pVnode, SReadMsg *pRead); int32_t vnodeProcessQueryMsg(SVnode *pVnode, SReadMsg *pRead);
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SReadMsg *pRead); int32_t vnodeProcessFetchMsg(SVnode *pVnode, SReadMsg *pRead);
int32_t vnodeProcessConsumeMsg(SVnode *pVnode, SReadMsg *pRead);
int32_t vnodeProcessTqQueryMsg(SVnode *pVnode, SReadMsg *pRead);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -27,9 +27,15 @@ int32_t vnodeProcessDropTableReq(SVnode *pVnode, SDropTableReq *pReq, SDropTable ...@@ -27,9 +27,15 @@ int32_t vnodeProcessDropTableReq(SVnode *pVnode, SDropTableReq *pReq, SDropTable
int32_t vnodeProcessAlterTableReq(SVnode *pVnode, SAlterTableReq *pReq, SAlterTableRsp *pRsp); int32_t vnodeProcessAlterTableReq(SVnode *pVnode, SAlterTableReq *pReq, SAlterTableRsp *pRsp);
int32_t vnodeProcessDropStableReq(SVnode *pVnode, SDropStableReq *pReq, SDropStableRsp *pRsp); int32_t vnodeProcessDropStableReq(SVnode *pVnode, SDropStableReq *pReq, SDropStableRsp *pRsp);
int32_t vnodeProcessUpdateTagValReq(SVnode *pVnode, SUpdateTagValReq *pReq, SUpdateTagValRsp *pRsp); int32_t vnodeProcessUpdateTagValReq(SVnode *pVnode, SUpdateTagValReq *pReq, SUpdateTagValRsp *pRsp);
//mq related
int32_t vnodeProcessMqConnectReq(SVnode* pVnode, SMqConnectReq *pReq, SMqConnectRsp *pRsp);
int32_t vnodeProcessMqDisconnectReq(SVnode* pVnode, SMqConnectReq *pReq, SMqConnectRsp *pRsp);
int32_t vnodeProcessMqAckReq(SVnode* pVnode, SMqAckReq *pReq, SMqAckRsp *pRsp);
int32_t vnodeProcessMqResetReq(SVnode* pVnode, SMqResetReq *pReq, SMqResetRsp *pRsp);
//mq related end
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
#endif /*_TD_VNODE_WRITE_MSG_H_*/ #endif /*_TD_VNODE_WRITE_MSG_H_*/
\ No newline at end of file
...@@ -780,20 +780,30 @@ static void vnodeCleanupVnodes() { ...@@ -780,20 +780,30 @@ static void vnodeCleanupVnodes() {
} }
static void vnodeInitMsgFp() { static void vnodeInitMsgFp() {
tsVmain.msgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = vnodeProcessMgmtMsg; tsVmain.msgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = vnodeProcessMgmtMsg;
tsVmain.msgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = vnodeProcessMgmtMsg; tsVmain.msgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = vnodeProcessMgmtMsg;
tsVmain.msgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE] = vnodeProcessMgmtMsg; tsVmain.msgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE] = vnodeProcessMgmtMsg;
tsVmain.msgFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE] = vnodeProcessMgmtMsg; tsVmain.msgFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE] = vnodeProcessMgmtMsg;
tsVmain.msgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = vnodeProcessMgmtMsg; tsVmain.msgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = vnodeProcessMgmtMsg;
tsVmain.msgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = vnodeProcessMgmtMsg; tsVmain.msgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = vnodeProcessMgmtMsg;
tsVmain.msgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = vnodeProcessWriteMsg; tsVmain.msgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = vnodeProcessWriteMsg;
tsVmain.msgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = vnodeProcessWriteMsg; tsVmain.msgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = vnodeProcessWriteMsg;
tsVmain.msgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = vnodeProcessWriteMsg; tsVmain.msgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = vnodeProcessWriteMsg;
tsVmain.msgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = vnodeProcessWriteMsg; tsVmain.msgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = vnodeProcessWriteMsg;
tsVmain.msgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessWriteMsg; tsVmain.msgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessWriteMsg;
tsVmain.msgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessWriteMsg; tsVmain.msgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessWriteMsg;
tsVmain.msgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessReadMsg; //mq related
tsVmain.msgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessReadMsg; tsVmain.msgFp[TSDB_MSG_TYPE_MQ_CONNECT] = vnodeProcessWriteMsg;
tsVmain.msgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = vnodeProcessWriteMsg;
tsVmain.msgFp[TSDB_MSG_TYPE_MQ_ACK] = vnodeProcessWriteMsg;
tsVmain.msgFp[TSDB_MSG_TYPE_MQ_RESET] = vnodeProcessWriteMsg;
//mq related end
tsVmain.msgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessReadMsg;
tsVmain.msgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessReadMsg;
//mq related
tsVmain.msgFp[TSDB_MSG_TYPE_MQ_QUERY] = vnodeProcessReadMsg;
tsVmain.msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessReadMsg;
//mq related end
} }
void vnodeProcessMsg(SRpcMsg *pMsg) { void vnodeProcessMsg(SRpcMsg *pMsg) {
......
...@@ -141,6 +141,9 @@ void vnodeProcessReadMsg(SRpcMsg *pMsg) { ...@@ -141,6 +141,9 @@ void vnodeProcessReadMsg(SRpcMsg *pMsg) {
static void vnodeInitReadMsgFp() { static void vnodeInitReadMsgFp() {
tsVread.msgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessQueryMsg; tsVread.msgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessQueryMsg;
tsVread.msgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessFetchMsg; tsVread.msgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessFetchMsg;
tsVread.msgFp[TSDB_MSG_TYPE_MQ_QUERY] = vnodeProcessTqQueryMsg;
tsVread.msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessConsumeMsg;
} }
static int32_t vnodeProcessReadStart(SVnode *pVnode, SReadMsg *pRead, int32_t qtype) { static int32_t vnodeProcessReadStart(SVnode *pVnode, SReadMsg *pRead, int32_t qtype) {
......
...@@ -217,6 +217,15 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SReadMsg *pRead) { ...@@ -217,6 +217,15 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SReadMsg *pRead) {
return 0; return 0;
} }
//mq related
int32_t vnodeProcessConsumeMsg(SVnode *pVnode, SReadMsg *pRead){
return 0;
}
int32_t vnodeProcessTqQueryMsg(SVnode *pVnode, SReadMsg *pRead) {
return 0;
}
//mq related end
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SReadMsg *pRead) { int32_t vnodeProcessFetchMsg(SVnode *pVnode, SReadMsg *pRead) {
#if 0 #if 0
void * pCont = pRead->pCont; void * pCont = pRead->pCont;
......
...@@ -179,6 +179,20 @@ static bool vnodeProcessWriteStart(SVnode *pVnode, SVnWriteMsg *pWrite, int32_t ...@@ -179,6 +179,20 @@ static bool vnodeProcessWriteStart(SVnode *pVnode, SVnWriteMsg *pWrite, int32_t
case TSDB_MSG_TYPE_UPDATE_TAG_VAL: case TSDB_MSG_TYPE_UPDATE_TAG_VAL:
pWrite->code = vnodeProcessUpdateTagValReq(pVnode, (void*)pHead->cont, NULL); pWrite->code = vnodeProcessUpdateTagValReq(pVnode, (void*)pHead->cont, NULL);
break; break;
//mq related
case TSDB_MSG_TYPE_MQ_CONNECT:
pWrite->code = vnodeProcessMqConnectReq(pVnode, (void*)pHead->cont, NULL);
break;
case TSDB_MSG_TYPE_MQ_DISCONNECT:
pWrite->code = vnodeProcessMqDisconnectReq(pVnode, (void*)pHead->cont, NULL);
break;
case TSDB_MSG_TYPE_MQ_ACK:
pWrite->code = vnodeProcessMqAckReq(pVnode, (void*)pHead->cont, NULL);
break;
case TSDB_MSG_TYPE_MQ_RESET:
pWrite->code = vnodeProcessMqResetReq(pVnode, (void*)pHead->cont, NULL);
break;
//mq related end
default: default:
pWrite->code = TSDB_CODE_VND_MSG_NOT_PROCESSED; pWrite->code = TSDB_CODE_VND_MSG_NOT_PROCESSED;
break; break;
...@@ -186,7 +200,7 @@ static bool vnodeProcessWriteStart(SVnode *pVnode, SVnWriteMsg *pWrite, int32_t ...@@ -186,7 +200,7 @@ static bool vnodeProcessWriteStart(SVnode *pVnode, SVnWriteMsg *pWrite, int32_t
if (pWrite->code < 0) return false; if (pWrite->code < 0) return false;
// update fync // update fsync
return (pWrite->code == 0 && msgType != TSDB_MSG_TYPE_SUBMIT); return (pWrite->code == 0 && msgType != TSDB_MSG_TYPE_SUBMIT);
} }
...@@ -233,4 +247,4 @@ void vnodeCleanupWrite() { ...@@ -233,4 +247,4 @@ void vnodeCleanupWrite() {
taos_queue vnodeAllocWriteQueue(SVnode *pVnode) { return tWriteWorkerAllocQueue(&tsVwrite.pool, pVnode); } taos_queue vnodeAllocWriteQueue(SVnode *pVnode) { return tWriteWorkerAllocQueue(&tsVwrite.pool, pVnode); }
void vnodeFreeWriteQueue(taos_queue pQueue) { tWriteWorkerFreeQueue(&tsVwrite.pool, pQueue); } void vnodeFreeWriteQueue(taos_queue pQueue) { tWriteWorkerFreeQueue(&tsVwrite.pool, pQueue); }
\ No newline at end of file
...@@ -77,3 +77,18 @@ int32_t vnodeProcessUpdateTagValReq(SVnode *pVnode, SUpdateTagValReq *pReq, SUpd ...@@ -77,3 +77,18 @@ int32_t vnodeProcessUpdateTagValReq(SVnode *pVnode, SUpdateTagValReq *pReq, SUpd
// TODO // TODO
return 0; return 0;
} }
//mq related
int32_t vnodeProcessMqConnectReq(SVnode* pVnode, SMqConnectReq *pReq, SMqConnectRsp *pRsp){
return 0;
}
int32_t vnodeProcessMqDisconnectReq(SVnode* pVnode, SMqConnectReq *pReq, SMqConnectRsp *pRsp) {
return 0;
}
int32_t vnodeProcessMqAckReq(SVnode* pVnode, SMqAckReq *pReq, SMqAckRsp *pRsp) {
return 0;
}
int32_t vnodeProcessMqResetReq(SVnode* pVnode, SMqResetReq *pReq, SMqResetRsp *pRsp) {
return 0;
}
//mq related end
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册