diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index bc717ed88cda2a06ce8ce055e8aeb59b5b335f62..110377d1123ca8eaea21a6bb71b28d367802c846 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -235,7 +235,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) { SSqlRes *pRes = &pSql->res; SSqlCmd *pCmd = &pSql->cmd; STscObj *pObj = pSql->pTscObj; - tscTrace("%p msg:%p is received from server", pSql, rpcMsg->pCont); + tscTrace("%p msg:%s is received from server", pSql, taosMsg[rpcMsg->msgType]); if (pSql->freed || pObj->signature != pObj) { tscTrace("%p sql is already released or DB connection is closed, freed:%d pObj:%p signature:%p", pSql, pSql->freed, diff --git a/src/dnode/src/dnodeVRead.c b/src/dnode/src/dnodeVRead.c index aaf71838bfe2cc3916f10c579bd89e34acb89553..aa8cd997858c7ad27a02d751b00e765fc176d327 100644 --- a/src/dnode/src/dnodeVRead.c +++ b/src/dnode/src/dnodeVRead.c @@ -92,8 +92,6 @@ void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg) { char *pCont = (char *) pMsg->pCont; void *pVnode; - dTrace("dnode %s msg incoming, thandle:%p", taosMsg[pMsg->msgType], pMsg->handle); - while (leftLen > 0) { SMsgHead *pHead = (SMsgHead *) pCont; pHead->vgId = htonl(pHead->vgId); @@ -214,6 +212,7 @@ static void *dnodeProcessReadQueue(void *param) { continue; } + dTrace("%p, msg:%s will be processed", pReadMsg->rpcMsg.ahandle, taosMsg[pReadMsg->rpcMsg.msgType]); int32_t code = vnodeProcessRead(pVnode, pReadMsg->rpcMsg.msgType, pReadMsg->pCont, pReadMsg->contLen, &pReadMsg->rspRet); dnodeSendRpcReadRsp(pVnode, pReadMsg, code); taosFreeQitem(pReadMsg); diff --git a/src/dnode/src/dnodeVWrite.c b/src/dnode/src/dnodeVWrite.c index 5de4c16c500258fe5f3d5d7588bc8782530d8791..879082f223e27537f8f0dbd8a4e6cc53fce5117b 100644 --- a/src/dnode/src/dnodeVWrite.c +++ b/src/dnode/src/dnodeVWrite.c @@ -200,6 +200,7 @@ static void *dnodeProcessWriteQueue(void *param) { pHead->msgType = pWrite->rpcMsg.msgType; pHead->version = 0; pHead->len = pWrite->contLen; + dTrace("%p, msg:%s will be processed", pWrite->rpcMsg.ahandle, taosMsg[pWrite->rpcMsg.msgType]); } else { pHead = (SWalHead *)item; } diff --git a/src/inc/trpc.h b/src/inc/trpc.h index eff210433f7d7bcc2f4a5ad1d12bd88ed59581be..16223b813a1a738f6b0213e8d6c4f62cd0c2290c 100644 --- a/src/inc/trpc.h +++ b/src/inc/trpc.h @@ -48,6 +48,7 @@ typedef struct { int contLen; int32_t code; void *handle; + void *ahandle; //app handle set by client, for debug purpose } SRpcMsg; typedef struct { diff --git a/src/mnode/src/mgmtShell.c b/src/mnode/src/mgmtShell.c index 804572b9ffbf52259a333c0e45bf44a5a1c0f160..d8bcf6724236d6e7bfa1e4372797a78ab95aa1e3 100644 --- a/src/mnode/src/mgmtShell.c +++ b/src/mnode/src/mgmtShell.c @@ -124,6 +124,8 @@ void mgmtDealyedAddToShellQueue(SQueuedMsg *queuedMsg) { void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) { + mTrace("%p, msg:%s will be processed", rpcMsg->ahandle, taosMsg[rpcMsg->msgType]); + if (rpcMsg->pCont == NULL) { mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_INVALID_MSG_LEN); return; diff --git a/src/rpc/inc/rpcHead.h b/src/rpc/inc/rpcHead.h index 8b5410a596ae64a27ebecc9e3591fa90de044323..520edadc7dd072849720cad53c7f6f4ba605a06c 100644 --- a/src/rpc/inc/rpcHead.h +++ b/src/rpc/inc/rpcHead.h @@ -49,6 +49,7 @@ typedef struct { char encrypt:3; // encrypt algorithm, 0: no encryption uint16_t tranId; // transcation ID uint32_t linkUid; // for unique connection ID assigned by client + uint64_t ahandle; // ahandle assigned by client uint32_t sourceId; // source ID, an index for connection list uint32_t destId; // destination ID, an index for connection list uint32_t destIp; // destination IP address, for NAT scenario diff --git a/src/rpc/src/rpcCache.c b/src/rpc/src/rpcCache.c index edbb9b3e12be6dc24acfbb60fff1f91fe8e1c898..7a96571ab9d1a220e98d6d48a817fcc251097374 100644 --- a/src/rpc/src/rpcCache.c +++ b/src/rpc/src/rpcCache.c @@ -146,7 +146,7 @@ void rpcAddConnIntoCache(void *handle, void *data, char *fqdn, uint16_t port, in rpcUnlockCache(pCache->lockedBy+hash); pCache->total++; - tTrace("%p %s:%hu:%d:%d:%p added into cache, connections:%d", data, fqdn, port, connType, hash, pNode, pCache->count[hash]); + // tTrace("%p %s:%hu:%d:%d:%p added into cache, connections:%d", data, fqdn, port, connType, hash, pNode, pCache->count[hash]); return; } @@ -200,9 +200,9 @@ void *rpcGetConnFromCache(void *handle, char *fqdn, uint16_t port, int8_t connTy rpcUnlockCache(pCache->lockedBy+hash); if (pData) { - tTrace("%p %s:%hu:%d:%d:%p retrieved from cache, connections:%d", pData, fqdn, port, connType, hash, pNode, pCache->count[hash]); + //tTrace("%p %s:%hu:%d:%d:%p retrieved from cache, connections:%d", pData, fqdn, port, connType, hash, pNode, pCache->count[hash]); } else { - tTrace("%s:%hu:%d:%d failed to retrieve conn from cache, connections:%d", fqdn, port, connType, hash, pCache->count[hash]); + //tTrace("%s:%hu:%d:%d failed to retrieve conn from cache, connections:%d", fqdn, port, connType, hash, pCache->count[hash]); } return pData; @@ -240,8 +240,8 @@ static void rpcRemoveExpiredNodes(SConnCache *pCache, SConnHash *pNode, int hash pNext = pNode->next; pCache->total--; pCache->count[hash]--; - tTrace("%p %s:%hu:%d:%d:%p removed from cache, connections:%d", pNode->data, pNode->fqdn, pNode->port, pNode->connType, hash, pNode, - pCache->count[hash]); + //tTrace("%p %s:%hu:%d:%d:%p removed from cache, connections:%d", pNode->data, pNode->fqdn, pNode->port, pNode->connType, hash, pNode, + // pCache->count[hash]); taosMemPoolFree(pCache->connHashMemPool, (char *)pNode); pNode = pNext; } diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 7e80f0d282f0df08f0996d72ba2f4c3dbf1e3e2a..60fafb44bd71e32f10a2fa919a4592c2462ad15b 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -87,6 +87,7 @@ typedef struct { } SRpcReqContext; typedef struct SRpcConn { + char info[50];// debug info: label + pConn + ahandle int sid; // session ID uint32_t ownId; // own link ID uint32_t peerId; // peer link ID @@ -275,7 +276,7 @@ void *rpcOpen(const SRpcInit *pInit) { return NULL; } - tTrace("%s RPC is openned, numOfThreads:%d", pRpc->label, pRpc->numOfThreads); + tTrace("%s rpc is openned, threads:%d sessions:%d", pRpc->label, pRpc->numOfThreads, pInit->sessions); return pRpc; } @@ -299,7 +300,7 @@ void rpcClose(void *param) { tfree(pRpc->connList); pthread_mutex_destroy(&pRpc->mutex); - tTrace("%s RPC is closed", pRpc->label); + tTrace("%s rpc is closed", pRpc->label); tfree(pRpc); } @@ -374,8 +375,6 @@ void rpcSendRequest(void *shandle, const SRpcIpSet *pIpSet, const SRpcMsg *pMsg) void rpcSendResponse(const SRpcMsg *pRsp) { int msgLen = 0; SRpcConn *pConn = (SRpcConn *)pRsp->handle; - SRpcInfo *pRpc = pConn->pRpc; - SRpcMsg rpcMsg = *pRsp; SRpcMsg *pMsg = &rpcMsg; @@ -393,7 +392,7 @@ void rpcSendResponse(const SRpcMsg *pRsp) { rpcLockConn(pConn); if ( pConn->inType == 0 || pConn->user[0] == 0 ) { - tTrace("%s %p, connection is already released, rsp wont be sent", pRpc->label, pConn); + tTrace("%s, connection is already released, rsp wont be sent", pConn->info); rpcUnlockConn(pConn); return; } @@ -409,7 +408,8 @@ void rpcSendResponse(const SRpcMsg *pRsp) { pHead->linkUid = pConn->linkUid; pHead->port = htons(pConn->localPort); pHead->code = htonl(pMsg->code); - + pHead->ahandle = (uint64_t) pConn->ahandle; + // set pConn parameters pConn->inType = 0; @@ -491,6 +491,7 @@ static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerFqdn, uint16_t peerPort, uint32_t peerIp = taosGetIpFromFqdn(peerFqdn); if (peerIp == -1) { tError("%s, failed to resolve FQDN:%s", pRpc->label, peerFqdn); + terrno = TSDB_CODE_APP_ERROR; return NULL; } @@ -506,11 +507,7 @@ static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerFqdn, uint16_t peerPort, if (taosOpenConn[connType]) { void *shandle = (connType & RPC_CONN_TCP)? pRpc->tcphandle:pRpc->udphandle; pConn->chandle = (*taosOpenConn[connType])(shandle, pConn, pConn->peerIp, pConn->peerPort); - if (pConn->chandle) { - tTrace("%s %p, rpc connection is set up, sid:%d id:%s %s:%hu connType:%d", pRpc->label, - pConn, pConn->sid, pRpc->user, peerFqdn, pConn->peerPort, pConn->connType); - } else { - tError("%s %p, failed to set up connection to %s:%hu", pRpc->label, pConn, peerFqdn, pConn->peerPort); + if (pConn->chandle == NULL) { terrno = TSDB_CODE_NETWORK_UNAVAIL; rpcCloseConn(pConn); pConn = NULL; @@ -557,7 +554,7 @@ static void rpcCloseConn(void *thandle) { taosFreeId(pRpc->idPool, pConn->sid); pConn->pContext = NULL; - tTrace("%s %p, rpc connection is closed", pRpc->label, pConn); + tTrace("%s, rpc connection is closed", pConn->info); rpcUnlockConn(pConn); } @@ -619,7 +616,6 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) { } if (terrno != 0) { - tWarn("%s %p, user not there or server not ready", pRpc->label, pConn); taosFreeId(pRpc->idPool, sid); // sid shall be released pConn = NULL; } @@ -634,8 +630,6 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) { } taosHashPut(pRpc->hash, hashstr, size, (char *)&pConn, POINTER_BYTES); - tTrace("%s %p, rpc connection is allocated, sid:%d id:%s port:%u", - pRpc->label, pConn, sid, pConn->user, pConn->localPort); } return pConn; @@ -660,7 +654,6 @@ static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv) { if (pConn) { if (pConn->linkUid != pHead->linkUid) { - tTrace("%s %p, linkUid:0x%x not matched, received:0x%x", pRpc->label, pConn, pConn->linkUid, pHead->linkUid); terrno = TSDB_CODE_MISMATCHED_METER_ID; pConn = NULL; } @@ -677,21 +670,25 @@ static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext) { pConn = rpcGetConnFromCache(pRpc->pCache, pIpSet->fqdn[pIpSet->inUse], pIpSet->port[pIpSet->inUse], pContext->connType); if ( pConn == NULL || pConn->user[0] == 0) { pConn = rpcOpenConn(pRpc, pIpSet->fqdn[pIpSet->inUse], pIpSet->port[pIpSet->inUse], pContext->connType); + } + + if (pConn) { + pConn->ahandle = pContext->ahandle; + sprintf(pConn->info, "%s %p %p", pRpc->label, pConn, pConn->ahandle); } else { - tTrace("%s %p, connection is retrieved from cache", pRpc->label, pConn); + tError("%s %p, failed to set up connection(%s)", pRpc->label, pContext->ahandle, tstrerror(terrno)); } return pConn; } static int rpcProcessReqHead(SRpcConn *pConn, SRpcHead *pHead) { - SRpcInfo *pRpc= pConn->pRpc; if (pConn->peerId == 0) { pConn->peerId = pHead->sourceId; } else { if (pConn->peerId != pHead->sourceId) { - tTrace("%s %p, source Id is changed, old:0x%08x new:0x%08x", pRpc->label, pConn, + tTrace("%s, source Id is changed, old:0x%08x new:0x%08x", pConn->info, pConn->peerId, pHead->sourceId); return TSDB_CODE_INVALID_VALUE; } @@ -700,17 +697,16 @@ static int rpcProcessReqHead(SRpcConn *pConn, SRpcHead *pHead) { if (pConn->inTranId == pHead->tranId) { if (pConn->inType == pHead->msgType) { if (pHead->code == 0) { - tTrace("%s %p, %s is retransmitted", pRpc->label, pConn, taosMsg[pHead->msgType]); + tTrace("%s, %s is retransmitted", pConn->info, taosMsg[pHead->msgType]); rpcSendQuickRsp(pConn, TSDB_CODE_ACTION_IN_PROGRESS); } else { // do nothing, it is heart beat from client } } else if (pConn->inType == 0) { - tTrace("%s %p, %s is already processed, tranId:%d", pRpc->label, pConn, - taosMsg[pHead->msgType], pConn->inTranId); + tTrace("%s, %s is already processed, tranId:%d", pConn->info, taosMsg[pHead->msgType], pConn->inTranId); rpcSendMsgToPeer(pConn, pConn->pRspMsg, pConn->rspMsgLen); // resend the response } else { - tTrace("%s %p, mismatched message %s and tranId", pRpc->label, pConn, taosMsg[pHead->msgType]); + tTrace("%s, mismatched message %s and tranId", pConn->info, taosMsg[pHead->msgType]); } // do not reply any message @@ -718,7 +714,7 @@ static int rpcProcessReqHead(SRpcConn *pConn, SRpcHead *pHead) { } if (pConn->inType != 0) { - tTrace("%s %p, last session is not finished, inTranId:%d tranId:%d", pRpc->label, pConn, + tTrace("%s, last session is not finished, inTranId:%d tranId:%d", pConn->info, pConn->inTranId, pHead->tranId); return TSDB_CODE_LAST_SESSION_NOT_FINISHED; } @@ -750,7 +746,7 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) { if (pHead->code == TSDB_CODE_ACTION_IN_PROGRESS) { if (pConn->tretry <= tsRpcMaxRetry) { - tTrace("%s %p, peer is still processing the transaction", pRpc->label, pConn); + tTrace("%s, peer is still processing the transaction", pConn->info); pConn->tretry++; rpcSendReqHead(pConn); taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer); @@ -789,7 +785,15 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) { } pConn = rpcGetConnObj(pRpc, sid, pRecv); - if (pConn == NULL) return NULL; + if (pConn == NULL) { + tError("%s %p, failed to get connection obj(%s)", pRpc->label, pHead->ahandle, tstrerror(terrno)); + return NULL; + } else { + if (rpcIsReq(pHead->msgType)) { + pConn->ahandle = (void *)pHead->ahandle; + sprintf(pConn->info, "%s %p %p", pRpc->label, pConn, pConn->ahandle); + } + } rpcLockConn(pConn); sid = pConn->sid; @@ -826,7 +830,7 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) { static void rpcProcessBrokenLink(SRpcConn *pConn) { SRpcInfo *pRpc = pConn->pRpc; - tTrace("%s %p, link is broken", pRpc->label, pConn); + tTrace("%s, link is broken", pConn->info); // pConn->chandle = NULL; if (pConn->outType) { @@ -837,7 +841,7 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) { if (pConn->inType) { // if there are pending request, notify the app - tTrace("%s %p, connection is gone, notify the app", pRpc->label, pConn); + tTrace("%s, connection is gone, notify the app", pConn->info); /* SRpcMsg rpcMsg; rpcMsg.pCont = NULL; @@ -872,17 +876,17 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { pConn = rpcProcessMsgHead(pRpc, pRecv); if (pHead->msgType < TSDB_MSG_TYPE_CM_HEARTBEAT || (rpcDebugFlag & 16)) { - tTrace("%s %p, %s received from 0x%x:%hu, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", - pRpc->label, pConn, taosMsg[pHead->msgType], pRecv->ip, pRecv->port, terrno, + tTrace("%s %p %p, %s received from 0x%x:%hu, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", + pRpc->label, pConn, (void *)pHead->ahandle, taosMsg[pHead->msgType], pRecv->ip, pRecv->port, terrno, pRecv->msgLen, pHead->sourceId, pHead->destId, pHead->tranId, pHead->code); } int32_t code = terrno; if (code != TSDB_CODE_ALREADY_PROCESSED) { if (code != 0) { // parsing error - if ( rpcIsReq(pHead->msgType) ) { + if (rpcIsReq(pHead->msgType)) { rpcSendErrorMsgToPeer(pRecv, code); - tTrace("%s %p, %s is sent with error code:%x", pRpc->label, pConn, taosMsg[pHead->msgType+1], code); + tTrace("%s %p %p, %s is sent with error code:%x", pRpc->label, pConn, (void *)pHead->ahandle, taosMsg[pHead->msgType+1], code); } } else { // parsing OK rpcProcessIncomingMsg(pConn, pHead); @@ -924,6 +928,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { rpcMsg.pCont = pHead->content; rpcMsg.msgType = pHead->msgType; rpcMsg.code = pHead->code; + rpcMsg.ahandle = pConn->ahandle; if ( rpcIsReq(pHead->msgType) ) { rpcMsg.handle = pConn; @@ -948,14 +953,14 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { pContext->redirect++; if (pContext->redirect > TSDB_MAX_REPLICA) { pHead->code = TSDB_CODE_NETWORK_UNAVAIL; - tWarn("%s %p, too many redirects, quit", pRpc->label, pConn); + tWarn("%s, too many redirects, quit", pConn->info); } } if (pHead->code == TSDB_CODE_REDIRECT) { pContext->numOfTry = 0; memcpy(&pContext->ipSet, pHead->content, sizeof(pContext->ipSet)); - tTrace("%s %p, redirect is received, numOfIps:%d", pRpc->label, pConn, pContext->ipSet.numOfIps); + tTrace("%s, redirect is received, numOfIps:%d", pConn->info, pContext->ipSet.numOfIps); for (int i=0; iipSet.numOfIps; ++i) pContext->ipSet.port[i] = htons(pContext->ipSet.port[i]); rpcSendReqToServer(pRpc, pContext); @@ -1061,6 +1066,7 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { return; } + pConn->ahandle = pContext->ahandle; rpcLockConn(pConn); // set the message header @@ -1074,6 +1080,7 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { pHead->destId = pConn->peerId; pHead->port = 0; pHead->linkUid = pConn->linkUid; + pHead->ahandle = (uint64_t)pConn->ahandle; if (!pConn->secured) memcpy(pHead->user, pConn->user, tListLen(pHead->user)); // set the connection parameters @@ -1091,29 +1098,27 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) { int writtenLen = 0; - SRpcInfo *pRpc = pConn->pRpc; SRpcHead *pHead = (SRpcHead *)msg; msgLen = rpcAddAuthPart(pConn, msg, msgLen); if ( rpcIsReq(pHead->msgType)) { if (pHead->msgType < TSDB_MSG_TYPE_CM_HEARTBEAT || (rpcDebugFlag & 16)) - tTrace("%s %p, %s is sent to %s:%hu, len:%d sig:0x%08x:0x%08x:%d", - pRpc->label, pConn, taosMsg[pHead->msgType], pConn->peerFqdn, - pConn->peerPort, msgLen, pHead->sourceId, pHead->destId, pHead->tranId); + tTrace("%s, %s is sent to %s:%hu, len:%d sig:0x%08x:0x%08x:%d", + pConn->info, taosMsg[pHead->msgType], pConn->peerFqdn, pConn->peerPort, + msgLen, pHead->sourceId, pHead->destId, pHead->tranId); } else { if (pHead->code == 0) pConn->secured = 1; // for success response, set link as secured if (pHead->msgType < TSDB_MSG_TYPE_CM_HEARTBEAT || (rpcDebugFlag & 16)) - tTrace( "%s %p, %s is sent to 0x%x:%hu, code:0x%x len:%d sig:0x%08x:0x%08x:%d", - pRpc->label, pConn, taosMsg[pHead->msgType], pConn->peerIp, pConn->peerPort, + tTrace("%s, %s is sent to 0x%x:%hu, code:0x%x len:%d sig:0x%08x:0x%08x:%d", + pConn->info, taosMsg[pHead->msgType], pConn->peerIp, pConn->peerPort, htonl(pHead->code), msgLen, pHead->sourceId, pHead->destId, pHead->tranId); } writtenLen = (*taosSendData[pConn->connType])(pConn->peerIp, pConn->peerPort, pHead, msgLen, pConn->chandle); if (writtenLen != msgLen) { - tError("%s %p, failed to send, dataLen:%d writtenLen:%d, reason:%s", pRpc->label, pConn, - msgLen, writtenLen, strerror(errno)); + tError("%s, failed to send, msgLen:%d written:%d, reason:%s", pConn->info, msgLen, writtenLen, strerror(errno)); } tDump(msg, msgLen); @@ -1128,7 +1133,7 @@ static void rpcProcessConnError(void *param, void *id) { return; } - tTrace("%s connection error happens", pRpc->label); + tTrace("%s %p, connection error happens", pRpc->label, pContext->ahandle); if (pContext->numOfTry >= pContext->ipSet.numOfIps) { rpcMsg.msgType = pContext->msgType+1; @@ -1154,23 +1159,21 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) { rpcLockConn(pConn); if (pConn->outType && pConn->user[0]) { - tTrace("%s %p, expected %s is not received", pRpc->label, pConn, taosMsg[(int)pConn->outType + 1]); + tTrace("%s, expected %s is not received", pConn->info, taosMsg[(int)pConn->outType + 1]); pConn->pTimer = NULL; pConn->retry++; if (pConn->retry < 4) { - tTrace("%s %p, re-send msg:%s to %s:%hu", pRpc->label, pConn, - taosMsg[pConn->outType], pConn->peerFqdn, pConn->peerPort); + tTrace("%s, re-send msg:%s to %s:%hu", pConn->info, taosMsg[pConn->outType], pConn->peerFqdn, pConn->peerPort); rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen); taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer); } else { // close the connection - tTrace("%s %p, failed to send msg:%s to %s:%hu", pRpc->label, pConn, - taosMsg[pConn->outType], pConn->peerFqdn, pConn->peerPort); + tTrace("%s, failed to send msg:%s to %s:%hu", pConn->info, taosMsg[pConn->outType], pConn->peerFqdn, pConn->peerPort); reportDisc = 1; } } else { - tTrace("%s %p, retry timer not processed", pRpc->label, pConn); + tTrace("%s, retry timer not processed", pConn->info); } rpcUnlockConn(pConn); @@ -1187,10 +1190,10 @@ static void rpcProcessIdleTimer(void *param, void *tmrId) { SRpcInfo *pRpc = pConn->pRpc; if (pConn->user[0]) { - tTrace("%s %p, close the connection since no activity", pRpc->label, pConn); + tTrace("%s, close the connection since no activity", pConn->info); if (pConn->inType && pRpc->cfp) { // if there are pending request, notify the app - tTrace("%s %p, notify the app, connection is gone", pRpc->label, pConn); + tTrace("%s, notify the app, connection is gone", pConn->info); /* SRpcMsg rpcMsg; rpcMsg.pCont = NULL; @@ -1203,7 +1206,7 @@ static void rpcProcessIdleTimer(void *param, void *tmrId) { } rpcCloseConn(pConn); } else { - tTrace("%s %p, idle timer:%p not processed", pRpc->label, pConn, tmrId); + tTrace("%s, idle timer:%p not processed", pConn->info, tmrId); } } @@ -1214,11 +1217,11 @@ static void rpcProcessProgressTimer(void *param, void *tmrId) { rpcLockConn(pConn); if (pConn->inType && pConn->user[0]) { - tTrace("%s %p, progress timer expired, send progress", pRpc->label, pConn); + tTrace("%s, progress timer expired, send progress", pConn->info); rpcSendQuickRsp(pConn, TSDB_CODE_ACTION_IN_PROGRESS); taosTmrReset(rpcProcessProgressTimer, tsRpcTimer/2, pConn, pRpc->tmrCtrl, &pConn->pTimer); } else { - tTrace("%s %p, progress timer:%p not processed", pRpc->label, pConn, tmrId); + tTrace("%s, progress timer:%p not processed", pConn->info, tmrId); } rpcUnlockConn(pConn); @@ -1252,7 +1255,7 @@ static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) { memcpy(pCont + overhead, buf, compLen); pHead->comp = 1; - tTrace("compress rpc msg, before:%d, after:%d", contLen, compLen); + //tTrace("compress rpc msg, before:%d, after:%d", contLen, compLen); finalLen = compLen + overhead; } else { finalLen = contLen; @@ -1286,7 +1289,7 @@ static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead) { pNewHead->msgLen = rpcMsgLenFromCont(origLen); rpcFreeMsg(pHead); // free the compressed message buffer pHead = pNewHead; - tTrace("decompress rpc msg, compLen:%d, after:%d", compLen, contLen); + //tTrace("decompress rpc msg, compLen:%d, after:%d", compLen, contLen); } else { tError("failed to allocate memory to decompress msg, contLen:%d", contLen); } @@ -1343,7 +1346,6 @@ static int rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen) { static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) { SRpcHead *pHead = (SRpcHead *)msg; - SRpcInfo *pRpc = pConn->pRpc; int code = 0; if ((pConn->secured && pHead->spi == 0) || (pHead->spi == 0 && pConn->spi == 0)){ @@ -1371,20 +1373,20 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) { delta = (int32_t)htonl(pDigest->timeStamp); delta -= (int32_t)taosGetTimestampSec(); if (abs(delta) > 900) { - tWarn("%s %p, time diff:%d is too big, msg discarded", pRpc->label, pConn, delta); + tWarn("%s, time diff:%d is too big, msg discarded", pConn->info, delta); code = TSDB_CODE_INVALID_TIME_STAMP; } else { if (rpcAuthenticateMsg(pHead, msgLen-TSDB_AUTH_LEN, pDigest->auth, pConn->secret) < 0) { - tError("%s %p, authentication failed, msg discarded", pRpc->label, pConn); + tError("%s, authentication failed, msg discarded", pConn->info); code = TSDB_CODE_AUTH_FAILURE; } else { pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen) - sizeof(SRpcDigest); if ( !rpcIsReq(pHead->msgType) ) pConn->secured = 1; // link is secured for client - tTrace("%s %p, message is authenticated", pRpc->label, pConn); + //tTrace("%s, message is authenticated", pConn->info); } } } else { - tTrace("%s %p, auth spi:%d not matched with received:%d", pRpc->label, pConn, pConn->spi, pHead->spi); + tError("%s, auth spi:%d not matched with received:%d", pConn->info, pConn->spi, pHead->spi); code = TSDB_CODE_AUTH_FAILURE; } diff --git a/src/rpc/src/rpcUdp.c b/src/rpc/src/rpcUdp.c index c551f6b1db1acfe847ce51ef70b1f883ed66cb05..677187e3b999c3e55301eea3efc3bf5b8a19395e 100644 --- a/src/rpc/src/rpcUdp.c +++ b/src/rpc/src/rpcUdp.c @@ -218,7 +218,7 @@ static void *taosRecvUdpData(void *param) { while (1) { dataLen = recvfrom(pConn->fd, pConn->buffer, RPC_MAX_UDP_SIZE, 0, (struct sockaddr *)&sourceAdd, &addLen); port = ntohs(sourceAdd.sin_port); - tTrace("%s msg is recv from 0x%x:%hu len:%d", pConn->label, sourceAdd.sin_addr.s_addr, port, dataLen); + //tTrace("%s msg is recv from 0x%x:%hu len:%d", pConn->label, sourceAdd.sin_addr.s_addr, port, dataLen); if (dataLen < sizeof(SRpcHead)) { tError("%s recvfrom failed, reason:%s\n", pConn->label, strerror(errno));