提交 1318212c 编写于 作者: 陶建辉(Jeff)'s avatar 陶建辉(Jeff)

more optimization

上级 7fb3fd98
...@@ -35,37 +35,45 @@ extern "C" { ...@@ -35,37 +35,45 @@ extern "C" {
extern int tsRpcHeadSize; extern int tsRpcHeadSize;
typedef struct { typedef struct {
char *localIp; // local IP used int8_t index;
uint16_t localPort; // local port int8_t numOfIps;
char *label; // for debug purpose
int numOfThreads; // number of threads to handle connections
void *(*fp)(char type, void *pCont, int contLen, void *handle, int index); // function to process the incoming msg
int sessions; // number of sessions allowed
int connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS
int idleTime; // milliseconds, 0 means idle timer is disabled
char *meterId; // meter ID
char spi; // security parameter index
char encrypt; // encrypt algorithm
char *secret; // key for authentication
char *ckey; // ciphering key
int (*afp) (char *meterId, char *spi, char *encrypt, uint8_t *secret, uint8_t *ckey); // call back to retrieve auth info
} SRpcInit;
typedef struct {
int16_t index;
int16_t numOfIps;
uint16_t port; uint16_t port;
uint32_t ip[TSDB_MAX_MPEERS]; uint32_t ip[TSDB_MAX_MPEERS];
char ipStr[TSDB_MAX_MPEERS][TSDB_IPv4ADDR_LEN];
} SRpcIpSet; } SRpcIpSet;
typedef struct {
char *localIp; // local IP used
uint16_t localPort; // local port
char *label; // for debug purpose
int numOfThreads; // number of threads to handle connections
int sessions; // number of sessions allowed
int connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS
int idleTime; // milliseconds, 0 means idle timer is disabled
// the following is for client security only
char *meterId; // meter ID
char spi; // security parameter index
char encrypt; // encrypt algorithm
char *secret; // key for authentication
char *ckey; // ciphering key
// call back to process incoming msg
void (*cfp)(char type, void *pCont, int contLen, void *ahandle, int32_t code);
// call back to process notify the ipSet changes
void (*ufp)(void *ahandle, SRpcIpSet ipSet);
// call back to retrieve the client auth info
int (*afp)(char *meterId, char *spi, char *encrypt, uint8_t *secret, uint8_t *ckey);
} SRpcInit;
void *rpcOpen(SRpcInit *pRpc); void *rpcOpen(SRpcInit *pRpc);
void rpcClose(void *); void rpcClose(void *);
void *rpcMallocCont(int contLen); void *rpcMallocCont(int contLen);
void rpcFreeCont(void *pCont); void rpcFreeCont(void *pCont);
void rpcSendRequest(void *thandle, SRpcIpSet ipSet, char msgType, void *pCont, int contLen, void *ahandle); void rpcSendRequest(void *thandle, SRpcIpSet ipSet, char msgType, void *pCont, int contLen, void *ahandle);
void rpcSendResponse(void *pConn, void *pCont, int contLen); void rpcSendResponse(void *pConn, int32_t code, void *pCont, int contLen);
void rpcSendSimpleRsp(void *pConn, int32_t code); void rpcSendRedirectRsp(void *pConn, SRpcIpSet ipSet);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -33,15 +33,15 @@ ...@@ -33,15 +33,15 @@
#define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHeader) + sizeof(SRpcDigest)) #define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHeader) + sizeof(SRpcDigest))
#define rpcHeaderFromCont(cont) ((SRpcHeader *) (cont - sizeof(SRpcHeader))) #define rpcHeaderFromCont(cont) ((SRpcHeader *) (cont - sizeof(SRpcHeader)))
#define rpcContFromHeader(msg) ( msg + sizeof(SRpcHeader)) #define rpcContFromHeader(msg) (msg + sizeof(SRpcHeader))
#define rpcMsgLenFromCont(contLen) ( contLen + sizeof(SRpcHeader)) #define rpcMsgLenFromCont(contLen) (contLen + sizeof(SRpcHeader))
#define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHeader)) #define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHeader))
#define rpcIsReq(type) (type & 1U) #define rpcIsReq(type) (type & 1U)
typedef struct { typedef struct {
int sessions; int sessions;
int numOfThreads; int numOfThreads;
int idleTime; // milliseconds; int idleTime; // milliseconds;
char localIp[TSDB_IPv4ADDR_LEN]; char localIp[TSDB_IPv4ADDR_LEN];
uint16_t localPort; uint16_t localPort;
int connType; int connType;
...@@ -53,27 +53,29 @@ typedef struct { ...@@ -53,27 +53,29 @@ typedef struct {
char *secret; // key for authentication char *secret; // key for authentication
char *ckey; // ciphering key char *ckey; // ciphering key
void *(*fp)(char type, void *pCont, int contLen, void *handle, int index); void (*cfp)(char type, void *pCont, int contLen, void *ahandle, int32_t code);
int (*afp)(char *meterId, char *spi, char *encrypt, uint8_t *secret, uint8_t *ckey); // FP to retrieve auth info int (*afp)(char *meterId, char *spi, char *encrypt, uint8_t *secret, uint8_t *ckey);
struct _RpcConn *connList; void (*ufp)(void *ahandle, SRpcIpSet ipSet);
void *idPool;
void *tmrCtrl; void *idPool; // handle to ID pool
void *hash; void *tmrCtrl; // handle to timer
void *hash; // handle returned by hash utility
void *shandle; // returned handle from lower layer during initialization void *shandle; // returned handle from lower layer during initialization
void *pCache; // connection cache void *pCache; // connection cache
pthread_mutex_t mutex; pthread_mutex_t mutex;
struct _RpcConn *connList; // connection list
} SRpcInfo; } SRpcInfo;
typedef struct { typedef struct {
SRpcIpSet ipSet; SRpcIpSet ipSet;
void *ahandle; void *ahandle; // handle provided by app
SRpcInfo *pRpc; SRpcInfo *pRpc; // associated SRpcInfo
char msgType; char msgType; // message type
char *pCont; char *pCont; // content provided by app
int contLen; int contLen; // content length
int numOfRetry; int numOfRetry; // number of retry for different servers
int32_t code; int32_t code; // error code
char msg[]; char msg[0]; // RpcHeader starts from here
} SRpcReqContext; } SRpcReqContext;
typedef struct _RpcConn { typedef struct _RpcConn {
...@@ -124,6 +126,7 @@ typedef struct { ...@@ -124,6 +126,7 @@ typedef struct {
char empty[1]; // reserved char empty[1]; // reserved
uint8_t msgType; // message type uint8_t msgType; // message type
int32_t msgLen; // message length including the header iteslf int32_t msgLen; // message length including the header iteslf
int32_t code;
uint8_t content[0]; // message body starts from here uint8_t content[0]; // message body starts from here
} SRpcHeader; } SRpcHeader;
...@@ -174,25 +177,25 @@ void (*taosCloseConn[])(void *chandle) = { ...@@ -174,25 +177,25 @@ void (*taosCloseConn[])(void *chandle) = {
}; };
static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerIpStr, uint16_t peerPort); static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerIpStr, uint16_t peerPort);
static void rpcCloseConn(void *thandle); static void rpcCloseConn(void *thandle);
static SRpcConn *rpcSetConnToServer(void *shandle, SRpcIpSet ipSet); static SRpcConn *rpcSetConnToServer(SRpcInfo *pRpc, SRpcIpSet ipSet);
static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc); static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc);
static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, char *meterId, char *hashstr); static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, char *meterId, char *hashstr);
static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, char *meterId, char *hashstr); static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, char *meterId, char *hashstr);
static void rpcSendReqToServer(SRpcConn *pConn, SRpcReqContext *pContext); static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext);
static void rpcSendQuickRsp(SRpcConn *pConn, char code); static void rpcSendQuickRsp(SRpcConn *pConn, char code);
static void rpcSendErrorMsgToPeer(SRpcInfo *pRpc, char *pMsg, int32_t code, uint32_t ip, uint16_t port, void *chandle); static void rpcSendErrorMsgToPeer(SRpcInfo *pRpc, char *pMsg, int32_t code, uint32_t ip, uint16_t port, void *chandle);
static void rpcSendDataToPeer(SRpcConn *pConn, void *data, int dataLen); static void rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen);
static void *rpcProcessDataFromPeer(void *data, int dataLen, uint32_t ip, uint16_t port, void *shandle, void *chandle); static void *rpcProcessMsgFromPeer(void *data, int dataLen, uint32_t ip, uint16_t port, void *shandle, void *chandle);
static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHeader *pHeader); static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHeader *pHeader);
static void rpcProcessConnError(void *param, void *id); static void rpcProcessConnError(void *param, void *id);
static void rpcProcessRetryTimer(void *, void *); static void rpcProcessRetryTimer(void *, void *);
static void rpcProcessIdleTimer(void *param, void *tmrId); static void rpcProcessIdleTimer(void *param, void *tmrId);
static void rpcProcessProgressTimer(void *param, void *tmrId); static void rpcProcessProgressTimer(void *param, void *tmrId);
static void rpcFreeMsg(void *msg); static void rpcFreeOutMsg(void *msg);
static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen); static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen);
static SRpcHeader *rpcDecompressRpcMsg(SRpcHeader *pHeader); static SRpcHeader *rpcDecompressRpcMsg(SRpcHeader *pHeader);
static int rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen); static int rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen);
...@@ -208,7 +211,6 @@ void *rpcOpen(SRpcInit *pInit) { ...@@ -208,7 +211,6 @@ void *rpcOpen(SRpcInit *pInit) {
if (pRpc == NULL) return NULL; if (pRpc == NULL) return NULL;
strcpy(pRpc->label, pInit->label); strcpy(pRpc->label, pInit->label);
pRpc->fp = pInit->fp;
pRpc->connType = pInit->connType; pRpc->connType = pInit->connType;
pRpc->idleTime = pInit->idleTime; pRpc->idleTime = pInit->idleTime;
pRpc->numOfThreads = pInit->numOfThreads; pRpc->numOfThreads = pInit->numOfThreads;
...@@ -224,10 +226,12 @@ void *rpcOpen(SRpcInit *pInit) { ...@@ -224,10 +226,12 @@ void *rpcOpen(SRpcInit *pInit) {
pRpc->spi = pInit->spi; pRpc->spi = pInit->spi;
strcpy(pRpc->secret, pInit->secret); strcpy(pRpc->secret, pInit->secret);
strcpy(pRpc->ckey, pInit->ckey); strcpy(pRpc->ckey, pInit->ckey);
pRpc->ufp = pInit->ufp;
pRpc->cfp = pInit->cfp;
pRpc->afp = pInit->afp; pRpc->afp = pInit->afp;
pRpc->shandle = (*taosInitConn[pRpc->connType])(pRpc->localIp, pRpc->localPort, pRpc->label, pRpc->shandle = (*taosInitConn[pRpc->connType])(pRpc->localIp, pRpc->localPort, pRpc->label,
pRpc->numOfThreads, rpcProcessDataFromPeer, pRpc); pRpc->numOfThreads, rpcProcessMsgFromPeer, pRpc);
if (pRpc->shandle == NULL) { if (pRpc->shandle == NULL) {
tError("%s failed to init network, %s:%d", pRpc->label, pRpc->localIp, pRpc->localPort); tError("%s failed to init network, %s:%d", pRpc->label, pRpc->localIp, pRpc->localPort);
rpcClose(pRpc); rpcClose(pRpc);
...@@ -318,7 +322,6 @@ void rpcFreeCont(void *cont) { ...@@ -318,7 +322,6 @@ void rpcFreeCont(void *cont) {
void rpcSendRequest(void *shandle, SRpcIpSet ipSet, char type, void *pCont, int contLen, void *ahandle) { void rpcSendRequest(void *shandle, SRpcIpSet ipSet, char type, void *pCont, int contLen, void *ahandle) {
SRpcInfo *pRpc = (SRpcInfo *)shandle; SRpcInfo *pRpc = (SRpcInfo *)shandle;
SRpcConn *pConn;
SRpcReqContext *pContext; SRpcReqContext *pContext;
contLen = rpcCompressRpcMsg(pCont, contLen); contLen = rpcCompressRpcMsg(pCont, contLen);
...@@ -330,22 +333,23 @@ void rpcSendRequest(void *shandle, SRpcIpSet ipSet, char type, void *pCont, int ...@@ -330,22 +333,23 @@ void rpcSendRequest(void *shandle, SRpcIpSet ipSet, char type, void *pCont, int
pContext->pCont = pCont; pContext->pCont = pCont;
pContext->msgType = type; pContext->msgType = type;
pConn = rpcSetConnToServer(shandle, ipSet); rpcSendReqToServer(pRpc, pContext);
pContext->code = terrno;
if (pConn == NULL) taosTmrStart(rpcProcessConnError, 0, pContext, pRpc->tmrCtrl);
rpcSendReqToServer(pConn, pContext);
return; return;
} }
void rpcSendResponse(void *handle, void *pCont, int contLen) { void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) {
int msgLen = 0; int msgLen = 0;
SRpcConn *pConn = (SRpcConn *)handle; SRpcConn *pConn = (SRpcConn *)handle;
SRpcInfo *pRpc = pConn->pRpc; SRpcInfo *pRpc = pConn->pRpc;
SRpcHeader *pHeader = rpcHeaderFromCont(pCont); SRpcHeader *pHeader = rpcHeaderFromCont(pCont);
char *msg = (char *)pHeader; char *msg = (char *)pHeader;
if ( pCont == NULL ) {
pCont = rpcMallocCont(0);
contLen = 0;
}
contLen = rpcCompressRpcMsg(pCont, contLen); contLen = rpcCompressRpcMsg(pCont, contLen);
msgLen = rpcMsgLenFromCont(contLen); msgLen = rpcMsgLenFromCont(contLen);
...@@ -367,13 +371,14 @@ void rpcSendResponse(void *handle, void *pCont, int contLen) { ...@@ -367,13 +371,14 @@ void rpcSendResponse(void *handle, void *pCont, int contLen) {
pHeader->sourceId = pConn->ownId; pHeader->sourceId = pConn->ownId;
pHeader->destId = pConn->peerId; pHeader->destId = pConn->peerId;
pHeader->uid = 0; pHeader->uid = 0;
pHeader->code = htonl(code);
memcpy(pHeader->meterId, pConn->meterId, tListLen(pHeader->meterId)); memcpy(pHeader->meterId, pConn->meterId, tListLen(pHeader->meterId));
// set pConn parameters // set pConn parameters
pConn->inType = 0; pConn->inType = 0;
// response message is released until new response is sent // response message is released until new response is sent
rpcFreeMsg(pConn->pRspMsg); rpcFreeOutMsg(pConn->pRspMsg);
pConn->pRspMsg = msg; pConn->pRspMsg = msg;
pConn->rspMsgLen = msgLen; pConn->rspMsgLen = msgLen;
if (pHeader->content[0] == TSDB_CODE_ACTION_IN_PROGRESS) pConn->inTranId--; if (pHeader->content[0] == TSDB_CODE_ACTION_IN_PROGRESS) pConn->inTranId--;
...@@ -381,28 +386,21 @@ void rpcSendResponse(void *handle, void *pCont, int contLen) { ...@@ -381,28 +386,21 @@ void rpcSendResponse(void *handle, void *pCont, int contLen) {
pthread_mutex_unlock(&pRpc->mutex); pthread_mutex_unlock(&pRpc->mutex);
taosTmrStopA(&pConn->pTimer); taosTmrStopA(&pConn->pTimer);
rpcSendDataToPeer(pConn, msg, msgLen); rpcSendMsgToPeer(pConn, msg, msgLen);
return; return;
} }
void rpcSendSimpleRsp(void *thandle, int32_t code) { void rpcSendRedirectRsp(void *thandle, SRpcIpSet ipSet) {
char *pMsg; char *pMsg;
STaosRsp *pRsp; int msgLen = sizeof(SRpcIpSet);
int msgLen = sizeof(STaosRsp);
if (thandle == NULL) {
tError("connection is gone, response could not be sent");
return;
}
pMsg = rpcMallocCont(msgLen); pMsg = rpcMallocCont(msgLen);
if (pMsg == NULL) return; if (pMsg == NULL) return;
pRsp = (STaosRsp *)pMsg; memcpy(pMsg, &ipSet, sizeof(ipSet));
pRsp->code = htonl(code);
rpcSendResponse(thandle, pMsg, msgLen); rpcSendResponse(thandle, TSDB_CODE_REDIRECT, pMsg, msgLen);
return; return;
} }
...@@ -442,34 +440,33 @@ static void rpcCloseConn(void *thandle) { ...@@ -442,34 +440,33 @@ static void rpcCloseConn(void *thandle) {
pthread_mutex_lock(&pRpc->mutex); pthread_mutex_lock(&pRpc->mutex);
if (taosCloseConn[pRpc->connType]) (*taosCloseConn[pRpc->connType])(pConn->chandle); if (pConn->meterId[0]) {
pConn->meterId[0] = 0;
if (taosCloseConn[pRpc->connType]) (*taosCloseConn[pRpc->connType])(pConn->chandle);
taosTmrStopA(&pConn->pTimer);
taosTmrStopA(&pConn->pIdleTimer);
if ( pRpc->connType == TAOS_CONN_UDPS || TAOS_CONN_TCPS) {
char hashstr[40] = {0};
sprintf(hashstr, "%x:%x:%x", pConn->peerIp, pConn->peerUid, pConn->peerId);
taosDeleteStrHash(pRpc->hash, hashstr);
rpcFreeOutMsg(pConn->pRspMsg); // it may have a response msg saved, but not request msg
pConn->pRspMsg = NULL;
pConn->inType = 0;
pConn->inTranId = 0;
} else {
pConn->outType = 0;
pConn->outTranId = 0;
pConn->pReqMsg = NULL;
}
taosTmrStopA(&pConn->pTimer); taosFreeId(pRpc->idPool, pConn->sid);
taosTmrStopA(&pConn->pIdleTimer); pConn->pContext = NULL;
if ( pRpc->connType == TAOS_CONN_UDPS || TAOS_CONN_TCPS) { tTrace("%s pConn:%p, rpc connection is closed", pRpc->label, pConn);
char hashstr[40] = {0};
sprintf(hashstr, "%x:%x:%x", pConn->peerIp, pConn->peerUid, pConn->peerId);
taosDeleteStrHash(pRpc->hash, hashstr);
rpcFreeMsg(pConn->pRspMsg); // it may have a response msg saved, but not request msg
} }
taosFreeId(pRpc->idPool, pConn->sid);
// reset the link parameters
pConn->meterId[0] = 0;
pConn->outType = 0;
pConn->inType = 0;
pConn->inTranId = 0;
pConn->outTranId = 0;
pConn->pReqMsg = NULL;
pConn->reqMsgLen = 0;
pConn->pRspMsg = NULL;
pConn->rspMsgLen = 0;
pConn->pContext = NULL;
tTrace("%s pConn:%p, rpc connection is closed", pRpc->label, pConn);
pthread_mutex_unlock(&pRpc->mutex); pthread_mutex_unlock(&pRpc->mutex);
} }
...@@ -553,13 +550,14 @@ static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, char *meterId, char *has ...@@ -553,13 +550,14 @@ static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, char *meterId, char *has
return pConn; return pConn;
} }
SRpcConn *rpcSetConnToServer(void *shandle, SRpcIpSet ipSet) { SRpcConn *rpcSetConnToServer(SRpcInfo *pRpc, SRpcIpSet ipSet) {
SRpcInfo *pRpc = (SRpcInfo *)shandle; SRpcConn *pConn;
SRpcConn *pConn = taosGetConnFromCache(pRpc->pCache, ipSet.ip[ipSet.index], ipSet.port, pRpc->meterId);
pConn = taosGetConnFromCache(pRpc->pCache, ipSet.ip[ipSet.index], ipSet.port, pRpc->meterId);
if ( pConn == NULL ) { if ( pConn == NULL ) {
pConn = rpcOpenConn(pRpc, ipSet.ipStr[ipSet.index], ipSet.port); char ipstr[20] = {0};
tinet_ntoa(ipstr, ipSet.ip[ipSet.index]);
pConn = rpcOpenConn(pRpc, ipstr, ipSet.port);
} }
return pConn; return pConn;
...@@ -585,7 +583,7 @@ static int rpcProcessReqHeader(SRpcConn *pConn, SRpcHeader *pHeader) { ...@@ -585,7 +583,7 @@ static int rpcProcessReqHeader(SRpcConn *pConn, SRpcHeader *pHeader) {
} else if (pConn->inType == 0) { } else if (pConn->inType == 0) {
tTrace("%s pConn:%p, %s is already processed, tranId:%d", pRpc->label, pConn, tTrace("%s pConn:%p, %s is already processed, tranId:%d", pRpc->label, pConn,
taosMsg[pHeader->msgType], pConn->inTranId); taosMsg[pHeader->msgType], pConn->inTranId);
rpcSendDataToPeer(pConn, pConn->pRspMsg, pConn->rspMsgLen); // resend the response rpcSendMsgToPeer(pConn, pConn->pRspMsg, pConn->rspMsgLen); // resend the response
} else { } else {
tTrace("%s pConn:%p, mismatched message %s and tranId", pRpc->label, pConn, taosMsg[pHeader->msgType]); tTrace("%s pConn:%p, mismatched message %s and tranId", pRpc->label, pConn, taosMsg[pHeader->msgType]);
} }
...@@ -658,13 +656,14 @@ static int rpcProcessHeader(SRpcInfo *pRpc, SRpcConn **ppConn, void *data, int d ...@@ -658,13 +656,14 @@ static int rpcProcessHeader(SRpcInfo *pRpc, SRpcConn **ppConn, void *data, int d
SRpcHeader *pHeader = (SRpcHeader *)data; SRpcHeader *pHeader = (SRpcHeader *)data;
sid = htonl(pHeader->destId); sid = htonl(pHeader->destId);
pHeader->code = htonl(pHeader->code);
pHeader->msgLen = (int32_t)htonl((uint32_t)pHeader->msgLen);
if (pHeader->msgType >= TSDB_MSG_TYPE_MAX || pHeader->msgType <= 0) { if (pHeader->msgType >= TSDB_MSG_TYPE_MAX || pHeader->msgType <= 0) {
tTrace("%s sid:%d, invalid message type:%d", pRpc->label, sid, pHeader->msgType); tTrace("%s sid:%d, invalid message type:%d", pRpc->label, sid, pHeader->msgType);
return TSDB_CODE_INVALID_MSG_TYPE; return TSDB_CODE_INVALID_MSG_TYPE;
} }
pHeader->msgLen = (int32_t)htonl((uint32_t)pHeader->msgLen);
if (dataLen != pHeader->msgLen) { if (dataLen != pHeader->msgLen) {
tTrace("%s sid:%d, %s has invalid length, dataLen:%d, msgLen:%d", pRpc->label, sid, tTrace("%s sid:%d, %s has invalid length, dataLen:%d, msgLen:%d", pRpc->label, sid,
taosMsg[pHeader->msgType], dataLen, pHeader->msgLen); taosMsg[pHeader->msgType], dataLen, pHeader->msgLen);
...@@ -708,7 +707,7 @@ static int rpcProcessHeader(SRpcInfo *pRpc, SRpcConn **ppConn, void *data, int d ...@@ -708,7 +707,7 @@ static int rpcProcessHeader(SRpcInfo *pRpc, SRpcConn **ppConn, void *data, int d
return code; return code;
} }
static void *rpcProcessDataFromPeer(void *data, int dataLen, uint32_t ip, uint16_t port, void *shandle, void *chandle) { static void *rpcProcessMsgFromPeer(void *data, int dataLen, uint32_t ip, uint16_t port, void *shandle, void *chandle) {
SRpcHeader *pHeader = (SRpcHeader *)data; SRpcHeader *pHeader = (SRpcHeader *)data;
SRpcInfo *pRpc = (SRpcInfo *)shandle; SRpcInfo *pRpc = (SRpcInfo *)shandle;
SRpcConn *pConn = NULL; SRpcConn *pConn = NULL;
...@@ -764,42 +763,33 @@ static void *rpcProcessDataFromPeer(void *data, int dataLen, uint32_t ip, uint16 ...@@ -764,42 +763,33 @@ static void *rpcProcessDataFromPeer(void *data, int dataLen, uint32_t ip, uint16
static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHeader *pHeader) { static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHeader *pHeader) {
SRpcInfo *pRpc = pConn->pRpc; SRpcInfo *pRpc = pConn->pRpc;
int msgLen = rpcContFromHeader(pHeader->msgLen);
pHeader = rpcDecompressRpcMsg(pHeader); pHeader = rpcDecompressRpcMsg(pHeader);
int contLen = rpcContLenFromMsg(pHeader->msgLen);
uint8_t *pCont = pHeader->content;
if ( rpcIsReq(pHeader->msgType) ) { if ( rpcIsReq(pHeader->msgType) ) {
taosTmrReset(rpcProcessProgressTimer, tsRpcTimer/2, pConn, pRpc->tmrCtrl, &pConn->pTimer); taosTmrReset(rpcProcessProgressTimer, tsRpcTimer/2, pConn, pRpc->tmrCtrl, &pConn->pTimer);
(*(pRpc->fp))(pHeader->msgType, pHeader->content, msgLen, pConn, 0); (*(pRpc->cfp))(pHeader->msgType, pCont, contLen, pConn, 0);
} else { } else {
// it's a response // it's a response
STaosRsp *pRsp = (STaosRsp *)pHeader->content; int32_t code = pHeader->code;
int32_t code = htonl(pRsp->code);
SRpcReqContext *pContext = pConn->pContext; SRpcReqContext *pContext = pConn->pContext;
pConn->pContext = NULL; pConn->pContext = NULL;
taosAddConnIntoCache(pRpc->pCache, pConn, pConn->peerIp, pConn->peerPort, pConn->meterId); taosAddConnIntoCache(pRpc->pCache, pConn, pConn->peerIp, pConn->peerPort, pConn->meterId);
if (code == TSDB_CODE_NO_MASTER) { if (code == TSDB_CODE_REDIRECT) {
pContext->code = code; memcpy(&pContext->ipSet, pHeader->content, sizeof(pContext->ipSet));
taosTmrStart(rpcProcessConnError, 0, pContext, pRpc->tmrCtrl); rpcSendReqToServer(pRpc, pContext);
} else { } else {
rpcFreeMsg(rpcHeaderFromCont(pContext->pCont)); // free the request msg rpcFreeOutMsg(rpcHeaderFromCont(pContext->pCont)); // free the request msg
(*(pRpc->fp))(pHeader->msgType, pHeader->content, msgLen, pContext->ahandle, pContext->ipSet.index); (*(pRpc->cfp))(pHeader->msgType, pCont, contLen, pContext->ahandle, pContext->ipSet.index);
} }
} }
} }
static void rpcSendQuickRsp(SRpcConn *pConn, char code) { static void rpcSendQuickRsp(SRpcConn *pConn, char code) {
char msg[RPC_MSG_OVERHEAD + sizeof(STaosRsp)]; char msg[RPC_MSG_OVERHEAD];
SRpcHeader *pHeader; SRpcHeader *pHeader;
int msgLen;
STaosRsp *pRsp;
pRsp = (STaosRsp *)rpcContFromHeader(msg);
pRsp->code = htonl(code);
msgLen = sizeof(STaosRsp);
// set msg header // set msg header
memset(msg, 0, sizeof(SRpcHeader)); memset(msg, 0, sizeof(SRpcHeader));
...@@ -814,14 +804,14 @@ static void rpcSendQuickRsp(SRpcConn *pConn, char code) { ...@@ -814,14 +804,14 @@ static void rpcSendQuickRsp(SRpcConn *pConn, char code) {
pHeader->destId = pConn->peerId; pHeader->destId = pConn->peerId;
pHeader->uid = 0; pHeader->uid = 0;
memcpy(pHeader->meterId, pConn->meterId, tListLen(pHeader->meterId)); memcpy(pHeader->meterId, pConn->meterId, tListLen(pHeader->meterId));
pHeader->code = htonl(code);
rpcSendDataToPeer(pConn, msg, msgLen); rpcSendMsgToPeer(pConn, msg, 0);
} }
static void rpcSendErrorMsgToPeer(SRpcInfo *pRpc, char *pMsg, int32_t code, uint32_t ip, uint16_t port, void *chandle) { static void rpcSendErrorMsgToPeer(SRpcInfo *pRpc, char *pMsg, int32_t code, uint32_t ip, uint16_t port, void *chandle) {
SRpcHeader *pRecvHeader, *pReplyHeader; SRpcHeader *pRecvHeader, *pReplyHeader;
char msg[sizeof(SRpcHeader) + sizeof(SRpcDigest) + sizeof(STaosRsp)]; char msg[sizeof(SRpcHeader) + sizeof(SRpcDigest) + sizeof(uint32_t) ];
STaosRsp *pRsp;
uint32_t timeStamp; uint32_t timeStamp;
int msgLen; int msgLen;
...@@ -839,13 +829,12 @@ static void rpcSendErrorMsgToPeer(SRpcInfo *pRpc, char *pMsg, int32_t code, uint ...@@ -839,13 +829,12 @@ static void rpcSendErrorMsgToPeer(SRpcInfo *pRpc, char *pMsg, int32_t code, uint
pReplyHeader->destId = pRecvHeader->sourceId; pReplyHeader->destId = pRecvHeader->sourceId;
memcpy(pReplyHeader->meterId, pRecvHeader->meterId, tListLen(pReplyHeader->meterId)); memcpy(pReplyHeader->meterId, pRecvHeader->meterId, tListLen(pReplyHeader->meterId));
pRsp = (STaosRsp *)pReplyHeader->content; pReplyHeader->code = htonl(code);
pRsp->code = htonl(code); msgLen = sizeof(SRpcHeader);
msgLen = sizeof(STaosRsp);
char *pContent = pRsp->more;
if (code == TSDB_CODE_INVALID_TIME_STAMP) { if (code == TSDB_CODE_INVALID_TIME_STAMP) {
// include a time stamp if client's time is not synchronized well // include a time stamp if client's time is not synchronized well
uint8_t *pContent = pReplyHeader->content;
timeStamp = taosGetTimestampSec(); timeStamp = taosGetTimestampSec();
memcpy(pContent, &timeStamp, sizeof(timeStamp)); memcpy(pContent, &timeStamp, sizeof(timeStamp));
msgLen += sizeof(timeStamp); msgLen += sizeof(timeStamp);
...@@ -857,13 +846,19 @@ static void rpcSendErrorMsgToPeer(SRpcInfo *pRpc, char *pMsg, int32_t code, uint ...@@ -857,13 +846,19 @@ static void rpcSendErrorMsgToPeer(SRpcInfo *pRpc, char *pMsg, int32_t code, uint
return; return;
} }
static void rpcSendReqToServer(SRpcConn *pConn, SRpcReqContext *pContext) { static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
SRpcHeader *pHeader = rpcHeaderFromCont(pContext->pCont); SRpcHeader *pHeader = rpcHeaderFromCont(pContext->pCont);
SRpcInfo *pRpc = pConn->pRpc;
char *msg = (char *)pHeader; char *msg = (char *)pHeader;
int msgLen = rpcMsgLenFromCont(pContext->contLen); int msgLen = rpcMsgLenFromCont(pContext->contLen);
char msgType = pContext->msgType; char msgType = pContext->msgType;
SRpcConn *pConn = rpcSetConnToServer(pRpc, pContext->ipSet);
if (pConn == NULL) {
pContext->code = terrno;
taosTmrStart(rpcProcessConnError, 0, pContext, pRpc->tmrCtrl);
return;
}
pthread_mutex_lock(&pRpc->mutex); pthread_mutex_lock(&pRpc->mutex);
// set the message header // set the message header
...@@ -889,37 +884,37 @@ static void rpcSendReqToServer(SRpcConn *pConn, SRpcReqContext *pContext) { ...@@ -889,37 +884,37 @@ static void rpcSendReqToServer(SRpcConn *pConn, SRpcReqContext *pContext) {
pthread_mutex_unlock(&pRpc->mutex); pthread_mutex_unlock(&pRpc->mutex);
rpcSendDataToPeer(pConn, msg, msgLen); rpcSendMsgToPeer(pConn, msg, msgLen);
taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer); taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer);
} }
static void rpcSendDataToPeer(SRpcConn *pConn, void *data, int dataLen) { static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) {
int writtenLen = 0; int writtenLen = 0;
SRpcInfo *pRpc = pConn->pRpc; SRpcInfo *pRpc = pConn->pRpc;
SRpcHeader *pHeader = (SRpcHeader *)data; SRpcHeader *pHeader = (SRpcHeader *)msg;
dataLen = rpcAddAuthPart(pConn, data, dataLen); msgLen = rpcAddAuthPart(pConn, msg, msgLen);
if ( rpcIsReq(pHeader->msgType)) { if ( rpcIsReq(pHeader->msgType)) {
if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16))
tTrace("%s pConn:%p, %s is sent to %s:%hu, len:%d source:0x%08x dest:0x%08x tranId:%d", tTrace("%s pConn:%p, %s is sent to %s:%hu, len:%d source:0x%08x dest:0x%08x tranId:%d",
pRpc->label, pConn, taosMsg[pHeader->msgType], pConn->peerIpstr, pRpc->label, pConn, taosMsg[pHeader->msgType], pConn->peerIpstr,
pConn->peerPort, dataLen, pHeader->sourceId, pHeader->destId, pHeader->tranId); pConn->peerPort, msgLen, pHeader->sourceId, pHeader->destId, pHeader->tranId);
} else { } else {
if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16))
tTrace( "%s pConn:%p, %s is sent to %s:%hu, code:%u len:%d source:0x%08x dest:0x%08x tranId:%d", tTrace( "%s pConn:%p, %s is sent to %s:%hu, code:%u len:%d source:0x%08x dest:0x%08x tranId:%d",
pRpc->label, pConn, taosMsg[pHeader->msgType], pConn->peerIpstr, pConn->peerPort, pRpc->label, pConn, taosMsg[pHeader->msgType], pConn->peerIpstr, pConn->peerPort,
(uint8_t)pHeader->content[0], dataLen, pHeader->sourceId, pHeader->destId, pHeader->tranId); (uint8_t)pHeader->content[0], msgLen, pHeader->sourceId, pHeader->destId, pHeader->tranId);
} }
writtenLen = (*taosSendData[pRpc->connType])(pConn->peerIp, pConn->peerPort, (char *)pHeader, dataLen, pConn->chandle); writtenLen = (*taosSendData[pRpc->connType])(pConn->peerIp, pConn->peerPort, (char *)pHeader, msgLen, pConn->chandle);
if (writtenLen != dataLen) { if (writtenLen != msgLen) {
tError("%s pConn:%p, failed to send, dataLen:%d writtenLen:%d, reason:%s", pRpc->label, pConn, tError("%s pConn:%p, failed to send, dataLen:%d writtenLen:%d, reason:%s", pRpc->label, pConn,
dataLen, writtenLen, strerror(errno)); msgLen, writtenLen, strerror(errno));
} }
tDump(data, dataLen); tDump(msg, msgLen);
} }
static void rpcProcessConnError(void *param, void *id) { static void rpcProcessConnError(void *param, void *id) {
...@@ -927,34 +922,20 @@ static void rpcProcessConnError(void *param, void *id) { ...@@ -927,34 +922,20 @@ static void rpcProcessConnError(void *param, void *id) {
SRpcInfo *pRpc = pContext->pRpc; SRpcInfo *pRpc = pContext->pRpc;
if ( pContext->numOfRetry >= pContext->ipSet.numOfIps ) { if ( pContext->numOfRetry >= pContext->ipSet.numOfIps ) {
rpcFreeMsg(rpcHeaderFromCont(pContext->pCont)); // free the request msg rpcFreeOutMsg(rpcHeaderFromCont(pContext->pCont)); // free the request msg
char *rsp = calloc(1, RPC_MSG_OVERHEAD + sizeof(STaosRsp)); (*(pRpc->cfp))(pContext->msgType+1, NULL, 0, pContext->ahandle, pContext->code);
if ( rsp ) {
STaosRsp *pRsp = (STaosRsp *)(rsp+sizeof(SRpcHeader));
pRsp->code = pContext->code;
(*(pRpc->fp))(pContext->msgType+1, pRsp, sizeof(STaosRsp), pContext->ahandle, 0);
} else {
tError("%s failed to malloc RSP", pRpc->label);
}
} else { } else {
// move to next IP // move to next IP
pContext->ipSet.index++; pContext->ipSet.index++;
pContext->ipSet.index = pContext->ipSet.index % pContext->ipSet.numOfIps; pContext->ipSet.index = pContext->ipSet.index % pContext->ipSet.numOfIps;
rpcSendReqToServer(pRpc, pContext);
SRpcConn *pConn = rpcSetConnToServer(pContext->pRpc, pContext->ipSet);
pContext->code = terrno;
if (pConn == NULL) taosTmrStart(rpcProcessConnError, 0, pContext, pRpc->tmrCtrl);
rpcSendReqToServer(pConn, pContext);
} }
} }
static void rpcProcessRetryTimer(void *param, void *tmrId) { static void rpcProcessRetryTimer(void *param, void *tmrId) {
SRpcConn *pConn = (SRpcConn *)param; SRpcConn *pConn = (SRpcConn *)param;
int reportDisc = 0;
SRpcInfo *pRpc = pConn->pRpc; SRpcInfo *pRpc = pConn->pRpc;
if (pRpc == NULL) return; // it means it is already released int reportDisc = 0;
pthread_mutex_lock(&pRpc->mutex); pthread_mutex_lock(&pRpc->mutex);
...@@ -966,7 +947,7 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) { ...@@ -966,7 +947,7 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) {
if (pConn->retry < 4) { if (pConn->retry < 4) {
tTrace("%s pConn:%p, re-send msg:%s to %s:%hu", pRpc->label, tTrace("%s pConn:%p, re-send msg:%s to %s:%hu", pRpc->label,
taosMsg[pConn->outType], pConn->peerIpstr, pConn->peerPort); taosMsg[pConn->outType], pConn->peerIpstr, pConn->peerPort);
rpcSendDataToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen); rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen);
taosTmrReset(rpcProcessRetryTimer, tsRpcTimer<<pConn->retry, pConn, pRpc->tmrCtrl, &pConn->pTimer); taosTmrReset(rpcProcessRetryTimer, tsRpcTimer<<pConn->retry, pConn, pRpc->tmrCtrl, &pConn->pTimer);
} else { } else {
// close the connection // close the connection
...@@ -990,18 +971,13 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) { ...@@ -990,18 +971,13 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) {
static void rpcProcessIdleTimer(void *param, void *tmrId) { static void rpcProcessIdleTimer(void *param, void *tmrId) {
SRpcConn *pConn = (SRpcConn *)param; SRpcConn *pConn = (SRpcConn *)param;
SRpcInfo *pRpc = pConn->pRpc; SRpcInfo *pRpc = pConn->pRpc;
assert(pRpc);
pthread_mutex_lock(&pRpc->mutex); if (pConn->meterId[0]) {
if (pConn->inType == 0 && pConn->meterId[0]) {
tTrace("%s pConn:%p, close the connection since no activity", pRpc->label, pConn); tTrace("%s pConn:%p, close the connection since no activity", pRpc->label, pConn);
rpcCloseConn(pConn); rpcCloseConn(pConn);
} else { } else {
tTrace("%s pConn:%p, idle timer:%p not processed", pRpc->label, pConn, tmrId); tTrace("%s pConn:%p, idle timer:%p not processed", pRpc->label, pConn, tmrId);
} }
pthread_mutex_unlock(&pRpc->mutex);
} }
static void rpcProcessProgressTimer(void *param, void *tmrId) { static void rpcProcessProgressTimer(void *param, void *tmrId) {
...@@ -1021,22 +997,27 @@ static void rpcProcessProgressTimer(void *param, void *tmrId) { ...@@ -1021,22 +997,27 @@ static void rpcProcessProgressTimer(void *param, void *tmrId) {
pthread_mutex_unlock(&pRpc->mutex); pthread_mutex_unlock(&pRpc->mutex);
} }
static void rpcFreeMsg(void *msg) { static void rpcFreeOutMsg(void *msg) {
if ( msg == NULL ) return; if ( msg == NULL ) return;
char *req = ((char *)msg) - sizeof(SRpcReqContext); char *req = ((char *)msg) - sizeof(SRpcReqContext);
free(req); free(req);
} }
typedef struct {
int32_t reserved;
int32_t contLen;
} SRpcComp;
static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) { static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) {
SRpcHeader *pHeader = rpcHeaderFromCont(pCont); SRpcHeader *pHeader = rpcHeaderFromCont(pCont);
int32_t overhead = sizeof(int32_t) * 2;
int32_t finalLen = 0; int32_t finalLen = 0;
int overhead = sizeof(SRpcComp);
if (!NEEDTO_COMPRESSS_MSG(contLen)) { if (!NEEDTO_COMPRESSS_MSG(contLen)) {
return contLen; return contLen;
} }
char *buf = malloc (contLen + overhead+8); // 16 extra bytes char *buf = malloc (contLen + overhead + 8); // 8 extra bytes
if (buf == NULL) { if (buf == NULL) {
tError("failed to allocate memory for rpc msg compression, contLen:%d, reason:%s", contLen, strerror(errno)); tError("failed to allocate memory for rpc msg compression, contLen:%d, reason:%s", contLen, strerror(errno));
return contLen; return contLen;
...@@ -1049,20 +1030,15 @@ static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) { ...@@ -1049,20 +1030,15 @@ static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) {
* The first four bytes is set to 0, the second four bytes are utilized to keep the original length of message * The first four bytes is set to 0, the second four bytes are utilized to keep the original length of message
*/ */
if (compLen < contLen - overhead) { if (compLen < contLen - overhead) {
//tDump(pCont, contLen); SRpcComp *pComp = (SRpcComp *)pCont;
int32_t *pLen = (int32_t *)pCont; pComp->reserved = 0;
pComp->contLen = htonl(contLen);
*pLen = 0; // first 4 bytes must be zero
pLen = (int32_t *)(pCont + sizeof(int32_t));
*pLen = htonl(contLen); // contLen is encoded in second 4 bytes
memcpy(pCont + overhead, buf, compLen); memcpy(pCont + overhead, buf, compLen);
pHeader->comp = 1; pHeader->comp = 1;
tTrace("compress rpc msg, before:%d, after:%d", contLen, compLen); tTrace("compress rpc msg, before:%d, after:%d", contLen, compLen);
finalLen = compLen + overhead; finalLen = compLen + overhead;
//tDump(pCont, contLen);
} else { } else {
finalLen = contLen; finalLen = contLen;
} }
...@@ -1072,16 +1048,15 @@ static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) { ...@@ -1072,16 +1048,15 @@ static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) {
} }
static SRpcHeader *rpcDecompressRpcMsg(SRpcHeader *pHeader) { static SRpcHeader *rpcDecompressRpcMsg(SRpcHeader *pHeader) {
int overhead = sizeof(int32_t) * 2; int overhead = sizeof(SRpcComp);
SRpcHeader *pNewHeader = NULL; SRpcHeader *pNewHeader = NULL;
uint8_t *pCont = pHeader->content; uint8_t *pCont = pHeader->content;
SRpcComp *pComp = (SRpcComp *)pHeader->content;
if (pHeader->comp) { if (pHeader->comp) {
// decompress the content // decompress the content
assert(GET_INT32_VAL(pHeader->content) == 0); assert(pComp->reserved == 0);
int contLen = htonl(pComp->contLen);
// contLen is original message length before compression applied
int contLen = htonl(GET_INT32_VAL(pCont + sizeof(int32_t)));
// prepare the temporary buffer to decompress message // prepare the temporary buffer to decompress message
char *buf = rpcMallocCont(contLen); char *buf = rpcMallocCont(contLen);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册