提交 baf6b0fa 编写于 作者: 陶建辉(Jeff)'s avatar 陶建辉(Jeff)

more bug fix

上级 20003818
...@@ -65,7 +65,7 @@ typedef struct { ...@@ -65,7 +65,7 @@ typedef struct {
void (*cfp)(char type, void *pCont, int contLen, void *ahandle, int32_t code); void (*cfp)(char type, void *pCont, int contLen, void *ahandle, int32_t code);
// call back to process notify the ipSet changes // call back to process notify the ipSet changes
void (*ufp)(void *ahandle, SRpcIpSet ipSet); void (*ufp)(void *ahandle, SRpcIpSet *pIpSet);
// call back to retrieve the client auth info // call back to retrieve the client auth info
int (*afp)(char *meterId, char *spi, char *encrypt, char *secret, char *ckey); int (*afp)(char *meterId, char *spi, char *encrypt, char *secret, char *ckey);
...@@ -75,9 +75,9 @@ void *rpcOpen(SRpcInit *pRpc); ...@@ -75,9 +75,9 @@ void *rpcOpen(SRpcInit *pRpc);
void rpcClose(void *); void rpcClose(void *);
void *rpcMallocCont(int contLen); void *rpcMallocCont(int contLen);
void rpcFreeCont(void *pCont); void rpcFreeCont(void *pCont);
void rpcSendRequest(void *thandle, SRpcIpSet ipSet, char msgType, void *pCont, int contLen, void *ahandle); void rpcSendRequest(void *thandle, SRpcIpSet *pIpSet, char msgType, void *pCont, int contLen, void *ahandle);
void rpcSendResponse(void *pConn, int32_t code, void *pCont, int contLen); void rpcSendResponse(void *pConn, int32_t code, void *pCont, int contLen);
void rpcSendRedirectRsp(void *pConn, SRpcIpSet ipSet); void rpcSendRedirectRsp(void *pConn, SRpcIpSet *pIpSet);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -73,7 +73,7 @@ void rpcRemoveExpiredNodes(SConnCache *pCache, SConnHash *pNode, int hash, uint6 ...@@ -73,7 +73,7 @@ void rpcRemoveExpiredNodes(SConnCache *pCache, SConnHash *pNode, int hash, uint6
pNext = pNode->next; pNext = pNode->next;
pCache->total--; pCache->total--;
pCache->count[hash]--; pCache->count[hash]--;
tTrace("%p ip:0x%x:%hu:%d:%p removed, connections in cache:%d", pNode->data, pNode->ip, pNode->port, hash, pNode, tTrace("%p ip:0x%x:%hu:%d:%p removed from cache, connections:%d", pNode->data, pNode->ip, pNode->port, hash, pNode,
pCache->count[hash]); pCache->count[hash]);
taosMemPoolFree(pCache->connHashMemPool, (char *)pNode); taosMemPoolFree(pCache->connHashMemPool, (char *)pNode);
pNode = pNext; pNode = pNext;
...@@ -116,7 +116,7 @@ void rpcAddConnIntoCache(void *handle, void *data, uint32_t ip, uint16_t port, c ...@@ -116,7 +116,7 @@ void rpcAddConnIntoCache(void *handle, void *data, uint32_t ip, uint16_t port, c
pthread_mutex_unlock(&pCache->mutex); pthread_mutex_unlock(&pCache->mutex);
tTrace("%p ip:0x%x:%hu:%d:%p added, connections in cache:%d", data, ip, port, hash, pNode, pCache->count[hash]); tTrace("%p ip:0x%x:%hu:%d:%p added into cache, connections:%d", data, ip, port, hash, pNode, pCache->count[hash]);
return; return;
} }
...@@ -192,7 +192,7 @@ void *rpcGetConnFromCache(void *handle, uint32_t ip, uint16_t port, char *user) ...@@ -192,7 +192,7 @@ void *rpcGetConnFromCache(void *handle, uint32_t ip, uint16_t port, char *user)
pthread_mutex_unlock(&pCache->mutex); pthread_mutex_unlock(&pCache->mutex);
if (pData) { if (pData) {
tTrace("%p ip:0x%x:%hu:%d:%p retrieved, connections in cache:%d", pData, ip, port, hash, pNode, pCache->count[hash]); tTrace("%p ip:0x%x:%hu:%d:%p retrieved from cache, connections:%d", pData, ip, port, hash, pNode, pCache->count[hash]);
} }
return pData; return pData;
......
...@@ -57,7 +57,7 @@ typedef struct { ...@@ -57,7 +57,7 @@ typedef struct {
void (*cfp)(char type, void *pCont, int contLen, void *ahandle, int32_t code); void (*cfp)(char type, void *pCont, int contLen, void *ahandle, int32_t code);
int (*afp)(char *meterId, char *spi, char *encrypt, char *secret, char *ckey); int (*afp)(char *meterId, char *spi, char *encrypt, char *secret, char *ckey);
void (*ufp)(void *ahandle, SRpcIpSet ipSet); void (*ufp)(void *ahandle, SRpcIpSet *pIpSet);
void *idPool; // handle to ID pool void *idPool; // handle to ID pool
void *tmrCtrl; // handle to timer void *tmrCtrl; // handle to timer
...@@ -193,11 +193,7 @@ void *rpcOpen(SRpcInit *pInit) { ...@@ -193,11 +193,7 @@ void *rpcOpen(SRpcInit *pInit) {
if(pInit->label) strcpy(pRpc->label, pInit->label); if(pInit->label) strcpy(pRpc->label, pInit->label);
pRpc->connType = pInit->connType; pRpc->connType = pInit->connType;
pRpc->idleTime = pInit->idleTime; pRpc->idleTime = pInit->idleTime;
pRpc->numOfThreads = pInit->numOfThreads; pRpc->numOfThreads = pInit->numOfThreads>TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS:pInit->numOfThreads;
if (pRpc->numOfThreads > TSDB_MAX_RPC_THREADS) {
pRpc->numOfThreads = TSDB_MAX_RPC_THREADS;
}
if (pInit->localIp) strcpy(pRpc->localIp, pInit->localIp); if (pInit->localIp) strcpy(pRpc->localIp, pInit->localIp);
pRpc->localPort = pInit->localPort; pRpc->localPort = pInit->localPort;
pRpc->afp = pInit->afp; pRpc->afp = pInit->afp;
...@@ -300,7 +296,7 @@ void rpcFreeCont(void *cont) { ...@@ -300,7 +296,7 @@ void rpcFreeCont(void *cont) {
free(msg); free(msg);
} }
void rpcSendRequest(void *shandle, SRpcIpSet ipSet, char type, void *pCont, int contLen, void *ahandle) { void rpcSendRequest(void *shandle, SRpcIpSet *pIpSet, char type, void *pCont, int contLen, void *ahandle) {
SRpcInfo *pRpc = (SRpcInfo *)shandle; SRpcInfo *pRpc = (SRpcInfo *)shandle;
SRpcReqContext *pContext; SRpcReqContext *pContext;
...@@ -308,11 +304,11 @@ void rpcSendRequest(void *shandle, SRpcIpSet ipSet, char type, void *pCont, int ...@@ -308,11 +304,11 @@ void rpcSendRequest(void *shandle, SRpcIpSet ipSet, char type, void *pCont, int
pContext = (SRpcReqContext *) (pCont-sizeof(SRpcHead)-sizeof(SRpcReqContext)); pContext = (SRpcReqContext *) (pCont-sizeof(SRpcHead)-sizeof(SRpcReqContext));
pContext->ahandle = ahandle; pContext->ahandle = ahandle;
pContext->pRpc = (SRpcInfo *)shandle; pContext->pRpc = (SRpcInfo *)shandle;
pContext->ipSet = ipSet; pContext->ipSet = *pIpSet;
pContext->contLen = contLen; pContext->contLen = contLen;
pContext->pCont = pCont; pContext->pCont = pCont;
pContext->msgType = type; pContext->msgType = type;
pContext->oldIndex = ipSet.index; pContext->oldIndex = pIpSet->index;
rpcSendReqToServer(pRpc, pContext); rpcSendReqToServer(pRpc, pContext);
...@@ -338,7 +334,7 @@ void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) { ...@@ -338,7 +334,7 @@ void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) {
pthread_mutex_lock(&pRpc->mutex); pthread_mutex_lock(&pRpc->mutex);
if ( pConn->inType == 0 || pConn->meterId[0] == 0 ) { if ( pConn->inType == 0 || pConn->meterId[0] == 0 ) {
tTrace("%s pConn:%p, connection is already released, rsp wont be sent", pRpc->label, pConn); tTrace("%s %p, connection is already released, rsp wont be sent", pRpc->label, pConn);
pthread_mutex_lock(&pRpc->mutex); pthread_mutex_lock(&pRpc->mutex);
return; return;
} }
...@@ -373,14 +369,14 @@ void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) { ...@@ -373,14 +369,14 @@ void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) {
return; return;
} }
void rpcSendRedirectRsp(void *thandle, SRpcIpSet ipSet) { void rpcSendRedirectRsp(void *thandle, SRpcIpSet *pIpSet) {
char *pMsg; char *pMsg;
int msgLen = sizeof(SRpcIpSet); int msgLen = sizeof(SRpcIpSet);
pMsg = rpcMallocCont(msgLen); pMsg = rpcMallocCont(msgLen);
if (pMsg == NULL) return; if (pMsg == NULL) return;
memcpy(pMsg, &ipSet, sizeof(ipSet)); memcpy(pMsg, pIpSet, sizeof(SRpcIpSet));
rpcSendResponse(thandle, TSDB_CODE_REDIRECT, pMsg, msgLen); rpcSendResponse(thandle, TSDB_CODE_REDIRECT, pMsg, msgLen);
...@@ -401,10 +397,10 @@ static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerIpStr, uint16_t peerPort) ...@@ -401,10 +397,10 @@ static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerIpStr, uint16_t peerPort)
if (taosOpenConn[pRpc->connType]) { if (taosOpenConn[pRpc->connType]) {
pConn->chandle = (*taosOpenConn[pRpc->connType])(pRpc->shandle, pConn, pConn->peerIpstr, pConn->peerPort); pConn->chandle = (*taosOpenConn[pRpc->connType])(pRpc->shandle, pConn, pConn->peerIpstr, pConn->peerPort);
if (pConn->chandle) { if (pConn->chandle) {
tTrace("%s pConn:%p, rpc connection is set up, sid:%d id:%s ip:%s:%hu localPort:%d", pRpc->label, tTrace("%s %p, rpc connection is set up, sid:%d id:%s ip:%s:%hu localPort:%d", pRpc->label,
pConn, pConn->sid, pRpc->meterId, pConn->peerIpstr, pConn->peerPort, pConn->localPort); pConn, pConn->sid, pRpc->meterId, pConn->peerIpstr, pConn->peerPort, pConn->localPort);
} else { } else {
tError("%s pConn:%p, failed to set up nw connection to ip:%s:%hu", pRpc->label, pConn, tError("%s %p, failed to set up nw connection to ip:%s:%hu", pRpc->label, pConn,
pConn->sid, pRpc->meterId, pConn->peerIpstr, pConn->peerPort); pConn->sid, pRpc->meterId, pConn->peerIpstr, pConn->peerPort);
terrno = TSDB_CODE_NETWORK_UNAVAIL; terrno = TSDB_CODE_NETWORK_UNAVAIL;
rpcCloseConn(pConn); rpcCloseConn(pConn);
...@@ -446,7 +442,7 @@ static void rpcCloseConn(void *thandle) { ...@@ -446,7 +442,7 @@ static void rpcCloseConn(void *thandle) {
taosFreeId(pRpc->idPool, pConn->sid); taosFreeId(pRpc->idPool, pConn->sid);
pConn->pContext = NULL; pConn->pContext = NULL;
tTrace("%s pConn:%p, rpc connection is closed", pRpc->label, pConn); tTrace("%s %p, rpc connection is closed", pRpc->label, pConn);
} }
pthread_mutex_unlock(&pRpc->mutex); pthread_mutex_unlock(&pRpc->mutex);
...@@ -460,8 +456,6 @@ static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc) { ...@@ -460,8 +456,6 @@ static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc) {
tError("%s maximum number of sessions:%d is reached", pRpc->label, pRpc->sessions); tError("%s maximum number of sessions:%d is reached", pRpc->label, pRpc->sessions);
terrno = TSDB_CODE_MAX_SESSIONS; terrno = TSDB_CODE_MAX_SESSIONS;
} else { } else {
tTrace("%s sid:%d, ID allocated, used:%d", pRpc->label, sid, taosIdPoolNumOfUsed(pRpc->idPool));
pConn = pRpc->connList + sid; pConn = pRpc->connList + sid;
memset(pConn, 0, sizeof(SRpcConn)); memset(pConn, 0, sizeof(SRpcConn));
...@@ -498,7 +492,7 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, char *meterId, char *hash ...@@ -498,7 +492,7 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, char *meterId, char *hash
pConn->tranId = (uint16_t)(rand() & 0xFFFF); pConn->tranId = (uint16_t)(rand() & 0xFFFF);
pConn->ownId = htonl(pConn->sid); pConn->ownId = htonl(pConn->sid);
if (pRpc->afp && (*pRpc->afp)(meterId, &pConn->spi, &pConn->encrypt, pConn->secret, pConn->ckey)) { if (pRpc->afp && (*pRpc->afp)(meterId, &pConn->spi, &pConn->encrypt, pConn->secret, pConn->ckey)) {
tWarn("%s pConn:%p, meterId not there", pRpc->label, pConn); tWarn("%s %p, meterId not there", pRpc->label, pConn);
taosFreeId(pRpc->idPool, sid); // sid shall be released taosFreeId(pRpc->idPool, sid); // sid shall be released
terrno = TSDB_CODE_INVALID_USER; terrno = TSDB_CODE_INVALID_USER;
pConn = NULL; pConn = NULL;
...@@ -507,7 +501,7 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, char *meterId, char *hash ...@@ -507,7 +501,7 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, char *meterId, char *hash
if (pConn) { if (pConn) {
taosAddStrHash(pRpc->hash, hashstr, (char *)&pConn); taosAddStrHash(pRpc->hash, hashstr, (char *)&pConn);
tTrace("%s pConn:%p, rpc connection is allocated, sid:%d id:%s", pRpc->label, pConn, sid, pConn->meterId); tTrace("%s %p, rpc connection is allocated, sid:%d id:%s", pRpc->label, pConn, sid, pConn->meterId);
} }
return pConn; return pConn;
...@@ -524,7 +518,7 @@ static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, char *meterId, char *has ...@@ -524,7 +518,7 @@ static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, char *meterId, char *has
if (pConn) { if (pConn) {
if (memcmp(pConn->meterId, meterId, tListLen(pConn->meterId)) != 0) { if (memcmp(pConn->meterId, meterId, tListLen(pConn->meterId)) != 0) {
tTrace("%s pConn:%p, meterId:%s is not matched, received:%s", pRpc->label, pConn, pConn->meterId, meterId); tTrace("%s %p, meterId:%s is not matched, received:%s", pRpc->label, pConn, pConn->meterId, meterId);
terrno = TSDB_CODE_MISMATCHED_METER_ID; terrno = TSDB_CODE_MISMATCHED_METER_ID;
pConn = NULL; pConn = NULL;
} }
...@@ -553,7 +547,7 @@ static int rpcProcessReqHead(SRpcConn *pConn, SRpcHead *pHead) { ...@@ -553,7 +547,7 @@ static int rpcProcessReqHead(SRpcConn *pConn, SRpcHead *pHead) {
pConn->peerId = pHead->sourceId; pConn->peerId = pHead->sourceId;
} else { } else {
if (pConn->peerId != pHead->sourceId) { if (pConn->peerId != pHead->sourceId) {
tTrace("%s pConn:%p, source Id is changed, old:0x%08x new:0x%08x", pRpc->label, pConn, tTrace("%s %p, source Id is changed, old:0x%08x new:0x%08x", pRpc->label, pConn,
pConn->peerId, pHead->sourceId); pConn->peerId, pHead->sourceId);
return TSDB_CODE_INVALID_VALUE; return TSDB_CODE_INVALID_VALUE;
} }
...@@ -561,14 +555,14 @@ static int rpcProcessReqHead(SRpcConn *pConn, SRpcHead *pHead) { ...@@ -561,14 +555,14 @@ static int rpcProcessReqHead(SRpcConn *pConn, SRpcHead *pHead) {
if (pConn->inTranId == pHead->tranId) { if (pConn->inTranId == pHead->tranId) {
if (pConn->inType == pHead->msgType) { if (pConn->inType == pHead->msgType) {
tTrace("%s pConn:%p, %s is retransmitted", pRpc->label, pConn, taosMsg[pHead->msgType]); tTrace("%s %p, %s is retransmitted", pRpc->label, pConn, taosMsg[pHead->msgType]);
rpcSendQuickRsp(pConn, TSDB_CODE_ACTION_IN_PROGRESS); rpcSendQuickRsp(pConn, TSDB_CODE_ACTION_IN_PROGRESS);
} else if (pConn->inType == 0) { } else if (pConn->inType == 0) {
tTrace("%s pConn:%p, %s is already processed, tranId:%d", pRpc->label, pConn, tTrace("%s %p, %s is already processed, tranId:%d", pRpc->label, pConn,
taosMsg[pHead->msgType], pConn->inTranId); taosMsg[pHead->msgType], pConn->inTranId);
rpcSendMsgToPeer(pConn, pConn->pRspMsg, pConn->rspMsgLen); // resend the response rpcSendMsgToPeer(pConn, pConn->pRspMsg, pConn->rspMsgLen); // resend the response
} else { } else {
tTrace("%s pConn:%p, mismatched message %s and tranId", pRpc->label, pConn, taosMsg[pHead->msgType]); tTrace("%s %p, mismatched message %s and tranId", pRpc->label, pConn, taosMsg[pHead->msgType]);
} }
// do not reply any message // do not reply any message
...@@ -576,7 +570,7 @@ static int rpcProcessReqHead(SRpcConn *pConn, SRpcHead *pHead) { ...@@ -576,7 +570,7 @@ static int rpcProcessReqHead(SRpcConn *pConn, SRpcHead *pHead) {
} }
if (pConn->inType != 0) { if (pConn->inType != 0) {
tTrace("%s pConn:%p, last session is not finished, inTranId:%d tranId:%d", pRpc->label, pConn, tTrace("%s %p, last session is not finished, inTranId:%d tranId:%d", pRpc->label, pConn,
pConn->inTranId, pHead->tranId); pConn->inTranId, pHead->tranId);
return TSDB_CODE_LAST_SESSION_NOT_FINISHED; return TSDB_CODE_LAST_SESSION_NOT_FINISHED;
} }
...@@ -613,7 +607,7 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) { ...@@ -613,7 +607,7 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) {
if (*pHead->content == TSDB_CODE_ACTION_IN_PROGRESS || pHead->tcp) { if (*pHead->content == TSDB_CODE_ACTION_IN_PROGRESS || pHead->tcp) {
if (pConn->tretry <= tsRpcMaxRetry) { if (pConn->tretry <= tsRpcMaxRetry) {
pConn->tretry++; pConn->tretry++;
tTrace("%s pConn:%p, peer is still processing the transaction", pRpc->label, pConn); tTrace("%s %p, peer is still processing the transaction", pRpc->label, pConn);
taosTmrReset(rpcProcessRetryTimer, tsRpcProgressTime, pConn, pRpc->tmrCtrl, &pConn->pTimer); taosTmrReset(rpcProcessRetryTimer, tsRpcProgressTime, pConn, pRpc->tmrCtrl, &pConn->pTimer);
return TSDB_CODE_ALREADY_PROCESSED; return TSDB_CODE_ALREADY_PROCESSED;
} else { } else {
...@@ -669,7 +663,7 @@ static int32_t rpcProcessHead(SRpcInfo *pRpc, SRpcConn **ppConn, void *data, int ...@@ -669,7 +663,7 @@ static int32_t rpcProcessHead(SRpcInfo *pRpc, SRpcConn **ppConn, void *data, int
if (pHead->uid) pConn->peerUid = pHead->uid; if (pHead->uid) pConn->peerUid = pHead->uid;
if (pHead->tcp) { if (pHead->tcp) {
tTrace("%s pConn:%p, content will be transfered via TCP", pRpc->label, pConn); tTrace("%s %p, content will be transfered via TCP", pRpc->label, pConn);
if (pConn->outType) taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer); if (pConn->outType) taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer);
return TSDB_CODE_ALREADY_PROCESSED; return TSDB_CODE_ALREADY_PROCESSED;
} }
...@@ -690,14 +684,32 @@ static int32_t rpcProcessHead(SRpcInfo *pRpc, SRpcConn **ppConn, void *data, int ...@@ -690,14 +684,32 @@ static int32_t rpcProcessHead(SRpcInfo *pRpc, SRpcConn **ppConn, void *data, int
return code; return code;
} }
static void rpcProcessBrokenLink(SRpcConn *pConn) {
SRpcInfo *pRpc = pConn->pRpc;
if (pConn->outType) {
SRpcReqContext *pContext = pConn->pContext;
pContext->code = TSDB_CODE_NETWORK_UNAVAIL;
taosTmrStart(rpcProcessConnError, 0, pContext, pRpc->tmrCtrl);
}
rpcCloseConn(pConn);
}
static void *rpcProcessMsgFromPeer(void *msg, int msgLen, uint32_t ip, uint16_t port, void *shandle, void *thandle, void *chandle) { static void *rpcProcessMsgFromPeer(void *msg, int msgLen, uint32_t ip, uint16_t port, void *shandle, void *thandle, void *chandle) {
SRpcHead *pHead = (SRpcHead *)msg; SRpcHead *pHead = (SRpcHead *)msg;
SRpcInfo *pRpc = (SRpcInfo *)shandle; SRpcInfo *pRpc = (SRpcInfo *)shandle;
SRpcConn *pConn = NULL; SRpcConn *pConn = (SRpcConn *)thandle;
int32_t code = 0; int32_t code = 0;
tDump(msg, msgLen); tDump(msg, msgLen);
if (ip==0 && pConn) {
rpcProcessBrokenLink(pConn);
tfree(msg);
return NULL;
}
pthread_mutex_lock(&pRpc->mutex); pthread_mutex_lock(&pRpc->mutex);
code = rpcProcessHead(pRpc, &pConn, msg, msgLen, ip); code = rpcProcessHead(pRpc, &pConn, msg, msgLen, ip);
...@@ -720,7 +732,7 @@ static void *rpcProcessMsgFromPeer(void *msg, int msgLen, uint32_t ip, uint16_t ...@@ -720,7 +732,7 @@ static void *rpcProcessMsgFromPeer(void *msg, int msgLen, uint32_t ip, uint16_t
pthread_mutex_unlock(&pRpc->mutex); pthread_mutex_unlock(&pRpc->mutex);
if (pHead->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) { if (pHead->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) {
tTrace("%s pConn:%p, %s received from 0x%x:%hu, parse code:%x len:%d source:0x%08x dest:0x%08x tranId:%d", tTrace("%s %p, %s received from 0x%x:%hu, parse code:%x len:%d source:0x%08x dest:0x%08x tranId:%d",
pRpc->label, pConn, taosMsg[pHead->msgType], ip, port, code, pRpc->label, pConn, taosMsg[pHead->msgType], ip, port, code,
msgLen, pHead->sourceId, pHead->destId, pHead->tranId); msgLen, pHead->sourceId, pHead->destId, pHead->tranId);
} }
...@@ -733,7 +745,7 @@ static void *rpcProcessMsgFromPeer(void *msg, int msgLen, uint32_t ip, uint16_t ...@@ -733,7 +745,7 @@ static void *rpcProcessMsgFromPeer(void *msg, int msgLen, uint32_t ip, uint16_t
if (code != 0) { // parsing error if (code != 0) { // parsing error
if ( rpcIsReq(pHead->msgType) ) { if ( rpcIsReq(pHead->msgType) ) {
rpcSendErrorMsgToPeer(pRpc, msg, code, ip, port, chandle); rpcSendErrorMsgToPeer(pRpc, msg, code, ip, port, chandle);
tTrace("%s pConn:%p, %s is sent with error code:%x", pRpc->label, pConn, taosMsg[pHead->msgType+1], code); tTrace("%s %p, %s is sent with error code:%x", pRpc->label, pConn, taosMsg[pHead->msgType+1], code);
} }
} else { // parsing OK } else { // parsing OK
rpcProcessIncomingMsg(pConn, pHead); rpcProcessIncomingMsg(pConn, pHead);
...@@ -764,12 +776,13 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { ...@@ -764,12 +776,13 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
pContext->redirect = 1; pContext->redirect = 1;
pContext->numOfTry = 0; pContext->numOfTry = 0;
memcpy(&pContext->ipSet, pHead->content, sizeof(pContext->ipSet)); memcpy(&pContext->ipSet, pHead->content, sizeof(pContext->ipSet));
tTrace("%s %p, redirect is received, numOfIps:%d", pRpc->label, pConn, pContext->ipSet.numOfIps);
rpcSendReqToServer(pRpc, pContext); rpcSendReqToServer(pRpc, pContext);
} else { } else {
rpcFreeOutMsg(rpcHeadFromCont(pContext->pCont)); // free the request msg
if ( pRpc->ufp && (pContext->ipSet.index != pContext->oldIndex || pContext->redirect) ) if ( pRpc->ufp && (pContext->ipSet.index != pContext->oldIndex || pContext->redirect) )
(*pRpc->ufp)(pContext->ahandle, pContext->ipSet); // notify the update of ipSet (*pRpc->ufp)(pContext->ahandle, &pContext->ipSet); // notify the update of ipSet
(*pRpc->cfp)(pHead->msgType, pCont, contLen, pContext->ahandle, code); (*pRpc->cfp)(pHead->msgType, pCont, contLen, pContext->ahandle, code);
rpcFreeOutMsg(rpcHeadFromCont(pContext->pCont)); // free the request msg
} }
} }
} }
...@@ -885,12 +898,12 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) { ...@@ -885,12 +898,12 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) {
if ( rpcIsReq(pHead->msgType)) { if ( rpcIsReq(pHead->msgType)) {
if (pHead->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) if (pHead->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16))
tTrace("%s pConn:%p, %s is sent to %s:%hu, len:%d source:0x%08x dest:0x%08x tranId:%d", tTrace("%s %p, %s is sent to %s:%hu, len:%d source:0x%08x dest:0x%08x tranId:%d",
pRpc->label, pConn, taosMsg[pHead->msgType], pConn->peerIpstr, pRpc->label, pConn, taosMsg[pHead->msgType], pConn->peerIpstr,
pConn->peerPort, msgLen, pHead->sourceId, pHead->destId, pHead->tranId); pConn->peerPort, msgLen, pHead->sourceId, pHead->destId, pHead->tranId);
} else { } else {
if (pHead->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) if (pHead->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16))
tTrace( "%s pConn:%p, %s is sent to %s:%hu, code:%u len:%d source:0x%08x dest:0x%08x tranId:%d", tTrace( "%s %p, %s is sent to %s:%hu, code:%u len:%d source:0x%08x dest:0x%08x tranId:%d",
pRpc->label, pConn, taosMsg[pHead->msgType], pConn->peerIpstr, pConn->peerPort, pRpc->label, pConn, taosMsg[pHead->msgType], pConn->peerIpstr, pConn->peerPort,
(uint8_t)pHead->content[0], msgLen, pHead->sourceId, pHead->destId, pHead->tranId); (uint8_t)pHead->content[0], msgLen, pHead->sourceId, pHead->destId, pHead->tranId);
} }
...@@ -898,7 +911,7 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) { ...@@ -898,7 +911,7 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) {
writtenLen = (*taosSendData[pRpc->connType])(pConn->peerIp, pConn->peerPort, (char *)pHead, msgLen, pConn->chandle); writtenLen = (*taosSendData[pRpc->connType])(pConn->peerIp, pConn->peerPort, (char *)pHead, msgLen, pConn->chandle);
if (writtenLen != msgLen) { if (writtenLen != msgLen) {
tError("%s pConn:%p, failed to send, dataLen:%d writtenLen:%d, reason:%s", pRpc->label, pConn, tError("%s %p, failed to send, dataLen:%d writtenLen:%d, reason:%s", pRpc->label, pConn,
msgLen, writtenLen, strerror(errno)); msgLen, writtenLen, strerror(errno));
} }
...@@ -928,23 +941,23 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) { ...@@ -928,23 +941,23 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) {
pthread_mutex_lock(&pRpc->mutex); pthread_mutex_lock(&pRpc->mutex);
if (pConn->outType && pConn->meterId[0]) { if (pConn->outType && pConn->meterId[0]) {
tTrace("%s pConn:%p, expected %s is not received", pRpc->label, pConn, taosMsg[(int)pConn->outType + 1]); tTrace("%s %p, expected %s is not received", pRpc->label, pConn, taosMsg[(int)pConn->outType + 1]);
pConn->pTimer = NULL; pConn->pTimer = NULL;
pConn->retry++; pConn->retry++;
if (pConn->retry < 4) { if (pConn->retry < 4) {
tTrace("%s pConn:%p, re-send msg:%s to %s:%hu", pRpc->label, pConn, tTrace("%s %p, re-send msg:%s to %s:%hu", pRpc->label, pConn,
taosMsg[pConn->outType], pConn->peerIpstr, pConn->peerPort); taosMsg[pConn->outType], pConn->peerIpstr, pConn->peerPort);
rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen); rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen);
taosTmrReset(rpcProcessRetryTimer, tsRpcTimer<<pConn->retry, pConn, pRpc->tmrCtrl, &pConn->pTimer); taosTmrReset(rpcProcessRetryTimer, tsRpcTimer<<pConn->retry, pConn, pRpc->tmrCtrl, &pConn->pTimer);
} else { } else {
// close the connection // close the connection
tTrace("%s pConn:%p, failed to send msg:%s to %s:%hu", pRpc->label, pConn, tTrace("%s %p, failed to send msg:%s to %s:%hu", pRpc->label, pConn,
taosMsg[pConn->outType], pConn->peerIpstr, pConn->peerPort); taosMsg[pConn->outType], pConn->peerIpstr, pConn->peerPort);
reportDisc = 1; reportDisc = 1;
} }
} else { } else {
tTrace("%s pConn:%p, retry timer not processed", pRpc->label, pConn); tTrace("%s %p, retry timer not processed", pRpc->label, pConn);
} }
pthread_mutex_unlock(&pRpc->mutex); pthread_mutex_unlock(&pRpc->mutex);
...@@ -961,10 +974,10 @@ static void rpcProcessIdleTimer(void *param, void *tmrId) { ...@@ -961,10 +974,10 @@ static void rpcProcessIdleTimer(void *param, void *tmrId) {
SRpcInfo *pRpc = pConn->pRpc; SRpcInfo *pRpc = pConn->pRpc;
if (pConn->meterId[0]) { if (pConn->meterId[0]) {
tTrace("%s pConn:%p, close the connection since no activity", pRpc->label, pConn); tTrace("%s %p, close the connection since no activity", pRpc->label, pConn);
rpcCloseConn(pConn); rpcCloseConn(pConn);
} else { } else {
tTrace("%s pConn:%p, idle timer:%p not processed", pRpc->label, pConn, tmrId); tTrace("%s %p, idle timer:%p not processed", pRpc->label, pConn, tmrId);
} }
} }
...@@ -975,11 +988,11 @@ static void rpcProcessProgressTimer(void *param, void *tmrId) { ...@@ -975,11 +988,11 @@ static void rpcProcessProgressTimer(void *param, void *tmrId) {
pthread_mutex_lock(&pRpc->mutex); pthread_mutex_lock(&pRpc->mutex);
if (pConn->inType && pConn->meterId[0]) { if (pConn->inType && pConn->meterId[0]) {
tTrace("%s pConn:%p, progress timer expired, send progress", pRpc->label, pConn); tTrace("%s %p, progress timer expired, send progress", pRpc->label, pConn);
rpcSendQuickRsp(pConn, TSDB_CODE_ACTION_IN_PROGRESS); rpcSendQuickRsp(pConn, TSDB_CODE_ACTION_IN_PROGRESS);
taosTmrReset(rpcProcessProgressTimer, tsRpcTimer<<pConn->retry, pConn, pRpc->tmrCtrl, &pConn->pTimer); taosTmrReset(rpcProcessProgressTimer, tsRpcTimer<<pConn->retry, pConn, pRpc->tmrCtrl, &pConn->pTimer);
} else { } else {
tTrace("%s pConn:%p, progress timer:%p not processed", pRpc->label, pConn, tmrId); tTrace("%s %p, progress timer:%p not processed", pRpc->label, pConn, tmrId);
} }
pthread_mutex_unlock(&pRpc->mutex); pthread_mutex_unlock(&pRpc->mutex);
...@@ -1124,12 +1137,12 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) { ...@@ -1124,12 +1137,12 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) {
delta = (int32_t)htonl(pDigest->timeStamp); delta = (int32_t)htonl(pDigest->timeStamp);
delta -= (int32_t)taosGetTimestampSec(); delta -= (int32_t)taosGetTimestampSec();
if (abs(delta) > 900) { if (abs(delta) > 900) {
tWarn("%s pConn:%p, time diff:%d is too big, msg discarded, timestamp:%d", pRpc->label, pConn, tWarn("%s %p, time diff:%d is too big, msg discarded, timestamp:%d", pRpc->label, pConn,
delta, htonl(pDigest->timeStamp)); delta, htonl(pDigest->timeStamp));
code = TSDB_CODE_INVALID_TIME_STAMP; code = TSDB_CODE_INVALID_TIME_STAMP;
} else { } else {
if (rpcAuthenticateMsg((uint8_t *)pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, (uint8_t *)pConn->secret) < 0) { if (rpcAuthenticateMsg((uint8_t *)pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, (uint8_t *)pConn->secret) < 0) {
tError("%s pConn:%p, authentication failed, msg discarded", pRpc->label, pConn); tError("%s %p, authentication failed, msg discarded", pRpc->label, pConn);
code = TSDB_CODE_AUTH_FAILURE; code = TSDB_CODE_AUTH_FAILURE;
} else { } else {
pHead->msgLen -= sizeof(SRpcDigest); pHead->msgLen -= sizeof(SRpcDigest);
...@@ -1138,7 +1151,7 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) { ...@@ -1138,7 +1151,7 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) {
} else { } else {
// if it is request or response with code 0, msg shall be discarded // if it is request or response with code 0, msg shall be discarded
if (rpcIsReq(pHead->msgType) || (pHead->content[0] == 0)) { if (rpcIsReq(pHead->msgType) || (pHead->content[0] == 0)) {
tTrace("%s pConn:%p, auth spi not matched, msg discarded", pRpc->label, pConn); tTrace("%s %p, auth spi not matched, msg discarded", pRpc->label, pConn);
code = TSDB_CODE_AUTH_FAILURE; code = TSDB_CODE_AUTH_FAILURE;
} }
} }
......
...@@ -17,10 +17,15 @@ ...@@ -17,10 +17,15 @@
#include "os.h" #include "os.h"
#include "tlog.h" #include "tlog.h"
#include "trpc.h" #include "trpc.h"
#include "taoserror.h"
#include <stdint.h> #include <stdint.h>
void processMsg(char type, void *pCont, int contLen, void *ahandle, int32_t code) { void processMsg(char type, void *pCont, int contLen, void *ahandle, int32_t code) {
dPrint("response is received, type:%d, contLen:%d code:%x, ahandle:%p", type, contLen, code, ahandle); dPrint("response is received, type:%d, contLen:%d code:%x:%s", type, contLen, code, tstrerror(code));
}
void processUpdate(void *handle, SRpcIpSet *pIpSet) {
dPrint("ip set is changed, index:%d", pIpSet->index);
} }
int32_t main(int32_t argc, char *argv[]) { int32_t main(int32_t argc, char *argv[]) {
...@@ -35,6 +40,7 @@ int32_t main(int32_t argc, char *argv[]) { ...@@ -35,6 +40,7 @@ int32_t main(int32_t argc, char *argv[]) {
rpcInit.label = "APP"; rpcInit.label = "APP";
rpcInit.numOfThreads = 1; rpcInit.numOfThreads = 1;
rpcInit.cfp = processMsg; rpcInit.cfp = processMsg;
rpcInit.ufp = processUpdate;
rpcInit.sessions = 1000; rpcInit.sessions = 1000;
rpcInit.connType = TAOS_CONN_UDPC; rpcInit.connType = TAOS_CONN_UDPC;
rpcInit.idleTime = 2000; rpcInit.idleTime = 2000;
...@@ -52,11 +58,11 @@ int32_t main(int32_t argc, char *argv[]) { ...@@ -52,11 +58,11 @@ int32_t main(int32_t argc, char *argv[]) {
ipSet.numOfIps = 2; ipSet.numOfIps = 2;
ipSet.index = 0; ipSet.index = 0;
ipSet.port = 7000; ipSet.port = 7000;
ipSet.ip[0] = inet_addr("127.0.0.1"); ipSet.ip[0] = inet_addr("192.168.0.1");
ipSet.ip[1] = inet_addr("192.168.0.1"); ipSet.ip[1] = inet_addr("127.0.0.1");
void *cont = rpcMallocCont(100); void *cont = rpcMallocCont(100);
rpcSendRequest(pRpc, ipSet, 1, cont, 100, 1); rpcSendRequest(pRpc, &ipSet, 1, cont, 100, 1);
getchar(); getchar();
......
...@@ -22,7 +22,17 @@ ...@@ -22,7 +22,17 @@
void processMsg(char type, void *pCont, int contLen, void *ahandle, int32_t code) { void processMsg(char type, void *pCont, int contLen, void *ahandle, int32_t code) {
dPrint("request is received, type:%d, contLen:%d", type, contLen); dPrint("request is received, type:%d, contLen:%d", type, contLen);
void *rsp = rpcMallocCont(128); void *rsp = rpcMallocCont(128);
rpcSendResponse(ahandle, 1, rsp, 128);
//rpcSendResponse(ahandle, 1, rsp, 128);
SRpcIpSet ipSet;
ipSet.numOfIps = 1;
ipSet.index = 0;
ipSet.port = 7000;
ipSet.ip[0] = inet_addr("192.168.0.2");
rpcSendRedirectRsp(ahandle, &ipSet);
rpcFreeCont(pCont); rpcFreeCont(pCont);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册