提交 95d7105e 编写于 作者: 陶建辉(Jeff)'s avatar 陶建辉(Jeff)

tune the debug info

上级 714d8ea9
...@@ -235,7 +235,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) { ...@@ -235,7 +235,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) {
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
STscObj *pObj = pSql->pTscObj; STscObj *pObj = pSql->pTscObj;
tscTrace("%p msg:%p is received from server", pSql, rpcMsg->pCont); tscTrace("%p msg:%s is received from server", pSql, taosMsg[rpcMsg->msgType]);
if (pSql->freed || pObj->signature != pObj) { if (pSql->freed || pObj->signature != pObj) {
tscTrace("%p sql is already released or DB connection is closed, freed:%d pObj:%p signature:%p", pSql, pSql->freed, tscTrace("%p sql is already released or DB connection is closed, freed:%d pObj:%p signature:%p", pSql, pSql->freed,
......
...@@ -92,8 +92,6 @@ void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg) { ...@@ -92,8 +92,6 @@ void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg) {
char *pCont = (char *) pMsg->pCont; char *pCont = (char *) pMsg->pCont;
void *pVnode; void *pVnode;
dTrace("dnode %s msg incoming, thandle:%p", taosMsg[pMsg->msgType], pMsg->handle);
while (leftLen > 0) { while (leftLen > 0) {
SMsgHead *pHead = (SMsgHead *) pCont; SMsgHead *pHead = (SMsgHead *) pCont;
pHead->vgId = htonl(pHead->vgId); pHead->vgId = htonl(pHead->vgId);
...@@ -214,6 +212,7 @@ static void *dnodeProcessReadQueue(void *param) { ...@@ -214,6 +212,7 @@ static void *dnodeProcessReadQueue(void *param) {
continue; continue;
} }
dTrace("%p, msg:%s will be processed", pReadMsg->rpcMsg.ahandle, taosMsg[pReadMsg->rpcMsg.msgType]);
int32_t code = vnodeProcessRead(pVnode, pReadMsg->rpcMsg.msgType, pReadMsg->pCont, pReadMsg->contLen, &pReadMsg->rspRet); int32_t code = vnodeProcessRead(pVnode, pReadMsg->rpcMsg.msgType, pReadMsg->pCont, pReadMsg->contLen, &pReadMsg->rspRet);
dnodeSendRpcReadRsp(pVnode, pReadMsg, code); dnodeSendRpcReadRsp(pVnode, pReadMsg, code);
taosFreeQitem(pReadMsg); taosFreeQitem(pReadMsg);
......
...@@ -200,6 +200,7 @@ static void *dnodeProcessWriteQueue(void *param) { ...@@ -200,6 +200,7 @@ static void *dnodeProcessWriteQueue(void *param) {
pHead->msgType = pWrite->rpcMsg.msgType; pHead->msgType = pWrite->rpcMsg.msgType;
pHead->version = 0; pHead->version = 0;
pHead->len = pWrite->contLen; pHead->len = pWrite->contLen;
dTrace("%p, msg:%s will be processed", pWrite->rpcMsg.ahandle, taosMsg[pWrite->rpcMsg.msgType]);
} else { } else {
pHead = (SWalHead *)item; pHead = (SWalHead *)item;
} }
......
...@@ -48,6 +48,7 @@ typedef struct { ...@@ -48,6 +48,7 @@ typedef struct {
int contLen; int contLen;
int32_t code; int32_t code;
void *handle; void *handle;
void *ahandle; //app handle set by client, for debug purpose
} SRpcMsg; } SRpcMsg;
typedef struct { typedef struct {
......
...@@ -124,6 +124,8 @@ void mgmtDealyedAddToShellQueue(SQueuedMsg *queuedMsg) { ...@@ -124,6 +124,8 @@ void mgmtDealyedAddToShellQueue(SQueuedMsg *queuedMsg) {
void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) { void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) {
mTrace("%p, msg:%s will be processed", rpcMsg->ahandle, taosMsg[rpcMsg->msgType]);
if (rpcMsg->pCont == NULL) { if (rpcMsg->pCont == NULL) {
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_INVALID_MSG_LEN); mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_INVALID_MSG_LEN);
return; return;
......
...@@ -49,6 +49,7 @@ typedef struct { ...@@ -49,6 +49,7 @@ typedef struct {
char encrypt:3; // encrypt algorithm, 0: no encryption char encrypt:3; // encrypt algorithm, 0: no encryption
uint16_t tranId; // transcation ID uint16_t tranId; // transcation ID
uint32_t linkUid; // for unique connection ID assigned by client uint32_t linkUid; // for unique connection ID assigned by client
uint64_t ahandle; // ahandle assigned by client
uint32_t sourceId; // source ID, an index for connection list uint32_t sourceId; // source ID, an index for connection list
uint32_t destId; // destination ID, an index for connection list uint32_t destId; // destination ID, an index for connection list
uint32_t destIp; // destination IP address, for NAT scenario uint32_t destIp; // destination IP address, for NAT scenario
......
...@@ -146,7 +146,7 @@ void rpcAddConnIntoCache(void *handle, void *data, char *fqdn, uint16_t port, in ...@@ -146,7 +146,7 @@ void rpcAddConnIntoCache(void *handle, void *data, char *fqdn, uint16_t port, in
rpcUnlockCache(pCache->lockedBy+hash); rpcUnlockCache(pCache->lockedBy+hash);
pCache->total++; pCache->total++;
tTrace("%p %s:%hu:%d:%d:%p added into cache, connections:%d", data, fqdn, port, connType, hash, pNode, pCache->count[hash]); // tTrace("%p %s:%hu:%d:%d:%p added into cache, connections:%d", data, fqdn, port, connType, hash, pNode, pCache->count[hash]);
return; return;
} }
...@@ -200,9 +200,9 @@ void *rpcGetConnFromCache(void *handle, char *fqdn, uint16_t port, int8_t connTy ...@@ -200,9 +200,9 @@ void *rpcGetConnFromCache(void *handle, char *fqdn, uint16_t port, int8_t connTy
rpcUnlockCache(pCache->lockedBy+hash); rpcUnlockCache(pCache->lockedBy+hash);
if (pData) { if (pData) {
tTrace("%p %s:%hu:%d:%d:%p retrieved from cache, connections:%d", pData, fqdn, port, connType, hash, pNode, pCache->count[hash]); //tTrace("%p %s:%hu:%d:%d:%p retrieved from cache, connections:%d", pData, fqdn, port, connType, hash, pNode, pCache->count[hash]);
} else { } else {
tTrace("%s:%hu:%d:%d failed to retrieve conn from cache, connections:%d", fqdn, port, connType, hash, pCache->count[hash]); //tTrace("%s:%hu:%d:%d failed to retrieve conn from cache, connections:%d", fqdn, port, connType, hash, pCache->count[hash]);
} }
return pData; return pData;
...@@ -240,8 +240,8 @@ static void rpcRemoveExpiredNodes(SConnCache *pCache, SConnHash *pNode, int hash ...@@ -240,8 +240,8 @@ static void rpcRemoveExpiredNodes(SConnCache *pCache, SConnHash *pNode, int hash
pNext = pNode->next; pNext = pNode->next;
pCache->total--; pCache->total--;
pCache->count[hash]--; pCache->count[hash]--;
tTrace("%p %s:%hu:%d:%d:%p removed from cache, connections:%d", pNode->data, pNode->fqdn, pNode->port, pNode->connType, hash, pNode, //tTrace("%p %s:%hu:%d:%d:%p removed from cache, connections:%d", pNode->data, pNode->fqdn, pNode->port, pNode->connType, hash, pNode,
pCache->count[hash]); // pCache->count[hash]);
taosMemPoolFree(pCache->connHashMemPool, (char *)pNode); taosMemPoolFree(pCache->connHashMemPool, (char *)pNode);
pNode = pNext; pNode = pNext;
} }
......
...@@ -87,6 +87,7 @@ typedef struct { ...@@ -87,6 +87,7 @@ typedef struct {
} SRpcReqContext; } SRpcReqContext;
typedef struct SRpcConn { typedef struct SRpcConn {
char info[50];// debug info: label + pConn + ahandle
int sid; // session ID int sid; // session ID
uint32_t ownId; // own link ID uint32_t ownId; // own link ID
uint32_t peerId; // peer link ID uint32_t peerId; // peer link ID
...@@ -275,7 +276,7 @@ void *rpcOpen(const SRpcInit *pInit) { ...@@ -275,7 +276,7 @@ void *rpcOpen(const SRpcInit *pInit) {
return NULL; return NULL;
} }
tTrace("%s RPC is openned, numOfThreads:%d", pRpc->label, pRpc->numOfThreads); tTrace("%s rpc is openned, threads:%d sessions:%d", pRpc->label, pRpc->numOfThreads, pInit->sessions);
return pRpc; return pRpc;
} }
...@@ -299,7 +300,7 @@ void rpcClose(void *param) { ...@@ -299,7 +300,7 @@ void rpcClose(void *param) {
tfree(pRpc->connList); tfree(pRpc->connList);
pthread_mutex_destroy(&pRpc->mutex); pthread_mutex_destroy(&pRpc->mutex);
tTrace("%s RPC is closed", pRpc->label); tTrace("%s rpc is closed", pRpc->label);
tfree(pRpc); tfree(pRpc);
} }
...@@ -374,8 +375,6 @@ void rpcSendRequest(void *shandle, const SRpcIpSet *pIpSet, const SRpcMsg *pMsg) ...@@ -374,8 +375,6 @@ void rpcSendRequest(void *shandle, const SRpcIpSet *pIpSet, const SRpcMsg *pMsg)
void rpcSendResponse(const SRpcMsg *pRsp) { void rpcSendResponse(const SRpcMsg *pRsp) {
int msgLen = 0; int msgLen = 0;
SRpcConn *pConn = (SRpcConn *)pRsp->handle; SRpcConn *pConn = (SRpcConn *)pRsp->handle;
SRpcInfo *pRpc = pConn->pRpc;
SRpcMsg rpcMsg = *pRsp; SRpcMsg rpcMsg = *pRsp;
SRpcMsg *pMsg = &rpcMsg; SRpcMsg *pMsg = &rpcMsg;
...@@ -393,7 +392,7 @@ void rpcSendResponse(const SRpcMsg *pRsp) { ...@@ -393,7 +392,7 @@ void rpcSendResponse(const SRpcMsg *pRsp) {
rpcLockConn(pConn); rpcLockConn(pConn);
if ( pConn->inType == 0 || pConn->user[0] == 0 ) { if ( pConn->inType == 0 || pConn->user[0] == 0 ) {
tTrace("%s %p, connection is already released, rsp wont be sent", pRpc->label, pConn); tTrace("%s, connection is already released, rsp wont be sent", pConn->info);
rpcUnlockConn(pConn); rpcUnlockConn(pConn);
return; return;
} }
...@@ -409,7 +408,8 @@ void rpcSendResponse(const SRpcMsg *pRsp) { ...@@ -409,7 +408,8 @@ void rpcSendResponse(const SRpcMsg *pRsp) {
pHead->linkUid = pConn->linkUid; pHead->linkUid = pConn->linkUid;
pHead->port = htons(pConn->localPort); pHead->port = htons(pConn->localPort);
pHead->code = htonl(pMsg->code); pHead->code = htonl(pMsg->code);
pHead->ahandle = (uint64_t) pConn->ahandle;
// set pConn parameters // set pConn parameters
pConn->inType = 0; pConn->inType = 0;
...@@ -491,6 +491,7 @@ static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerFqdn, uint16_t peerPort, ...@@ -491,6 +491,7 @@ static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerFqdn, uint16_t peerPort,
uint32_t peerIp = taosGetIpFromFqdn(peerFqdn); uint32_t peerIp = taosGetIpFromFqdn(peerFqdn);
if (peerIp == -1) { if (peerIp == -1) {
tError("%s, failed to resolve FQDN:%s", pRpc->label, peerFqdn); tError("%s, failed to resolve FQDN:%s", pRpc->label, peerFqdn);
terrno = TSDB_CODE_APP_ERROR;
return NULL; return NULL;
} }
...@@ -506,11 +507,7 @@ static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerFqdn, uint16_t peerPort, ...@@ -506,11 +507,7 @@ static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerFqdn, uint16_t peerPort,
if (taosOpenConn[connType]) { if (taosOpenConn[connType]) {
void *shandle = (connType & RPC_CONN_TCP)? pRpc->tcphandle:pRpc->udphandle; void *shandle = (connType & RPC_CONN_TCP)? pRpc->tcphandle:pRpc->udphandle;
pConn->chandle = (*taosOpenConn[connType])(shandle, pConn, pConn->peerIp, pConn->peerPort); pConn->chandle = (*taosOpenConn[connType])(shandle, pConn, pConn->peerIp, pConn->peerPort);
if (pConn->chandle) { if (pConn->chandle == NULL) {
tTrace("%s %p, rpc connection is set up, sid:%d id:%s %s:%hu connType:%d", pRpc->label,
pConn, pConn->sid, pRpc->user, peerFqdn, pConn->peerPort, pConn->connType);
} else {
tError("%s %p, failed to set up connection to %s:%hu", pRpc->label, pConn, peerFqdn, pConn->peerPort);
terrno = TSDB_CODE_NETWORK_UNAVAIL; terrno = TSDB_CODE_NETWORK_UNAVAIL;
rpcCloseConn(pConn); rpcCloseConn(pConn);
pConn = NULL; pConn = NULL;
...@@ -557,7 +554,7 @@ static void rpcCloseConn(void *thandle) { ...@@ -557,7 +554,7 @@ static void rpcCloseConn(void *thandle) {
taosFreeId(pRpc->idPool, pConn->sid); taosFreeId(pRpc->idPool, pConn->sid);
pConn->pContext = NULL; pConn->pContext = NULL;
tTrace("%s %p, rpc connection is closed", pRpc->label, pConn); tTrace("%s, rpc connection is closed", pConn->info);
rpcUnlockConn(pConn); rpcUnlockConn(pConn);
} }
...@@ -619,7 +616,6 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) { ...@@ -619,7 +616,6 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) {
} }
if (terrno != 0) { if (terrno != 0) {
tWarn("%s %p, user not there or server not ready", pRpc->label, pConn);
taosFreeId(pRpc->idPool, sid); // sid shall be released taosFreeId(pRpc->idPool, sid); // sid shall be released
pConn = NULL; pConn = NULL;
} }
...@@ -634,8 +630,6 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) { ...@@ -634,8 +630,6 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) {
} }
taosHashPut(pRpc->hash, hashstr, size, (char *)&pConn, POINTER_BYTES); taosHashPut(pRpc->hash, hashstr, size, (char *)&pConn, POINTER_BYTES);
tTrace("%s %p, rpc connection is allocated, sid:%d id:%s port:%u",
pRpc->label, pConn, sid, pConn->user, pConn->localPort);
} }
return pConn; return pConn;
...@@ -660,7 +654,6 @@ static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv) { ...@@ -660,7 +654,6 @@ static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv) {
if (pConn) { if (pConn) {
if (pConn->linkUid != pHead->linkUid) { if (pConn->linkUid != pHead->linkUid) {
tTrace("%s %p, linkUid:0x%x not matched, received:0x%x", pRpc->label, pConn, pConn->linkUid, pHead->linkUid);
terrno = TSDB_CODE_MISMATCHED_METER_ID; terrno = TSDB_CODE_MISMATCHED_METER_ID;
pConn = NULL; pConn = NULL;
} }
...@@ -677,21 +670,25 @@ static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext) { ...@@ -677,21 +670,25 @@ static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext) {
pConn = rpcGetConnFromCache(pRpc->pCache, pIpSet->fqdn[pIpSet->inUse], pIpSet->port[pIpSet->inUse], pContext->connType); pConn = rpcGetConnFromCache(pRpc->pCache, pIpSet->fqdn[pIpSet->inUse], pIpSet->port[pIpSet->inUse], pContext->connType);
if ( pConn == NULL || pConn->user[0] == 0) { if ( pConn == NULL || pConn->user[0] == 0) {
pConn = rpcOpenConn(pRpc, pIpSet->fqdn[pIpSet->inUse], pIpSet->port[pIpSet->inUse], pContext->connType); pConn = rpcOpenConn(pRpc, pIpSet->fqdn[pIpSet->inUse], pIpSet->port[pIpSet->inUse], pContext->connType);
}
if (pConn) {
pConn->ahandle = pContext->ahandle;
sprintf(pConn->info, "%s %p %p", pRpc->label, pConn, pConn->ahandle);
} else { } else {
tTrace("%s %p, connection is retrieved from cache", pRpc->label, pConn); tError("%s %p, failed to set up connection(%s)", pRpc->label, pContext->ahandle, tstrerror(terrno));
} }
return pConn; return pConn;
} }
static int rpcProcessReqHead(SRpcConn *pConn, SRpcHead *pHead) { static int rpcProcessReqHead(SRpcConn *pConn, SRpcHead *pHead) {
SRpcInfo *pRpc= pConn->pRpc;
if (pConn->peerId == 0) { if (pConn->peerId == 0) {
pConn->peerId = pHead->sourceId; pConn->peerId = pHead->sourceId;
} else { } else {
if (pConn->peerId != pHead->sourceId) { if (pConn->peerId != pHead->sourceId) {
tTrace("%s %p, source Id is changed, old:0x%08x new:0x%08x", pRpc->label, pConn, tTrace("%s, source Id is changed, old:0x%08x new:0x%08x", pConn->info,
pConn->peerId, pHead->sourceId); pConn->peerId, pHead->sourceId);
return TSDB_CODE_INVALID_VALUE; return TSDB_CODE_INVALID_VALUE;
} }
...@@ -700,17 +697,16 @@ static int rpcProcessReqHead(SRpcConn *pConn, SRpcHead *pHead) { ...@@ -700,17 +697,16 @@ static int rpcProcessReqHead(SRpcConn *pConn, SRpcHead *pHead) {
if (pConn->inTranId == pHead->tranId) { if (pConn->inTranId == pHead->tranId) {
if (pConn->inType == pHead->msgType) { if (pConn->inType == pHead->msgType) {
if (pHead->code == 0) { if (pHead->code == 0) {
tTrace("%s %p, %s is retransmitted", pRpc->label, pConn, taosMsg[pHead->msgType]); tTrace("%s, %s is retransmitted", pConn->info, taosMsg[pHead->msgType]);
rpcSendQuickRsp(pConn, TSDB_CODE_ACTION_IN_PROGRESS); rpcSendQuickRsp(pConn, TSDB_CODE_ACTION_IN_PROGRESS);
} else { } else {
// do nothing, it is heart beat from client // do nothing, it is heart beat from client
} }
} else if (pConn->inType == 0) { } else if (pConn->inType == 0) {
tTrace("%s %p, %s is already processed, tranId:%d", pRpc->label, pConn, tTrace("%s, %s is already processed, tranId:%d", pConn->info, taosMsg[pHead->msgType], pConn->inTranId);
taosMsg[pHead->msgType], pConn->inTranId);
rpcSendMsgToPeer(pConn, pConn->pRspMsg, pConn->rspMsgLen); // resend the response rpcSendMsgToPeer(pConn, pConn->pRspMsg, pConn->rspMsgLen); // resend the response
} else { } else {
tTrace("%s %p, mismatched message %s and tranId", pRpc->label, pConn, taosMsg[pHead->msgType]); tTrace("%s, mismatched message %s and tranId", pConn->info, taosMsg[pHead->msgType]);
} }
// do not reply any message // do not reply any message
...@@ -718,7 +714,7 @@ static int rpcProcessReqHead(SRpcConn *pConn, SRpcHead *pHead) { ...@@ -718,7 +714,7 @@ static int rpcProcessReqHead(SRpcConn *pConn, SRpcHead *pHead) {
} }
if (pConn->inType != 0) { if (pConn->inType != 0) {
tTrace("%s %p, last session is not finished, inTranId:%d tranId:%d", pRpc->label, pConn, tTrace("%s, last session is not finished, inTranId:%d tranId:%d", pConn->info,
pConn->inTranId, pHead->tranId); pConn->inTranId, pHead->tranId);
return TSDB_CODE_LAST_SESSION_NOT_FINISHED; return TSDB_CODE_LAST_SESSION_NOT_FINISHED;
} }
...@@ -750,7 +746,7 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) { ...@@ -750,7 +746,7 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) {
if (pHead->code == TSDB_CODE_ACTION_IN_PROGRESS) { if (pHead->code == TSDB_CODE_ACTION_IN_PROGRESS) {
if (pConn->tretry <= tsRpcMaxRetry) { if (pConn->tretry <= tsRpcMaxRetry) {
tTrace("%s %p, peer is still processing the transaction", pRpc->label, pConn); tTrace("%s, peer is still processing the transaction", pConn->info);
pConn->tretry++; pConn->tretry++;
rpcSendReqHead(pConn); rpcSendReqHead(pConn);
taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer); taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer);
...@@ -789,7 +785,15 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) { ...@@ -789,7 +785,15 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) {
} }
pConn = rpcGetConnObj(pRpc, sid, pRecv); pConn = rpcGetConnObj(pRpc, sid, pRecv);
if (pConn == NULL) return NULL; if (pConn == NULL) {
tError("%s %p, failed to get connection obj(%s)", pRpc->label, pHead->ahandle, tstrerror(terrno));
return NULL;
} else {
if (rpcIsReq(pHead->msgType)) {
pConn->ahandle = (void *)pHead->ahandle;
sprintf(pConn->info, "%s %p %p", pRpc->label, pConn, pConn->ahandle);
}
}
rpcLockConn(pConn); rpcLockConn(pConn);
sid = pConn->sid; sid = pConn->sid;
...@@ -826,7 +830,7 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) { ...@@ -826,7 +830,7 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) {
static void rpcProcessBrokenLink(SRpcConn *pConn) { static void rpcProcessBrokenLink(SRpcConn *pConn) {
SRpcInfo *pRpc = pConn->pRpc; SRpcInfo *pRpc = pConn->pRpc;
tTrace("%s %p, link is broken", pRpc->label, pConn); tTrace("%s, link is broken", pConn->info);
// pConn->chandle = NULL; // pConn->chandle = NULL;
if (pConn->outType) { if (pConn->outType) {
...@@ -837,7 +841,7 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) { ...@@ -837,7 +841,7 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) {
if (pConn->inType) { if (pConn->inType) {
// if there are pending request, notify the app // if there are pending request, notify the app
tTrace("%s %p, connection is gone, notify the app", pRpc->label, pConn); tTrace("%s, connection is gone, notify the app", pConn->info);
/* /*
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
rpcMsg.pCont = NULL; rpcMsg.pCont = NULL;
...@@ -872,17 +876,17 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { ...@@ -872,17 +876,17 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
pConn = rpcProcessMsgHead(pRpc, pRecv); pConn = rpcProcessMsgHead(pRpc, pRecv);
if (pHead->msgType < TSDB_MSG_TYPE_CM_HEARTBEAT || (rpcDebugFlag & 16)) { if (pHead->msgType < TSDB_MSG_TYPE_CM_HEARTBEAT || (rpcDebugFlag & 16)) {
tTrace("%s %p, %s received from 0x%x:%hu, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", tTrace("%s %p %p, %s received from 0x%x:%hu, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x",
pRpc->label, pConn, taosMsg[pHead->msgType], pRecv->ip, pRecv->port, terrno, pRpc->label, pConn, (void *)pHead->ahandle, taosMsg[pHead->msgType], pRecv->ip, pRecv->port, terrno,
pRecv->msgLen, pHead->sourceId, pHead->destId, pHead->tranId, pHead->code); pRecv->msgLen, pHead->sourceId, pHead->destId, pHead->tranId, pHead->code);
} }
int32_t code = terrno; int32_t code = terrno;
if (code != TSDB_CODE_ALREADY_PROCESSED) { if (code != TSDB_CODE_ALREADY_PROCESSED) {
if (code != 0) { // parsing error if (code != 0) { // parsing error
if ( rpcIsReq(pHead->msgType) ) { if (rpcIsReq(pHead->msgType)) {
rpcSendErrorMsgToPeer(pRecv, code); rpcSendErrorMsgToPeer(pRecv, code);
tTrace("%s %p, %s is sent with error code:%x", pRpc->label, pConn, taosMsg[pHead->msgType+1], code); tTrace("%s %p %p, %s is sent with error code:%x", pRpc->label, pConn, (void *)pHead->ahandle, taosMsg[pHead->msgType+1], code);
} }
} else { // parsing OK } else { // parsing OK
rpcProcessIncomingMsg(pConn, pHead); rpcProcessIncomingMsg(pConn, pHead);
...@@ -924,6 +928,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { ...@@ -924,6 +928,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
rpcMsg.pCont = pHead->content; rpcMsg.pCont = pHead->content;
rpcMsg.msgType = pHead->msgType; rpcMsg.msgType = pHead->msgType;
rpcMsg.code = pHead->code; rpcMsg.code = pHead->code;
rpcMsg.ahandle = pConn->ahandle;
if ( rpcIsReq(pHead->msgType) ) { if ( rpcIsReq(pHead->msgType) ) {
rpcMsg.handle = pConn; rpcMsg.handle = pConn;
...@@ -948,14 +953,14 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { ...@@ -948,14 +953,14 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
pContext->redirect++; pContext->redirect++;
if (pContext->redirect > TSDB_MAX_REPLICA) { if (pContext->redirect > TSDB_MAX_REPLICA) {
pHead->code = TSDB_CODE_NETWORK_UNAVAIL; pHead->code = TSDB_CODE_NETWORK_UNAVAIL;
tWarn("%s %p, too many redirects, quit", pRpc->label, pConn); tWarn("%s, too many redirects, quit", pConn->info);
} }
} }
if (pHead->code == TSDB_CODE_REDIRECT) { if (pHead->code == TSDB_CODE_REDIRECT) {
pContext->numOfTry = 0; pContext->numOfTry = 0;
memcpy(&pContext->ipSet, pHead->content, sizeof(pContext->ipSet)); memcpy(&pContext->ipSet, pHead->content, sizeof(pContext->ipSet));
tTrace("%s %p, redirect is received, numOfIps:%d", pRpc->label, pConn, pContext->ipSet.numOfIps); tTrace("%s, redirect is received, numOfIps:%d", pConn->info, pContext->ipSet.numOfIps);
for (int i=0; i<pContext->ipSet.numOfIps; ++i) for (int i=0; i<pContext->ipSet.numOfIps; ++i)
pContext->ipSet.port[i] = htons(pContext->ipSet.port[i]); pContext->ipSet.port[i] = htons(pContext->ipSet.port[i]);
rpcSendReqToServer(pRpc, pContext); rpcSendReqToServer(pRpc, pContext);
...@@ -1061,6 +1066,7 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { ...@@ -1061,6 +1066,7 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
return; return;
} }
pConn->ahandle = pContext->ahandle;
rpcLockConn(pConn); rpcLockConn(pConn);
// set the message header // set the message header
...@@ -1074,6 +1080,7 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { ...@@ -1074,6 +1080,7 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
pHead->destId = pConn->peerId; pHead->destId = pConn->peerId;
pHead->port = 0; pHead->port = 0;
pHead->linkUid = pConn->linkUid; pHead->linkUid = pConn->linkUid;
pHead->ahandle = (uint64_t)pConn->ahandle;
if (!pConn->secured) memcpy(pHead->user, pConn->user, tListLen(pHead->user)); if (!pConn->secured) memcpy(pHead->user, pConn->user, tListLen(pHead->user));
// set the connection parameters // set the connection parameters
...@@ -1091,29 +1098,27 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { ...@@ -1091,29 +1098,27 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) { static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) {
int writtenLen = 0; int writtenLen = 0;
SRpcInfo *pRpc = pConn->pRpc;
SRpcHead *pHead = (SRpcHead *)msg; SRpcHead *pHead = (SRpcHead *)msg;
msgLen = rpcAddAuthPart(pConn, msg, msgLen); msgLen = rpcAddAuthPart(pConn, msg, msgLen);
if ( rpcIsReq(pHead->msgType)) { if ( rpcIsReq(pHead->msgType)) {
if (pHead->msgType < TSDB_MSG_TYPE_CM_HEARTBEAT || (rpcDebugFlag & 16)) if (pHead->msgType < TSDB_MSG_TYPE_CM_HEARTBEAT || (rpcDebugFlag & 16))
tTrace("%s %p, %s is sent to %s:%hu, len:%d sig:0x%08x:0x%08x:%d", tTrace("%s, %s is sent to %s:%hu, len:%d sig:0x%08x:0x%08x:%d",
pRpc->label, pConn, taosMsg[pHead->msgType], pConn->peerFqdn, pConn->info, taosMsg[pHead->msgType], pConn->peerFqdn, pConn->peerPort,
pConn->peerPort, msgLen, pHead->sourceId, pHead->destId, pHead->tranId); msgLen, pHead->sourceId, pHead->destId, pHead->tranId);
} else { } else {
if (pHead->code == 0) pConn->secured = 1; // for success response, set link as secured if (pHead->code == 0) pConn->secured = 1; // for success response, set link as secured
if (pHead->msgType < TSDB_MSG_TYPE_CM_HEARTBEAT || (rpcDebugFlag & 16)) if (pHead->msgType < TSDB_MSG_TYPE_CM_HEARTBEAT || (rpcDebugFlag & 16))
tTrace( "%s %p, %s is sent to 0x%x:%hu, code:0x%x len:%d sig:0x%08x:0x%08x:%d", tTrace("%s, %s is sent to 0x%x:%hu, code:0x%x len:%d sig:0x%08x:0x%08x:%d",
pRpc->label, pConn, taosMsg[pHead->msgType], pConn->peerIp, pConn->peerPort, pConn->info, taosMsg[pHead->msgType], pConn->peerIp, pConn->peerPort,
htonl(pHead->code), msgLen, pHead->sourceId, pHead->destId, pHead->tranId); htonl(pHead->code), msgLen, pHead->sourceId, pHead->destId, pHead->tranId);
} }
writtenLen = (*taosSendData[pConn->connType])(pConn->peerIp, pConn->peerPort, pHead, msgLen, pConn->chandle); writtenLen = (*taosSendData[pConn->connType])(pConn->peerIp, pConn->peerPort, pHead, msgLen, pConn->chandle);
if (writtenLen != msgLen) { if (writtenLen != msgLen) {
tError("%s %p, failed to send, dataLen:%d writtenLen:%d, reason:%s", pRpc->label, pConn, tError("%s, failed to send, msgLen:%d written:%d, reason:%s", pConn->info, msgLen, writtenLen, strerror(errno));
msgLen, writtenLen, strerror(errno));
} }
tDump(msg, msgLen); tDump(msg, msgLen);
...@@ -1128,7 +1133,7 @@ static void rpcProcessConnError(void *param, void *id) { ...@@ -1128,7 +1133,7 @@ static void rpcProcessConnError(void *param, void *id) {
return; return;
} }
tTrace("%s connection error happens", pRpc->label); tTrace("%s %p, connection error happens", pRpc->label, pContext->ahandle);
if (pContext->numOfTry >= pContext->ipSet.numOfIps) { if (pContext->numOfTry >= pContext->ipSet.numOfIps) {
rpcMsg.msgType = pContext->msgType+1; rpcMsg.msgType = pContext->msgType+1;
...@@ -1154,23 +1159,21 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) { ...@@ -1154,23 +1159,21 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) {
rpcLockConn(pConn); rpcLockConn(pConn);
if (pConn->outType && pConn->user[0]) { if (pConn->outType && pConn->user[0]) {
tTrace("%s %p, expected %s is not received", pRpc->label, pConn, taosMsg[(int)pConn->outType + 1]); tTrace("%s, expected %s is not received", pConn->info, taosMsg[(int)pConn->outType + 1]);
pConn->pTimer = NULL; pConn->pTimer = NULL;
pConn->retry++; pConn->retry++;
if (pConn->retry < 4) { if (pConn->retry < 4) {
tTrace("%s %p, re-send msg:%s to %s:%hu", pRpc->label, pConn, tTrace("%s, re-send msg:%s to %s:%hu", pConn->info, taosMsg[pConn->outType], pConn->peerFqdn, pConn->peerPort);
taosMsg[pConn->outType], pConn->peerFqdn, pConn->peerPort);
rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen); rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen);
taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer); taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer);
} else { } else {
// close the connection // close the connection
tTrace("%s %p, failed to send msg:%s to %s:%hu", pRpc->label, pConn, tTrace("%s, failed to send msg:%s to %s:%hu", pConn->info, taosMsg[pConn->outType], pConn->peerFqdn, pConn->peerPort);
taosMsg[pConn->outType], pConn->peerFqdn, pConn->peerPort);
reportDisc = 1; reportDisc = 1;
} }
} else { } else {
tTrace("%s %p, retry timer not processed", pRpc->label, pConn); tTrace("%s, retry timer not processed", pConn->info);
} }
rpcUnlockConn(pConn); rpcUnlockConn(pConn);
...@@ -1187,10 +1190,10 @@ static void rpcProcessIdleTimer(void *param, void *tmrId) { ...@@ -1187,10 +1190,10 @@ static void rpcProcessIdleTimer(void *param, void *tmrId) {
SRpcInfo *pRpc = pConn->pRpc; SRpcInfo *pRpc = pConn->pRpc;
if (pConn->user[0]) { if (pConn->user[0]) {
tTrace("%s %p, close the connection since no activity", pRpc->label, pConn); tTrace("%s, close the connection since no activity", pConn->info);
if (pConn->inType && pRpc->cfp) { if (pConn->inType && pRpc->cfp) {
// if there are pending request, notify the app // if there are pending request, notify the app
tTrace("%s %p, notify the app, connection is gone", pRpc->label, pConn); tTrace("%s, notify the app, connection is gone", pConn->info);
/* /*
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
rpcMsg.pCont = NULL; rpcMsg.pCont = NULL;
...@@ -1203,7 +1206,7 @@ static void rpcProcessIdleTimer(void *param, void *tmrId) { ...@@ -1203,7 +1206,7 @@ static void rpcProcessIdleTimer(void *param, void *tmrId) {
} }
rpcCloseConn(pConn); rpcCloseConn(pConn);
} else { } else {
tTrace("%s %p, idle timer:%p not processed", pRpc->label, pConn, tmrId); tTrace("%s, idle timer:%p not processed", pConn->info, tmrId);
} }
} }
...@@ -1214,11 +1217,11 @@ static void rpcProcessProgressTimer(void *param, void *tmrId) { ...@@ -1214,11 +1217,11 @@ static void rpcProcessProgressTimer(void *param, void *tmrId) {
rpcLockConn(pConn); rpcLockConn(pConn);
if (pConn->inType && pConn->user[0]) { if (pConn->inType && pConn->user[0]) {
tTrace("%s %p, progress timer expired, send progress", pRpc->label, pConn); tTrace("%s, progress timer expired, send progress", pConn->info);
rpcSendQuickRsp(pConn, TSDB_CODE_ACTION_IN_PROGRESS); rpcSendQuickRsp(pConn, TSDB_CODE_ACTION_IN_PROGRESS);
taosTmrReset(rpcProcessProgressTimer, tsRpcTimer/2, pConn, pRpc->tmrCtrl, &pConn->pTimer); taosTmrReset(rpcProcessProgressTimer, tsRpcTimer/2, pConn, pRpc->tmrCtrl, &pConn->pTimer);
} else { } else {
tTrace("%s %p, progress timer:%p not processed", pRpc->label, pConn, tmrId); tTrace("%s, progress timer:%p not processed", pConn->info, tmrId);
} }
rpcUnlockConn(pConn); rpcUnlockConn(pConn);
...@@ -1252,7 +1255,7 @@ static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) { ...@@ -1252,7 +1255,7 @@ static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) {
memcpy(pCont + overhead, buf, compLen); memcpy(pCont + overhead, buf, compLen);
pHead->comp = 1; pHead->comp = 1;
tTrace("compress rpc msg, before:%d, after:%d", contLen, compLen); //tTrace("compress rpc msg, before:%d, after:%d", contLen, compLen);
finalLen = compLen + overhead; finalLen = compLen + overhead;
} else { } else {
finalLen = contLen; finalLen = contLen;
...@@ -1286,7 +1289,7 @@ static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead) { ...@@ -1286,7 +1289,7 @@ static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead) {
pNewHead->msgLen = rpcMsgLenFromCont(origLen); pNewHead->msgLen = rpcMsgLenFromCont(origLen);
rpcFreeMsg(pHead); // free the compressed message buffer rpcFreeMsg(pHead); // free the compressed message buffer
pHead = pNewHead; pHead = pNewHead;
tTrace("decompress rpc msg, compLen:%d, after:%d", compLen, contLen); //tTrace("decompress rpc msg, compLen:%d, after:%d", compLen, contLen);
} else { } else {
tError("failed to allocate memory to decompress msg, contLen:%d", contLen); tError("failed to allocate memory to decompress msg, contLen:%d", contLen);
} }
...@@ -1343,7 +1346,6 @@ static int rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen) { ...@@ -1343,7 +1346,6 @@ static int rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen) {
static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) { static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) {
SRpcHead *pHead = (SRpcHead *)msg; SRpcHead *pHead = (SRpcHead *)msg;
SRpcInfo *pRpc = pConn->pRpc;
int code = 0; int code = 0;
if ((pConn->secured && pHead->spi == 0) || (pHead->spi == 0 && pConn->spi == 0)){ if ((pConn->secured && pHead->spi == 0) || (pHead->spi == 0 && pConn->spi == 0)){
...@@ -1371,20 +1373,20 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) { ...@@ -1371,20 +1373,20 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) {
delta = (int32_t)htonl(pDigest->timeStamp); delta = (int32_t)htonl(pDigest->timeStamp);
delta -= (int32_t)taosGetTimestampSec(); delta -= (int32_t)taosGetTimestampSec();
if (abs(delta) > 900) { if (abs(delta) > 900) {
tWarn("%s %p, time diff:%d is too big, msg discarded", pRpc->label, pConn, delta); tWarn("%s, time diff:%d is too big, msg discarded", pConn->info, delta);
code = TSDB_CODE_INVALID_TIME_STAMP; code = TSDB_CODE_INVALID_TIME_STAMP;
} else { } else {
if (rpcAuthenticateMsg(pHead, msgLen-TSDB_AUTH_LEN, pDigest->auth, pConn->secret) < 0) { if (rpcAuthenticateMsg(pHead, msgLen-TSDB_AUTH_LEN, pDigest->auth, pConn->secret) < 0) {
tError("%s %p, authentication failed, msg discarded", pRpc->label, pConn); tError("%s, authentication failed, msg discarded", pConn->info);
code = TSDB_CODE_AUTH_FAILURE; code = TSDB_CODE_AUTH_FAILURE;
} else { } else {
pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen) - sizeof(SRpcDigest); pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen) - sizeof(SRpcDigest);
if ( !rpcIsReq(pHead->msgType) ) pConn->secured = 1; // link is secured for client if ( !rpcIsReq(pHead->msgType) ) pConn->secured = 1; // link is secured for client
tTrace("%s %p, message is authenticated", pRpc->label, pConn); //tTrace("%s, message is authenticated", pConn->info);
} }
} }
} else { } else {
tTrace("%s %p, auth spi:%d not matched with received:%d", pRpc->label, pConn, pConn->spi, pHead->spi); tError("%s, auth spi:%d not matched with received:%d", pConn->info, pConn->spi, pHead->spi);
code = TSDB_CODE_AUTH_FAILURE; code = TSDB_CODE_AUTH_FAILURE;
} }
......
...@@ -218,7 +218,7 @@ static void *taosRecvUdpData(void *param) { ...@@ -218,7 +218,7 @@ static void *taosRecvUdpData(void *param) {
while (1) { while (1) {
dataLen = recvfrom(pConn->fd, pConn->buffer, RPC_MAX_UDP_SIZE, 0, (struct sockaddr *)&sourceAdd, &addLen); dataLen = recvfrom(pConn->fd, pConn->buffer, RPC_MAX_UDP_SIZE, 0, (struct sockaddr *)&sourceAdd, &addLen);
port = ntohs(sourceAdd.sin_port); port = ntohs(sourceAdd.sin_port);
tTrace("%s msg is recv from 0x%x:%hu len:%d", pConn->label, sourceAdd.sin_addr.s_addr, port, dataLen); //tTrace("%s msg is recv from 0x%x:%hu len:%d", pConn->label, sourceAdd.sin_addr.s_addr, port, dataLen);
if (dataLen < sizeof(SRpcHead)) { if (dataLen < sizeof(SRpcHead)) {
tError("%s recvfrom failed, reason:%s\n", pConn->label, strerror(errno)); tError("%s recvfrom failed, reason:%s\n", pConn->label, strerror(errno));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册