diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 994dace1e3f5c129970efefbceee806d0e803c82..a9f64e07643b774f183a8985f1c551aa559caadc 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -214,7 +214,7 @@ int tscSendMsgToServer(SSqlObj *pSql) { STscObj* pObj = pSql->pTscObj; SSqlCmd* pCmd = &pSql->cmd; - char *pMsg = rpcMallocCont(pCmd->payloadLen); + char *pMsg = rpcMallocCont(sizeof(SMsgVersion) + pCmd->payloadLen); if (NULL == pMsg) { tscError("%p msg:%s malloc failed", pSql, taosMsg[pSql->cmd.msgType]); return TSDB_CODE_TSC_OUT_OF_MEMORY; @@ -225,12 +225,13 @@ int tscSendMsgToServer(SSqlObj *pSql) { tscDumpMgmtEpSet(pSql); } - memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen); + tstrncpy(pMsg, version, sizeof(SMsgVersion)); + memcpy(pMsg + sizeof(SMsgVersion), pSql->cmd.payload, pSql->cmd.payloadLen); SRpcMsg rpcMsg = { .msgType = pSql->cmd.msgType, .pCont = pMsg, - .contLen = pSql->cmd.payloadLen, + .contLen = pSql->cmd.payloadLen + sizeof(SMsgVersion), .ahandle = (void*)pSql->self, .handle = NULL, .code = 0 diff --git a/src/dnode/src/dnodeMRead.c b/src/dnode/src/dnodeMRead.c index 0fc6400d99f8dbcbc57caa2ed1265e36e18f1a93..b32326f4c2362e7a30489277b62e3e33738c2f3a 100644 --- a/src/dnode/src/dnodeMRead.c +++ b/src/dnode/src/dnodeMRead.c @@ -124,8 +124,6 @@ void dnodeDispatchToMReadQueue(SRpcMsg *pMsg) { SMnodeMsg *pRead = mnodeCreateMsg(pMsg); taosWriteQitem(tsMReadQueue, TAOS_QTYPE_RPC, pRead); } - - rpcFreeCont(pMsg->pCont); } static void dnodeFreeMReadMsg(SMnodeMsg *pRead) { diff --git a/src/dnode/src/dnodeMWrite.c b/src/dnode/src/dnodeMWrite.c index 414b66653d123b785643cfdc96b429edbe8d58ad..9007b54d47172da500f5af373daebed9ae3e66b8 100644 --- a/src/dnode/src/dnodeMWrite.c +++ b/src/dnode/src/dnodeMWrite.c @@ -125,8 +125,6 @@ void dnodeDispatchToMWriteQueue(SRpcMsg *pMsg) { taosMsg[pWrite->rpcMsg.msgType], tsMWriteQueue); taosWriteQitem(tsMWriteQueue, TAOS_QTYPE_RPC, pWrite); } - - rpcFreeCont(pMsg->pCont); } static void dnodeFreeMWriteMsg(SMnodeMsg *pWrite) { diff --git a/src/dnode/src/dnodeShell.c b/src/dnode/src/dnodeShell.c index 79cc70005b6e83bc1c455abb7cd3709ae29a8643..221e13d109576f9c4a46ae689ebddaf96e11d28d 100644 --- a/src/dnode/src/dnodeShell.c +++ b/src/dnode/src/dnodeShell.c @@ -127,7 +127,20 @@ static void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { } else {} if ( dnodeProcessShellMsgFp[pMsg->msgType] ) { + SMsgVersion *pMsgVersion = pMsg->pCont; + if (taosCheckVersion(pMsgVersion->clientVersion, version, 3) != TSDB_CODE_SUCCESS) { + rpcMsg.code = TSDB_CODE_TSC_INVALID_VERSION; + rpcSendResponse(&rpcMsg); + rpcFreeCont(pMsg->pCont); + return; // todo change the error code + } + pMsg->pCont += sizeof(*pMsgVersion); + pMsg->contLen -= sizeof(*pMsgVersion); + (*dnodeProcessShellMsgFp[pMsg->msgType])(pMsg); + + //pMsg->contLen += sizeof(*pMsgVersion); + rpcFreeCont(pMsg->pCont - sizeof(*pMsgVersion)); } else { dError("RPC %p, shell msg:%s is not processed", pMsg->handle, taosMsg[pMsg->msgType]); rpcMsg.code = TSDB_CODE_DND_MSG_NOT_PROCESSED; @@ -231,4 +244,4 @@ SStatisInfo dnodeGetStatisInfo() { } return info; -} \ No newline at end of file +} diff --git a/src/dnode/src/dnodeVRead.c b/src/dnode/src/dnodeVRead.c index 3f31e4937052d7a505031b4c8083ae1acd00bf6c..2995116ef56bfa9b6f70642944b13796ff4adc92 100644 --- a/src/dnode/src/dnodeVRead.c +++ b/src/dnode/src/dnodeVRead.c @@ -77,8 +77,6 @@ void dnodeDispatchToVReadQueue(SRpcMsg *pMsg) { SRpcMsg rpcRsp = {.handle = pMsg->handle, .code = TSDB_CODE_VND_INVALID_VGROUP_ID}; rpcSendResponse(&rpcRsp); } - - rpcFreeCont(pMsg->pCont); } void *dnodeAllocVQueryQueue(void *pVnode) { diff --git a/src/dnode/src/dnodeVWrite.c b/src/dnode/src/dnodeVWrite.c index a5ae8ac83063c599ad4c215bd0a7fa4468810580..959789a6d24c507782527eea8b305d36445f55a9 100644 --- a/src/dnode/src/dnodeVWrite.c +++ b/src/dnode/src/dnodeVWrite.c @@ -102,7 +102,6 @@ void dnodeDispatchToVWriteQueue(SRpcMsg *pRpcMsg) { } vnodeRelease(pVnode); - rpcFreeCont(pRpcMsg->pCont); } void *dnodeAllocVWriteQueue(void *pVnode) { diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 27d857ce1401ff74ecff72e92354f1ee19d568eb..a429ba3271e25b0742a1d560f293ed909faee536 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -198,6 +198,10 @@ typedef struct { int32_t numOfVnodes; } SMsgDesc; +typedef struct SMsgVersion { + char clientVersion[TSDB_VERSION_LEN]; +} SMsgVersion; + typedef struct SMsgHead { int32_t contLen; int32_t vgId;