提交 93f6cd34 编写于 作者: 陶建辉(Jeff)'s avatar 陶建辉(Jeff)

fix some bugs

上级 a2629a02
...@@ -168,7 +168,7 @@ static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code); ...@@ -168,7 +168,7 @@ static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code);
static void rpcSendErrorMsgToPeer(SRpcInfo *pRpc, char *pMsg, int32_t code, uint32_t ip, uint16_t port, void *chandle); static void rpcSendErrorMsgToPeer(SRpcInfo *pRpc, char *pMsg, int32_t code, uint32_t ip, uint16_t port, void *chandle);
static void rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen); static void rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen);
static void *rpcProcessMsgFromPeer(void *data, int dataLen, uint32_t ip, uint16_t port, void *shandle, void *chandle); static void *rpcProcessMsgFromPeer(void *msg, int msgLen, uint32_t ip, uint16_t port, void *shandle, void *thandle, void *chandle);
static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead); static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead);
static void rpcProcessConnError(void *param, void *id); static void rpcProcessConnError(void *param, void *id);
static void rpcProcessRetryTimer(void *, void *); static void rpcProcessRetryTimer(void *, void *);
...@@ -323,14 +323,15 @@ void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) { ...@@ -323,14 +323,15 @@ void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) {
int msgLen = 0; int msgLen = 0;
SRpcConn *pConn = (SRpcConn *)handle; SRpcConn *pConn = (SRpcConn *)handle;
SRpcInfo *pRpc = pConn->pRpc; SRpcInfo *pRpc = pConn->pRpc;
SRpcHead *pHead = rpcHeadFromCont(pCont);
char *msg = (char *)pHead;
if ( pCont == NULL ) { if ( pCont == NULL ) {
pCont = rpcMallocCont(0); pCont = rpcMallocCont(0);
contLen = 0; contLen = 0;
} }
SRpcHead *pHead = rpcHeadFromCont(pCont);
char *msg = (char *)pHead;
contLen = rpcCompressRpcMsg(pCont, contLen); contLen = rpcCompressRpcMsg(pCont, contLen);
msgLen = rpcMsgLenFromCont(contLen); msgLen = rpcMsgLenFromCont(contLen);
...@@ -689,17 +690,17 @@ static int32_t rpcProcessHead(SRpcInfo *pRpc, SRpcConn **ppConn, void *data, int ...@@ -689,17 +690,17 @@ static int32_t rpcProcessHead(SRpcInfo *pRpc, SRpcConn **ppConn, void *data, int
return code; return code;
} }
static void *rpcProcessMsgFromPeer(void *data, int dataLen, uint32_t ip, uint16_t port, void *shandle, void *chandle) { static void *rpcProcessMsgFromPeer(void *msg, int msgLen, uint32_t ip, uint16_t port, void *shandle, void *thandle, void *chandle) {
SRpcHead *pHead = (SRpcHead *)data; SRpcHead *pHead = (SRpcHead *)msg;
SRpcInfo *pRpc = (SRpcInfo *)shandle; SRpcInfo *pRpc = (SRpcInfo *)shandle;
SRpcConn *pConn = NULL; SRpcConn *pConn = NULL;
int32_t code = 0; int32_t code = 0;
tDump(data, dataLen); tDump(msg, msgLen);
pthread_mutex_lock(&pRpc->mutex); pthread_mutex_lock(&pRpc->mutex);
code = rpcProcessHead(pRpc, &pConn, data, dataLen, ip); code = rpcProcessHead(pRpc, &pConn, msg, msgLen, ip);
if (pConn) { if (pConn) {
// update connection info // update connection info
...@@ -721,7 +722,7 @@ static void *rpcProcessMsgFromPeer(void *data, int dataLen, uint32_t ip, uint16_ ...@@ -721,7 +722,7 @@ static void *rpcProcessMsgFromPeer(void *data, int dataLen, uint32_t ip, uint16_
if (pHead->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) { if (pHead->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) {
tTrace("%s pConn:%p, %s received from 0x%x:%hu, parse code:%x len:%d source:0x%08x dest:0x%08x tranId:%d", tTrace("%s pConn:%p, %s received from 0x%x:%hu, parse code:%x len:%d source:0x%08x dest:0x%08x tranId:%d",
pRpc->label, pConn, taosMsg[pHead->msgType], ip, port, code, pRpc->label, pConn, taosMsg[pHead->msgType], ip, port, code,
dataLen, pHead->sourceId, pHead->destId, pHead->tranId); msgLen, pHead->sourceId, pHead->destId, pHead->tranId);
} }
if (pConn && pRpc->idleTime) { if (pConn && pRpc->idleTime) {
...@@ -731,7 +732,7 @@ static void *rpcProcessMsgFromPeer(void *data, int dataLen, uint32_t ip, uint16_ ...@@ -731,7 +732,7 @@ static void *rpcProcessMsgFromPeer(void *data, int dataLen, uint32_t ip, uint16_
if (code != TSDB_CODE_ALREADY_PROCESSED) { if (code != TSDB_CODE_ALREADY_PROCESSED) {
if (code != 0) { // parsing error if (code != 0) { // parsing error
if ( rpcIsReq(pHead->msgType) ) { if ( rpcIsReq(pHead->msgType) ) {
rpcSendErrorMsgToPeer(pRpc, data, code, ip, port, chandle); rpcSendErrorMsgToPeer(pRpc, msg, code, ip, port, chandle);
tTrace("%s pConn:%p, %s is sent with error code:%x", pRpc->label, pConn, taosMsg[pHead->msgType+1], code); tTrace("%s pConn:%p, %s is sent with error code:%x", pRpc->label, pConn, taosMsg[pHead->msgType+1], code);
} }
} else { // parsing OK } else { // parsing OK
...@@ -739,7 +740,7 @@ static void *rpcProcessMsgFromPeer(void *data, int dataLen, uint32_t ip, uint16_ ...@@ -739,7 +740,7 @@ static void *rpcProcessMsgFromPeer(void *data, int dataLen, uint32_t ip, uint16_
} }
} }
if ( code != 0 ) free (data); if ( code != 0 ) free (msg);
return pConn; return pConn;
} }
...@@ -766,8 +767,8 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { ...@@ -766,8 +767,8 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
rpcSendReqToServer(pRpc, pContext); rpcSendReqToServer(pRpc, pContext);
} else { } else {
rpcFreeOutMsg(rpcHeadFromCont(pContext->pCont)); // free the request msg rpcFreeOutMsg(rpcHeadFromCont(pContext->pCont)); // free the request msg
if ( pContext->ipSet.index != pContext->oldIndex || pContext->redirect ) if ( pRpc->ufp && (pContext->ipSet.index != pContext->oldIndex || pContext->redirect) )
(*pRpc->ufp)(pContext->ahandle, pContext->ipSet); (*pRpc->ufp)(pContext->ahandle, pContext->ipSet); // notify the update of ipSet
(*pRpc->cfp)(pHead->msgType, pCont, contLen, pContext->ahandle, pContext->ipSet.index); (*pRpc->cfp)(pHead->msgType, pCont, contLen, pContext->ahandle, pContext->ipSet.index);
} }
} }
......
...@@ -20,7 +20,7 @@ ...@@ -20,7 +20,7 @@
#include <stdint.h> #include <stdint.h>
void processMsg(char type, void *pCont, int contLen, void *ahandle, int32_t code) { void processMsg(char type, void *pCont, int contLen, void *ahandle, int32_t code) {
dPrint("response is received, type:%d, contLen:%d code:%d, ahandle:%p", type, contLen, code, ahandle); dPrint("response is received, type:%d, contLen:%d code:%x, ahandle:%p", type, contLen, code, ahandle);
} }
int32_t main(int32_t argc, char *argv[]) { int32_t main(int32_t argc, char *argv[]) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册