diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c index f007a82f8451e536fe991036fab8387e122bfaaf..9700201e3c42177ec41bebca6b1d0b914fec85f1 100644 --- a/src/common/src/tglobal.c +++ b/src/common/src/tglobal.c @@ -25,6 +25,7 @@ #include "tutil.h" #include "tlocale.h" #include "ttimezone.h" +#include "taosmsg.h" // cluster char tsFirst[TSDB_EP_LEN] = {0}; @@ -1461,6 +1462,24 @@ int32_t taosCheckGlobalCfg() { tsVersion = 10 * tsVersion; + taosMsgVer = 0; + for (int ver = 0, dotCount = 0, i = 0; i < TSDB_VERSION_LEN; i++) { + if (version[i] >= '0' && version[i] <= '9') { + ver = ver * 10 + (version[i] - '0'); + } else if (version[i] == '.') { + taosMsgVer |= ver & 0xFF; + taosMsgVer <<= 8; + + if (++dotCount >= 3) { + break; + } + + ver = 0; + } else if (version[i] == 0) { + break; + } + } + tsDnodeShellPort = tsServerPort + TSDB_PORT_DNODESHELL; // udp[6035-6039] tcp[6035] tsDnodeDnodePort = tsServerPort + TSDB_PORT_DNODEDNODE; // udp/tcp tsSyncPort = tsServerPort + TSDB_PORT_SYNC; diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 2df243eb3ebe942f9a489c167b164082a941d8a4..6aa7aabbfcb2b5708f157e6629b200b3c417b862 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -31,10 +31,14 @@ extern "C" { // message type #ifdef TAOS_MESSAGE_C +uint32_t taosMsgVer = 0; + #define TAOS_DEFINE_MESSAGE_TYPE( name, msg ) msg, msg "-rsp", char *taosMsg[] = { "null", #else +extern uint32_t taosMsgVer; + #define TAOS_DEFINE_MESSAGE_TYPE( name, msg ) name, name##_RSP, enum { TSDB_MESSAGE_NULL = 0, diff --git a/src/rpc/inc/rpcHead.h b/src/rpc/inc/rpcHead.h index 520edadc7dd072849720cad53c7f6f4ba605a06c..acea98105cff04b44235b0c6db3e9591a4495ca1 100644 --- a/src/rpc/inc/rpcHead.h +++ b/src/rpc/inc/rpcHead.h @@ -20,6 +20,8 @@ extern "C" { #endif +#define rpcIsReq(type) (type & 1U) + #define RPC_CONN_UDPS 0 #define RPC_CONN_UDPC 1 #define RPC_CONN_TCPS 2 @@ -58,6 +60,7 @@ typedef struct { char empty[1]; // reserved uint8_t msgType; // message type int32_t msgLen; // message length including the header iteslf + uint32_t msgVer; int32_t code; // code in response message uint8_t content[0]; // message body starts from here } SRpcHead; diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 1d394d879543cb45ce3d2591d9fb415aa3aa6f03..3a23c6021ab687908179ecba2a029ec5bf93e066 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -38,7 +38,6 @@ #define rpcContFromHead(msg) (msg + sizeof(SRpcHead)) #define rpcMsgLenFromCont(contLen) (contLen + sizeof(SRpcHead)) #define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHead)) -#define rpcIsReq(type) (type & 1U) typedef struct { int sessions; // number of sessions allowed @@ -1282,6 +1281,7 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { // set the message header pHead->version = 1; + pHead->msgVer = htonl(taosMsgVer); pHead->msgType = msgType; pHead->encrypt = 0; pConn->tranId++; diff --git a/src/rpc/src/rpcTcp.c b/src/rpc/src/rpcTcp.c index 178b96c423641ccc6be84c57ca71fe7b85fd7a33..0689a63aaacee7ae0889ba212a0fa301739a95df 100644 --- a/src/rpc/src/rpcTcp.c +++ b/src/rpc/src/rpcTcp.c @@ -18,6 +18,7 @@ #include "tutil.h" #include "taosdef.h" #include "taoserror.h" +#include "taosmsg.h" #include "rpcLog.h" #include "rpcHead.h" #include "rpcTcp.h" @@ -423,6 +424,11 @@ static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) { return -1; } + if (rpcIsReq(rpcHead.msgType) && htonl(rpcHead.msgVer) != taosMsgVer) { + tError("%s %p FD:%p, client version:%08x mismatched with server version:%08x", pThreadObj->label, pFdObj->thandle, pFdObj, htonl(rpcHead.msgVer), taosMsgVer); + return -1; + } + msgLen = (int32_t)htonl((uint32_t)rpcHead.msgLen); int32_t size = msgLen + tsRpcOverhead; buffer = malloc(size); diff --git a/src/rpc/src/rpcUdp.c b/src/rpc/src/rpcUdp.c index 22301fcecc83fb1f4c9a29132b0311e05b2382e6..d4b0a5f56a06c928eb893feda42b164eb80efb4a 100644 --- a/src/rpc/src/rpcUdp.c +++ b/src/rpc/src/rpcUdp.c @@ -20,6 +20,7 @@ #include "tutil.h" #include "taosdef.h" #include "taoserror.h" +#include "taosmsg.h" #include "rpcLog.h" #include "rpcUdp.h" #include "rpcHead.h" @@ -209,6 +210,12 @@ static void *taosRecvUdpData(void *param) { continue; } + SRpcHead *pRpcHead = (SRpcHead *)msg; + if (rpcIsReq(pRpcHead->msgType) && htonl(pRpcHead->msgVer) != taosMsgVer) { + tError("%s client version:%08x mismatched with server version:%08x", pConn->label, htonl(pRpcHead->msgVer), taosMsgVer); + continue; + } + int32_t size = dataLen + tsRpcOverhead; char *tmsg = malloc(size); if (NULL == tmsg) {