diff --git a/src/rpc/src/trpc.c b/src/rpc/src/trpc.c index 32e9a42a38b8be4663dd19c84e140d762884d9a7..bc27e7290abd2da1f166df7873f01759760b75cc 100644 --- a/src/rpc/src/trpc.c +++ b/src/rpc/src/trpc.c @@ -59,7 +59,7 @@ typedef struct { uint8_t secret[TSDB_KEY_LEN]; uint8_t ckey[TSDB_KEY_LEN]; - short localPort; // for UDP only + short localPort; // for UDP only uint32_t peerUid; uint32_t peerIp; // peer IP short peerPort; // peer port @@ -164,8 +164,8 @@ char *taosBuildReqHeader(void *param, char type, char *msg) { pHeader->spi = 0; pHeader->tcp = 0; pHeader->encrypt = 0; - if (pConn->tranId == 0) __sync_add_and_fetch_32(&pConn->tranId, 1); pHeader->tranId = __sync_add_and_fetch_32(&pConn->tranId, 1); + if (pHeader->tranId == 0) pHeader->tranId = __sync_add_and_fetch_32(&pConn->tranId, 1); pHeader->sourceId = pConn->ownId; pHeader->destId = pConn->peerId; @@ -196,8 +196,8 @@ char *taosBuildReqMsgWithSize(void *param, char type, int size) { pHeader->spi = 0; pHeader->tcp = 0; pHeader->encrypt = 0; - if (pConn->tranId == 0) __sync_add_and_fetch_32(&pConn->tranId, 1); pHeader->tranId = __sync_add_and_fetch_32(&pConn->tranId, 1); + if (pHeader->tranId == 0) pHeader->tranId = __sync_add_and_fetch_32(&pConn->tranId, 1); pHeader->sourceId = pConn->ownId; pHeader->destId = pConn->peerId; @@ -219,6 +219,11 @@ char *taosBuildRspMsgWithSize(void *param, char type, int size) { size += sizeof(SMsgNode) + sizeof(STaosHeader) + sizeof(STaosDigest); pMsg = (char *)malloc((size_t)size); + if (pMsg == NULL) { + tError("pConn:%p, malloc(%d) failed when building a type:%d message", pConn, size, type); + return NULL; + } + memset(pMsg, 0, (size_t)size); pHeader = (STaosHeader *)pMsg; pHeader->version = 1; @@ -324,6 +329,11 @@ void *taosOpenRpc(SRpcInit *pRpc) { int size = (int)sizeof(SRpcChann) * pRpc->numOfChanns; pServer->channList = (SRpcChann *)malloc((size_t)size); + if (pServer->channList == NULL) { + tError("%s, failed to malloc channList", pRpc->label); + tfree(pServer); + return NULL; + } memset(pServer->channList, 0, (size_t)size); pServer->shandle = (*taosInitConn[pRpc->connType])(pRpc->localIp, pRpc->localPort, pRpc->label, pRpc->numOfThreads, @@ -334,7 +344,14 @@ void *taosOpenRpc(SRpcInit *pRpc) { return NULL; } - if (pServer->numOfChanns == 1) taosOpenRpcChann(pServer, 0, pRpc->sessionsPerChann); + if (pServer->numOfChanns == 1) { + int retVal = taosOpenRpcChann(pServer, 0, pRpc->sessionsPerChann); + if (0 != retVal) { + tError("%s, failed to open rpc chann", pRpc->label); + taosCloseRpc(pServer); + return NULL; + } + } tTrace("%s RPC is openned, numOfThreads:%d", pRpc->label, pRpc->numOfThreads); @@ -354,13 +371,12 @@ int taosOpenRpcChannWithQ(void *handle, int cid, int sessions, void *qhandle) { pChann = pServer->channList + cid; memset(pChann, 0, sizeof(SRpcChann)); - int size = (int)sizeof(SRpcConn) * sessions; - pChann->connList = (SRpcConn *)malloc((size_t)size); + size_t size = sizeof(SRpcConn) * sessions; + pChann->connList = (SRpcConn *)calloc(1, size); if (pChann->connList == NULL) { - tError("%s cid:%d, failed to allocate memory for taos connections", pServer->label, cid); + tError("%s cid:%d, failed to allocate memory for taos connections, size:%d", pServer->label, cid, size); return -1; } - memset(pChann->connList, 0, (size_t)size); if (pServer->idMgmt == TAOS_ID_FREE) { pChann->idPool = taosInitIdPool(sessions); @@ -387,7 +403,7 @@ int taosOpenRpcChannWithQ(void *handle, int cid, int sessions, void *qhandle) { pChann->qhandle = qhandle ? qhandle : pServer->qhandle; - return 0; + return TSDB_CODE_SUCCESS; } void taosCloseRpcChann(void *handle, int cid) { @@ -472,7 +488,7 @@ int taosGetRpcConn(int chann, int sid, char *meterId, STaosRpc *pServer, SRpcCon pChann = pServer->channList + chann; if (pServer->idMgmt == TAOS_ID_FREE) { - if ((sid == 0) || (pChann->connList[sid].signature == NULL)) { + if (sid == 0) { if (req) { int osid = sid; SRpcConn **ppConn = (SRpcConn **)taosGetStrHashData(pChann->hash, hashstr); @@ -494,7 +510,12 @@ int taosGetRpcConn(int chann, int sid, char *meterId, STaosRpc *pServer, SRpcCon } else { return TSDB_CODE_UNEXPECTED_RESPONSE; } - } + } else { + if (pChann->connList[sid].signature == NULL) { + tError("%s cid:%d, sid:%d session is already released", pServer->label, chann, sid); + return TSDB_CODE_INVALID_VALUE; + } + } } pConn = pChann->connList + sid; @@ -515,8 +536,7 @@ int taosGetRpcConn(int chann, int sid, char *meterId, STaosRpc *pServer, SRpcCon if (pServer->afp) { int ret = (*pServer->afp)(meterId, &pConn->spi, &pConn->encrypt, pConn->secret, pConn->ckey); if (ret != 0) { - tTrace("%s cid:%d sid:%d id:%s, meterId not there, localPort:%d pConn:%p", pServer->label, chann, sid, pConn->meterId, - pConn->localPort, pConn); + tTrace("%s cid:%d sid:%d id:%s, meterId not there pConn:%p", pServer->label, chann, sid, pConn->meterId, pConn); return ret; } } @@ -529,8 +549,8 @@ int taosGetRpcConn(int chann, int sid, char *meterId, STaosRpc *pServer, SRpcCon } taosAddStrHash(pChann->hash, hashstr, (char *)&pConn); - tTrace("%s cid:%d sid:%d id:%s, TAOS connection is allocated, localPort:%d pConn:%p", - pServer->label, chann, sid, pConn->meterId, pConn->localPort, pConn); + tTrace("%s cid:%d sid:%d id:%s, TAOS connection is allocated, localPort:%d pConn:%p", pServer->label, chann, sid, + pConn->meterId, pConn->localPort, pConn); } else { if (memcmp(pConn->meterId, meterId, tListLen(pConn->meterId)) != 0) { tTrace("%s cid:%d sid:%d id:%s, meterId is not matched, received:%s", pServer->label, chann, sid, pConn->meterId, @@ -566,9 +586,8 @@ void *taosOpenRpcConn(SRpcConnInit *pInit, uint8_t *code) { if (taosOpenConn[pServer->type]) { pConn->chandle = (*taosOpenConn[pServer->type])(pServer->shandle, pConn, pConn->peerIpstr, pConn->peerPort); if (pConn->chandle) { - tTrace("%s cid:%d sid:%d id:%s, nw connection is set up, ip:%s:%hu localPort:%d pConn:%p", - pServer->label, pConn->chann, pConn->sid, pInit->meterId, pConn->peerIpstr, pConn->peerPort, - pConn->localPort, pConn); + tTrace("%s cid:%d sid:%d id:%s, nw connection is set up, ip:%s:%hu localPort:%d pConn:%p", pServer->label, + pConn->chann, pConn->sid, pInit->meterId, pConn->peerIpstr, pConn->peerPort, pConn->localPort, pConn); } else { tError("%s cid:%d sid:%d id:%s, failed to set up nw connection to ip:%s:%hu", pServer->label, pConn->chann, pConn->sid, pInit->meterId, pConn->peerIpstr, pConn->peerPort); @@ -629,18 +648,16 @@ int taosSendDataToPeer(SRpcConn *pConn, char *data, int dataLen) { if (pHeader->msgType & 1) { if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) - tTrace( - "%s cid:%d sid:%d id:%s, %s is sent to %s:%hu, len:%d tranId:%d " - "pConn:%p", - pServer->label, pConn->chann, pConn->sid, pConn->meterId, taosMsg[pHeader->msgType], pConn->peerIpstr, - pConn->peerPort, dataLen, pHeader->tranId, pConn); + tTrace("%s cid:%d sid:%d id:%s, %s is sent to %s:%hu, len:%d source:0x%08x dest:0x%08x tranId:%d pConn:%p", + pServer->label, pConn->chann, pConn->sid, pConn->meterId, taosMsg[pHeader->msgType], pConn->peerIpstr, + pConn->peerPort, dataLen, pHeader->sourceId, pHeader->destId, pHeader->tranId, pConn); } else { if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) tTrace( - "%s cid:%d sid:%d id:%s, %s is sent to %s:%hu, code:%u len:%d " - "tranId:%d pConn:%p", + "%s cid:%d sid:%d id:%s, %s is sent to %s:%hu, code:%u len:%d source:0x%08x dest:0x%08x tranId:%d pConn:%p", pServer->label, pConn->chann, pConn->sid, pConn->meterId, taosMsg[pHeader->msgType], pConn->peerIpstr, - pConn->peerPort, (uint8_t)pHeader->content[0], dataLen, pHeader->tranId, pConn); + pConn->peerPort, (uint8_t)pHeader->content[0], dataLen, pHeader->sourceId, pHeader->destId, pHeader->tranId, + pConn); } writtenLen = (*taosSendData[pServer->type])(pConn->peerIp, pConn->peerPort, (char *)pHeader, dataLen, pConn->chandle); @@ -650,8 +667,6 @@ int taosSendDataToPeer(SRpcConn *pConn, char *data, int dataLen) { pConn->sid, pConn->meterId, dataLen, writtenLen, strerror(errno)); // assert ( writtenLen == dataLen ); tDump(data, dataLen); - tTrace("%s msg sent, len:%d source:0x%08x dest:0x%08x tranId:%d pConn:%p", pServer->label, dataLen, pHeader->sourceId, - pHeader->destId, pHeader->tranId, pConn); return 0; } @@ -697,12 +712,13 @@ void taosProcessResponse(SRpcConn *pConn) { if (pConn->pHead == NULL) pConn->pTail = NULL; } - pthread_mutex_unlock(&pChann->mutex); - if (msg) { taosSendDataToPeer(pConn, msg, msgLen); taosTmrReset(taosProcessTaosTimer, tsRpcTimer, pConn, pChann->tmrCtrl, &pConn->pTimer); } + + pthread_mutex_unlock(&pChann->mutex); + } int taosProcessMsgHeader(STaosHeader *pHeader, SRpcConn **ppConn, STaosRpc *pServer, int dataLen, uint32_t ip, @@ -712,7 +728,7 @@ int taosProcessMsgHeader(STaosHeader *pHeader, SRpcConn **ppConn, STaosRpc *pSer SRpcChann *pChann; int msgLen; char hashstr[40] = {0}; - int reSend = 0; + // int reSend = 0; *ppConn = NULL; uint32_t destId = htonl(pHeader->destId); @@ -794,24 +810,24 @@ int taosProcessMsgHeader(STaosHeader *pHeader, SRpcConn **ppConn, STaosRpc *pSer delta = (int32_t)htonl(pDigest->timeStamp); delta -= (int32_t)taosGetTimestampSec(); if (abs(delta) > 900) { - tWarn("%s cid:%d sid:%d id:%s, time diff:%d is too big, msg discarded pConn:%p, timestamp:%d", - pServer->label, chann, sid, pConn->meterId, delta, pConn, htonl(pDigest->timeStamp)); + tWarn("%s cid:%d sid:%d id:%s, time diff:%d is too big, msg discarded pConn:%p, timestamp:%d", pServer->label, + chann, sid, pConn->meterId, delta, pConn, htonl(pDigest->timeStamp)); // the requirement of goldwind, should not return error in this case code = TSDB_CODE_INVALID_TIME_STAMP; goto _exit; } if (taosAuthenticateMsg((uint8_t *)pHeader, dataLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret) < 0) { - tTrace("%s cid:%d sid:%d id:%s, authentication failed, msg discarded pConn:%p", - pServer->label, chann, sid, pConn->meterId, pConn); + tTrace("%s cid:%d sid:%d id:%s, authentication failed, msg discarded pConn:%p", pServer->label, chann, sid, + pConn->meterId, pConn); code = TSDB_CODE_AUTH_FAILURE; goto _exit; } } else { // if it is request or response with code 0, msg shall be discarded if ((pHeader->msgType & 1) || (pHeader->content[0] == 0)) { - tTrace("%s cid:%d sid:%d id:%s, auth spi not matched, msg discarded pConn:%p", - pServer->label, chann, sid, pConn->meterId, pConn); + tTrace("%s cid:%d sid:%d id:%s, auth spi not matched, msg discarded pConn:%p", pServer->label, chann, sid, + pConn->meterId, pConn); code = TSDB_CODE_AUTH_FAILURE; goto _exit; } @@ -829,9 +845,9 @@ int taosProcessMsgHeader(STaosHeader *pHeader, SRpcConn **ppConn, STaosRpc *pSer pConn->peerId = pHeader->sourceId; } else { if (pConn->peerId != pHeader->sourceId) { - tTrace("%s cid:%d sid:%d id:%s, source Id is changed, old:0x%08x new:0x%08x pConn:%p", - pServer->label, chann, sid, pConn->meterId, pConn->peerId, pHeader->sourceId, pConn); - code = TSDB_CODE_SESSION_ALREADY_EXIST; + tTrace("%s cid:%d sid:%d id:%s, source Id is changed, old:0x%08x new:0x%08x pConn:%p", pServer->label, chann, + sid, pConn->meterId, pConn->peerId, pHeader->sourceId, pConn); + code = TSDB_CODE_INVALID_VALUE; goto _exit; } } @@ -842,9 +858,9 @@ int taosProcessMsgHeader(STaosHeader *pHeader, SRpcConn **ppConn, STaosRpc *pSer taosMsg[pHeader->msgType], pConn); taosSendQuickRsp(pConn, (char)(pHeader->msgType + 1), TSDB_CODE_ACTION_IN_PROGRESS); } else if (pConn->inType == 0) { - tTrace("%s cid:%d sid:%d id:%s, %s is already processed, tranId:%d pConn:%p", - pServer->label, chann, sid, pConn->meterId, taosMsg[pHeader->msgType], pConn->inTranId, pConn); - reSend = 1; + tTrace("%s cid:%d sid:%d id:%s, %s is already processed, tranId:%d pConn:%p", pServer->label, chann, sid, + pConn->meterId, taosMsg[pHeader->msgType], pConn->inTranId, pConn); + taosReSendRspToPeer(pConn); } else { tTrace("%s cid:%d sid:%d id:%s, mismatched message %s and tranId pConn:%p", pServer->label, chann, sid, pConn->meterId, taosMsg[pHeader->msgType], pConn); @@ -856,8 +872,8 @@ int taosProcessMsgHeader(STaosHeader *pHeader, SRpcConn **ppConn, STaosRpc *pSer } if (pConn->inType != 0) { - tTrace("%s cid:%d sid:%d id:%s, last session is not finished, inTranId:%d tranId:%d pConn:%p", - pServer->label, chann, sid, pConn->meterId, pConn->inTranId, pHeader->tranId, pConn); + tTrace("%s cid:%d sid:%d id:%s, last session is not finished, inTranId:%d tranId:%d pConn:%p", pServer->label, + chann, sid, pConn->meterId, pConn->inTranId, pHeader->tranId, pConn); code = TSDB_CODE_LAST_SESSION_NOT_FINISHED; goto _exit; } @@ -897,8 +913,8 @@ int taosProcessMsgHeader(STaosHeader *pHeader, SRpcConn **ppConn, STaosRpc *pSer if (*pHeader->content == TSDB_CODE_ACTION_IN_PROGRESS || pHeader->tcp) { if (pConn->tretry <= tsRpcMaxRetry) { - tTrace("%s cid:%d sid:%d id:%s, peer is still processing the transaction, pConn:%p", - pServer->label, chann, sid, pHeader->meterId, pConn); + tTrace("%s cid:%d sid:%d id:%s, peer is still processing the transaction, pConn:%p", pServer->label, chann, sid, + pHeader->meterId, pConn); pConn->tretry++; taosTmrReset(taosProcessTaosTimer, tsRpcProgressTime, pConn, pChann->tmrCtrl, &pConn->pTimer); code = TSDB_CODE_ALREADY_PROCESSED; @@ -921,7 +937,7 @@ int taosProcessMsgHeader(STaosHeader *pHeader, SRpcConn **ppConn, STaosRpc *pSer _exit: pthread_mutex_unlock(&pChann->mutex); - if (reSend) taosReSendRspToPeer(pConn); + // if (reSend) taosReSendRspToPeer(pConn); return code; } @@ -1028,8 +1044,6 @@ void *taosProcessDataFromPeer(char *data, int dataLen, uint32_t ip, short port, } pHeader = (STaosHeader *)data; - tTrace("%s msg received, len:%d source:0x%08x dest:0x%08x tranId:%d", pServer->label, dataLen, pHeader->sourceId, - pHeader->destId, pHeader->tranId); msgLen = (int32_t)htonl((uint32_t)pHeader->msgLen); code = (uint8_t)taosProcessMsgHeader(pHeader, &pConn, pServer, dataLen, ip, port, chandle); @@ -1044,16 +1058,19 @@ void *taosProcessDataFromPeer(char *data, int dataLen, uint32_t ip, short port, } if (code == TSDB_CODE_ALREADY_PROCESSED) { - tTrace("%s cid:%d sid:%d id:%s, %s wont be processed tranId:%d pConn:%p", pServer->label, chann, sid, - pHeader->meterId, taosMsg[pHeader->msgType], pHeader->tranId, pConn); + tTrace("%s cid:%d sid:%d id:%s, %s wont be processed, source:0x%08x dest:0x%08x tranId:%d pConn:%p", pServer->label, + chann, sid, pHeader->meterId, taosMsg[pHeader->msgType], pHeader->sourceId, htonl(pHeader->destId), + pHeader->tranId, pConn); free(data); return pConn; } if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) { - tTrace("%s cid:%d sid:%d id:%s, %s received from 0x%x:%hu, parse code:%u, first:%u len:%d tranId:%d pConn:%p", - pServer->label, chann, sid, pHeader->meterId, taosMsg[pHeader->msgType], ip, port, code, pHeader->content[0], - dataLen, pHeader->tranId, pConn); + tTrace( + "%s cid:%d sid:%d id:%s, %s received from 0x%x:%hu, parse code:%u, first:%u len:%d source:0x%08x dest:0x%08x " + "tranId:%d pConn:%p", + pServer->label, chann, sid, pHeader->meterId, taosMsg[pHeader->msgType], ip, port, code, pHeader->content[0], + dataLen, pHeader->sourceId, htonl(pHeader->destId), pHeader->tranId, pConn); } if (code != 0) { @@ -1085,9 +1102,8 @@ void *taosProcessDataFromPeer(char *data, int dataLen, uint32_t ip, short port, } if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) { - tTrace("%s cid:%d sid:%d id:%s, %s is put into queue, msgLen:%d pConn:%p pTimer:%p", - pServer->label, chann, sid, pHeader->meterId, taosMsg[pHeader->msgType], pHeader->msgLen, pConn, - pConn->pTimer); + tTrace("%s cid:%d sid:%d id:%s, %s is put into queue, msgLen:%d pConn:%p pTimer:%p", pServer->label, chann, sid, + pHeader->meterId, taosMsg[pHeader->msgType], pHeader->msgLen, pConn, pConn->pTimer); } pChann = pServer->channList + pConn->chann; @@ -1168,8 +1184,8 @@ int taosSendMsgToPeerH(void *thandle, char *pCont, int contLen, void *ahandle) { } else { assert(pConn->pMsgNode == NULL); if (pConn->pMsgNode) { - tError("%s cid:%d sid:%d id:%s, bug, there shall be no pengding req pConn:%p", - pServer->label, pConn->chann, pConn->sid, pConn->meterId, pConn); + tError("%s cid:%d sid:%d id:%s, bug, there shall be no pengding req pConn:%p", pServer->label, pConn->chann, + pConn->sid, pConn->meterId, pConn); } pConn->outType = msgType; @@ -1180,8 +1196,6 @@ int taosSendMsgToPeerH(void *thandle, char *pCont, int contLen, void *ahandle) { } } - pthread_mutex_unlock(&pChann->mutex); - if (msgLen) { taosSendDataToPeer(pConn, (char *)pHeader, msgLen); if (msgType & 1U) { @@ -1189,6 +1203,8 @@ int taosSendMsgToPeerH(void *thandle, char *pCont, int contLen, void *ahandle) { } } + pthread_mutex_unlock(&pChann->mutex); + return contLen; } @@ -1288,12 +1304,13 @@ void taosProcessTaosTimer(void *param, void *tmrId) { } } - pthread_mutex_unlock(&pChann->mutex); - if (pHeader) { (*taosSendData[pServer->type])(pConn->peerIp, pConn->peerPort, (char *)pHeader, msgLen, pConn->chandle); taosTmrReset(taosProcessTaosTimer, tsRpcTimer, pConn, pChann->tmrCtrl, &pConn->pTimer); } + + pthread_mutex_unlock(&pChann->mutex); + } void taosGetRpcConnInfo(void *thandle, uint32_t *peerId, uint32_t *peerIp, short *peerPort, int *cid, int *sid) {