未验证 提交 2eb8cc45 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #2274 from taosdata/feature/rpcCancel

add rpcCancelRequest API
...@@ -78,12 +78,13 @@ void rpcClose(void *); ...@@ -78,12 +78,13 @@ void rpcClose(void *);
void *rpcMallocCont(int contLen); void *rpcMallocCont(int contLen);
void rpcFreeCont(void *pCont); void rpcFreeCont(void *pCont);
void *rpcReallocCont(void *ptr, int contLen); void *rpcReallocCont(void *ptr, int contLen);
void rpcSendRequest(void *thandle, const SRpcIpSet *pIpSet, const SRpcMsg *pMsg); void *rpcSendRequest(void *thandle, const SRpcIpSet *pIpSet, const SRpcMsg *pMsg);
void rpcSendResponse(const SRpcMsg *pMsg); void rpcSendResponse(const SRpcMsg *pMsg);
void rpcSendRedirectRsp(void *pConn, const SRpcIpSet *pIpSet); void rpcSendRedirectRsp(void *pConn, const SRpcIpSet *pIpSet);
int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
void rpcSendRecv(void *shandle, SRpcIpSet *pIpSet, const SRpcMsg *pReq, SRpcMsg *pRsp); void rpcSendRecv(void *shandle, SRpcIpSet *pIpSet, const SRpcMsg *pReq, SRpcMsg *pRsp);
int rpcReportProgress(void *pConn, char *pCont, int contLen); int rpcReportProgress(void *pConn, char *pCont, int contLen);
void rpcCanelRequest(void *pContext);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -73,6 +73,7 @@ typedef struct { ...@@ -73,6 +73,7 @@ typedef struct {
SRpcInfo *pRpc; // associated SRpcInfo SRpcInfo *pRpc; // associated SRpcInfo
SRpcIpSet ipSet; // ip list provided by app SRpcIpSet ipSet; // ip list provided by app
void *ahandle; // handle provided by app void *ahandle; // handle provided by app
struct SRpcConn *pConn; // pConn allocated
char msgType; // message type char msgType; // message type
uint8_t *pCont; // content provided by app uint8_t *pCont; // content provided by app
int32_t contLen; // content length int32_t contLen; // content length
...@@ -339,7 +340,7 @@ void *rpcReallocCont(void *ptr, int contLen) { ...@@ -339,7 +340,7 @@ void *rpcReallocCont(void *ptr, int contLen) {
return start + sizeof(SRpcReqContext) + sizeof(SRpcHead); return start + sizeof(SRpcReqContext) + sizeof(SRpcHead);
} }
void rpcSendRequest(void *shandle, const SRpcIpSet *pIpSet, const SRpcMsg *pMsg) { void *rpcSendRequest(void *shandle, const SRpcIpSet *pIpSet, const SRpcMsg *pMsg) {
SRpcInfo *pRpc = (SRpcInfo *)shandle; SRpcInfo *pRpc = (SRpcInfo *)shandle;
SRpcReqContext *pContext; SRpcReqContext *pContext;
...@@ -367,7 +368,7 @@ void rpcSendRequest(void *shandle, const SRpcIpSet *pIpSet, const SRpcMsg *pMsg) ...@@ -367,7 +368,7 @@ void rpcSendRequest(void *shandle, const SRpcIpSet *pIpSet, const SRpcMsg *pMsg)
rpcSendReqToServer(pRpc, pContext); rpcSendReqToServer(pRpc, pContext);
return; return pContext;
} }
void rpcSendResponse(const SRpcMsg *pRsp) { void rpcSendResponse(const SRpcMsg *pRsp) {
...@@ -501,6 +502,19 @@ int rpcReportProgress(void *handle, char *pCont, int contLen) { ...@@ -501,6 +502,19 @@ int rpcReportProgress(void *handle, char *pCont, int contLen) {
return -1; return -1;
} }
/* todo: cancel process may have race condition, pContext may have been released
just before app calls the rpcCancelRequest */
void rpcCancelRequest(void *handle) {
SRpcReqContext *pContext = handle;
if (pContext->pConn) {
tTrace("%s, app trys to cancel request", pContext->pConn->info);
rpcCloseConn(pContext->pConn);
pContext->pConn = NULL;
rpcFreeCont(pContext->pCont);
}
}
static void rpcFreeMsg(void *msg) { static void rpcFreeMsg(void *msg) {
if ( msg ) { if ( msg ) {
char *temp = (char *)msg - sizeof(SRpcReqContext); char *temp = (char *)msg - sizeof(SRpcReqContext);
...@@ -874,6 +888,7 @@ static void rpcReportBrokenLinkToServer(SRpcConn *pConn) { ...@@ -874,6 +888,7 @@ static void rpcReportBrokenLinkToServer(SRpcConn *pConn) {
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
rpcMsg.pCont = pConn->pReqMsg; // pReqMsg is re-used to store the APP context from server rpcMsg.pCont = pConn->pReqMsg; // pReqMsg is re-used to store the APP context from server
rpcMsg.contLen = pConn->reqMsgLen; // reqMsgLen is re-used to store the APP context length rpcMsg.contLen = pConn->reqMsgLen; // reqMsgLen is re-used to store the APP context length
rpcMsg.ahandle = pConn->ahandle;
rpcMsg.handle = pConn; rpcMsg.handle = pConn;
rpcMsg.msgType = pConn->inType; rpcMsg.msgType = pConn->inType;
rpcMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; rpcMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
...@@ -942,6 +957,7 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { ...@@ -942,6 +957,7 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) { static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) {
SRpcInfo *pRpc = pContext->pRpc; SRpcInfo *pRpc = pContext->pRpc;
pContext->pConn = NULL;
if (pContext->pRsp) { if (pContext->pRsp) {
// for synchronous API // for synchronous API
memcpy(pContext->pSet, &pContext->ipSet, sizeof(SRpcIpSet)); memcpy(pContext->pSet, &pContext->ipSet, sizeof(SRpcIpSet));
...@@ -1110,6 +1126,7 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { ...@@ -1110,6 +1126,7 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
return; return;
} }
pContext->pConn = pConn;
pConn->ahandle = pContext->ahandle; pConn->ahandle = pContext->ahandle;
rpcLockConn(pConn); rpcLockConn(pConn);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册