提交 005a7fda 编写于 作者: 陶建辉(Jeff)'s avatar 陶建辉(Jeff)

fix the compiling errors

上级 af773a37
...@@ -43,7 +43,7 @@ typedef struct { ...@@ -43,7 +43,7 @@ typedef struct {
uint16_t localPort; // local port uint16_t localPort; // local port
char *label; // for debug purpose char *label; // for debug purpose
int numOfThreads; // number of threads to handle connections int numOfThreads; // number of threads to handle connections
void *(*fp)(char type, char *pCont, int contLen, void *handle, int index); // function to process the incoming msg void *(*fp)(char type, void *pCont, int contLen, void *handle, int index); // function to process the incoming msg
int sessions; // number of sessions allowed int sessions; // number of sessions allowed
int connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS int connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS
int idleTime; // milliseconds, 0 means idle timer is disabled int idleTime; // milliseconds, 0 means idle timer is disabled
...@@ -58,13 +58,14 @@ typedef struct { ...@@ -58,13 +58,14 @@ typedef struct {
typedef struct { typedef struct {
int16_t index; int16_t index;
int16_t numOfIps; int16_t numOfIps;
uint32_t ip[TSDB_MAX_REPLICA]; uint16_t port;
char ipStr[TSDB_MAX_MPEERS][40];
} SRpcIpSet; } SRpcIpSet;
void *rpcOpen(SRpcInit *pRpc); void *rpcOpen(SRpcInit *pRpc);
void rpcClose(void *); void rpcClose(void *);
char *rpcMallocCont(int contLen); void *rpcMallocCont(int contLen);
void rpcFreeCont(char *pCont); void rpcFreeCont(void *pCont);
void rpcSendRequest(void *thandle, SRpcIpSet ipSet, char msgType, void *pCont, int contLen, void *ahandle); void rpcSendRequest(void *thandle, SRpcIpSet ipSet, char msgType, void *pCont, int contLen, void *ahandle);
void rpcSendResponse(void *pConn, void *pCont, int contLen); void rpcSendResponse(void *pConn, void *pCont, int contLen);
void rpcSendSimpleRsp(void *pConn, int32_t code); void rpcSendSimpleRsp(void *pConn, int32_t code);
......
...@@ -18,10 +18,10 @@ ...@@ -18,10 +18,10 @@
#include "tglobalcfg.h" #include "tglobalcfg.h"
#include "tlog.h" #include "tlog.h"
#include "tmempool.h" #include "tmempool.h"
#include "tsclient.h"
#include "ttime.h" #include "ttime.h"
#include "ttimer.h" #include "ttimer.h"
#include "tutil.h" #include "tutil.h"
#include "tcache.h"
typedef struct _c_hash_t { typedef struct _c_hash_t {
uint32_t ip; uint32_t ip;
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
*/ */
#include "os.h" #include "os.h"
#include "tcache.h"
#include "shash.h" #include "shash.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "tidpool.h" #include "tidpool.h"
...@@ -21,7 +22,6 @@ ...@@ -21,7 +22,6 @@
#include "tmd5.h" #include "tmd5.h"
#include "tmempool.h" #include "tmempool.h"
#include "trpc.h" #include "trpc.h"
#include "tsdb.h"
#include "tsocket.h" #include "tsocket.h"
#include "ttcpclient.h" #include "ttcpclient.h"
#include "ttcpserver.h" #include "ttcpserver.h"
...@@ -32,7 +32,7 @@ ...@@ -32,7 +32,7 @@
#include "lz4.h" #include "lz4.h"
#define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHeader) + sizeof(SRpcDigest)) #define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHeader) + sizeof(SRpcDigest))
#define rpcHeaderFromCont(cont) ((STaosHeader *) (cont - sizeof(SRpcHeader))) #define rpcHeaderFromCont(cont) ((SRpcHeader *) (cont - sizeof(SRpcHeader)))
#define rpcContFromHeader(msg) ( msg + sizeof(SRpcHeader)) #define rpcContFromHeader(msg) ( msg + sizeof(SRpcHeader))
#define rpcMsgLenFromCont(contLen) ( contLen + sizeof(SRpcHeader)) #define rpcMsgLenFromCont(contLen) ( contLen + sizeof(SRpcHeader))
#define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHeader)) #define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHeader))
...@@ -43,7 +43,9 @@ typedef struct { ...@@ -43,7 +43,9 @@ typedef struct {
int numOfThreads; int numOfThreads;
int type; int type;
int idleTime; // milliseconds; int idleTime; // milliseconds;
uint32_t localIp;
uint16_t localPort; uint16_t localPort;
int connType;
char label[12]; char label[12];
char *meterId; // meter ID char *meterId; // meter ID
...@@ -52,9 +54,9 @@ typedef struct { ...@@ -52,9 +54,9 @@ typedef struct {
char *secret; // key for authentication char *secret; // key for authentication
char *ckey; // ciphering key char *ckey; // ciphering key
void *(*fp)(char *, void *ahandle, void *thandle); // FP to call the application void *(*fp)(char type, void *pCont, int contLen, void *handle, int index);
int (*afp)(char *meterId, char *spi, char *encrypt, uint8_t *secret, uint8_t *ckey); // FP to retrieve auth info int (*afp)(char *meterId, char *spi, char *encrypt, uint8_t *secret, uint8_t *ckey); // FP to retrieve auth info
SRpcConn *connList; struct _RpcConn *connList;
void *idPool; void *idPool;
void *tmrCtrl; void *tmrCtrl;
void *hash; void *hash;
...@@ -64,6 +66,18 @@ typedef struct { ...@@ -64,6 +66,18 @@ typedef struct {
} SRpcInfo; } SRpcInfo;
typedef struct { typedef struct {
SRpcIpSet ipSet;
void *ahandle;
SRpcInfo *pRpc;
char msgType;
char *pCont;
int contLen;
int numOfRetry;
int32_t code;
char msg[];
} SRpcReqContext;
typedef struct _RpcConn {
void *signature; void *signature;
int sid; // session ID int sid; // session ID
uint32_t ownId; // own link ID uint32_t ownId; // own link ID
...@@ -94,19 +108,9 @@ typedef struct { ...@@ -94,19 +108,9 @@ typedef struct {
char *pReqMsg; // including header char *pReqMsg; // including header
int reqMsgLen; int reqMsgLen;
SRpcInfo *pRpc; SRpcInfo *pRpc;
SRpcReqContext *pContext;
} SRpcConn; } SRpcConn;
typedef struct {
SRpcIpSet ipSet;
void *ahandle;
SRpcInfo *pRpc;
char type;
char *pCont;
int contLen;
int numOfRetry;
char msg[];
} SRpcReqContext;
typedef struct { typedef struct {
char version : 4; char version : 4;
char comp : 4; char comp : 4;
...@@ -173,15 +177,18 @@ void (*taosCloseConn[])(void *chandle) = { ...@@ -173,15 +177,18 @@ void (*taosCloseConn[])(void *chandle) = {
taosCloseTcpClientConnection taosCloseTcpClientConnection
}; };
int rpcResendRspToPeer(SRpcConn *pConn); static void rpcProcessRetryTimer(void *, void *);
void rpcProcessRetryTimer(void *, void *); static void *rpcProcessDataFromPeer(void *data, int dataLen, uint32_t ip, uint16_t port, void *shandle, void *chandle);
void *rpcProcessDataFromPeer(char *data, int dataLen, uint32_t ip, uint16_t port, void *shandle, void *thandle, void *chandle); static void rpcSendDataToPeer(SRpcConn *pConn, void *data, int dataLen);
int rpcSendDataToPeer(SRpcConn *pConn, char *data, int dataLen); static int rpcAuthenticateMsg(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey);
int rpcAuthenticateMsg(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey); static int rpcBuildAuthHeader(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey);
int rpcBuildAuthHeader(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey); static void rpcCloseConn(void *thandle);
static int rpcGetConn(int sid, char *meterId, SRpcInfo *pRpc, SRpcConn **ppConn, char req, char *hashstr);
static void rpcProcessConnError(void *param, void *id);
static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHeader *pHeader);
static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) { static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) {
SRpcHeader* pHeader = rpcHeaderFromCont(pCont); SRpcHeader *pHeader = rpcHeaderFromCont(pCont);
int32_t overhead = sizeof(int32_t) * 2; int32_t overhead = sizeof(int32_t) * 2;
int32_t finalLen = 0; int32_t finalLen = 0;
...@@ -227,7 +234,7 @@ static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) { ...@@ -227,7 +234,7 @@ static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) {
static SRpcHeader *rpcDecompressRpcMsg(SRpcHeader *pHeader) { static SRpcHeader *rpcDecompressRpcMsg(SRpcHeader *pHeader) {
int overhead = sizeof(int32_t) * 2; int overhead = sizeof(int32_t) * 2;
SRpcHeader *pNewHeader = NULL; SRpcHeader *pNewHeader = NULL;
char *pCont = pHeader->content; uint8_t *pCont = pHeader->content;
if (pHeader->comp) { if (pHeader->comp) {
// decompress the content // decompress the content
...@@ -257,7 +264,7 @@ static SRpcHeader *rpcDecompressRpcMsg(SRpcHeader *pHeader) { ...@@ -257,7 +264,7 @@ static SRpcHeader *rpcDecompressRpcMsg(SRpcHeader *pHeader) {
return pHeader; return pHeader;
} }
char *rpcMallocCont(int size) { void *rpcMallocCont(int size) {
char *pMsg = NULL; char *pMsg = NULL;
size += RPC_MSG_OVERHEAD; size += RPC_MSG_OVERHEAD;
...@@ -270,17 +277,17 @@ char *rpcMallocCont(int size) { ...@@ -270,17 +277,17 @@ char *rpcMallocCont(int size) {
return pMsg + sizeof(SRpcReqContext) + sizeof(SRpcHeader); return pMsg + sizeof(SRpcReqContext) + sizeof(SRpcHeader);
} }
void rpcFreeCont(char *cont) { void rpcFreeCont(void *cont) {
char *msg = cont - sizeof(SRpcHeader); char *msg = ((char *)cont) - sizeof(SRpcHeader);
free(msg); free(msg);
} }
static void rpcFreeMsg(char *msg) { static void rpcFreeMsg(void *msg) {
msg -= sizeof(SRpcReqContext); char *req = ((char *)msg) - sizeof(SRpcReqContext);
free(msg); free(req);
} }
void rpcSendSimpleRsp(void *thandle, int_32 code) { void rpcSendSimpleRsp(void *thandle, int32_t code) {
char *pMsg; char *pMsg;
STaosRsp *pRsp; STaosRsp *pRsp;
int msgLen = sizeof(STaosRsp); int msgLen = sizeof(STaosRsp);
...@@ -296,7 +303,7 @@ void rpcSendSimpleRsp(void *thandle, int_32 code) { ...@@ -296,7 +303,7 @@ void rpcSendSimpleRsp(void *thandle, int_32 code) {
pRsp = (STaosRsp *)pMsg; pRsp = (STaosRsp *)pMsg;
pRsp->code = htonl(code); pRsp->code = htonl(code);
taosSendResponse(thandle, pMsg, msgLen); rpcSendResponse(thandle, pMsg, msgLen);
return; return;
} }
...@@ -305,6 +312,7 @@ static void rpcSendQuickRsp(SRpcConn *pConn, char code) { ...@@ -305,6 +312,7 @@ static void rpcSendQuickRsp(SRpcConn *pConn, char code) {
char msg[RPC_MSG_OVERHEAD + sizeof(STaosRsp)]; char msg[RPC_MSG_OVERHEAD + sizeof(STaosRsp)];
SRpcHeader *pHeader; SRpcHeader *pHeader;
int msgLen; int msgLen;
STaosRsp *pRsp;
pRsp = (STaosRsp *)rpcContFromHeader(msg); pRsp = (STaosRsp *)rpcContFromHeader(msg);
pRsp->code = htonl(code); pRsp->code = htonl(code);
...@@ -324,7 +332,7 @@ static void rpcSendQuickRsp(SRpcConn *pConn, char code) { ...@@ -324,7 +332,7 @@ static void rpcSendQuickRsp(SRpcConn *pConn, char code) {
pHeader->uid = 0; pHeader->uid = 0;
memcpy(pHeader->meterId, pConn->meterId, tListLen(pHeader->meterId)); memcpy(pHeader->meterId, pConn->meterId, tListLen(pHeader->meterId));
rpcSendDataToPeer(pConn, (char *)msg, msgLen); rpcSendDataToPeer(pConn, msg, msgLen);
} }
void *rpcOpen(SRpcInit *pInit) { void *rpcOpen(SRpcInit *pInit) {
...@@ -347,54 +355,54 @@ void *rpcOpen(SRpcInit *pInit) { ...@@ -347,54 +355,54 @@ void *rpcOpen(SRpcInit *pInit) {
pRpc->localPort = pInit->localPort; pRpc->localPort = pInit->localPort;
pRpc->afp = pInit->afp; pRpc->afp = pInit->afp;
pRpc->sessions = pInit->session; pRpc->sessions = pInit->sessions;
strcpy(pRpc->meterId, pInit->meterId); strcpy(pRpc->meterId, pInit->meterId);
pRpc->spi = pInit->spi; pRpc->spi = pInit->spi;
strcpy(pRpc->secret, pInit->secret); strcpy(pRpc->secret, pInit->secret);
strcpy(pRpc->ckey, pInit->ckey); strcpy(pRpc->ckey, pInit->ckey);
pRpc->afp = pInit->afp; pRpc->afp = pInit->afp;
pRpc->shandle = (*taosInitConn[pRpc->connType])(pRpc->localIp, pRpc->localPort, pRpc->label, pRpc->numOfThreads, pRpc->shandle = (*taosInitConn[pRpc->connType])(pRpc->localIp, pRpc->localPort, pRpc->label,
taosProcessDataFromPeer, pRpc); pRpc->numOfThreads, rpcProcessDataFromPeer, pRpc);
if (pRpc->shandle == NULL) { if (pRpc->shandle == NULL) {
tError("%s failed to init network, %s:%d", pRpc->label, pRpc->localIp, pRpc->localPort); tError("%s failed to init network, %s:%d", pRpc->label, pRpc->localIp, pRpc->localPort);
taosCloseRpc(pRpc); rpcClose(pRpc);
return NULL; return NULL;
} }
size_t size = sizeof(SRpcConn) * sessions; size_t size = sizeof(SRpcConn) * pRpc->sessions;
pRpc->connList = (SRpcConn *)calloc(1, size); pRpc->connList = (SRpcConn *)calloc(1, size);
if (pRpc->connList == NULL) { if (pRpc->connList == NULL) {
tError("%s failed to allocate memory for taos connections, size:%d", pRpc->label, size); tError("%s failed to allocate memory for taos connections, size:%d", pRpc->label, size);
taosCloseRpc(pRpc); rpcClose(pRpc);
return NULL; return NULL;
} }
pRpc->idPool = taosInitIdPool(sessions); pRpc->idPool = taosInitIdPool(pRpc->sessions);
if (pRpc->idPool == NULL) { if (pRpc->idPool == NULL) {
tError("%s failed to init ID pool", pRpc->label); tError("%s failed to init ID pool", pRpc->label);
taosCloseRpc(pRpc); rpcClose(pRpc);
return NULL; return NULL;
} }
pRpc->tmrCtrl = taosTmrInit(sessions * 2 + 1, 50, 10000, pRpc->label); pRpc->tmrCtrl = taosTmrInit(pRpc->sessions*2 + 1, 50, 10000, pRpc->label);
if (pRpc->tmrCtrl == NULL) { if (pRpc->tmrCtrl == NULL) {
tError("%s failed to init timers", pRpc->label); tError("%s failed to init timers", pRpc->label);
taosCloseRpc(pRpc); rpcClose(pRpc);
return NULL; return NULL;
} }
pRpc->hash = taosInitStrHash(sessions, sizeof(pRpc), taosHashString); pRpc->hash = taosInitStrHash(pRpc->sessions, sizeof(pRpc), taosHashString);
if (pRpc->hash == NULL) { if (pRpc->hash == NULL) {
tError("%s failed to init string hash", pRpc->label); tError("%s failed to init string hash", pRpc->label);
taosCloseRpc(pRpc); rpcClose(pRpc);
return NULL; return NULL;
} }
pRpc->pCahche = taosOpenConnCache(pRpc->sessions, rpcCloseConn, pRpc->tmrCtrl, tsShellActivityTimer*1000); pRpc->pCache = taosOpenConnCache(pRpc->sessions, rpcCloseConn, pRpc->tmrCtrl, tsShellActivityTimer*1000);
if ( pRpc->pCache == NULL ) { if ( pRpc->pCache == NULL ) {
tError("%s failed to init connection cache", pRpc->label); tError("%s failed to init connection cache", pRpc->label);
taosCloseRpc(pRpc); rpcClose(pRpc);
return NULL; return NULL;
} }
...@@ -411,8 +419,8 @@ void rpcClose(void *param) { ...@@ -411,8 +419,8 @@ void rpcClose(void *param) {
(*taosCleanUpConn[pRpc->type])(pRpc->shandle); (*taosCleanUpConn[pRpc->type])(pRpc->shandle);
for (int i = 0; i < pRpc->sessions; ++i) { for (int i = 0; i < pRpc->sessions; ++i) {
if (pRpc->connList[i].signature != NULL) { if (pRpc->connList[i].meterId[0]) {
taosCloseRpcConn((void *)(pRpc->connList + i)); rpcCloseConn((void *)(pRpc->connList + i));
} }
} }
...@@ -426,32 +434,30 @@ void rpcClose(void *param) { ...@@ -426,32 +434,30 @@ void rpcClose(void *param) {
tfree(pRpc); tfree(pRpc);
} }
static SRpcConn *rpcOpenConn(SRpcConnInit *pInit) { static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerIpStr, uint16_t peerPort) {
SRpcConn *pConn; SRpcConn *pConn;
SRpcInfo *pRpc = (SRpcInfo *)pInit->shandle;
if ( (uint8_t)(rpcGetConn(pInit->sid, pInit->meterId, pRpc, &pConn, 1, NULL)) != 0 ) if ( (uint8_t)(rpcGetConn(0, pRpc->meterId, pRpc, &pConn, 1, NULL)) != 0 )
return NULL; return NULL;
if (pConn->peerId == 0) pConn->peerId = pRpc->peerId; strcpy(pConn->peerIpstr, peerIpStr);
strcpy(pConn->peerIpstr, pInit->peerIp); pConn->peerIp = inet_addr(peerIpStr);
pConn->peerIp = inet_addr(pInit->peerIp); pConn->peerPort = peerPort;
pConn->peerPort = pInit->peerPort;
pConn->ahandle = pInit->ahandle;
pConn->spi = pRpc->spi; pConn->spi = pRpc->spi;
pConn->encrypt = pRpc->encrypt; pConn->encrypt = pRpc->encrypt;
if (pConn->spi) memcpy(pConn->secret, pRpc->secret, TSDB_KEY_LEN); if (pConn->spi) memcpy(pConn->secret, pRpc->secret, TSDB_KEY_LEN);
strcpy(pConn->meterId, pRpc->meterId);
// if it is client, it shall set up connection first // if it is client, it shall set up connection first
if (taosOpenConn[pRpc->type]) { if (taosOpenConn[pRpc->type]) {
pConn->chandle = (*taosOpenConn[pRpc->type])(pRpc->shandle, pConn, pConn->peerIpstr, pConn->peerPort); pConn->chandle = (*taosOpenConn[pRpc->type])(pRpc->shandle, pConn, pConn->peerIpstr, pConn->peerPort);
if (pConn->chandle) { if (pConn->chandle) {
tTrace("%s pConn:%p, rpc connection is set up, sid:%d id:%s ip:%s:%hu localPort:%d", pRpc->label, tTrace("%s pConn:%p, rpc connection is set up, sid:%d id:%s ip:%s:%hu localPort:%d", pRpc->label,
pConn, pConn->sid, pInit->meterId, pConn->peerIpstr, pConn->peerPort, pConn->localPort); pConn, pConn->sid, pRpc->meterId, pConn->peerIpstr, pConn->peerPort, pConn->localPort);
} else { } else {
tError("%s pConn:%p, failed to set up nw connection to ip:%s:%hu", pRpc->label, pConn, tError("%s pConn:%p, failed to set up nw connection to ip:%s:%hu", pRpc->label, pConn,
pConn->sid, pInit->meterId, pConn->peerIpstr, pConn->peerPort); pConn->sid, pRpc->meterId, pConn->peerIpstr, pConn->peerPort);
terrorno = TSDB_CODE_NETWORK_UNAVAIL; terrno = TSDB_CODE_NETWORK_UNAVAIL;
rpcCloseConn(pConn); rpcCloseConn(pConn);
pConn = NULL; pConn = NULL;
} }
...@@ -462,32 +468,28 @@ static SRpcConn *rpcOpenConn(SRpcConnInit *pInit) { ...@@ -462,32 +468,28 @@ static SRpcConn *rpcOpenConn(SRpcConnInit *pInit) {
static void rpcCloseConn(void *thandle) { static void rpcCloseConn(void *thandle) {
SRpcConn *pConn = (SRpcConn *)thandle; SRpcConn *pConn = (SRpcConn *)thandle;
if (pConn == NULL) return; assert(pConn);
SRpcInfo *pRpc = pConn->pRpc; SRpcInfo *pRpc = pConn->pRpc;
if (pConn->signature != thandle || pRpc == NULL) return; assert(pRpc);
pthread_mutex_lock(&pRpc->mutex); pthread_mutex_lock(&pRpc->mutex);
pConn->signature = NULL;
if (taosCloseConn[pRpc->type]) (*taosCloseConn[pRpc->type])(pConn->chandle); if (taosCloseConn[pRpc->type]) (*taosCloseConn[pRpc->type])(pConn->chandle);
taosTmrStopA(&pConn->pTimer); taosTmrStopA(&pConn->pTimer);
taosTmrStopA(&pConn->pIdleTimer); taosTmrStopA(&pConn->pIdleTimer);
rpcFreeMsg(pConn->pRspMsg); rpcFreeMsg(pConn->pRspMsg);
rpcFreeMsg(pConn-pReqMsg); rpcFreeMsg(pConn->pReqMsg);
char hashstr[40] = {0}; char hashstr[40] = {0};
sprintf(hashstr, "%x:%x:%x", pConn->peerIp, pConn->peerUid, pConn->peerId); sprintf(hashstr, "%x:%x:%x", pConn->peerIp, pConn->peerUid, pConn->peerId);
taosDeleteStrHash(pRpc->hash, hashstr); taosDeleteStrHash(pRpc->hash, hashstr);
tTrace("%s pConn:%p, TAOS connection closed", pRpc->label, pConn->sid, if (pRpc->idPool) taosFreeId(pRpc->idPool, pConn->sid);
pConn->meterId, pConn);
int freeId = pConn->sid;
memset(pConn, 0, sizeof(SRpcConn));
if (pRpc->idPool) taosFreeId(pRpc->idPool, freeId); tTrace("%s pConn:%p, TAOS connection closed", pRpc->label, pConn);
memset(pConn, 0, sizeof(SRpcConn));
pthread_mutex_unlock(&pRpc->mutex); pthread_mutex_unlock(&pRpc->mutex);
} }
...@@ -495,8 +497,6 @@ static void rpcCloseConn(void *thandle) { ...@@ -495,8 +497,6 @@ static void rpcCloseConn(void *thandle) {
static int rpcGetConn(int sid, char *meterId, SRpcInfo *pRpc, SRpcConn **ppConn, char req, char *hashstr) { static int rpcGetConn(int sid, char *meterId, SRpcInfo *pRpc, SRpcConn **ppConn, char req, char *hashstr) {
SRpcConn * pConn = NULL; SRpcConn * pConn = NULL;
if (pRpc == NULL) return -1;
if (sid == 0) { if (sid == 0) {
if (req) { if (req) {
int osid = sid; int osid = sid;
...@@ -513,14 +513,13 @@ static int rpcGetConn(int sid, char *meterId, SRpcInfo *pRpc, SRpcConn **ppConn, ...@@ -513,14 +513,13 @@ static int rpcGetConn(int sid, char *meterId, SRpcInfo *pRpc, SRpcConn **ppConn,
} }
} else { } else {
sid = pConn->sid; sid = pConn->sid;
tTrace("%s sid:%d id:%s, session is already there", pRpc->label, pConn->sid, tTrace("%s sid:%d id:%s, session is already there", pRpc->label, pConn->sid, pConn->meterId);
pConn->meterId);
} }
} else { } else {
return TSDB_CODE_UNEXPECTED_RESPONSE; return TSDB_CODE_UNEXPECTED_RESPONSE;
} }
} else { } else {
if (pRpc->connList[sid].signature == NULL) { if (pRpc->connList[sid].meterId[0] == 0) {
tError("%s sid:%d session is already released", pRpc->label, sid); tError("%s sid:%d session is already released", pRpc->label, sid);
return TSDB_CODE_INVALID_VALUE; return TSDB_CODE_INVALID_VALUE;
} }
...@@ -528,9 +527,8 @@ static int rpcGetConn(int sid, char *meterId, SRpcInfo *pRpc, SRpcConn **ppConn, ...@@ -528,9 +527,8 @@ static int rpcGetConn(int sid, char *meterId, SRpcInfo *pRpc, SRpcConn **ppConn,
pConn = pRpc->connList + sid; pConn = pRpc->connList + sid;
if (pConn->signature == NULL) { if (pConn->meterId[0] == 0) {
memset(pConn, 0, sizeof(SRpcConn)); memset(pConn, 0, sizeof(SRpcConn));
pConn->signature = pConn;
memcpy(pConn->meterId, meterId, tListLen(pConn->meterId)); memcpy(pConn->meterId, meterId, tListLen(pConn->meterId));
pConn->pRpc = pRpc; pConn->pRpc = pRpc;
pConn->sid = sid; pConn->sid = sid;
...@@ -546,13 +544,6 @@ static int rpcGetConn(int sid, char *meterId, SRpcInfo *pRpc, SRpcConn **ppConn, ...@@ -546,13 +544,6 @@ static int rpcGetConn(int sid, char *meterId, SRpcInfo *pRpc, SRpcConn **ppConn,
} }
} }
if ((pRpc->type == TAOS_CONN_UDPC || pRpc->type == TAOS_CONN_UDPS) && pRpc->numOfThreads > 1 &&
pRpc->localPort) {
// UDP server, assign to new connection
pRpc->index = (pRpc->index + 1) % pRpc->numOfThreads;
pConn->localPort = (int16_t)(pRpc->localPort + pRpc->index);
}
taosAddStrHash(pRpc->hash, hashstr, (char *)&pConn); taosAddStrHash(pRpc->hash, hashstr, (char *)&pConn);
tTrace("%s pConn:%p, TAOS connection is allocated, sid:%d id:%s", pRpc->label, pConn, sid); tTrace("%s pConn:%p, TAOS connection is allocated, sid:%d id:%s", pRpc->label, pConn, sid);
} else { } else {
...@@ -569,8 +560,8 @@ static int rpcGetConn(int sid, char *meterId, SRpcInfo *pRpc, SRpcConn **ppConn, ...@@ -569,8 +560,8 @@ static int rpcGetConn(int sid, char *meterId, SRpcInfo *pRpc, SRpcConn **ppConn,
static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) { static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) {
SRpcHeader *pHeader = (SRpcHeader *)msg; SRpcHeader *pHeader = (SRpcHeader *)msg;
SRpcInfo *pRpc = pConn->pRpc; SRpcInfo *pRpc = pConn->pRpc;
int code = 0; int code = 0;
if (pConn->spi == 0 ) return 0; if (pConn->spi == 0 ) return 0;
...@@ -586,10 +577,7 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) { ...@@ -586,10 +577,7 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) {
delta, htonl(pDigest->timeStamp)); delta, htonl(pDigest->timeStamp));
code = TSDB_CODE_INVALID_TIME_STAMP; code = TSDB_CODE_INVALID_TIME_STAMP;
} else { } else {
if (rpcAuthenticateMsg((uint8_t *)pHeader, dataLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret) < 0) { if (rpcAuthenticateMsg((uint8_t *)pHeader, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret) < 0) {
char ipstr[24];
tinet_ntoa(ipstr, ip);
mLError("id:%s from %s, authentication failed", pHeader->meterId, ipstr);
tError("%s pConn:%p, authentication failed, msg discarded", pRpc->label, pConn); tError("%s pConn:%p, authentication failed, msg discarded", pRpc->label, pConn);
code = TSDB_CODE_AUTH_FAILURE; code = TSDB_CODE_AUTH_FAILURE;
} else { } else {
...@@ -608,7 +596,7 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) { ...@@ -608,7 +596,7 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) {
} }
static int rpcProcessReqHeader(SRpcConn *pConn, SRpcHeader *pHeader) { static int rpcProcessReqHeader(SRpcConn *pConn, SRpcHeader *pHeader) {
int code = 0; SRpcInfo *pRpc= pConn->pRpc;
if (pConn->peerId == 0) { if (pConn->peerId == 0) {
pConn->peerId = pHeader->sourceId; pConn->peerId = pHeader->sourceId;
...@@ -627,7 +615,7 @@ static int rpcProcessReqHeader(SRpcConn *pConn, SRpcHeader *pHeader) { ...@@ -627,7 +615,7 @@ static int rpcProcessReqHeader(SRpcConn *pConn, SRpcHeader *pHeader) {
} else if (pConn->inType == 0) { } else if (pConn->inType == 0) {
tTrace("%s pConn:%p, %s is already processed, tranId:%d", pRpc->label, pConn, tTrace("%s pConn:%p, %s is already processed, tranId:%d", pRpc->label, pConn,
taosMsg[pHeader->msgType], pConn->inTranId); taosMsg[pHeader->msgType], pConn->inTranId);
rpcResendRspToPeer(pConn); rpcSendDataToPeer(pConn, pConn->pRspMsg, pConn->rspMsgLen); // resend the response
} else { } else {
tTrace("%s pConn:%p, mismatched message %s and tranId", pRpc->label, pConn, taosMsg[pHeader->msgType]); tTrace("%s pConn:%p, mismatched message %s and tranId", pRpc->label, pConn, taosMsg[pHeader->msgType]);
} }
...@@ -637,8 +625,8 @@ static int rpcProcessReqHeader(SRpcConn *pConn, SRpcHeader *pHeader) { ...@@ -637,8 +625,8 @@ static int rpcProcessReqHeader(SRpcConn *pConn, SRpcHeader *pHeader) {
} }
if (pConn->inType != 0) { if (pConn->inType != 0) {
tTrace("%s pConn:%p, last session is not finished, inTranId:%d tranId:%d", pRpc->label, pConn, tTrace("%s pConn:%p, last session is not finished, inTranId:%d tranId:%d", pRpc->label, pConn,
pConn->inTranId, pHeader->tranId); pConn->inTranId, pHeader->tranId);
return TSDB_CODE_LAST_SESSION_NOT_FINISHED; return TSDB_CODE_LAST_SESSION_NOT_FINISHED;
} }
...@@ -649,6 +637,7 @@ static int rpcProcessReqHeader(SRpcConn *pConn, SRpcHeader *pHeader) { ...@@ -649,6 +637,7 @@ static int rpcProcessReqHeader(SRpcConn *pConn, SRpcHeader *pHeader) {
} }
static int rpcProcessRspHeader(SRpcConn *pConn, SRpcHeader *pHeader) { static int rpcProcessRspHeader(SRpcConn *pConn, SRpcHeader *pHeader) {
SRpcInfo *pRpc = pConn->pRpc;
pConn->peerId = pHeader->sourceId; pConn->peerId = pHeader->sourceId;
if (pConn->outType == 0 || pConn->pContext == NULL) { if (pConn->outType == 0 || pConn->pContext == NULL) {
...@@ -664,7 +653,7 @@ static int rpcProcessRspHeader(SRpcConn *pConn, SRpcHeader *pHeader) { ...@@ -664,7 +653,7 @@ static int rpcProcessRspHeader(SRpcConn *pConn, SRpcHeader *pHeader) {
} }
if (*pHeader->content == TSDB_CODE_NOT_READY) { if (*pHeader->content == TSDB_CODE_NOT_READY) {
return = TSDB_CODE_ALREADY_PROCESSED; return TSDB_CODE_ALREADY_PROCESSED;
} }
taosTmrStopA(&pConn->pTimer); taosTmrStopA(&pConn->pTimer);
...@@ -685,18 +674,18 @@ static int rpcProcessRspHeader(SRpcConn *pConn, SRpcHeader *pHeader) { ...@@ -685,18 +674,18 @@ static int rpcProcessRspHeader(SRpcConn *pConn, SRpcHeader *pHeader) {
pConn->tretry = 0; pConn->tretry = 0;
pConn->outType = 0; pConn->outType = 0;
pConn->pReqMsg = NULL; pConn->pReqMsg = NULL;
pConn->pReqMsgLen = 0; pConn->reqMsgLen = 0;
} }
static int rpcProcessHeader(SRpcHeader *pHeader, SRpcConn **ppConn, SRpcInfo *pRpc, int dataLen, uint32_t ip, static int rpcProcessHeader(SRpcInfo *pRpc, SRpcConn **ppConn, void *data, int dataLen, uint32_t ip) {
uint16_t port, void *chandle) { int32_t sid, code = 0;
int sid, code = 0;
SRpcConn * pConn = NULL; SRpcConn * pConn = NULL;
int msgLen;
char hashstr[40] = {0}; char hashstr[40] = {0};
*ppConn = NULL; *ppConn = NULL;
uint32_t sid = htonl(pHeader->destId); SRpcHeader *pHeader = (SRpcHeader *)data;
sid = htonl(pHeader->destId);
if (pHeader->msgType >= TSDB_MSG_TYPE_MAX || pHeader->msgType <= 0) { if (pHeader->msgType >= TSDB_MSG_TYPE_MAX || pHeader->msgType <= 0) {
tTrace("%s sid:%d, invalid message type:%d", pRpc->label, sid, pHeader->msgType); tTrace("%s sid:%d, invalid message type:%d", pRpc->label, sid, pHeader->msgType);
...@@ -706,7 +695,7 @@ static int rpcProcessHeader(SRpcHeader *pHeader, SRpcConn **ppConn, SRpcInfo *pR ...@@ -706,7 +695,7 @@ static int rpcProcessHeader(SRpcHeader *pHeader, SRpcConn **ppConn, SRpcInfo *pR
pHeader->msgLen = (int32_t)htonl((uint32_t)pHeader->msgLen); pHeader->msgLen = (int32_t)htonl((uint32_t)pHeader->msgLen);
if (dataLen != pHeader->msgLen) { if (dataLen != pHeader->msgLen) {
tTrace("%s sid:%d, %s has invalid length, dataLen:%d, msgLen:%d", pRpc->label, sid, tTrace("%s sid:%d, %s has invalid length, dataLen:%d, msgLen:%d", pRpc->label, sid,
taosMsg[pHeader->msgType], dataLen, msgLen); taosMsg[pHeader->msgType], dataLen, pHeader->msgLen);
return TSDB_CODE_INVALID_MSG_LEN; return TSDB_CODE_INVALID_MSG_LEN;
} }
...@@ -724,18 +713,7 @@ static int rpcProcessHeader(SRpcHeader *pHeader, SRpcConn **ppConn, SRpcInfo *pR ...@@ -724,18 +713,7 @@ static int rpcProcessHeader(SRpcHeader *pHeader, SRpcConn **ppConn, SRpcInfo *pR
*ppConn = pConn; *ppConn = pConn;
sid = pConn->sid; sid = pConn->sid;
if (pConn->peerIp != ip) {
pConn->peerIp = ip;
char ipstr[20] = {0};
tinet_ntoa(ipstr, ip);
strcpy(pConn->peerIpstr, ipstr);
}
if (pHeader->uid) pConn->peerUid = pHeader->uid; if (pHeader->uid) pConn->peerUid = pHeader->uid;
if (port) pConn->peerPort = port;
if (pHeader->port) // port maybe changed by the peer
pConn->peerPort = pHeader->port;
if (chandle) pConn->chandle = chandle;
if (pHeader->tcp) { if (pHeader->tcp) {
tTrace("%s pConn:%p, content will be transfered via TCP", pRpc->label, pConn); tTrace("%s pConn:%p, content will be transfered via TCP", pRpc->label, pConn);
...@@ -759,7 +737,7 @@ static int rpcProcessHeader(SRpcHeader *pHeader, SRpcConn **ppConn, SRpcInfo *pR ...@@ -759,7 +737,7 @@ static int rpcProcessHeader(SRpcHeader *pHeader, SRpcConn **ppConn, SRpcInfo *pR
return code; return code;
} }
void rpcSendErrorMsgToPeer(char *pMsg, int32 code, uint_32 ip, uint_16 port, void *chandle) { void rpcSendErrorMsgToPeer(SRpcInfo *pRpc, char *pMsg, int32_t code, uint32_t ip, uint16_t port, void *chandle) {
SRpcHeader *pRecvHeader, *pReplyHeader; SRpcHeader *pRecvHeader, *pReplyHeader;
char msg[sizeof(SRpcHeader) + sizeof(SRpcDigest) + sizeof(STaosRsp)]; char msg[sizeof(SRpcHeader) + sizeof(SRpcDigest) + sizeof(STaosRsp)];
STaosRsp *pRsp; STaosRsp *pRsp;
...@@ -793,7 +771,7 @@ void rpcSendErrorMsgToPeer(char *pMsg, int32 code, uint_32 ip, uint_16 port, voi ...@@ -793,7 +771,7 @@ void rpcSendErrorMsgToPeer(char *pMsg, int32 code, uint_32 ip, uint_16 port, voi
} }
pReplyHeader->msgLen = (int32_t)htonl((uint32_t)msgLen); pReplyHeader->msgLen = (int32_t)htonl((uint32_t)msgLen);
(*taosSendData[pRpc->type])(ip, port, pReply, msgLen, chandle); (*taosSendData[pRpc->type])(ip, port, msg, msgLen, chandle);
return; return;
} }
...@@ -815,26 +793,33 @@ void rpcProcessIdleTimer(void *param, void *tmrId) { ...@@ -815,26 +793,33 @@ void rpcProcessIdleTimer(void *param, void *tmrId) {
rpcCloseConn(pConn); rpcCloseConn(pConn);
} }
void *rpcProcessDataFromPeer(char *data, int dataLen, uint32_t ip, uint16_t port, void *shandle, void *thandle, static void *rpcProcessDataFromPeer(void *data, int dataLen, uint32_t ip, uint16_t port, void *shandle, void *chandle) {
void *chandle) {
SRpcHeader *pHeader = (SRpcHeader *)data; SRpcHeader *pHeader = (SRpcHeader *)data;
uint8_t code;
SRpcConn *pConn = (SRpcConn *)thandle;
SRpcInfo *pRpc = (SRpcInfo *)shandle; SRpcInfo *pRpc = (SRpcInfo *)shandle;
int msgLen; SRpcConn *pConn = NULL;
uint8_t code = 0;
tDump(data, dataLen); tDump(data, dataLen);
if (ip == 0 && taosCloseConn[pRpc->type] && pConn) { pthread_mutex_lock(&pRpc->mutex);
// it means the connection is broken, it only happens for TCP
tTrace("%s pConn:%p, underlying link is gone%p", pRpc->label, pConn); code = rpcProcessHeader(pRpc, &pConn, data, dataLen, ip);
pContext->terrno = TSDB_CODE_NETWORK_UNAVAIL;
taosTmrStart(taosProcessConnError, 0, pContext, pRpc->tmrCtrl); if (pConn) {
return NULL; // update connection info
pConn->chandle = chandle;
if (pConn->peerIp != ip) {
pConn->peerIp = ip;
char ipstr[20] = {0};
tinet_ntoa(ipstr, ip);
strcpy(pConn->peerIpstr, ipstr);
}
if (port) pConn->peerPort = port;
if (pHeader->port) // port maybe changed by the peer
pConn->peerPort = pHeader->port;
} }
pthread_mutex_lock(&pRpc->mutex);
code = rpcProcessHeader(pHeader, &pConn, pRpc, dataLen, ip, port, chandle);
pthread_mutex_unlock(&pRpc->mutex); pthread_mutex_unlock(&pRpc->mutex);
if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) { if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) {
...@@ -850,7 +835,7 @@ void *rpcProcessDataFromPeer(char *data, int dataLen, uint32_t ip, uint16_t port ...@@ -850,7 +835,7 @@ void *rpcProcessDataFromPeer(char *data, int dataLen, uint32_t ip, uint16_t port
if (code != TSDB_CODE_ALREADY_PROCESSED) { if (code != TSDB_CODE_ALREADY_PROCESSED) {
if (code != 0) { // parsing error if (code != 0) { // parsing error
if ( rpcIsReq(pHeader->msgType) ) { if ( rpcIsReq(pHeader->msgType) ) {
taosSendErrorMsgToPeer(data, code, ip, port, chandle); rpcSendErrorMsgToPeer(pRpc, data, code, ip, port, chandle);
tTrace("%s pConn:%p, %s is sent with error code:%u", pRpc->label, pConn, taosMsg[pHeader->msgType+1], code); tTrace("%s pConn:%p, %s is sent with error code:%u", pRpc->label, pConn, taosMsg[pHeader->msgType+1], code);
} }
} else { // parsing OK } else { // parsing OK
...@@ -862,17 +847,17 @@ void *rpcProcessDataFromPeer(char *data, int dataLen, uint32_t ip, uint16_t port ...@@ -862,17 +847,17 @@ void *rpcProcessDataFromPeer(char *data, int dataLen, uint32_t ip, uint16_t port
return pConn; return pConn;
} }
void taosProcessIncomingMsg(SRpcConn *pConn, SRpcHeader *pHeader) { static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHeader *pHeader) {
SRpcInfo *pRpc = pConn->pRpc; SRpcInfo *pRpc = pConn->pRpc;
int msgLen = rpcContLenFromHeader(pHeader->msgLen); int msgLen = rpcContFromHeader(pHeader->msgLen);
pHeader = rpcDecompressRpcMsg(pHeader); pHeader = rpcDecompressRpcMsg(pHeader);
if ( rpcIsReq(msgType) ) { if ( rpcIsReq(pHeader->msgType) ) {
(*(pRpc->fp))(pHeader->msgType, pHeader->content, msgLen, pConn); (*(pRpc->fp))(pHeader->msgType, pHeader->content, msgLen, pConn, 0);
} else { } else {
// it's a response // it's a response
STaosRsp *pRsp = (STaosRsp *)msg; STaosRsp *pRsp = (STaosRsp *)pHeader->content;
int32_t code = htonl(pRsp->code); int32_t code = htonl(pRsp->code);
SRpcReqContext *pContext = pConn->pContext; SRpcReqContext *pContext = pConn->pContext;
...@@ -880,12 +865,12 @@ void taosProcessIncomingMsg(SRpcConn *pConn, SRpcHeader *pHeader) { ...@@ -880,12 +865,12 @@ void taosProcessIncomingMsg(SRpcConn *pConn, SRpcHeader *pHeader) {
taosAddConnToIntoCache(pRpc->pCache, pConn, pConn->peerIp, pConn->peerPort, pConn->meterId); taosAddConnToIntoCache(pRpc->pCache, pConn, pConn->peerIp, pConn->peerPort, pConn->meterId);
if (code == TSDB_CODE_NOT_MASTER) { if (code == TSDB_CODE_NO_MASTER) {
pContext->terrno = code; pContext->code = code;
taosTmrStart(taosProcessConnError, 0, pContext, pRpc->tmrCtrl); taosTmrStart(rpcProcessConnError, 0, pContext, pRpc->tmrCtrl);
} else { } else {
rpcFreeMsg(rpcGetMsgFromCont(pContext->cont)); // free the request msg rpcFreeMsg(rpcHeaderFromCont(pContext->pCont)); // free the request msg
(*(pRpc->fp))(pHeader->msgType, pHeader->content, msgLen, pContext->ahandle); (*(pRpc->fp))(pHeader->msgType, pHeader->content, msgLen, pContext->ahandle, pContext->ipSet.index);
} }
} }
} }
...@@ -893,20 +878,10 @@ void taosProcessIncomingMsg(SRpcConn *pConn, SRpcHeader *pHeader) { ...@@ -893,20 +878,10 @@ void taosProcessIncomingMsg(SRpcConn *pConn, SRpcHeader *pHeader) {
SRpcConn *rpcGetConnToServer(void *shandle, SRpcIpSet ipSet) { SRpcConn *rpcGetConnToServer(void *shandle, SRpcIpSet ipSet) {
SRpcInfo *pRpc = (SRpcInfo *)shandle; SRpcInfo *pRpc = (SRpcInfo *)shandle;
SRpcConn *pConn = taosGetConnFromCache(pRpc->pCache, ipSet.ip[index], pRpc->peerPort, pRpc->meterId); SRpcConn *pConn = taosGetConnFromCache(pRpc->pCache, ipSet.ipStr[ipSet.index], ipSet.port, pRpc->meterId);
if ( pConn == NULL ) { if ( pConn == NULL ) {
SRpcConnInit connInit; pConn = rpcOpenConn(pRpc, ipSet.ipStr[ipSet.index], ipSet.port);
memset(&connInit, 0, sizeof(connInit));
connInit.sid = 0;
connInit.spi = pRpc->spi;
connInit.encrypt = pRpc->encrypt;
connInit.meterId = pRpc->user;
connInit.peerId = 0;
connInit.shandle = pRpc;
connInit.peerIp = ipstr;
connInit.peerPort = pRpc->peerPort;
pConn = rpcOpenConn(&connInit);
} }
return pConn; return pConn;
...@@ -930,11 +905,14 @@ int taosAddAuthPart(SRpcConn *pConn, char *msg, int msgLen) { ...@@ -930,11 +905,14 @@ int taosAddAuthPart(SRpcConn *pConn, char *msg, int msgLen) {
return msgLen; return msgLen;
} }
int rpcSendDataToPeer(SRpcConn *pConn, char *data, int dataLen) { static void rpcSendDataToPeer(SRpcConn *pConn, void *data, int dataLen) {
int writtenLen = 0; int writtenLen = 0;
SRpcInfo *pRpc = pConn->pRpc; SRpcInfo *pRpc = pConn->pRpc;
SRpcHeader *pHeader = (SRpcHeader *)data; SRpcHeader *pHeader = (SRpcHeader *)data;
int code = 0;
assert(data);
assert(dataLen>0);
assert(pHeader->msgType > 0);
dataLen = taosAddAuthPart(pConn, data, dataLen); dataLen = taosAddAuthPart(pConn, data, dataLen);
...@@ -955,19 +933,16 @@ int rpcSendDataToPeer(SRpcConn *pConn, char *data, int dataLen) { ...@@ -955,19 +933,16 @@ int rpcSendDataToPeer(SRpcConn *pConn, char *data, int dataLen) {
if (writtenLen != dataLen) { if (writtenLen != dataLen) {
tError("%s pConn:%p, failed to send, dataLen:%d writtenLen:%d, reason:%s", pRpc->label, pConn, tError("%s pConn:%p, failed to send, dataLen:%d writtenLen:%d, reason:%s", pRpc->label, pConn,
dataLen, writtenLen, strerror(errno)); dataLen, writtenLen, strerror(errno));
code = -1;
} }
tDump(data, dataLen); tDump(data, dataLen);
return code;
} }
void rpcSendReqToOneServer(SRpcConn *pConn, SRpcReqContext *pContext) { void rpcSendReqToOneServer(SRpcConn *pConn, SRpcReqContext *pContext) {
SRpcHeader *pHeader = rpcHeaderFromCont(pContext->pCont);
char *pHeader = rpcHeaderFromCont(pContext->pCont); SRpcInfo *pRpc = pConn->pRpc;
SRpcHeader *msg = (char *)pHeader; char *msg = (char *)pHeader;
int msgLen = rpcGetMsgLen(pContext->contLen); int msgLen = rpcMsgLenFromCont(pContext->contLen);
char msgType = pContext->msgType; char msgType = pContext->msgType;
// set the message header // set the message header
...@@ -987,19 +962,16 @@ void rpcSendReqToOneServer(SRpcConn *pConn, SRpcReqContext *pContext) { ...@@ -987,19 +962,16 @@ void rpcSendReqToOneServer(SRpcConn *pConn, SRpcReqContext *pContext) {
// set the connection parameters // set the connection parameters
pConn->outType = msgType; pConn->outType = msgType;
pConn->outTranId = pHeader->tranId; pConn->outTranId = pHeader->tranId;
pConn->pMsgNode = pMsgNode;
pConn->pReqMsg = msg; pConn->pReqMsg = msg;
pConn->reqMsgLen = msgLen; pConn->reqMsgLen = msgLen;
pConn->context = pContext; pConn->pContext = pContext;
if ( rpcSendDataToPeer(pConn, msg, msgLen) < 0 ) { rpcSendDataToPeer(pConn, msg, msgLen);
taosReportError(pConn->pContext, terrno); taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer);
} else {
taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer);
}
} }
void rpcSendRequest(void *shandle, SRpcIpSet ipSet, char type, char *pCont, int contLen, void *ahandle) { void rpcSendRequest(void *shandle, SRpcIpSet ipSet, char type, void *pCont, int contLen, void *ahandle) {
SRpcInfo *pRpc = (SRpcInfo *)shandle;
SRpcConn *pConn; SRpcConn *pConn;
SRpcReqContext *pContext; SRpcReqContext *pContext;
...@@ -1008,22 +980,23 @@ void rpcSendRequest(void *shandle, SRpcIpSet ipSet, char type, char *pCont, int ...@@ -1008,22 +980,23 @@ void rpcSendRequest(void *shandle, SRpcIpSet ipSet, char type, char *pCont, int
pContext->ahandle = ahandle; pContext->ahandle = ahandle;
pContext->pRpc = (SRpcInfo *)shandle; pContext->pRpc = (SRpcInfo *)shandle;
pContext->ipSet = ipSet; pContext->ipSet = ipSet;
pContext->contLen = contLen pContext->contLen = contLen;
pContext->pCont = pCont; pContext->pCont = pCont;
pContext->type = type; pContext->msgType = type;
pConn = rpcGetConnToServer(shandle, ipSet); pConn = rpcGetConnToServer(shandle, ipSet);
pContext->terrno = terrno; pContext->code = terrno;
if (pConn == NULL) taosTmrStart(taosProcessConnError, 0, pContext, pRpc->tmrCtrl); if (pConn == NULL) taosTmrStart(rpcProcessConnError, 0, pContext, pRpc->tmrCtrl);
rpcSendReqToOneServer(pConn, pContext); rpcSendReqToOneServer(pConn, pContext);
return; return;
} }
void rpcSendResponse(SRpcConn *pConn, char *pCont, int contLen) { void rpcSendResponse(void *handle, void *pCont, int contLen) {
int msgLen = 0; int msgLen = 0;
SRpcConn *pConn; SRpcConn *pConn = (SRpcConn *)handle;
SRpcInfo *pRpc = pConn->pRpc;
SRpcHeader *pHeader = rpcHeaderFromCont(pCont); SRpcHeader *pHeader = rpcHeaderFromCont(pCont);
char *msg = (char *)pHeader; char *msg = (char *)pHeader;
...@@ -1052,38 +1025,23 @@ void rpcSendResponse(SRpcConn *pConn, char *pCont, int contLen) { ...@@ -1052,38 +1025,23 @@ void rpcSendResponse(SRpcConn *pConn, char *pCont, int contLen) {
if (pHeader->content[0] == TSDB_CODE_ACTION_IN_PROGRESS) pConn->inTranId--; if (pHeader->content[0] == TSDB_CODE_ACTION_IN_PROGRESS) pConn->inTranId--;
pthread_mutex_lock(&pRpc->mutex); pthread_mutex_unlock(&pRpc->mutex);
rpcSendDataToPeer(pConn, msg, msgLen); rpcSendDataToPeer(pConn, msg, msgLen);
return; return;
} }
static void rpcResendRspToPeer(SRpcConn *pConn) {
if (pConn->pRspMsg == NULL || pConn->rspMsgLen <= 0 || pConn->rspMsgLen <= sizeof(SRpcHeader)) {
tError("%s pConn:%p, rsp is null", pRpc->label);
return;
}
SRpcHeader *pHeader = (SRpcHeader *)pConn->pRspMsg;
if (pHeader->msgType <= 0) {
tError("%s pConn:%p, msgType is messed up, rspLen:%d, msgType:%d", pRpc->label, pConn, pHeader->msgLen, pHeader->msgType);
return;
}
rpcSendDataToPeer(pConn, pConn->pRspMsg, pConn->rspMsgLen);
}
static void rpcProcessConnError(void *param, void *id) { static void rpcProcessConnError(void *param, void *id) {
SRpcReqContext *pContext = (SRpcContext *)param; SRpcReqContext *pContext = (SRpcReqContext *)param;
SRpcInfo *pRpc = pContext->pRpc;
if ( pContext->numOfRetry >= pContext->ipSet.numOfIps ) { if ( pContext->numOfRetry >= pContext->ipSet.numOfIps ) {
char *rsp = calloc(1, RPC_MSG_OVERHEAD + sizeof(STaosRsp)); char *rsp = calloc(1, RPC_MSG_OVERHEAD + sizeof(STaosRsp));
if ( rsp ) { if ( rsp ) {
STaosRsp *pRsp = (rsp+sizeof(SRpcHeader)); STaosRsp *pRsp = (STaosRsp *)(rsp+sizeof(SRpcHeader));
pRsp->code = pContext->terrno; pRsp->code = pContext->code;
(*(pRpc->fp))(pContext->msgType+1, pRsp, sizeof(STaosRsp), pContext->ahandle); (*(pRpc->fp))(pContext->msgType+1, pRsp, sizeof(STaosRsp), pContext->ahandle, 0);
} else { } else {
tError("%s failed to malloc RSP", pRpc->label); tError("%s failed to malloc RSP", pRpc->label);
} }
...@@ -1092,9 +1050,9 @@ static void rpcProcessConnError(void *param, void *id) { ...@@ -1092,9 +1050,9 @@ static void rpcProcessConnError(void *param, void *id) {
pContext->ipSet.index++; pContext->ipSet.index++;
pContext->ipSet.index = pContext->ipSet.index % pContext->ipSet.numOfIps; pContext->ipSet.index = pContext->ipSet.index % pContext->ipSet.numOfIps;
pConn = rpcGetConnToServer(pContext->pRpc, pContext->ipSet); SRpcConn *pConn = rpcGetConnToServer(pContext->pRpc, pContext->ipSet);
pContext->terrno = terrno; pContext->code = terrno;
if (pConn == NULL) taosTmrStart(taosProcessConnError, 0, pContext, pRpc->tmrCtrl); if (pConn == NULL) taosTmrStart(rpcProcessConnError, 0, pContext, pRpc->tmrCtrl);
taosSendReqToOneServer(pConn, pContext); taosSendReqToOneServer(pConn, pContext);
} }
...@@ -1104,22 +1062,13 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) { ...@@ -1104,22 +1062,13 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) {
SRpcConn *pConn = (SRpcConn *)param; SRpcConn *pConn = (SRpcConn *)param;
int reportDisc = 0; int reportDisc = 0;
if (pConn->signature != param) {
tError("pConn Signature:0x%x, pConn:0x%x not matched", pConn->signature, param);
return;
}
SRpcInfo *pRpc = pConn->pRpc; SRpcInfo *pRpc = pConn->pRpc;
assert(pRpc);
if (pConn->pTimer != tmrId) {
tTrace("%s pConn:%p, timer:%p already processed%", pRpc->label, pConn);
return;
}
pthread_mutex_lock(&pRpc->mutex); pthread_mutex_lock(&pRpc->mutex);
if (pConn->outType == 0) { if (pConn->outType == 0) {
tTrace("%s pConn:%p, outtype is zero", pRpc->label, pConn); tTrace("%s pConn:%p, outtype is zero, it is already processed", pRpc->label, pConn);
} else { } else {
tTrace("%s pConn:%p, expected %s is not received", pRpc->label, pConn, taosMsg[(int)pConn->outType + 1]); tTrace("%s pConn:%p, expected %s is not received", pRpc->label, pConn, taosMsg[(int)pConn->outType + 1]);
pConn->pTimer = NULL; pConn->pTimer = NULL;
...@@ -1128,21 +1077,19 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) { ...@@ -1128,21 +1077,19 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) {
if (pConn->retry < 4) { if (pConn->retry < 4) {
tTrace("%s pConn:%p, re-send msg:%s to %s:%hu", pRpc->label, tTrace("%s pConn:%p, re-send msg:%s to %s:%hu", pRpc->label,
taosMsg[pConn->outType], pConn->peerIpstr, pConn->peerPort); taosMsg[pConn->outType], pConn->peerIpstr, pConn->peerPort);
if (pConn->pReqMsg && pConn->pReqMsgLen > 0) { rpcSendDataToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen);
rpcSendDataToPeer(pConn, pReqMsg, pReqMsgLen);
}
} else { } else {
// close the connection // close the connection
tTrace("%s pConn:%p, failed to send msg:%s to %s:%hu", pRpc->label, pConn, tTrace("%s pConn:%p, failed to send msg:%s to %s:%hu", pRpc->label, pConn,
taosMsg[pConn->outType], pConn->peerIpstr, pConn->peerPort, pConn); taosMsg[pConn->outType], pConn->peerIpstr, pConn->peerPort);
reportDisc = 1; reportDisc = 1;
} }
} }
pthread_mutex_unlock(&pRpc->mutex); pthread_mutex_unlock(&pRpc->mutex);
pConn->terrno = TSDB_CODE_NETWORK_UNAVAIL; pConn->pContext->code = TSDB_CODE_NETWORK_UNAVAIL;
if (reportDisc) taosProcessConnError(pConn->pContext, NULL); if (reportDisc) rpcProcessConnError(pConn->pContext, NULL);
} }
static int rpcAuthenticateMsg(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey) { static int rpcAuthenticateMsg(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册