From 029949978535aa81d545455d82573d8bc8505383 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Tue, 18 Feb 2020 22:14:32 +0800 Subject: [PATCH] put all the protections all multi-threads --- src/rpc/src/trpc.c | 322 ++++++++++++++++++++++++++------------------- 1 file changed, 189 insertions(+), 133 deletions(-) diff --git a/src/rpc/src/trpc.c b/src/rpc/src/trpc.c index ffa308c9b1..dd881f0692 100755 --- a/src/rpc/src/trpc.c +++ b/src/rpc/src/trpc.c @@ -80,53 +80,51 @@ typedef struct _RpcConn { int sid; // session ID uint32_t ownId; // own link ID uint32_t peerId; // peer link ID - char meterId[TSDB_UNI_LEN]; - char spi; - char encrypt; - uint8_t secret[TSDB_KEY_LEN]; - uint8_t ckey[TSDB_KEY_LEN]; + char meterId[TSDB_UNI_LEN]; // user ID for the link + char spi; // security parameter index + char encrypt; // encryption, 0:1 + uint8_t secret[TSDB_KEY_LEN]; // secret for the link + uint8_t ckey[TSDB_KEY_LEN]; // ciphering key uint16_t localPort; // for UDP only - uint32_t peerUid; + uint32_t peerUid; // peer UID uint32_t peerIp; // peer IP uint16_t peerPort; // peer port char peerIpstr[20]; // peer IP string uint16_t tranId; // outgoing transcation ID, for build message uint16_t outTranId; // outgoing transcation ID - uint16_t inTranId; - uint8_t outType; - char inType; + uint16_t inTranId; // transcation ID for incoming msg + uint8_t outType; // message type for outgoing request + char inType; // message type for incoming request void *chandle; // handle passed by TCP/UDP connection layer void *ahandle; // handle provided by upper app layter - int retry; + int retry; // number of retry for sending request int tretry; // total retry - void *pTimer; - void *pIdleTimer; - char *pRspMsg; // including header - int rspMsgLen; - char *pReqMsg; // including header - int reqMsgLen; - SRpcInfo *pRpc; - SRpcReqContext *pContext; + void *pTimer; // retry timer to monitor the response + void *pIdleTimer; // idle timer + char *pRspMsg; // response message including header + int rspMsgLen; // response messag length + char *pReqMsg; // request message including header + int reqMsgLen; // request message length + SRpcInfo *pRpc; // the associated SRpcInfo + SRpcReqContext *pContext; // request context } SRpcConn; typedef struct { - char version : 4; - char comp : 4; - char tcp : 2; - char spi : 3; - char encrypt : 3; - uint16_t tranId; - uint32_t uid; // for unique ID inside a client - uint32_t sourceId; - - uint32_t destId; - uint32_t destIp; - char meterId[TSDB_UNI_LEN]; - uint16_t port; // for UDP only - char empty[1]; - uint8_t msgType; - int32_t msgLen; - uint8_t content[0]; + char version:4; // RPC version + char comp:4; // compression algorithm, 0:no compression 1:lz4 + char tcp:2; // tcp flag + char spi:3; // security parameter index + char encrypt:3; // encrypt algorithm, 0: no encryption + uint16_t tranId; // transcation ID + uint32_t uid; // for unique ID inside a client + uint32_t sourceId; // source ID, an index for connection list + uint32_t destId; // destination ID, an index for connection list + char meterId[TSDB_UNI_LEN]; + uint16_t port; // for UDP only, port may be changed + char empty[1]; // reserved + uint8_t msgType; // message type + int32_t msgLen; // message length including the header iteslf + uint8_t content[0]; // message body starts from here } SRpcHeader; typedef struct { @@ -178,7 +176,9 @@ void (*taosCloseConn[])(void *chandle) = { static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerIpStr, uint16_t peerPort); static void rpcCloseConn(void *thandle); static SRpcConn *rpcSetConnToServer(void *shandle, SRpcIpSet ipSet); -static int rpcGetConnObj(int sid, char *meterId, SRpcInfo *pRpc, SRpcConn **ppConn, char req, char *hashstr); +static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc); +static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, char *meterId, char *hashstr); +static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, char *meterId, char *hashstr); static void rpcSendReqToServer(SRpcConn *pConn, SRpcReqContext *pContext); static void rpcSendQuickRsp(SRpcConn *pConn, char code); @@ -190,6 +190,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHeader *pHeader); static void rpcProcessConnError(void *param, void *id); static void rpcProcessRetryTimer(void *, void *); static void rpcProcessIdleTimer(void *param, void *tmrId); +static void rpcProcessProgressTimer(void *param, void *tmrId); static void rpcFreeMsg(void *msg); static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen); @@ -350,6 +351,12 @@ void rpcSendResponse(void *handle, void *pCont, int contLen) { pthread_mutex_lock(&pRpc->mutex); + if ( pConn->inType == 0 || pConn->meterId[0] == 0 ) { + tTrace("%s pConn:%p, connection is already released, rsp wont be sent", pRpc->label, pConn); + pthread_mutex_lock(&pRpc->mutex); + return; + } + // set msg header pHeader->version = 1; pHeader->msgType = pConn->inType+1; @@ -364,14 +371,16 @@ void rpcSendResponse(void *handle, void *pCont, int contLen) { // set pConn parameters pConn->inType = 0; - rpcFreeMsg(pConn->pRspMsg); + + // response message is released until new response is sent + rpcFreeMsg(pConn->pRspMsg); pConn->pRspMsg = msg; pConn->rspMsgLen = msgLen; - if (pHeader->content[0] == TSDB_CODE_ACTION_IN_PROGRESS) pConn->inTranId--; pthread_mutex_unlock(&pRpc->mutex); + taosTmrStopA(&pConn->pTimer); rpcSendDataToPeer(pConn, msg, msgLen); return; @@ -401,29 +410,26 @@ void rpcSendSimpleRsp(void *thandle, int32_t code) { static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerIpStr, uint16_t peerPort) { SRpcConn *pConn; - if ( (uint8_t)(rpcGetConnObj(0, pRpc->meterId, pRpc, &pConn, 1, NULL)) != 0 ) - return NULL; + pConn = rpcAllocateClientConn(pRpc); - strcpy(pConn->peerIpstr, peerIpStr); - pConn->peerIp = inet_addr(peerIpStr); - pConn->peerPort = peerPort; - pConn->spi = pRpc->spi; - pConn->encrypt = pRpc->encrypt; - if (pConn->spi) memcpy(pConn->secret, pRpc->secret, TSDB_KEY_LEN); - strcpy(pConn->meterId, pRpc->meterId); - - // if it is client, it shall set up connection first - if (taosOpenConn[pRpc->connType]) { - pConn->chandle = (*taosOpenConn[pRpc->connType])(pRpc->shandle, pConn, pConn->peerIpstr, pConn->peerPort); - if (pConn->chandle) { - tTrace("%s pConn:%p, rpc connection is set up, sid:%d id:%s ip:%s:%hu localPort:%d", pRpc->label, - pConn, pConn->sid, pRpc->meterId, pConn->peerIpstr, pConn->peerPort, pConn->localPort); - } else { - tError("%s pConn:%p, failed to set up nw connection to ip:%s:%hu", pRpc->label, pConn, - pConn->sid, pRpc->meterId, pConn->peerIpstr, pConn->peerPort); - terrno = TSDB_CODE_NETWORK_UNAVAIL; - rpcCloseConn(pConn); - pConn = NULL; + if (pConn) { + strcpy(pConn->peerIpstr, peerIpStr); + pConn->peerIp = inet_addr(peerIpStr); + pConn->peerPort = peerPort; + strcpy(pConn->meterId, pRpc->meterId); + + if (taosOpenConn[pRpc->connType]) { + pConn->chandle = (*taosOpenConn[pRpc->connType])(pRpc->shandle, pConn, pConn->peerIpstr, pConn->peerPort); + if (pConn->chandle) { + tTrace("%s pConn:%p, rpc connection is set up, sid:%d id:%s ip:%s:%hu localPort:%d", pRpc->label, + pConn, pConn->sid, pRpc->meterId, pConn->peerIpstr, pConn->peerPort, pConn->localPort); + } else { + tError("%s pConn:%p, failed to set up nw connection to ip:%s:%hu", pRpc->label, pConn, + pConn->sid, pRpc->meterId, pConn->peerIpstr, pConn->peerPort); + terrno = TSDB_CODE_NETWORK_UNAVAIL; + rpcCloseConn(pConn); + pConn = NULL; + } } } @@ -432,10 +438,7 @@ static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerIpStr, uint16_t peerPort) static void rpcCloseConn(void *thandle) { SRpcConn *pConn = (SRpcConn *)thandle; - assert(pConn); - SRpcInfo *pRpc = pConn->pRpc; - assert(pRpc); pthread_mutex_lock(&pRpc->mutex); @@ -443,83 +446,111 @@ static void rpcCloseConn(void *thandle) { taosTmrStopA(&pConn->pTimer); taosTmrStopA(&pConn->pIdleTimer); - rpcFreeMsg(pConn->pRspMsg); - rpcFreeMsg(pConn->pReqMsg); - char hashstr[40] = {0}; - sprintf(hashstr, "%x:%x:%x", pConn->peerIp, pConn->peerUid, pConn->peerId); - taosDeleteStrHash(pRpc->hash, hashstr); + if ( pRpc->connType == TAOS_CONN_UDPS || TAOS_CONN_TCPS) { + char hashstr[40] = {0}; + sprintf(hashstr, "%x:%x:%x", pConn->peerIp, pConn->peerUid, pConn->peerId); + taosDeleteStrHash(pRpc->hash, hashstr); + rpcFreeMsg(pConn->pRspMsg); // it may have a response msg saved, but not request msg + } - if (pRpc->idPool) taosFreeId(pRpc->idPool, pConn->sid); + taosFreeId(pRpc->idPool, pConn->sid); - tTrace("%s pConn:%p, TAOS connection closed", pRpc->label, pConn); - memset(pConn, 0, sizeof(SRpcConn)); + // reset the link parameters + pConn->meterId[0] = 0; + pConn->outType = 0; + pConn->inType = 0; + pConn->inTranId = 0; + pConn->outTranId = 0; + pConn->pReqMsg = NULL; + pConn->reqMsgLen = 0; + pConn->pRspMsg = NULL; + pConn->rspMsgLen = 0; + pConn->pContext = NULL; + + tTrace("%s pConn:%p, rpc connection is closed", pRpc->label, pConn); pthread_mutex_unlock(&pRpc->mutex); } -static int rpcGetConnObj(int sid, char *meterId, SRpcInfo *pRpc, SRpcConn **ppConn, char req, char *hashstr) { - SRpcConn * pConn = NULL; +static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc) { + SRpcConn *pConn = NULL; - if (sid == 0) { - if (req) { - int osid = sid; - SRpcConn **ppConn = (SRpcConn **)taosGetStrHashData(pRpc->hash, hashstr); - if (ppConn) pConn = *ppConn; - if (pConn == NULL) { - sid = taosAllocateId(pRpc->idPool); - if (sid <= 0) { - tError("%s maximum number of sessions:%d is reached", pRpc->label, pRpc->sessions); - return TSDB_CODE_MAX_SESSIONS; - } else { - tTrace("%s sid:%d, ID allocated, used:%d, old id:%d", pRpc->label, sid, - taosIdPoolNumOfUsed(pRpc->idPool), osid); - } - } else { - sid = pConn->sid; - tTrace("%s sid:%d id:%s, session is already there", pRpc->label, pConn->sid, pConn->meterId); - } - } else { - return TSDB_CODE_UNEXPECTED_RESPONSE; - } + int sid = taosAllocateId(pRpc->idPool); + if (sid <= 0) { + tError("%s maximum number of sessions:%d is reached", pRpc->label, pRpc->sessions); + terrno = TSDB_CODE_MAX_SESSIONS; } else { - if (pRpc->connList[sid].meterId[0] == 0) { - tError("%s sid:%d session is already released", pRpc->label, sid); - return TSDB_CODE_INVALID_VALUE; - } - } + tTrace("%s sid:%d, ID allocated, used:%d, old id:%d", pRpc->label, sid, taosIdPoolNumOfUsed(pRpc->idPool)); + + pConn = pRpc->connList + sid; + memset(pConn, 0, sizeof(SRpcConn)); + + pConn->pRpc = pRpc; + pConn->sid = sid; + pConn->tranId = (uint16_t)(rand() & 0xFFFF); + pConn->ownId = htonl(pConn->sid); + pConn->spi = pRpc->spi; + pConn->encrypt = pRpc->encrypt; + if (pConn->spi) memcpy(pConn->secret, pRpc->secret, TSDB_KEY_LEN); + } - pConn = pRpc->connList + sid; + return pConn; +} - if (pConn->meterId[0] == 0) { +static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, char *meterId, char *hashstr) { + SRpcConn *pConn; + + // check if it is already allocated + pConn = *(SRpcConn **)(taosGetStrHashData(pRpc->hash, hashstr)); + if (pConn) return pConn; + + int sid = taosAllocateId(pRpc->idPool); + if (sid <= 0) { + tError("%s maximum number of sessions:%d is reached", pRpc->label, pRpc->sessions); + terrno = TSDB_CODE_MAX_SESSIONS; + } else { + pConn = pRpc->connList + sid; memset(pConn, 0, sizeof(SRpcConn)); memcpy(pConn->meterId, meterId, tListLen(pConn->meterId)); pConn->pRpc = pRpc; pConn->sid = sid; pConn->tranId = (uint16_t)(rand() & 0xFFFF); pConn->ownId = htonl(pConn->sid); - if (pRpc->afp) { - int ret = (*pRpc->afp)(meterId, &pConn->spi, &pConn->encrypt, pConn->secret, pConn->ckey); - if (ret != 0) { - tWarn("%s pConn:%p, meterId not there", pRpc->label, pConn); - taosFreeId(pRpc->idPool, sid); // sid shall be released - memset(pConn, 0, sizeof(SRpcConn)); - return ret; - } + if (pRpc->afp && (*pRpc->afp)(meterId, &pConn->spi, &pConn->encrypt, pConn->secret, pConn->ckey)) { + tWarn("%s pConn:%p, meterId not there", pRpc->label, pConn); + taosFreeId(pRpc->idPool, sid); // sid shall be released + terrno = TSDB_CODE_INVALID_USER; + pConn = NULL; } + } + if (pConn) { taosAddStrHash(pRpc->hash, hashstr, (char *)&pConn); - tTrace("%s pConn:%p, TAOS connection is allocated, sid:%d id:%s", pRpc->label, pConn, sid); + tTrace("%s pConn:%p, rpc connection is allocated, sid:%d id:%s", pRpc->label, pConn, sid); + } + + return pConn; +} + +static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, char *meterId, char *hashstr) { + SRpcConn *pConn = NULL; + + if (sid) { + pConn = pRpc->connList + sid; } else { + pConn = rpcAllocateServerConn(pRpc, meterId, hashstr); + } + + if (pConn) { if (memcmp(pConn->meterId, meterId, tListLen(pConn->meterId)) != 0) { tTrace("%s pConn:%p, meterId:%s is not matched, received:%s", pRpc->label, pConn, pConn->meterId, meterId); - return TSDB_CODE_MISMATCHED_METER_ID; + terrno = TSDB_CODE_MISMATCHED_METER_ID; + pConn = NULL; } } - *ppConn = pConn; - - return TSDB_CODE_SUCCESS; + return pConn; } SRpcConn *rpcSetConnToServer(void *shandle, SRpcIpSet ipSet) { @@ -600,8 +631,8 @@ static int rpcProcessRspHeader(SRpcConn *pConn, SRpcHeader *pHeader) { if (*pHeader->content == TSDB_CODE_ACTION_IN_PROGRESS || pHeader->tcp) { if (pConn->tretry <= tsRpcMaxRetry) { - tTrace("%s pConn:%p, peer is still processing the transaction", pRpc->label, pConn); pConn->tretry++; + tTrace("%s pConn:%p, peer is still processing the transaction", pRpc->label, pConn); taosTmrReset(rpcProcessRetryTimer, tsRpcProgressTime, pConn, pRpc->tmrCtrl, &pConn->pTimer); return TSDB_CODE_ALREADY_PROCESSED; } else { @@ -647,9 +678,8 @@ static int rpcProcessHeader(SRpcInfo *pRpc, SRpcConn **ppConn, void *data, int d } if (sid == 0) sprintf(hashstr, "%x:%x:%x", ip, pHeader->uid, pHeader->sourceId); - - code = rpcGetConnObj(sid, pHeader->meterId, pRpc, &pConn, rpcIsReq(pHeader->msgType), hashstr); - if (code != TSDB_CODE_SUCCESS) return code; + pConn = rpcGetConnObj(pRpc, sid, pHeader->meterId, hashstr); + if (pConn == NULL ) return terrno; *ppConn = pConn; sid = pConn->sid; @@ -739,6 +769,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHeader *pHeader) { pHeader = rpcDecompressRpcMsg(pHeader); if ( rpcIsReq(pHeader->msgType) ) { + taosTmrReset(rpcProcessProgressTimer, tsRpcTimer/2, pConn, pRpc->tmrCtrl, &pConn->pTimer); (*(pRpc->fp))(pHeader->msgType, pHeader->content, msgLen, pConn, 0); } else { // it's a response @@ -833,6 +864,8 @@ static void rpcSendReqToServer(SRpcConn *pConn, SRpcReqContext *pContext) { int msgLen = rpcMsgLenFromCont(pContext->contLen); char msgType = pContext->msgType; + pthread_mutex_lock(&pRpc->mutex); + // set the message header pHeader->version = 1; pHeader->msgType = msgType; @@ -854,6 +887,8 @@ static void rpcSendReqToServer(SRpcConn *pConn, SRpcReqContext *pContext) { pConn->reqMsgLen = msgLen; pConn->pContext = pContext; + pthread_mutex_unlock(&pRpc->mutex); + rpcSendDataToPeer(pConn, msg, msgLen); taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer); } @@ -863,10 +898,6 @@ static void rpcSendDataToPeer(SRpcConn *pConn, void *data, int dataLen) { SRpcInfo *pRpc = pConn->pRpc; SRpcHeader *pHeader = (SRpcHeader *)data; - assert(data); - assert(dataLen>0); - assert(pHeader->msgType > 0); - dataLen = rpcAddAuthPart(pConn, data, dataLen); if ( rpcIsReq(pHeader->msgType)) { @@ -896,6 +927,7 @@ static void rpcProcessConnError(void *param, void *id) { SRpcInfo *pRpc = pContext->pRpc; if ( pContext->numOfRetry >= pContext->ipSet.numOfIps ) { + rpcFreeMsg(rpcHeaderFromCont(pContext->pCont)); // free the request msg char *rsp = calloc(1, RPC_MSG_OVERHEAD + sizeof(STaosRsp)); if ( rsp ) { STaosRsp *pRsp = (STaosRsp *)(rsp+sizeof(SRpcHeader)); @@ -922,13 +954,11 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) { int reportDisc = 0; SRpcInfo *pRpc = pConn->pRpc; - assert(pRpc); + if (pRpc == NULL) return; // it means it is already released pthread_mutex_lock(&pRpc->mutex); - if (pConn->outType == 0) { - tTrace("%s pConn:%p, outtype is zero, it is already processed", pRpc->label, pConn); - } else { + if (pConn->outType && pConn->meterId[0]) { tTrace("%s pConn:%p, expected %s is not received", pRpc->label, pConn, taosMsg[(int)pConn->outType + 1]); pConn->pTimer = NULL; pConn->retry++; @@ -937,36 +967,62 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) { tTrace("%s pConn:%p, re-send msg:%s to %s:%hu", pRpc->label, taosMsg[pConn->outType], pConn->peerIpstr, pConn->peerPort); rpcSendDataToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen); + taosTmrReset(rpcProcessRetryTimer, tsRpcTimer<retry, pConn, pRpc->tmrCtrl, &pConn->pTimer); } else { // close the connection tTrace("%s pConn:%p, failed to send msg:%s to %s:%hu", pRpc->label, pConn, taosMsg[pConn->outType], pConn->peerIpstr, pConn->peerPort); reportDisc = 1; } + } else { + tTrace("%s pConn:%p, retry timer not processed", pRpc->label, pConn); } pthread_mutex_unlock(&pRpc->mutex); pConn->pContext->code = TSDB_CODE_NETWORK_UNAVAIL; - if (reportDisc) rpcProcessConnError(pConn->pContext, NULL); + if (reportDisc) { + rpcProcessConnError(pConn->pContext, NULL); + rpcCloseConn(pConn); + } } static void rpcProcessIdleTimer(void *param, void *tmrId) { SRpcConn *pConn = (SRpcConn *)param; SRpcInfo *pRpc = pConn->pRpc; - assert(pRpc); - if (pConn->pIdleTimer != tmrId) { - tTrace("%s pConn:%p, idle timer:%p already processed", pRpc->label, pConn, tmrId); - return; + pthread_mutex_lock(&pRpc->mutex); + + if (pConn->inType == 0 && pConn->meterId[0]) { + tTrace("%s pConn:%p, close the connection since no activity", pRpc->label, pConn); + rpcCloseConn(pConn); + } else { + tTrace("%s pConn:%p, idle timer:%p not processed", pRpc->label, pConn, tmrId); + } + + pthread_mutex_unlock(&pRpc->mutex); +} + +static void rpcProcessProgressTimer(void *param, void *tmrId) { + SRpcConn *pConn = (SRpcConn *)param; + SRpcInfo *pRpc = pConn->pRpc; + + pthread_mutex_lock(&pRpc->mutex); + + if (pConn->inType && pConn->meterId[0]) { + tTrace("%s pConn:%p, progress timer expired, send progress", pRpc->label, pConn); + rpcSendQuickRsp(pConn, TSDB_CODE_ACTION_IN_PROGRESS); + taosTmrReset(rpcProcessProgressTimer, tsRpcTimer<retry, pConn, pRpc->tmrCtrl, &pConn->pTimer); + } else { + tTrace("%s pConn:%p, progress timer:%p not processed", pRpc->label, pConn, tmrId); } - tTrace("%s pConn:%p, close the connection since no activity", pRpc->label, pConn); - rpcCloseConn(pConn); + pthread_mutex_unlock(&pRpc->mutex); } static void rpcFreeMsg(void *msg) { + if ( msg == NULL ) return; char *req = ((char *)msg) - sizeof(SRpcReqContext); free(req); } -- GitLab