diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 8dd5ef69f7a6eb9811779debf99d77240567becd..a3d3b035e2c4564acd34a71fe1c1490ddc25ec75 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -191,7 +191,8 @@ int tscSendMsgToServer(SSqlObj *pSql) { .msgType = pSql->cmd.msgType, .pCont = pMsg, .contLen = pSql->cmd.payloadLen, - .handle = pSql, + .ahandle = pSql, + .handle = &pSql->pRpcCtx, .code = 0 }; @@ -199,12 +200,12 @@ int tscSendMsgToServer(SSqlObj *pSql) { // Otherwise, the pSql object may have been released already during the response function, which is // processMsgFromServer function. In the meanwhile, the assignment of the rpc context to sql object will absolutely // cause crash. - /*pSql->pRpcCtx = */rpcSendRequest(pObj->pDnodeConn, &pSql->ipList, &rpcMsg); + rpcSendRequest(pObj->pDnodeConn, &pSql->ipList, &rpcMsg); return TSDB_CODE_SUCCESS; } void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) { - SSqlObj *pSql = (SSqlObj *)rpcMsg->handle; + SSqlObj *pSql = (SSqlObj *)rpcMsg->ahandle; if (pSql == NULL || pSql->signature != pSql) { tscError("%p sql is already released", pSql); return; diff --git a/src/inc/trpc.h b/src/inc/trpc.h index 30e0f9eee1889e8b832812fcc3f17f81c157724e..d1adfb7494aafb89912c508c65ab9a774fae4f09 100644 --- a/src/inc/trpc.h +++ b/src/inc/trpc.h @@ -47,8 +47,8 @@ typedef struct SRpcMsg { void *pCont; int contLen; int32_t code; - void *handle; - void *ahandle; //app handle set by client, for debug purpose + void *handle; // rpc handle returned to app + void *ahandle; // app handle set by client } SRpcMsg; typedef struct SRpcInit { @@ -78,11 +78,11 @@ void rpcClose(void *); void *rpcMallocCont(int contLen); void rpcFreeCont(void *pCont); void *rpcReallocCont(void *ptr, int contLen); -void *rpcSendRequest(void *thandle, const SRpcIpSet *pIpSet, const SRpcMsg *pMsg); +void rpcSendRequest(void *thandle, const SRpcIpSet *pIpSet, SRpcMsg *pMsg); void rpcSendResponse(const SRpcMsg *pMsg); void rpcSendRedirectRsp(void *pConn, const SRpcIpSet *pIpSet); int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); -void rpcSendRecv(void *shandle, SRpcIpSet *pIpSet, const SRpcMsg *pReq, SRpcMsg *pRsp); +void rpcSendRecv(void *shandle, SRpcIpSet *pIpSet, SRpcMsg *pReq, SRpcMsg *pRsp); int rpcReportProgress(void *pConn, char *pCont, int contLen); void rpcCancelRequest(void *pContext); diff --git a/src/mnode/src/mnodeDnode.c b/src/mnode/src/mnodeDnode.c index dbe9185d8e97ba33a07e1f98d865bd156ea4dd53..7e7c12cf088c823137350eb476af79df128139e2 100644 --- a/src/mnode/src/mnodeDnode.c +++ b/src/mnode/src/mnodeDnode.c @@ -264,7 +264,7 @@ static int32_t mnodeProcessCfgDnodeMsg(SMnodeMsg *pMsg) { strcpy(pMdCfgDnode->config, pCmCfgDnode->config); SRpcMsg rpcMdCfgDnodeMsg = { - .handle = 0, + .ahandle = 0, .code = 0, .msgType = TSDB_MSG_TYPE_MD_CONFIG_DNODE, .pCont = pMdCfgDnode, diff --git a/src/mnode/src/mnodeTable.c b/src/mnode/src/mnodeTable.c index c7f8791d6de658fc21187de849945182f44840be..47add8f7a3184d517ce3b77317324ebe916b7185 100644 --- a/src/mnode/src/mnodeTable.c +++ b/src/mnode/src/mnodeTable.c @@ -1574,7 +1574,7 @@ static int32_t mnodeDoCreateChildTableCb(SMnodeMsg *pMsg, int32_t code) { SRpcIpSet ipSet = mnodeGetIpSetFromVgroup(pMsg->pVgroup); SRpcMsg rpcMsg = { - .handle = pMsg, + .ahandle = pMsg, .pCont = pMDCreate, .contLen = htonl(pMDCreate->contLen), .code = 0, @@ -1751,7 +1751,7 @@ static int32_t mnodeProcessDropChildTableMsg(SMnodeMsg *pMsg) { mInfo("app:%p:%p, table:%s, send drop ctable msg", pMsg->rpcMsg.ahandle, pMsg, pDrop->tableId); SRpcMsg rpcMsg = { - .handle = pMsg, + .ahandle = pMsg, .pCont = pDrop, .contLen = sizeof(SMDDropTableMsg), .code = 0, @@ -1799,7 +1799,7 @@ static int32_t mnodeAlterNormalTableColumnCb(SMnodeMsg *pMsg, int32_t code) { SRpcIpSet ipSet = mnodeGetIpSetFromVgroup(pMsg->pVgroup); SRpcMsg rpcMsg = { - .handle = pMsg, + .ahandle = pMsg, .pCont = pMDCreate, .contLen = htonl(pMDCreate->contLen), .code = 0, @@ -2144,9 +2144,9 @@ static int32_t mnodeProcessTableCfgMsg(SMnodeMsg *pMsg) { // handle drop child response static void mnodeProcessDropChildTableRsp(SRpcMsg *rpcMsg) { - if (rpcMsg->handle == NULL) return; + if (rpcMsg->ahandle == NULL) return; - SMnodeMsg *mnodeMsg = rpcMsg->handle; + SMnodeMsg *mnodeMsg = rpcMsg->ahandle; mnodeMsg->received++; SChildTableObj *pTable = (SChildTableObj *)mnodeMsg->pTable; @@ -2195,9 +2195,9 @@ static void mnodeProcessDropChildTableRsp(SRpcMsg *rpcMsg) { * if failed, drop the table cached */ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) { - if (rpcMsg->handle == NULL) return; + if (rpcMsg->ahandle == NULL) return; - SMnodeMsg *mnodeMsg = rpcMsg->handle; + SMnodeMsg *mnodeMsg = rpcMsg->ahandle; mnodeMsg->received++; SChildTableObj *pTable = (SChildTableObj *)mnodeMsg->pTable; @@ -2238,9 +2238,9 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) { } static void mnodeProcessAlterTableRsp(SRpcMsg *rpcMsg) { - if (rpcMsg->handle == NULL) return; + if (rpcMsg->ahandle == NULL) return; - SMnodeMsg *mnodeMsg = rpcMsg->handle; + SMnodeMsg *mnodeMsg = rpcMsg->ahandle; mnodeMsg->received++; SChildTableObj *pTable = (SChildTableObj *)mnodeMsg->pTable; diff --git a/src/mnode/src/mnodeVgroup.c b/src/mnode/src/mnodeVgroup.c index b314bfc8d03689842e33bdbbe9f5197a3bd9f6ca..3855de41014deb0fe33ba09913cf13a62212f9ec 100644 --- a/src/mnode/src/mnodeVgroup.c +++ b/src/mnode/src/mnodeVgroup.c @@ -652,7 +652,7 @@ SRpcIpSet mnodeGetIpSetFromIp(char *ep) { void mnodeSendCreateVnodeMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *ahandle) { SMDCreateVnodeMsg *pCreate = mnodeBuildCreateVnodeMsg(pVgroup); SRpcMsg rpcMsg = { - .handle = ahandle, + .ahandle = ahandle, .pCont = pCreate, .contLen = pCreate ? sizeof(SMDCreateVnodeMsg) : 0, .code = 0, @@ -673,9 +673,9 @@ void mnodeSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle) { } static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg) { - if (rpcMsg->handle == NULL) return; + if (rpcMsg->ahandle == NULL) return; - SMnodeMsg *mnodeMsg = rpcMsg->handle; + SMnodeMsg *mnodeMsg = rpcMsg->ahandle; mnodeMsg->received++; if (rpcMsg->code == TSDB_CODE_SUCCESS) { mnodeMsg->successed++; @@ -686,7 +686,7 @@ static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg) { SVgObj *pVgroup = mnodeMsg->pVgroup; mDebug("vgId:%d, create vnode rsp received, result:%s received:%d successed:%d expected:%d, thandle:%p ahandle:%p", pVgroup->vgId, tstrerror(rpcMsg->code), mnodeMsg->received, mnodeMsg->successed, mnodeMsg->expected, - mnodeMsg->rpcMsg.handle, rpcMsg->handle); + mnodeMsg->rpcMsg.handle, rpcMsg->ahandle); if (mnodeMsg->received != mnodeMsg->expected) return; @@ -718,7 +718,7 @@ static SMDDropVnodeMsg *mnodeBuildDropVnodeMsg(int32_t vgId) { void mnodeSendDropVnodeMsg(int32_t vgId, SRpcIpSet *ipSet, void *ahandle) { SMDDropVnodeMsg *pDrop = mnodeBuildDropVnodeMsg(vgId); SRpcMsg rpcMsg = { - .handle = ahandle, + .ahandle = ahandle, .pCont = pDrop, .contLen = pDrop ? sizeof(SMDDropVnodeMsg) : 0, .code = 0, @@ -737,10 +737,10 @@ static void mnodeSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle) { } static void mnodeProcessDropVnodeRsp(SRpcMsg *rpcMsg) { - mDebug("drop vnode rsp is received, handle:%p", rpcMsg->handle); - if (rpcMsg->handle == NULL) return; + mDebug("drop vnode rsp is received, handle:%p", rpcMsg->ahandle); + if (rpcMsg->ahandle == NULL) return; - SMnodeMsg *mnodeMsg = rpcMsg->handle; + SMnodeMsg *mnodeMsg = rpcMsg->ahandle; mnodeMsg->received++; if (rpcMsg->code == TSDB_CODE_SUCCESS) { mnodeMsg->code = rpcMsg->code; @@ -750,7 +750,7 @@ static void mnodeProcessDropVnodeRsp(SRpcMsg *rpcMsg) { SVgObj *pVgroup = mnodeMsg->pVgroup; mDebug("vgId:%d, drop vnode rsp received, result:%s received:%d successed:%d expected:%d, thandle:%p ahandle:%p", pVgroup->vgId, tstrerror(rpcMsg->code), mnodeMsg->received, mnodeMsg->successed, mnodeMsg->expected, - mnodeMsg->rpcMsg.handle, rpcMsg->handle); + mnodeMsg->rpcMsg.handle, rpcMsg->ahandle); if (mnodeMsg->received != mnodeMsg->expected) return; diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index a59024f8db6fc541e1af8a85bd26f6305d024f40..3a9397502251e308069ecb65699fb5383c550432 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -354,13 +354,13 @@ void *rpcReallocCont(void *ptr, int contLen) { return start + sizeof(SRpcReqContext) + sizeof(SRpcHead); } -void *rpcSendRequest(void *shandle, const SRpcIpSet *pIpSet, const SRpcMsg *pMsg) { +void rpcSendRequest(void *shandle, const SRpcIpSet *pIpSet, SRpcMsg *pMsg) { SRpcInfo *pRpc = (SRpcInfo *)shandle; SRpcReqContext *pContext; int contLen = rpcCompressRpcMsg(pMsg->pCont, pMsg->contLen); pContext = (SRpcReqContext *) (pMsg->pCont-sizeof(SRpcHead)-sizeof(SRpcReqContext)); - pContext->ahandle = pMsg->handle; + pContext->ahandle = pMsg->ahandle; pContext->pRpc = (SRpcInfo *)shandle; pContext->ipSet = *pIpSet; pContext->contLen = contLen; @@ -380,9 +380,12 @@ void *rpcSendRequest(void *shandle, const SRpcIpSet *pIpSet, const SRpcMsg *pMsg || type == TSDB_MSG_TYPE_CM_SHOW ) pContext->connType = RPC_CONN_TCPC; + // set the handle to pContext, so app can cancel the request + if (pMsg->handle) *((void **)pMsg->handle) = pContext; + rpcSendReqToServer(pRpc, pContext); - return pContext; + return; } void rpcSendResponse(const SRpcMsg *pRsp) { @@ -483,7 +486,7 @@ int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) { return 0; } -void rpcSendRecv(void *shandle, SRpcIpSet *pIpSet, const SRpcMsg *pMsg, SRpcMsg *pRsp) { +void rpcSendRecv(void *shandle, SRpcIpSet *pIpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { SRpcReqContext *pContext; pContext = (SRpcReqContext *) (pMsg->pCont-sizeof(SRpcHead)-sizeof(SRpcReqContext)); @@ -1051,7 +1054,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { } else { // it's a response SRpcReqContext *pContext = pConn->pContext; - rpcMsg.handle = pContext->ahandle; + rpcMsg.handle = pContext; pConn->pContext = NULL; // for UDP, port may be changed by server, the port in ipSet shall be used for cache @@ -1255,7 +1258,7 @@ static void rpcProcessConnError(void *param, void *id) { if (pContext->numOfTry >= pContext->ipSet.numOfIps) { rpcMsg.msgType = pContext->msgType+1; - rpcMsg.handle = pContext->ahandle; + rpcMsg.ahandle = pContext->ahandle; rpcMsg.code = pContext->code; rpcMsg.pCont = NULL; rpcMsg.contLen = 0; diff --git a/src/rpc/test/rclient.c b/src/rpc/test/rclient.c index 1bc64b0825fa2afa8752251b015b6819ecdb52b1..e51b54e299e517914adc2cb879b701ac2a0f71f2 100644 --- a/src/rpc/test/rclient.c +++ b/src/rpc/test/rclient.c @@ -33,7 +33,7 @@ typedef struct { } SInfo; static void processResponse(SRpcMsg *pMsg, SRpcIpSet *pIpSet) { - SInfo *pInfo = (SInfo *)pMsg->handle; + SInfo *pInfo = (SInfo *)pMsg->ahandle; tDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen, pMsg->code); if (pIpSet) pInfo->ipSet = *pIpSet; @@ -46,7 +46,7 @@ static int tcount = 0; static void *sendRequest(void *param) { SInfo *pInfo = (SInfo *)param; - SRpcMsg rpcMsg; + SRpcMsg rpcMsg = {0}; tDebug("thread:%d, start to send request", pInfo->index); @@ -54,7 +54,7 @@ static void *sendRequest(void *param) { pInfo->num++; rpcMsg.pCont = rpcMallocCont(pInfo->msgSize); rpcMsg.contLen = pInfo->msgSize; - rpcMsg.handle = pInfo; + rpcMsg.ahandle = pInfo; rpcMsg.msgType = 1; tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num); rpcSendRequest(pInfo->pRpc, &pInfo->ipSet, &rpcMsg);