diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index ffac17cd70ce4a5e97df1f20e9ff897d85e00700..8ce625f6fda441f5e362df0646e4861a0c73989f 100755 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -116,6 +116,7 @@ typedef struct _RpcConn { int reqMsgLen; // request message length SRpcInfo *pRpc; // the associated SRpcInfo int connType; // connection type + int64_t lockedBy; // lock for connection SRpcReqContext *pContext; // request context } SRpcConn; @@ -191,6 +192,8 @@ static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen); static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead); static int rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen); static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen); +static void rpcLockConn(SRpcConn *pConn); +static void rpcUnlockConn(SRpcConn *pConn); void *rpcOpen(SRpcInit *pInit) { SRpcInfo *pRpc; @@ -361,11 +364,11 @@ void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) { contLen = rpcCompressRpcMsg(pCont, contLen); msgLen = rpcMsgLenFromCont(contLen); - pthread_mutex_lock(&pRpc->mutex); + rpcLockConn(pConn); if ( pConn->inType == 0 || pConn->user[0] == 0 ) { tTrace("%s %p, connection is already released, rsp wont be sent", pRpc->label, pConn); - pthread_mutex_lock(&pRpc->mutex); + rpcUnlockConn(pConn); return; } @@ -390,7 +393,7 @@ void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) { pConn->rspMsgLen = msgLen; if (pHead->content[0] == TSDB_CODE_ACTION_IN_PROGRESS) pConn->inTranId--; - pthread_mutex_unlock(&pRpc->mutex); + rpcUnlockConn(pConn); taosTmrStopA(&pConn->pTimer); rpcSendMsgToPeer(pConn, msg, msgLen); @@ -456,7 +459,7 @@ static void rpcCloseConn(void *thandle) { SRpcConn *pConn = (SRpcConn *)thandle; SRpcInfo *pRpc = pConn->pRpc; - pthread_mutex_lock(&pRpc->mutex); + rpcLockConn(pConn); if (pConn->user[0]) { pConn->user[0] = 0; @@ -485,7 +488,7 @@ static void rpcCloseConn(void *thandle) { tTrace("%s %p, rpc connection is closed", pRpc->label, pConn); } - pthread_mutex_unlock(&pRpc->mutex); + rpcUnlockConn(pConn); } static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc) { @@ -699,14 +702,15 @@ static SRpcConn *rpcProcessHead(SRpcInfo *pRpc, SRecvInfo *pRecv) { pConn = rpcGetConnObj(pRpc, sid, pHead->user, hashstr); if (pConn == NULL) return NULL; + rpcLockConn(pConn); sid = pConn->sid; pConn->chandle = pRecv->chandle; if (pConn->peerIp != pRecv->ip) { - pConn->peerIp = pRecv->ip; - char ipstr[20] = {0}; - tinet_ntoa(ipstr, pRecv->ip); - strcpy(pConn->peerIpstr, ipstr); + pConn->peerIp = pRecv->ip; + char ipstr[20] = {0}; + tinet_ntoa(ipstr, pRecv->ip); + strcpy(pConn->peerIpstr, ipstr); } if (pRecv->port) pConn->peerPort = pRecv->port; @@ -714,18 +718,20 @@ static SRpcConn *rpcProcessHead(SRpcInfo *pRpc, SRecvInfo *pRecv) { if (pHead->uid) pConn->peerUid = pHead->uid; terrno = rpcCheckAuthentication(pConn, (char *)pHead, pRecv->msgLen); - if (terrno != 0) return pConn; + if (terrno == 0) { + if (pHead->msgType != TSDB_MSG_TYPE_REG && pHead->encrypt) { + // decrypt here + } - if (pHead->msgType != TSDB_MSG_TYPE_REG && pHead->encrypt) { - // decrypt here + if ( rpcIsReq(pHead->msgType) ) { + terrno = rpcProcessReqHead(pConn, pHead); + pConn->connType = pRecv->connType; + } else { + terrno = rpcProcessRspHead(pConn, pHead); + } } - if ( rpcIsReq(pHead->msgType) ) { - terrno = rpcProcessReqHead(pConn, pHead); - pConn->connType = pRecv->connType; - } else { - terrno = rpcProcessRspHead(pConn, pHead); - } + rpcUnlockConn(pConn); return pConn; } @@ -762,9 +768,7 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { return NULL; } - pthread_mutex_lock(&pRpc->mutex); pConn = rpcProcessHead(pRpc, pRecv); - pthread_mutex_unlock(&pRpc->mutex); if (pHead->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) { tTrace("%s %p, %s received from 0x%x:%hu, parse code:%x len:%d source:0x%08x dest:0x%08x tranId:%d", @@ -895,7 +899,7 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { return; } - pthread_mutex_lock(&pRpc->mutex); + rpcLockConn(pConn); // set the message header pHead->version = 1; @@ -918,7 +922,7 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { pConn->reqMsgLen = msgLen; pConn->pContext = pContext; - pthread_mutex_unlock(&pRpc->mutex); + rpcUnlockConn(pConn); rpcSendMsgToPeer(pConn, msg, msgLen); taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer); @@ -975,7 +979,7 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) { SRpcInfo *pRpc = pConn->pRpc; int reportDisc = 0; - pthread_mutex_lock(&pRpc->mutex); + rpcLockConn(pConn); if (pConn->outType && pConn->user[0]) { tTrace("%s %p, expected %s is not received", pRpc->label, pConn, taosMsg[(int)pConn->outType + 1]); @@ -997,7 +1001,7 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) { tTrace("%s %p, retry timer not processed", pRpc->label, pConn); } - pthread_mutex_unlock(&pRpc->mutex); + rpcUnlockConn(pConn); if (reportDisc && pConn->pContext) { pConn->pContext->code = TSDB_CODE_NETWORK_UNAVAIL; @@ -1022,7 +1026,7 @@ static void rpcProcessProgressTimer(void *param, void *tmrId) { SRpcConn *pConn = (SRpcConn *)param; SRpcInfo *pRpc = pConn->pRpc; - pthread_mutex_lock(&pRpc->mutex); + rpcLockConn(pConn); if (pConn->inType && pConn->user[0]) { tTrace("%s %p, progress timer expired, send progress", pRpc->label, pConn); @@ -1032,7 +1036,7 @@ static void rpcProcessProgressTimer(void *param, void *tmrId) { tTrace("%s %p, progress timer:%p not processed", pRpc->label, pConn, tmrId); } - pthread_mutex_unlock(&pRpc->mutex); + rpcUnlockConn(pConn); } static void rpcFreeOutMsg(void *msg) { @@ -1195,4 +1199,20 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) { return code; } +static void rpcLockConn(SRpcConn *pConn) { + int64_t tid = taosGetPthreadId(); + int i = 0; + while (atomic_val_compare_exchange_64(&(pConn->lockedBy), 0, tid) != 0) { + if (++i % 1000 == 0) { + sched_yield(); + } + } +} + +static void rpcUnlockConn(SRpcConn *pConn) { + int64_t tid = taosGetPthreadId(); + if (atomic_val_compare_exchange_64(&(pConn->lockedBy), tid, 0) != tid) { + assert(false); + } +}