提交 def046eb 编写于 作者: 陶建辉(Jeff)'s avatar 陶建辉(Jeff)

made minor changes

上级 770ca34e
......@@ -33,83 +33,42 @@ extern "C" {
#define TAOS_SOCKET_TYPE_NAME_TCP "tcp"
#define TAOS_SOCKET_TYPE_NAME_UDP "udp"
#define TAOS_ID_ASSIGNED 0
#define TAOS_ID_FREE 1
#define TAOS_ID_REALLOCATE 2
#define TAOS_CONN_SOCKET_TYPE_S() ((strcasecmp(tsSocketType, TAOS_SOCKET_TYPE_NAME_UDP) == 0)? TAOS_CONN_UDPS:TAOS_CONN_TCPS)
#define TAOS_CONN_SOCKET_TYPE_C() ((strcasecmp(tsSocketType, TAOS_SOCKET_TYPE_NAME_UDP) == 0)? TAOS_CONN_UDPC:TAOS_CONN_TCPC)
#define taosSendMsgToPeer(x, y, z) taosSendMsgToPeerH(x, y, z, NULL)
#define taosOpenRpcChann(x, y, z) taosOpenRpcChannWithQ(x,y,z,NULL)
#define taosBuildReqMsg(x, y) taosBuildReqMsgWithSize(x, y, 512)
#define taosBuildRspMsg(x, y) taosBuildRspMsgWithSize(x, y, 512)
extern int tsRpcHeadSize;
typedef struct {
char *localIp; // local IP used
uint16_t localPort; // local port
uint16_t localPort; // local port
char *label; // for debug purpose
int numOfThreads; // number of threads to handle connections
void *(*fp)(char *, void *, void *); // function to process the incoming msg
void *qhandle; // queue handle
int bits; // number of bits for sessionId
int numOfChanns; // number of channels
int sessionsPerChann; // number of sessions per channel
int idMgmt; // TAOS_ID_ASSIGNED, TAOS_ID_FREE
void *(*fp)(char type, char *pCont, int contLen, void *handle, int index); // function to process the incoming msg
int sessions; // number of sessions allowed
int connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS
int idleTime; // milliseconds, 0 means idle timer is disabled
int noFree; // not free buffer
void (*efp)(int cid); // call back function to process not activated chann
int (*afp)(char *meterId, char *spi, char *encrypt, uint8_t *secret,
uint8_t *ckey); // call back to retrieve auth info
char *meterId; // meter ID
char spi; // security parameter index
char encrypt; // encrypt algorithm
char *secret; // key for authentication
char *ckey; // ciphering key
int (*afp) (char *meterId, char *spi, char *encrypt, uint8_t *secret, uint8_t *ckey); // call back to retrieve auth info
} SRpcInit;
typedef struct {
int cid; // channel ID
int sid; // session ID
char * meterId; // meter ID
uint32_t peerId; // peer link ID
void * shandle; // pointer returned by taosOpenRpc
void * ahandle; // handle provided by app
char * peerIp; // peer IP string
uint16_t peerPort; // peer port
char spi; // security parameter index
char encrypt; // encrypt algorithm
char * secret; // key for authentication
char * ckey; // ciphering key
} SRpcConnInit;
extern int tsRpcHeadSize;
void *taosOpenRpc(SRpcInit *pRpc);
void taosCloseRpc(void *);
int taosOpenRpcChannWithQ(void *handle, int cid, int sessions, void *qhandle);
void taosCloseRpcChann(void *handle, int cid);
void *taosOpenRpcConn(SRpcConnInit *pInit, uint8_t *code);
void taosCloseRpcConn(void *thandle);
void taosStopRpcConn(void *thandle);
int taosSendMsgToPeerH(void *thandle, char *pCont, int contLen, void *ahandle);
char *taosBuildReqHeader(void *param, char type, char *msg);
char *taosBuildReqMsgWithSize(void *, char type, int size);
char *taosBuildRspMsgWithSize(void *, char type, int size);
int taosSendSimpleRsp(void *thandle, char rsptype, char code);
int taosSetSecurityInfo(int cid, int sid, char *id, int spi, int encrypt, char *secret, char *ckey);
void taosGetRpcConnInfo(void *thandle, uint32_t *peerId, uint32_t *peerIp, uint16_t *peerPort, int *cid, int *sid);
int16_t index;
int16_t numOfIps;
uint32_t ip[TSDB_MAX_REPLICA];
} SRpcIpSet;
void *rpcOpen(SRpcInit *pRpc);
void rpcClose(void *);
char *rpcMallocCont(int contLen);
void rpcFreeCont(char *pCont);
void rpcSendRequest(void *thandle, SRpcIpSet ipSet, char msgType, void *pCont, int contLen, void *ahandle);
void rpcSendResponse(void *pConn, void *pCont, int contLen);
void rpcSendSimpleRsp(void *pConn, int32_t code);
int taosGetOutType(void *thandle);
#ifdef __cplusplus
}
......
......@@ -21,7 +21,7 @@
#include "tmd5.h"
#include "tmempool.h"
#include "trpc.h"
#include "taosdef.h"
#include "tsdb.h"
#include "tsocket.h"
#include "ttcpclient.h"
#include "ttcpserver.h"
......@@ -31,78 +31,106 @@
#include "tutil.h"
#include "lz4.h"
typedef struct _msg_node {
struct _msg_node *next;
void * ahandle;
int msgLen;
} SMsgNode;
#define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHeader) + sizeof(SRpcDigest))
#define rpcHeaderFromCont(cont) ((STaosHeader *) (cont - sizeof(SRpcHeader)))
#define rpcContFromHeader(msg) ( msg + sizeof(SRpcHeader))
#define rpcMsgLenFromCont(contLen) ( contLen + sizeof(SRpcHeader))
#define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHeader))
#define rpcIsReq(type) (type & 1U)
typedef struct {
void * signature;
int chann; // channel ID
int sid; // session ID
uint32_t ownId; // own link ID
uint32_t peerId; // peer link ID
char meterId[TSDB_UNI_LEN];
char spi;
char encrypt;
uint8_t secret[TSDB_KEY_LEN];
uint8_t ckey[TSDB_KEY_LEN];
uint16_t localPort; // for UDP only
uint32_t peerUid;
uint32_t peerIp; // peer IP
uint16_t peerPort; // peer port
char peerIpstr[20]; // peer IP string
uint16_t tranId; // outgoing transcation ID, for build message
uint16_t outTranId; // outgoing transcation ID
uint16_t inTranId;
uint8_t outType;
char inType;
char closing;
char rspReceived;
void * chandle; // handle passed by TCP/UDP connection layer
void * ahandle; // handle returned by upper app layter
int retry;
int tretry; // total retry
void * pTimer;
void * pIdleTimer;
char * pRspMsg;
char * pQuickRsp;
int rspMsgLen;
SMsgNode * pMsgNode;
SMsgNode * pHead, *pTail;
struct rpc_server *pServer;
int sessions;
int numOfThreads;
int type;
int idleTime; // milliseconds;
uint16_t localPort;
char label[12];
char *meterId; // meter ID
char spi; // security parameter index
char encrypt; // encrypt algorithm
char *secret; // key for authentication
char *ckey; // ciphering key
void *(*fp)(char *, void *ahandle, void *thandle); // FP to call the application
int (*afp)(char *meterId, char *spi, char *encrypt, uint8_t *secret, uint8_t *ckey); // FP to retrieve auth info
SRpcConn *connList;
void *idPool;
void *tmrCtrl;
void *hash;
void *shandle; // returned handle from lower layer during initialization
void *pCache; // connection cache
pthread_mutex_t mutex;
} SRpcInfo;
typedef struct {
void *signature;
int sid; // session ID
uint32_t ownId; // own link ID
uint32_t peerId; // peer link ID
char meterId[TSDB_UNI_LEN];
char spi;
char encrypt;
uint8_t secret[TSDB_KEY_LEN];
uint8_t ckey[TSDB_KEY_LEN];
uint16_t localPort; // for UDP only
uint32_t peerUid;
uint32_t peerIp; // peer IP
uint16_t peerPort; // peer port
char peerIpstr[20]; // peer IP string
uint16_t tranId; // outgoing transcation ID, for build message
uint16_t outTranId; // outgoing transcation ID
uint16_t inTranId;
uint8_t outType;
char inType;
void *chandle; // handle passed by TCP/UDP connection layer
void *ahandle; // handle provided by upper app layter
int retry;
int tretry; // total retry
void *pTimer;
void *pIdleTimer;
char *pRspMsg; // including header
int rspMsgLen;
char *pReqMsg; // including header
int reqMsgLen;
SRpcInfo *pRpc;
} SRpcConn;
typedef struct {
int sessions;
void * qhandle; // for scheduler
SRpcConn * connList;
void * idPool;
void * tmrCtrl;
void * hash;
pthread_mutex_t mutex;
} SRpcChann;
typedef struct rpc_server {
void *shandle; // returned handle from lower layer during initialization
void *qhandle; // for scheduler
int bits; // number of bits for session ID
int mask;
int numOfChanns;
int numOfThreads;
int idMgmt; // ID management method
int type;
int idleTime; // milliseconds;
int noFree; // do not free the request msg when rsp is received
int index; // for UDP server, next thread for new connection
uint16_t localPort;
char label[12];
void *(*fp)(char *, void *ahandle, void *thandle);
void (*efp)(int); // FP to report error
int (*afp)(char *meterId, char *spi, char *encrypt, uint8_t *secret, uint8_t *ckey); // FP to retrieve auth info
SRpcChann *channList;
} STaosRpc;
SRpcIpSet ipSet;
void *ahandle;
SRpcInfo *pRpc;
char type;
char *pCont;
int contLen;
int numOfRetry;
char msg[];
} SRpcReqContext;
typedef struct {
char version : 4;
char comp : 4;
char tcp : 2;
char spi : 3;
char encrypt : 3;
uint16_t tranId;
uint32_t uid; // for unique ID inside a client
uint32_t sourceId;
uint32_t destId;
uint32_t destIp;
char meterId[TSDB_UNI_LEN];
uint16_t port; // for UDP only
char empty[1];
uint8_t msgType;
int32_t msgLen;
uint8_t content[0];
} SRpcHeader;
typedef struct {
uint32_t timeStamp;
uint8_t auth[TSDB_AUTH_LEN];
} SRpcDigest;
int tsRpcProgressTime = 10; // milliseocnds
......@@ -111,13 +139,25 @@ int tsRpcMaxRetry;
int tsRpcHeadSize;
void *(*taosInitConn[])(char *ip, uint16_t port, char *label, int threads, void *fp, void *shandle) = {
taosInitUdpServer, taosInitUdpClient, taosInitTcpServer, taosInitTcpClient};
taosInitUdpServer,
taosInitUdpClient,
taosInitTcpServer,
taosInitTcpClient
};
void (*taosCleanUpConn[])(void *thandle) = {taosCleanUpUdpConnection, taosCleanUpUdpConnection, taosCleanUpTcpServer,
taosCleanUpTcpClient};
void (*taosCleanUpConn[])(void *thandle) = {
taosCleanUpUdpConnection,
taosCleanUpUdpConnection,
taosCleanUpTcpServer,
taosCleanUpTcpClient
};
int (*taosSendData[])(uint32_t ip, uint16_t port, char *data, int len, void *chandle) = {
taosSendUdpData, taosSendUdpData, taosSendTcpServerData, taosSendTcpClientData};
taosSendUdpData,
taosSendUdpData,
taosSendTcpServerData,
taosSendTcpClientData
};
void *(*taosOpenConn[])(void *shandle, void *thandle, char *ip, uint16_t port) = {
taosOpenUdpConnection,
......@@ -126,27 +166,30 @@ void *(*taosOpenConn[])(void *shandle, void *thandle, char *ip, uint16_t port) =
taosOpenTcpClientConnection,
};
void (*taosCloseConn[])(void *chandle) = {NULL, NULL, taosCloseTcpServerConnection, taosCloseTcpClientConnection};
int taosReSendRspToPeer(SRpcConn *pConn);
void taosProcessTaosTimer(void *, void *);
void *taosProcessDataFromPeer(char *data, int dataLen, uint32_t ip, uint16_t port, void *shandle, void *thandle,
void *chandle);
int taosSendDataToPeer(SRpcConn *pConn, char *data, int dataLen);
void taosProcessSchedMsg(SSchedMsg *pMsg);
int taosAuthenticateMsg(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey);
int taosBuildAuthHeader(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey);
static int32_t taosCompressRpcMsg(char* pCont, int32_t contLen) {
STaosHeader* pHeader = (STaosHeader *)(pCont - sizeof(STaosHeader));
int32_t overhead = sizeof(int32_t) * 2;
int32_t finalLen = 0;
void (*taosCloseConn[])(void *chandle) = {
NULL,
NULL,
taosCloseTcpServerConnection,
taosCloseTcpClientConnection
};
int rpcResendRspToPeer(SRpcConn *pConn);
void rpcProcessRetryTimer(void *, void *);
void *rpcProcessDataFromPeer(char *data, int dataLen, uint32_t ip, uint16_t port, void *shandle, void *thandle, void *chandle);
int rpcSendDataToPeer(SRpcConn *pConn, char *data, int dataLen);
int rpcAuthenticateMsg(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey);
int rpcBuildAuthHeader(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey);
static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) {
SRpcHeader* pHeader = rpcHeaderFromCont(pCont);
int32_t overhead = sizeof(int32_t) * 2;
int32_t finalLen = 0;
if (!NEEDTO_COMPRESSS_MSG(contLen)) {
return contLen;
}
char *buf = malloc (contLen + overhead + 8); // 16 extra bytes
char *buf = malloc (contLen + overhead+8); // 16 extra bytes
if (buf == NULL) {
tError("failed to allocate memory for rpc msg compression, contLen:%d, reason:%s", contLen, strerror(errno));
return contLen;
......@@ -181,131 +224,97 @@ static int32_t taosCompressRpcMsg(char* pCont, int32_t contLen) {
return finalLen;
}
static STaosHeader* taosDecompressRpcMsg(STaosHeader* pHeader, SSchedMsg* pSchedMsg, int32_t msgLen) {
static SRpcHeader *rpcDecompressRpcMsg(SRpcHeader *pHeader) {
int overhead = sizeof(int32_t) * 2;
SRpcHeader *pNewHeader = NULL;
char *pCont = pHeader->content;
if (pHeader->comp) {
// decompress the content
assert(GET_INT32_VAL(pHeader->content) == 0);
if (pHeader->comp == 0) {
pSchedMsg->msg = (char *)(&(pHeader->destId));
return pHeader;
}
// decompress the content
assert(GET_INT32_VAL(pHeader->content) == 0);
// contLen is original message length before compression applied
int contLen = htonl(GET_INT32_VAL(pHeader->content + sizeof(int32_t)));
// prepare the temporary buffer to decompress message
char *buf = malloc(sizeof(STaosHeader) + contLen);
// contLen is original message length before compression applied
int contLen = htonl(GET_INT32_VAL(pCont + sizeof(int32_t)));
//tDump(pHeader->content, msgLen);
// prepare the temporary buffer to decompress message
char *buf = rpcMallocCont(contLen);
if (buf) {
int32_t originalLen = LZ4_decompress_safe((const char*)(pHeader->content + overhead), buf + sizeof(STaosHeader),
msgLen - overhead, contLen);
if (buf) {
pNewHeader = rpcHeaderFromCont(buf);
int compLen = rpcContLenFromMsg(pHeader->msgLen) - overhead;
int32_t originalLen = LZ4_decompress_safe((const char*)(pCont + overhead), buf, compLen, contLen);
assert(originalLen == contLen);
memcpy(buf, pHeader, sizeof(STaosHeader));
free(pHeader); // free the compressed message buffer
STaosHeader* pNewHeader = (STaosHeader *) buf;
pNewHeader->msgLen = originalLen + (int) sizeof(SIntMsg);
assert(originalLen == contLen);
pSchedMsg->msg = (char *)(&(pNewHeader->destId));
//tDump(pHeader->content, contLen);
return pNewHeader;
} else {
tError("failed to allocate memory to decompress msg, contLen:%d, reason:%s", contLen, strerror(errno));
pSchedMsg->msg = NULL;
memcpy(pNewHeader, pHeader, sizeof(SRpcHeader));
pNewHeader->msgLen = rpcMsgLenFromCont(originalLen);
free(pHeader); // free the compressed message buffer
pHeader = pNewHeader;
} else {
tError("failed to allocate memory to decompress msg, contLen:%d, reason:%s", contLen, strerror(errno));
}
}
return NULL;
return pHeader;
}
char *taosBuildReqHeader(void *param, char type, char *msg) {
STaosHeader *pHeader;
SRpcConn * pConn = (SRpcConn *)param;
char *rpcMallocCont(int size) {
char *pMsg = NULL;
if (pConn == NULL || pConn->signature != pConn) {
tError("pConn:%p, connection has to be openned first before building a message", pConn);
size += RPC_MSG_OVERHEAD;
pMsg = (char *)calloc(1, (size_t)size);
if (pMsg == NULL) {
tError("failed to malloc msg, size:%d", size);
return NULL;
}
pHeader = (STaosHeader *)(msg + sizeof(SMsgNode));
memset(pHeader, 0, sizeof(STaosHeader));
pHeader->version = 1;
pHeader->comp = 0;
pHeader->msgType = type;
pHeader->spi = 0;
pHeader->tcp = 0;
pHeader->encrypt = 0;
pHeader->tranId = atomic_add_fetch_16(&pConn->tranId, 1);
if (pHeader->tranId == 0) pHeader->tranId = atomic_add_fetch_16(&pConn->tranId, 1);
pHeader->sourceId = pConn->ownId;
pHeader->destId = pConn->peerId;
pHeader->port = 0;
pHeader->uid = (uint32_t)((int64_t)pConn + (int64_t)getpid());
return pMsg + sizeof(SRpcReqContext) + sizeof(SRpcHeader);
}
memcpy(pHeader->meterId, pConn->meterId, tListLen(pHeader->meterId));
void rpcFreeCont(char *cont) {
char *msg = cont - sizeof(SRpcHeader);
free(msg);
}
return (char *)pHeader->content;
static void rpcFreeMsg(char *msg) {
msg -= sizeof(SRpcReqContext);
free(msg);
}
char *taosBuildReqMsgWithSize(void *param, char type, int size) {
STaosHeader *pHeader;
char * pMsg;
SRpcConn * pConn = (SRpcConn *)param;
void rpcSendSimpleRsp(void *thandle, int_32 code) {
char *pMsg;
STaosRsp *pRsp;
int msgLen = sizeof(STaosRsp);
if (pConn == NULL || pConn->signature != pConn) {
tError("pConn:%p, connection has to be openned first before building a message", pConn);
return NULL;
if (thandle == NULL) {
tError("connection is gone, response could not be sent");
return;
}
size += sizeof(SMsgNode) + sizeof(STaosHeader) + sizeof(STaosDigest);
pMsg = (char *)malloc((size_t)size);
memset(pMsg, 0, (size_t)size);
pHeader = (STaosHeader *)(pMsg + sizeof(SMsgNode));
pHeader->version = 1;
pHeader->msgType = type;
pHeader->spi = 0;
pHeader->tcp = 0;
pHeader->encrypt = 0;
pHeader->tranId = atomic_add_fetch_32(&pConn->tranId, 1);
if (pHeader->tranId == 0) pHeader->tranId = atomic_add_fetch_32(&pConn->tranId, 1);
pHeader->sourceId = pConn->ownId;
pHeader->destId = pConn->peerId;
pMsg = rpcMallocCont(msgLen);
if (pMsg == NULL) return;
pHeader->uid = (uint32_t)((int64_t)pConn + (int64_t)getpid());
pRsp = (STaosRsp *)pMsg;
pRsp->code = htonl(code);
memcpy(pHeader->meterId, pConn->meterId, tListLen(pHeader->meterId));
taosSendResponse(thandle, pMsg, msgLen);
return (char *)pHeader->content;
return;
}
char *taosBuildRspMsgWithSize(void *param, char type, int size) {
STaosHeader *pHeader;
char * pMsg;
SRpcConn * pConn = (SRpcConn *)param;
if (pConn == NULL || pConn->signature != pConn) {
tError("pConn:%p, connection has to be opened first before building a message", pConn);
return NULL;
}
static void rpcSendQuickRsp(SRpcConn *pConn, char code) {
char msg[RPC_MSG_OVERHEAD + sizeof(STaosRsp)];
SRpcHeader *pHeader;
int msgLen;
size += sizeof(SMsgNode) + sizeof(STaosHeader) + sizeof(STaosDigest);
pMsg = (char *)malloc((size_t)size);
if (pMsg == NULL) {
tError("pConn:%p, malloc(%d) failed when building a type:%d message", pConn, size, type);
return NULL;
}
pRsp = (STaosRsp *)rpcContFromHeader(msg);
pRsp->code = htonl(code);
msgLen = sizeof(STaosRsp);
memset(pMsg, 0, (size_t)size);
pHeader = (STaosHeader *)pMsg;
// set msg header
memset(msg, 0, sizeof(SRpcHeader));
pHeader = (SRpcHeader *)msg;
pHeader->version = 1;
pHeader->msgType = type;
pHeader->msgType = pConn->inType+1;
pHeader->spi = 0;
pHeader->tcp = 0;
pHeader->encrypt = 0;
......@@ -315,334 +324,240 @@ char *taosBuildRspMsgWithSize(void *param, char type, int size) {
pHeader->uid = 0;
memcpy(pHeader->meterId, pConn->meterId, tListLen(pHeader->meterId));
return (char *)pHeader->content;
rpcSendDataToPeer(pConn, (char *)msg, msgLen);
}
int taosSendSimpleRsp(void *thandle, char rsptype, char code) {
char *pMsg, *pStart;
int msgLen;
if (thandle == NULL) {
tError("connection is gone, response could not be sent");
return -1;
}
pStart = taosBuildRspMsgWithSize(thandle, rsptype, 32);
if (pStart == NULL) {
tError("build rsp msg error, return null prt");
return -1;
}
pMsg = pStart;
*pMsg = code;
pMsg++;
msgLen = (int)(pMsg - pStart);
taosSendMsgToPeer(thandle, pStart, msgLen);
return msgLen;
}
int taosSendQuickRsp(void *thandle, char rsptype, char code) {
char * pCont;
int contLen;
STaosHeader *pHeader;
char * msg;
int msgLen;
SRpcConn * pConn = (SRpcConn *)thandle;
void *rpcOpen(SRpcInit *pInit) {
SRpcInfo *pRpc;
pCont = taosBuildRspMsgWithSize(thandle, rsptype, 32);
if (pCont == NULL) return 0;
*pCont = code;
contLen = 1;
tsRpcMaxRetry = tsRpcMaxTime * 1000 / tsRpcProgressTime;
tsRpcHeadSize = RPC_MSG_OVERHEAD;
pHeader = (STaosHeader *)(pCont - sizeof(STaosHeader));
msg = (char *)pHeader;
msgLen = contLen + (int32_t)sizeof(STaosHeader);
pRpc = (SRpcInfo *)calloc(1, sizeof(SRpcInfo));
if (pRpc == NULL) return NULL;
if (pConn->spi) {
// add auth part
pHeader->spi = pConn->spi;
STaosDigest *pDigest = (STaosDigest *)(pCont + contLen);
pDigest->timeStamp = htonl(taosGetTimestampSec());
msgLen += sizeof(STaosDigest);
pHeader->msgLen = (int32_t)htonl((uint32_t)msgLen);
taosBuildAuthHeader((uint8_t *)pHeader, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret);
} else {
pHeader->msgLen = (int32_t)htonl((uint32_t)msgLen);
strcpy(pRpc->label, pInit->label);
pRpc->fp = pInit->fp;
pRpc->type = pInit->connType;
pRpc->idleTime = pInit->idleTime;
pRpc->numOfThreads = pInit->numOfThreads;
if (pRpc->numOfThreads > TSDB_MAX_RPC_THREADS) {
pRpc->numOfThreads = TSDB_MAX_RPC_THREADS;
}
tfree(pConn->pQuickRsp);
pConn->pQuickRsp = msg;
taosSendDataToPeer(pConn, (char *)pHeader, msgLen);
return msgLen;
}
pRpc->localPort = pInit->localPort;
pRpc->afp = pInit->afp;
pRpc->sessions = pInit->session;
strcpy(pRpc->meterId, pInit->meterId);
pRpc->spi = pInit->spi;
strcpy(pRpc->secret, pInit->secret);
strcpy(pRpc->ckey, pInit->ckey);
pRpc->afp = pInit->afp;
pRpc->shandle = (*taosInitConn[pRpc->connType])(pRpc->localIp, pRpc->localPort, pRpc->label, pRpc->numOfThreads,
taosProcessDataFromPeer, pRpc);
if (pRpc->shandle == NULL) {
tError("%s failed to init network, %s:%d", pRpc->label, pRpc->localIp, pRpc->localPort);
taosCloseRpc(pRpc);
return NULL;
}
void *taosOpenRpc(SRpcInit *pRpc) {
STaosRpc *pServer;
size_t size = sizeof(SRpcConn) * sessions;
pRpc->connList = (SRpcConn *)calloc(1, size);
if (pRpc->connList == NULL) {
tError("%s failed to allocate memory for taos connections, size:%d", pRpc->label, size);
taosCloseRpc(pRpc);
return NULL;
}
tsRpcMaxRetry = tsRpcMaxTime * 1000 / tsRpcProgressTime;
tsRpcHeadSize = sizeof(STaosHeader) + sizeof(SMsgNode);
pServer = (STaosRpc *)malloc(sizeof(STaosRpc));
if (pServer == NULL) return NULL;
memset(pServer, 0, sizeof(STaosRpc));
pServer->bits = pRpc->bits;
pServer->mask = (1 << (pRpc->bits)) - 1;
pServer->numOfChanns = pRpc->numOfChanns;
strcpy(pServer->label, pRpc->label);
pServer->fp = pRpc->fp;
pServer->idMgmt = pRpc->idMgmt;
pServer->type = pRpc->connType;
pServer->idleTime = pRpc->idleTime;
pServer->noFree = pRpc->noFree;
pServer->numOfThreads = pRpc->numOfThreads;
if (pServer->numOfThreads > TSDB_MAX_RPC_THREADS) {
pServer->numOfThreads = TSDB_MAX_RPC_THREADS;
pRpc->numOfThreads = TSDB_MAX_RPC_THREADS;
pRpc->idPool = taosInitIdPool(sessions);
if (pRpc->idPool == NULL) {
tError("%s failed to init ID pool", pRpc->label);
taosCloseRpc(pRpc);
return NULL;
}
pServer->localPort = pRpc->localPort;
pServer->qhandle = pRpc->qhandle;
pServer->efp = pRpc->efp;
pServer->afp = pRpc->afp;
int size = (int)sizeof(SRpcChann) * pRpc->numOfChanns;
pServer->channList = (SRpcChann *)malloc((size_t)size);
if (pServer->channList == NULL) {
tError("%s, failed to malloc channList", pRpc->label);
tfree(pServer);
pRpc->tmrCtrl = taosTmrInit(sessions * 2 + 1, 50, 10000, pRpc->label);
if (pRpc->tmrCtrl == NULL) {
tError("%s failed to init timers", pRpc->label);
taosCloseRpc(pRpc);
return NULL;
}
memset(pServer->channList, 0, (size_t)size);
pServer->shandle = (*taosInitConn[pRpc->connType])(pRpc->localIp, pRpc->localPort, pRpc->label, pRpc->numOfThreads,
taosProcessDataFromPeer, pServer);
if (pServer->shandle == NULL) {
tError("%s, failed to init network, %s:%d", pRpc->label, pRpc->localIp, pRpc->localPort);
taosCloseRpc(pServer);
pRpc->hash = taosInitStrHash(sessions, sizeof(pRpc), taosHashString);
if (pRpc->hash == NULL) {
tError("%s failed to init string hash", pRpc->label);
taosCloseRpc(pRpc);
return NULL;
}
if (pServer->numOfChanns == 1) {
int retVal = taosOpenRpcChann(pServer, 0, pRpc->sessionsPerChann);
if (0 != retVal) {
tError("%s, failed to open rpc chann", pRpc->label);
taosCloseRpc(pServer);
return NULL;
}
pRpc->pCahche = taosOpenConnCache(pRpc->sessions, rpcCloseConn, pRpc->tmrCtrl, tsShellActivityTimer*1000);
if ( pRpc->pCache == NULL ) {
tError("%s failed to init connection cache", pRpc->label);
taosCloseRpc(pRpc);
return NULL;
}
pthread_mutex_init(&pRpc->mutex, NULL);
tTrace("%s RPC is openned, numOfThreads:%d", pRpc->label, pRpc->numOfThreads);
return pServer;
return pRpc;
}
int taosOpenRpcChannWithQ(void *handle, int cid, int sessions, void *qhandle) {
STaosRpc * pServer = (STaosRpc *)handle;
SRpcChann *pChann;
tTrace("cid:%d, handle:%p open rpc chann", cid, handle);
if (pServer == NULL) return -1;
if (cid >= pServer->numOfChanns || cid < 0) {
tError("%s: cid:%d, chann is out of range, max:%d", pServer->label, cid, pServer->numOfChanns);
return -1;
}
void rpcClose(void *param) {
SRpcInfo *pRpc = (SRpcInfo *)param;
pChann = pServer->channList + cid;
memset(pChann, 0, sizeof(SRpcChann));
size_t size = sizeof(SRpcConn) * sessions;
pChann->connList = (SRpcConn *)calloc(1, size);
if (pChann->connList == NULL) {
tError("%s cid:%d, failed to allocate memory for taos connections, size:%d", pServer->label, cid, size);
return -1;
}
(*taosCleanUpConn[pRpc->type])(pRpc->shandle);
if (pServer->idMgmt == TAOS_ID_FREE) {
pChann->idPool = taosInitIdPool(sessions);
if (pChann->idPool == NULL) {
tError("%s cid:%d, failed to init ID pool", pServer->label, cid);
return -1;
for (int i = 0; i < pRpc->sessions; ++i) {
if (pRpc->connList[i].signature != NULL) {
taosCloseRpcConn((void *)(pRpc->connList + i));
}
}
pChann->tmrCtrl = taosTmrInit(sessions * 2 + 1, 50, 10000, pServer->label);
if (pChann->tmrCtrl == NULL) {
tError("%s cid:%d, failed to init timers", pServer->label, cid);
return -1;
}
pChann->hash = taosInitStrHash(sessions, sizeof(pChann), taosHashString);
if (pChann->hash == NULL) {
tError("%s cid:%d, failed to init string hash", pServer->label, cid);
return -1;
}
taosCleanUpStrHash(pRpc->hash);
taosTmrCleanUp(pRpc->tmrCtrl);
taosIdPoolCleanUp(pRpc->idPool);
taosCloseConnCache(pRpc->pCache);
pthread_mutex_init(&pChann->mutex, NULL);
pChann->sessions = sessions;
pChann->qhandle = qhandle ? qhandle : pServer->qhandle;
return TSDB_CODE_SUCCESS;
tfree(pRpc->connList);
pthread_mutex_destroy(&pRpc->mutex);
tfree(pRpc);
}
void taosCloseRpcChann(void *handle, int cid) {
STaosRpc * pServer = (STaosRpc *)handle;
SRpcChann *pChann;
tTrace("cid:%d, handle:%p close rpc chann", cid, handle);
static SRpcConn *rpcOpenConn(SRpcConnInit *pInit) {
SRpcConn *pConn;
SRpcInfo *pRpc = (SRpcInfo *)pInit->shandle;
if (pServer == NULL) return;
if (cid >= pServer->numOfChanns || cid < 0) {
tError("%s cid:%d, chann is out of range, max:%d", pServer->label, cid, pServer->numOfChanns);
return;
}
if ( (uint8_t)(rpcGetConn(pInit->sid, pInit->meterId, pRpc, &pConn, 1, NULL)) != 0 )
return NULL;
pChann = pServer->channList + cid;
if (pConn->peerId == 0) pConn->peerId = pRpc->peerId;
strcpy(pConn->peerIpstr, pInit->peerIp);
pConn->peerIp = inet_addr(pInit->peerIp);
pConn->peerPort = pInit->peerPort;
pConn->ahandle = pInit->ahandle;
pConn->spi = pRpc->spi;
pConn->encrypt = pRpc->encrypt;
if (pConn->spi) memcpy(pConn->secret, pRpc->secret, TSDB_KEY_LEN);
for (int i = 0; i < pChann->sessions; ++i) {
if (pChann->connList[i].signature != NULL) {
taosCloseRpcConn((void *)(pChann->connList + i));
// if it is client, it shall set up connection first
if (taosOpenConn[pRpc->type]) {
pConn->chandle = (*taosOpenConn[pRpc->type])(pRpc->shandle, pConn, pConn->peerIpstr, pConn->peerPort);
if (pConn->chandle) {
tTrace("%s pConn:%p, rpc connection is set up, sid:%d id:%s ip:%s:%hu localPort:%d", pRpc->label,
pConn, pConn->sid, pInit->meterId, pConn->peerIpstr, pConn->peerPort, pConn->localPort);
} else {
tError("%s pConn:%p, failed to set up nw connection to ip:%s:%hu", pRpc->label, pConn,
pConn->sid, pInit->meterId, pConn->peerIpstr, pConn->peerPort);
terrorno = TSDB_CODE_NETWORK_UNAVAIL;
rpcCloseConn(pConn);
pConn = NULL;
}
}
taosCleanUpStrHash(pChann->hash);
taosTmrCleanUp(pChann->tmrCtrl);
taosIdPoolCleanUp(pChann->idPool);
tfree(pChann->connList);
pthread_mutex_destroy(&pChann->mutex);
memset(pChann, 0, sizeof(SRpcChann));
return pConn;
}
void taosCloseRpcConn(void *thandle) {
static void rpcCloseConn(void *thandle) {
SRpcConn *pConn = (SRpcConn *)thandle;
if (pConn == NULL) return;
STaosRpc *pServer = pConn->pServer;
if (pConn->signature != thandle || pServer == NULL) return;
if (pConn->closing) return;
SRpcChann *pChann = pServer->channList + pConn->chann;
SRpcInfo *pRpc = pConn->pRpc;
if (pConn->signature != thandle || pRpc == NULL) return;
pthread_mutex_lock(&pChann->mutex);
pthread_mutex_lock(&pRpc->mutex);
pConn->closing = 1;
pConn->signature = NULL;
if (taosCloseConn[pServer->type]) (*taosCloseConn[pServer->type])(pConn->chandle);
if (taosCloseConn[pRpc->type]) (*taosCloseConn[pRpc->type])(pConn->chandle);
taosTmrStopA(&pConn->pTimer);
taosTmrStopA(&pConn->pIdleTimer);
tfree(pConn->pRspMsg);
if (pServer->noFree == 0) free(pConn->pMsgNode);
pConn->pMsgNode = NULL;
tfree(pConn->pQuickRsp);
SMsgNode *pMsgNode;
while (pConn->pHead) {
pMsgNode = pConn->pHead;
pConn->pHead = pConn->pHead->next;
memset(pMsgNode, 0, sizeof(SMsgNode));
if (pServer->noFree == 0) free(pMsgNode);
}
rpcFreeMsg(pConn->pRspMsg);
rpcFreeMsg(pConn-pReqMsg);
char hashstr[40] = {0};
sprintf(hashstr, "%x:%x:%x", pConn->peerIp, pConn->peerUid, pConn->peerId);
taosDeleteStrHash(pChann->hash, hashstr);
taosDeleteStrHash(pRpc->hash, hashstr);
tTrace("%s cid:%d sid:%d id:%s, TAOS connection closed, pConn:%p", pServer->label, pConn->chann, pConn->sid,
tTrace("%s pConn:%p, TAOS connection closed", pRpc->label, pConn->sid,
pConn->meterId, pConn);
int freeId = pConn->sid;
memset(pConn, 0, sizeof(SRpcConn));
if (pChann->idPool) taosFreeId(pChann->idPool, freeId);
if (pRpc->idPool) taosFreeId(pRpc->idPool, freeId);
pthread_mutex_unlock(&pChann->mutex);
pthread_mutex_unlock(&pRpc->mutex);
}
int taosGetRpcConn(int chann, int sid, char *meterId, STaosRpc *pServer, SRpcConn **ppConn, char req, char *hashstr) {
static int rpcGetConn(int sid, char *meterId, SRpcInfo *pRpc, SRpcConn **ppConn, char req, char *hashstr) {
SRpcConn * pConn = NULL;
SRpcChann *pChann;
if (pServer == NULL) return -1;
pChann = pServer->channList + chann;
if (pServer->idMgmt == TAOS_ID_FREE) {
if (sid == 0) {
if (req) {
int osid = sid;
SRpcConn **ppConn = (SRpcConn **)taosGetStrHashData(pChann->hash, hashstr);
if (ppConn) pConn = *ppConn;
if (pConn == NULL) {
sid = taosAllocateId(pChann->idPool);
if (sid <= 0) {
tError("%s cid:%d, maximum number of sessions:%d is reached", pServer->label, chann, pChann->sessions);
return TSDB_CODE_MAX_SESSIONS;
} else {
tTrace("%s cid:%d sid:%d, ID allocated, used:%d, old id:%d", pServer->label, chann, sid,
taosIdPoolNumOfUsed(pChann->idPool), osid);
}
if (pRpc == NULL) return -1;
if (sid == 0) {
if (req) {
int osid = sid;
SRpcConn **ppConn = (SRpcConn **)taosGetStrHashData(pRpc->hash, hashstr);
if (ppConn) pConn = *ppConn;
if (pConn == NULL) {
sid = taosAllocateId(pRpc->idPool);
if (sid <= 0) {
tError("%s maximum number of sessions:%d is reached", pRpc->label, pRpc->sessions);
return TSDB_CODE_MAX_SESSIONS;
} else {
sid = pConn->sid;
tTrace("%s cid:%d sid:%d id:%s, session is already there", pServer->label, pConn->chann, pConn->sid,
pConn->meterId);
tTrace("%s sid:%d, ID allocated, used:%d, old id:%d", pRpc->label, sid,
taosIdPoolNumOfUsed(pRpc->idPool), osid);
}
} else {
return TSDB_CODE_UNEXPECTED_RESPONSE;
sid = pConn->sid;
tTrace("%s sid:%d id:%s, session is already there", pRpc->label, pConn->sid,
pConn->meterId);
}
} else {
if (pChann->connList[sid].signature == NULL) {
tError("%s cid:%d, sid:%d session is already released", pServer->label, chann, sid);
return TSDB_CODE_INVALID_VALUE;
}
}
}
return TSDB_CODE_UNEXPECTED_RESPONSE;
}
} else {
if (pRpc->connList[sid].signature == NULL) {
tError("%s sid:%d session is already released", pRpc->label, sid);
return TSDB_CODE_INVALID_VALUE;
}
}
pConn = pChann->connList + sid;
if (pChann == NULL || pChann->connList == NULL) {
tTrace("%s cid:%d sid:%d, connlist is null, received:%s", pServer->label, chann, sid, meterId);
return TSDB_CODE_MISMATCHED_METER_ID;
}
pConn = pRpc->connList + sid;
if (pConn->signature == NULL) {
memset(pConn, 0, sizeof(SRpcConn));
pConn->signature = pConn;
memcpy(pConn->meterId, meterId, tListLen(pConn->meterId));
pConn->pServer = pServer;
pConn->chann = chann;
pConn->pRpc = pRpc;
pConn->sid = sid;
pConn->tranId = (uint16_t)(rand() & 0xFFFF);
pConn->ownId = htonl((uint32_t)((pConn->chann << pServer->bits) + pConn->sid));
if (pServer->afp) {
int ret = (*pServer->afp)(meterId, &pConn->spi, &pConn->encrypt, pConn->secret, pConn->ckey);
pConn->ownId = htonl(pConn->sid);
if (pRpc->afp) {
int ret = (*pRpc->afp)(meterId, &pConn->spi, &pConn->encrypt, pConn->secret, pConn->ckey);
if (ret != 0) {
tWarn("%s cid:%d sid:%d id:%s, meterId not there pConn:%p", pServer->label, chann, sid, pConn->meterId, pConn);
taosFreeId(pChann->idPool, sid); // sid shall be released
tWarn("%s pConn:%p, meterId not there", pRpc->label, pConn);
taosFreeId(pRpc->idPool, sid); // sid shall be released
memset(pConn, 0, sizeof(SRpcConn));
return ret;
}
}
if ((pServer->type == TAOS_CONN_UDPC || pServer->type == TAOS_CONN_UDPS) && pServer->numOfThreads > 1 &&
pServer->localPort) {
if ((pRpc->type == TAOS_CONN_UDPC || pRpc->type == TAOS_CONN_UDPS) && pRpc->numOfThreads > 1 &&
pRpc->localPort) {
// UDP server, assign to new connection
pServer->index = (pServer->index + 1) % pServer->numOfThreads;
pConn->localPort = (int16_t)(pServer->localPort + pServer->index);
pRpc->index = (pRpc->index + 1) % pRpc->numOfThreads;
pConn->localPort = (int16_t)(pRpc->localPort + pRpc->index);
}
taosAddStrHash(pChann->hash, hashstr, (char *)&pConn);
tTrace("%s cid:%d sid:%d id:%s, TAOS connection is allocated, localPort:%d pConn:%p", pServer->label, chann, sid,
pConn->meterId, pConn->localPort, pConn);
taosAddStrHash(pRpc->hash, hashstr, (char *)&pConn);
tTrace("%s pConn:%p, TAOS connection is allocated, sid:%d id:%s", pRpc->label, pConn, sid);
} else {
if (memcmp(pConn->meterId, meterId, tListLen(pConn->meterId)) != 0) {
tTrace("%s cid:%d sid:%d id:%s, meterId is not matched, received:%s", pServer->label, chann, sid, pConn->meterId,
meterId);
tTrace("%s pConn:%p, meterId:%s is not matched, received:%s", pRpc->label, pConn, pConn->meterId, meterId);
return TSDB_CODE_MISMATCHED_METER_ID;
}
}
......@@ -652,215 +567,159 @@ int taosGetRpcConn(int chann, int sid, char *meterId, STaosRpc *pServer, SRpcCon
return TSDB_CODE_SUCCESS;
}
void *taosOpenRpcConn(SRpcConnInit *pInit, uint8_t *code) {
SRpcConn *pConn;
STaosRpc *pServer = (STaosRpc *)pInit->shandle;
*code = (uint8_t)(taosGetRpcConn(pInit->cid, pInit->sid, pInit->meterId, pServer, &pConn, 1, NULL));
if (*code == TSDB_CODE_MAX_SESSIONS) *code = TSDB_CODE_MAX_CONNECTIONS;
if (*code != TSDB_CODE_SUCCESS) return NULL;
static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) {
SRpcHeader *pHeader = (SRpcHeader *)msg;
SRpcInfo *pRpc = pConn->pRpc;
int code = 0;
if (pConn->peerId == 0) pConn->peerId = pInit->peerId;
if (pConn->spi == 0 ) return 0;
strcpy(pConn->peerIpstr, pInit->peerIp);
pConn->peerIp = inet_addr(pInit->peerIp);
pConn->peerPort = pInit->peerPort;
pConn->ahandle = pInit->ahandle;
pConn->spi = pInit->spi;
pConn->encrypt = pInit->encrypt;
if (pConn->spi) memcpy(pConn->secret, pInit->secret, TSDB_KEY_LEN);
if (pHeader->spi == pConn->spi) {
// authentication
SRpcDigest *pDigest = (SRpcDigest *)((char *)pHeader + msgLen - sizeof(SRpcDigest));
// if it is client, it shall set up connection first
if (taosOpenConn[pServer->type]) {
pConn->chandle = (*taosOpenConn[pServer->type])(pServer->shandle, pConn, pConn->peerIpstr, pConn->peerPort);
if (pConn->chandle) {
tTrace("%s cid:%d sid:%d id:%s, nw connection is set up, ip:%s:%hu localPort:%d pConn:%p", pServer->label,
pConn->chann, pConn->sid, pInit->meterId, pConn->peerIpstr, pConn->peerPort, pConn->localPort, pConn);
int32_t delta;
delta = (int32_t)htonl(pDigest->timeStamp);
delta -= (int32_t)taosGetTimestampSec();
if (abs(delta) > 900) {
tWarn("%s pConn:%p, time diff:%d is too big, msg discarded, timestamp:%d", pRpc->label, pConn,
delta, htonl(pDigest->timeStamp));
code = TSDB_CODE_INVALID_TIME_STAMP;
} else {
tError("%s cid:%d sid:%d id:%s, failed to set up nw connection to ip:%s:%hu", pServer->label, pConn->chann,
pConn->sid, pInit->meterId, pConn->peerIpstr, pConn->peerPort);
*code = TSDB_CODE_NETWORK_UNAVAIL;
taosCloseRpcConn(pConn);
pConn = NULL;
if (rpcAuthenticateMsg((uint8_t *)pHeader, dataLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret) < 0) {
char ipstr[24];
tinet_ntoa(ipstr, ip);
mLError("id:%s from %s, authentication failed", pHeader->meterId, ipstr);
tError("%s pConn:%p, authentication failed, msg discarded", pRpc->label, pConn);
code = TSDB_CODE_AUTH_FAILURE;
} else {
pHeader->msgLen -= sizeof(SRpcDigest);
}
}
} else {
// if it is request or response with code 0, msg shall be discarded
if (rpcIsReq(pHeader->msgType) || (pHeader->content[0] == 0)) {
tTrace("%s pConn:%p, auth spi not matched, msg discarded", pRpc->label, pConn);
code = TSDB_CODE_AUTH_FAILURE;
}
}
return pConn;
}
void taosCloseRpc(void *param) {
STaosRpc *pServer = (STaosRpc *)param;
(*taosCleanUpConn[pServer->type])(pServer->shandle);
for (int cid = 0; cid < pServer->numOfChanns; ++cid) taosCloseRpcChann(pServer, cid);
tfree(pServer->channList);
tfree(pServer);
return code;
}
int taosSetSecurityInfo(int chann, int sid, char *id, int spi, int encrypt, char *secret, char *ckey) {
/*
SRpcConn *pConn;
pConn = connList[chann*tsSessionsPerChann + sid];
static int rpcProcessReqHeader(SRpcConn *pConn, SRpcHeader *pHeader) {
int code = 0;
if ( pConn == NULL ) {
pConn = (SRpcConn *)sizeof(SRpcConn);
if ( pConn == NULL ) {
tError("failed to allocate memory for taosConn");
return -1;
if (pConn->peerId == 0) {
pConn->peerId = pHeader->sourceId;
} else {
if (pConn->peerId != pHeader->sourceId) {
tTrace("%s pConn:%p, source Id is changed, old:0x%08x new:0x%08x", pRpc->label, pConn,
pConn->peerId, pHeader->sourceId);
return TSDB_CODE_INVALID_VALUE;
}
memset(pConn, 0, sizeof(SRpcConn));
pConn->chann = chann;
pConn->sid = sid;
}
pConn->spi = spi;
pConn->encrypt = encrypt;
memcpy(pConn->secret, pConn->secret, TSDB_KEY_LEN);
memcpy(pConn->cipheringKey, ckey, TSDB_KEY_LEN);
memcpy(pConn->meterId, id, TSDB_TABLE_ID_LEN);
*/
return -1;
}
int taosSendDataToPeer(SRpcConn *pConn, char *data, int dataLen) {
int writtenLen = 0;
STaosRpc * pServer = pConn->pServer;
STaosHeader *pHeader = (STaosHeader *)data;
if (pConn->signature != pConn || pServer == NULL) return -1;
if (pConn->inTranId == pHeader->tranId) {
if (pConn->inType == pHeader->msgType) {
tTrace("%s pConn:%p, %s is retransmitted", pRpc->label, pConn, taosMsg[pHeader->msgType]);
taosSendQuickRsp(pConn, TSDB_CODE_ACTION_IN_PROGRESS);
} else if (pConn->inType == 0) {
tTrace("%s pConn:%p, %s is already processed, tranId:%d", pRpc->label, pConn,
taosMsg[pHeader->msgType], pConn->inTranId);
rpcResendRspToPeer(pConn);
} else {
tTrace("%s pConn:%p, mismatched message %s and tranId", pRpc->label, pConn, taosMsg[pHeader->msgType]);
}
if (pHeader->msgType & 1) {
if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16))
tTrace("%s cid:%d sid:%d id:%s, %s is sent to %s:%hu, len:%d source:0x%08x dest:0x%08x tranId:%d pConn:%p",
pServer->label, pConn->chann, pConn->sid, pConn->meterId, taosMsg[pHeader->msgType], pConn->peerIpstr,
pConn->peerPort, dataLen, pHeader->sourceId, pHeader->destId, pHeader->tranId, pConn);
} else {
if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16))
tTrace(
"%s cid:%d sid:%d id:%s, %s is sent to %s:%hu, code:%u len:%d source:0x%08x dest:0x%08x tranId:%d pConn:%p",
pServer->label, pConn->chann, pConn->sid, pConn->meterId, taosMsg[pHeader->msgType], pConn->peerIpstr,
pConn->peerPort, (uint8_t)pHeader->content[0], dataLen, pHeader->sourceId, pHeader->destId, pHeader->tranId,
pConn);
}
// do not reply any message
return TSDB_CODE_ALREADY_PROCESSED;
}
writtenLen = (*taosSendData[pServer->type])(pConn->peerIp, pConn->peerPort, (char *)pHeader, dataLen, pConn->chandle);
if (pConn->inType != 0) {
tTrace("%s pConn:%p, last session is not finished, inTranId:%d tranId:%d", pRpc->label, pConn,
pConn->inTranId, pHeader->tranId);
return TSDB_CODE_LAST_SESSION_NOT_FINISHED;
}
if (writtenLen != dataLen)
tError("%s cid:%d sid:%d id:%s, dataLen:%d writtenLen:%d, not good, reason:%s", pServer->label, pConn->chann,
pConn->sid, pConn->meterId, dataLen, writtenLen, strerror(errno));
// assert ( writtenLen == dataLen );
tDump(data, dataLen);
pConn->inTranId = pHeader->tranId;
pConn->inType = pHeader->msgType;
return 0;
return 0;
}
void taosProcessResponse(SRpcConn *pConn) {
STaosHeader *pHeader;
char * msg = NULL;
int msgLen = 0;
static int rpcProcessRspHeader(SRpcConn *pConn, SRpcHeader *pHeader) {
pConn->peerId = pHeader->sourceId;
if (pConn == NULL) return;
STaosRpc *pServer = pConn->pServer;
if (pConn->signature != pConn || pServer == NULL) return;
SRpcChann *pChann = pServer->channList + pConn->chann;
pthread_mutex_lock(&pChann->mutex);
pConn->outType = 0;
pConn->rspReceived = 0;
if (pServer->noFree == 0) tfree(pConn->pMsgNode);
pConn->pMsgNode = NULL;
if (pConn->outType == 0 || pConn->pContext == NULL) {
return TSDB_CODE_UNEXPECTED_RESPONSE;
}
if (pConn->pHead) {
SMsgNode *pMsgNode = pConn->pHead;
// assert ( pMsgNode->msgLen >= sizeof(STaosHeader) && pMsgNode->msgLen < RPC_MAX_UDP_SIZE);
if (pMsgNode->msgLen >= sizeof(STaosHeader)) {
pConn->pMsgNode = pMsgNode;
pConn->pHead = pMsgNode->next;
if (pMsgNode->ahandle) pConn->ahandle = pMsgNode->ahandle;
if (pHeader->tranId != pConn->outTranId) {
return TSDB_CODE_INVALID_TRAN_ID;
}
pHeader = (STaosHeader *)((char *)pMsgNode + sizeof(SMsgNode));
pConn->outType = pHeader->msgType;
pConn->outTranId = pHeader->tranId;
if (pHeader->msgType != pConn->outType + 1) {
return TSDB_CODE_INVALID_RESPONSE_TYPE;
}
msg = (char *)pHeader;
msgLen = pMsgNode->msgLen;
if (*pHeader->content == TSDB_CODE_NOT_READY) {
return = TSDB_CODE_ALREADY_PROCESSED;
}
taosTmrStopA(&pConn->pTimer);
pConn->retry = 0;
if (*pHeader->content == TSDB_CODE_ACTION_IN_PROGRESS || pHeader->tcp) {
if (pConn->tretry <= tsRpcMaxRetry) {
tTrace("%s pConn:%p, peer is still processing the transaction", pRpc->label, pConn);
pConn->tretry++;
taosTmrReset(rpcProcessRetryTimer, tsRpcProgressTime, pConn, pRpc->tmrCtrl, &pConn->pTimer);
return TSDB_CODE_ALREADY_PROCESSED;
} else {
tError("%s cid:%d sid:%d id:%s, invalid msgLen:%d pConn:%p", pServer->label, pConn->chann, pConn->sid,
pConn->meterId, pMsgNode->msgLen, pConn);
pConn->pHead = NULL;
// peer still in processing, give up
*pHeader->content = TSDB_CODE_TOO_SLOW;
}
if (pConn->pHead == NULL) pConn->pTail = NULL;
}
if (msg) {
taosSendDataToPeer(pConn, msg, msgLen);
taosTmrReset(taosProcessTaosTimer, tsRpcTimer, pConn, pChann->tmrCtrl, &pConn->pTimer);
}
pthread_mutex_unlock(&pChann->mutex);
pConn->tretry = 0;
pConn->outType = 0;
pConn->pReqMsg = NULL;
pConn->pReqMsgLen = 0;
}
int taosProcessMsgHeader(STaosHeader *pHeader, SRpcConn **ppConn, STaosRpc *pServer, int dataLen, uint32_t ip,
static int rpcProcessHeader(SRpcHeader *pHeader, SRpcConn **ppConn, SRpcInfo *pRpc, int dataLen, uint32_t ip,
uint16_t port, void *chandle) {
int chann, sid, code = 0;
int sid, code = 0;
SRpcConn * pConn = NULL;
SRpcChann *pChann;
int msgLen;
char hashstr[40] = {0};
// int reSend = 0;
*ppConn = NULL;
uint32_t destId = htonl(pHeader->destId);
chann = destId >> pServer->bits;
sid = destId & pServer->mask;
uint32_t sid = htonl(pHeader->destId);
if (pHeader->msgType >= TSDB_MSG_TYPE_MAX || pHeader->msgType <= 0) {
tTrace("%s cid:%d sid:%d, invalid message type:%d", pServer->label, chann, sid, pHeader->msgType);
tTrace("%s sid:%d, invalid message type:%d", pRpc->label, sid, pHeader->msgType);
return TSDB_CODE_INVALID_MSG_TYPE;
}
msgLen = (int32_t)htonl((uint32_t)pHeader->msgLen);
if (dataLen != msgLen) {
tTrace("%s cid:%d sid:%d, %s has invalid length, dataLen:%d, msgLen:%d", pServer->label, chann, sid,
pHeader->msgLen = (int32_t)htonl((uint32_t)pHeader->msgLen);
if (dataLen != pHeader->msgLen) {
tTrace("%s sid:%d, %s has invalid length, dataLen:%d, msgLen:%d", pRpc->label, sid,
taosMsg[pHeader->msgType], dataLen, msgLen);
return TSDB_CODE_INVALID_MSG_LEN;
}
if (chann < 0 || chann >= pServer->numOfChanns) {
tTrace("%s cid:%d sid:%d, chann is out of range, max:%d, %s discarded", pServer->label, chann, sid,
pServer->numOfChanns, taosMsg[pHeader->msgType]);
if (sid < 0 || sid >= pRpc->sessions) {
tTrace("%s sid:%d, sid is out of range, max sid:%d, %s discarded", pRpc->label, sid,
pRpc->sessions, taosMsg[pHeader->msgType]);
return TSDB_CODE_INVALID_SESSION_ID;
}
pChann = pServer->channList + chann;
if (pChann->sessions == 0) {
tTrace("%s cid:%d, chann is not activated yet, %s discarded", pServer->label, chann, taosMsg[pHeader->msgType]);
if (pServer->efp) (*(pServer->efp))(chann);
return TSDB_CODE_NOT_ACTIVE_SESSION;
}
if (sid < 0 || sid >= pChann->sessions) {
tTrace("%s cid:%d sid:%d, sid is out of range, max sid:%d, %s discarded", pServer->label, chann, sid,
pChann->sessions, taosMsg[pHeader->msgType]);
return TSDB_CODE_INVALID_SESSION_ID;
}
// if ( pHeader->tcp ) return TSDB_CODE_ALREADY_PROCESSED;
if (sid == 0) sprintf(hashstr, "%x:%x:%x", ip, pHeader->uid, pHeader->sourceId);
pthread_mutex_lock(&pChann->mutex);
code = taosGetRpcConn(chann, sid, pHeader->meterId, pServer, &pConn, pHeader->msgType & 1, hashstr);
if (code != TSDB_CODE_SUCCESS) goto _exit;
code = rpcGetConn(sid, pHeader->meterId, pRpc, &pConn, rpcIsReq(pHeader->msgType), hashstr);
if (code != TSDB_CODE_SUCCESS) return code;
*ppConn = pConn;
sid = pConn->sid;
......@@ -873,175 +732,44 @@ int taosProcessMsgHeader(STaosHeader *pHeader, SRpcConn **ppConn, STaosRpc *pSer
}
if (pHeader->uid) pConn->peerUid = pHeader->uid;
if (port) pConn->peerPort = port;
if (pHeader->port) // port maybe changed by the peer
pConn->peerPort = pHeader->port;
if (chandle) pConn->chandle = chandle;
if (pHeader->tcp) {
tTrace("%s cid:%d sid:%d id:%s, content will be transfered via TCP pConn:%p", pServer->label, chann, sid,
pConn->meterId, pConn);
if (pConn->outType) taosTmrReset(taosProcessTaosTimer, tsRpcTimer, pConn, pChann->tmrCtrl, &pConn->pTimer);
code = TSDB_CODE_ALREADY_PROCESSED;
goto _exit;
tTrace("%s pConn:%p, content will be transfered via TCP", pRpc->label, pConn);
if (pConn->outType) taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer);
return TSDB_CODE_ALREADY_PROCESSED;
}
if (pConn->spi != 0) {
if (pHeader->spi == pConn->spi) {
// authentication
STaosDigest *pDigest = (STaosDigest *)((char *)pHeader + dataLen - sizeof(STaosDigest));
int32_t delta;
delta = (int32_t)htonl(pDigest->timeStamp);
delta -= (int32_t)taosGetTimestampSec();
if (abs(delta) > 900) {
tWarn("%s cid:%d sid:%d id:%s, time diff:%d is too big, msg discarded pConn:%p, timestamp:%d", pServer->label,
chann, sid, pConn->meterId, delta, pConn, htonl(pDigest->timeStamp));
// the requirement of goldwind, should not return error in this case
code = TSDB_CODE_INVALID_TIME_STAMP;
goto _exit;
}
if (taosAuthenticateMsg((uint8_t *)pHeader, dataLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret) < 0) {
char ipstr[24];
tinet_ntoa(ipstr, ip);
mLError("user:%s login from %s, authentication failed", pHeader->meterId, ipstr);
tError("%s cid:%d sid:%d id:%s, authentication failed, msg discarded pConn:%p", pServer->label, chann, sid,
pConn->meterId, pConn);
code = TSDB_CODE_AUTH_FAILURE;
goto _exit;
}
} else {
// if it is request or response with code 0, msg shall be discarded
if ((pHeader->msgType & 1) || (pHeader->content[0] == 0)) {
tTrace("%s cid:%d sid:%d id:%s, auth spi not matched, msg discarded pConn:%p", pServer->label, chann, sid,
pConn->meterId, pConn);
code = TSDB_CODE_AUTH_FAILURE;
goto _exit;
}
}
}
code = rpcCheckAuthentication(pConn, (char *)pHeader, dataLen);
if ( code != 0 ) return code;
if (pHeader->msgType != TSDB_MSG_TYPE_REG && pHeader->encrypt) {
// decrypt here
}
pHeader->destId = pConn->ownId; // destId maybe 0, it shall be changed
if (pHeader->msgType & 1) {
if (pConn->peerId == 0) {
pConn->peerId = pHeader->sourceId;
} else {
if (pConn->peerId != pHeader->sourceId) {
tTrace("%s cid:%d sid:%d id:%s, source Id is changed, old:0x%08x new:0x%08x pConn:%p", pServer->label, chann,
sid, pConn->meterId, pConn->peerId, pHeader->sourceId, pConn);
code = TSDB_CODE_INVALID_VALUE;
goto _exit;
}
}
if (pConn->inTranId == pHeader->tranId) {
if (pConn->inType == pHeader->msgType) {
tTrace("%s cid:%d sid:%d id:%s, %s is retransmitted, pConn:%p", pServer->label, chann, sid, pConn->meterId,
taosMsg[pHeader->msgType], pConn);
taosSendQuickRsp(pConn, (char)(pHeader->msgType + 1), TSDB_CODE_ACTION_IN_PROGRESS);
} else if (pConn->inType == 0) {
tTrace("%s cid:%d sid:%d id:%s, %s is already processed, tranId:%d pConn:%p", pServer->label, chann, sid,
pConn->meterId, taosMsg[pHeader->msgType], pConn->inTranId, pConn);
taosReSendRspToPeer(pConn);
} else {
tTrace("%s cid:%d sid:%d id:%s, mismatched message %s and tranId pConn:%p", pServer->label, chann, sid,
pConn->meterId, taosMsg[pHeader->msgType], pConn);
}
// do not reply any message
code = TSDB_CODE_ALREADY_PROCESSED;
goto _exit;
}
if (pConn->inType != 0) {
tTrace("%s cid:%d sid:%d id:%s, last session is not finished, inTranId:%d tranId:%d pConn:%p", pServer->label,
chann, sid, pConn->meterId, pConn->inTranId, pHeader->tranId, pConn);
code = TSDB_CODE_LAST_SESSION_NOT_FINISHED;
goto _exit;
}
pConn->inTranId = pHeader->tranId;
pConn->inType = pHeader->msgType;
if (sid == 0) // send a response first
taosSendQuickRsp(pConn, (char)(pConn->inType + 1), TSDB_CODE_ACTION_IN_PROGRESS);
if ( rpcIsReq(pHeader->msgType) ) {
code = rpcProcessReqHeader(pConn, pHeader);
} else {
// response from taos
pConn->peerId = pHeader->sourceId;
if (pConn->outType == 0) {
code = TSDB_CODE_UNEXPECTED_RESPONSE;
goto _exit;
}
if (pHeader->tranId != pConn->outTranId) {
code = TSDB_CODE_INVALID_TRAN_ID;
goto _exit;
}
if (pHeader->msgType != pConn->outType + 1) {
code = TSDB_CODE_INVALID_RESPONSE_TYPE;
goto _exit;
}
if (*pHeader->content == TSDB_CODE_NOT_READY) {
code = TSDB_CODE_ALREADY_PROCESSED;
goto _exit;
}
taosTmrStopA(&pConn->pTimer);
pConn->retry = 0;
if (*pHeader->content == TSDB_CODE_ACTION_IN_PROGRESS || pHeader->tcp) {
if (pConn->tretry <= tsRpcMaxRetry) {
tTrace("%s cid:%d sid:%d id:%s, peer is still processing the transaction, pConn:%p", pServer->label, chann, sid,
pHeader->meterId, pConn);
pConn->tretry++;
taosTmrReset(taosProcessTaosTimer, tsRpcProgressTime, pConn, pChann->tmrCtrl, &pConn->pTimer);
code = TSDB_CODE_ALREADY_PROCESSED;
goto _exit;
} else {
// peer still in processing, give up
*pHeader->content = TSDB_CODE_TOO_SLOW;
}
}
pConn->tretry = 0;
if (pConn->rspReceived) {
code = TSDB_CODE_UNEXPECTED_RESPONSE;
goto _exit;
} else {
pConn->rspReceived = 1;
}
code = rpcProcessRspHeader(pConn, pHeader);
}
_exit:
pthread_mutex_unlock(&pChann->mutex);
// if (reSend) taosReSendRspToPeer(pConn);
return code;
}
int taosBuildErrorMsgToPeer(char *pMsg, int code, char *pReply) {
STaosHeader *pRecvHeader, *pReplyHeader;
char * pContent;
void rpcSendErrorMsgToPeer(char *pMsg, int32 code, uint_32 ip, uint_16 port, void *chandle) {
SRpcHeader *pRecvHeader, *pReplyHeader;
char msg[sizeof(SRpcHeader) + sizeof(SRpcDigest) + sizeof(STaosRsp)];
STaosRsp *pRsp;
uint32_t timeStamp;
int msgLen;
pRecvHeader = (STaosHeader *)pMsg;
pReplyHeader = (STaosHeader *)pReply;
pRecvHeader = (SRpcHeader *)pMsg;
pReplyHeader = (SRpcHeader *)msg;
memset(msg, 0, sizeof(SRpcHeader));
pReplyHeader->version = pRecvHeader->version;
pReplyHeader->msgType = (char)(pRecvHeader->msgType + 1);
pReplyHeader->tcp = 0;
......@@ -1052,424 +780,372 @@ int taosBuildErrorMsgToPeer(char *pMsg, int code, char *pReply) {
pReplyHeader->destId = pRecvHeader->sourceId;
memcpy(pReplyHeader->meterId, pRecvHeader->meterId, tListLen(pReplyHeader->meterId));
pContent = (char *)pReplyHeader->content;
*pContent = (char)code;
pContent++;
pRsp = (STaosRsp *)pReplyHeader->content;
pRsp->code = htonl(code);
msgLen = sizeof(STaosRsp);
char *pContent = pRsp->more;
if (code == TSDB_CODE_INVALID_TIME_STAMP) {
// include a time stamp if client's time is not synchronized well
timeStamp = taosGetTimestampSec();
memcpy(pContent, &timeStamp, sizeof(timeStamp));
pContent += sizeof(timeStamp);
msgLen += sizeof(timeStamp);
}
msgLen = (int)(pContent - pReply);
pReplyHeader->msgLen = (int32_t)htonl((uint32_t)msgLen);
(*taosSendData[pRpc->type])(ip, port, pReply, msgLen, chandle);
return msgLen;
}
void taosReportDisconnection(SRpcChann *pChann, SRpcConn *pConn)
{
SSchedMsg schedMsg;
schedMsg.fp = taosProcessSchedMsg;
schedMsg.msg = NULL;
schedMsg.ahandle = pConn->ahandle;
schedMsg.thandle = pConn;
taosScheduleTask(pChann->qhandle, &schedMsg);
return;
}
void taosProcessIdleTimer(void *param, void *tmrId) {
void rpcProcessIdleTimer(void *param, void *tmrId) {
SRpcConn *pConn = (SRpcConn *)param;
if (pConn->signature != param) {
tError("idle timer pConn Signature:0x%x, pConn:0x%x not matched", pConn->signature, param);
return;
}
STaosRpc * pServer = pConn->pServer;
SRpcChann *pChann = pServer->channList + pConn->chann;
SRpcInfo * pRpc = pConn->pRpc;
if (pConn->pIdleTimer != tmrId) {
tTrace("%s cid:%d sid:%d id:%s, idle timer:%p already processed pConn:%p", pServer->label, pConn->chann, pConn->sid,
pConn->meterId, tmrId, pConn);
tTrace("%s pConn:%p, idle timer:%p already processed", pRpc->label, pConn, tmrId);
return;
}
int reportDisc = 0;
pthread_mutex_lock(&pChann->mutex);
tTrace("%s cid:%d sid:%d id:%s, close the connection since no activity pConn:%p", pServer->label, pConn->chann,
pConn->sid, pConn->meterId, pConn);
if (pConn->rspReceived == 0) {
pConn->rspReceived = 1;
reportDisc = 1;
}
pthread_mutex_unlock(&pChann->mutex);
if (reportDisc) taosReportDisconnection(pChann, pConn);
tTrace("%s pConn:%p, close the connection since no activity", pRpc->label, pConn);
rpcCloseConn(pConn);
}
void *taosProcessDataFromPeer(char *data, int dataLen, uint32_t ip, uint16_t port, void *shandle, void *thandle,
void *rpcProcessDataFromPeer(char *data, int dataLen, uint32_t ip, uint16_t port, void *shandle, void *thandle,
void *chandle) {
STaosHeader *pHeader;
SRpcHeader *pHeader = (SRpcHeader *)data;
uint8_t code;
SRpcConn * pConn = (SRpcConn *)thandle;
STaosRpc * pServer = (STaosRpc *)shandle;
SRpcConn *pConn = (SRpcConn *)thandle;
SRpcInfo *pRpc = (SRpcInfo *)shandle;
int msgLen;
char pReply[128];
SSchedMsg schedMsg;
int chann, sid;
SRpcChann * pChann = NULL;
tDump(data, dataLen);
if (ip == 0 && taosCloseConn[pServer->type]) {
// it means the connection is broken
if (pConn) {
pChann = pServer->channList + pConn->chann;
tTrace("%s cid:%d sid:%d id:%s, underlying link is gone pConn:%p", pServer->label, pConn->chann, pConn->sid,
pConn->meterId, pConn);
pConn->rspReceived = 1;
pConn->chandle = NULL;
taosReportDisconnection(pChann, pConn);
}
tfree(data);
if (ip == 0 && taosCloseConn[pRpc->type] && pConn) {
// it means the connection is broken, it only happens for TCP
tTrace("%s pConn:%p, underlying link is gone%p", pRpc->label, pConn);
pContext->terrno = TSDB_CODE_NETWORK_UNAVAIL;
taosTmrStart(taosProcessConnError, 0, pContext, pRpc->tmrCtrl);
return NULL;
}
pHeader = (STaosHeader *)data;
msgLen = (int32_t)htonl((uint32_t)pHeader->msgLen);
code = (uint8_t)taosProcessMsgHeader(pHeader, &pConn, pServer, dataLen, ip, port, chandle);
pHeader->destId = htonl(pHeader->destId);
chann = pHeader->destId >> pServer->bits;
sid = pHeader->destId & pServer->mask;
pthread_mutex_lock(&pRpc->mutex);
code = rpcProcessHeader(pHeader, &pConn, pRpc, dataLen, ip, port, chandle);
pthread_mutex_unlock(&pRpc->mutex);
if (pConn && pServer->idleTime) {
SRpcChann *pChann = pServer->channList + pConn->chann;
taosTmrReset(taosProcessIdleTimer, pServer->idleTime, pConn, pChann->tmrCtrl, &pConn->pIdleTimer);
if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) {
tTrace("%s pConn:%p, %s received from 0x%x:%hu, parse code:%u len:%d source:0x%08x dest:0x%08x tranId:%d",
pRpc->label, pConn, taosMsg[pHeader->msgType], ip, port, code,
dataLen, pHeader->sourceId, pHeader->destId, pHeader->tranId);
}
if (code == TSDB_CODE_ALREADY_PROCESSED) {
tTrace("%s cid:%d sid:%d id:%s, %s wont be processed, source:0x%08x dest:0x%08x tranId:%d pConn:%p", pServer->label,
chann, sid, pHeader->meterId, taosMsg[pHeader->msgType], pHeader->sourceId, htonl(pHeader->destId),
pHeader->tranId, pConn);
free(data);
return pConn;
if (pConn && pRpc->idleTime) {
taosTmrReset(rpcProcessIdleTimer, pRpc->idleTime, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer);
}
if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) {
tTrace(
"%s cid:%d sid:%d id:%s, %s received from 0x%x:%hu, parse code:%u, first:%u len:%d source:0x%08x dest:0x%08x "
"tranId:%d pConn:%p",
pServer->label, chann, sid, pHeader->meterId, taosMsg[pHeader->msgType], ip, port, code, pHeader->content[0],
dataLen, pHeader->sourceId, htonl(pHeader->destId), pHeader->tranId, pConn);
if (code != TSDB_CODE_ALREADY_PROCESSED) {
if (code != 0) { // parsing error
if ( rpcIsReq(pHeader->msgType) ) {
taosSendErrorMsgToPeer(data, code, ip, port, chandle);
tTrace("%s pConn:%p, %s is sent with error code:%u", pRpc->label, pConn, taosMsg[pHeader->msgType+1], code);
}
} else { // parsing OK
rpcProcessIncomingMsg(pConn, pHeader);
}
}
if (code != 0) {
// parsing error
if ( code != 0 ) free (data);
return pConn;
}
if (pHeader->msgType & 1U) {
memset(pReply, 0, sizeof(pReply));
msgLen = taosBuildErrorMsgToPeer(data, code, pReply);
(*taosSendData[pServer->type])(ip, port, pReply, msgLen, chandle);
tTrace("%s cid:%d sid:%d id:%s, %s is sent with error code:%u pConn:%p", pServer->label, chann, sid,
pHeader->meterId, taosMsg[pHeader->msgType + 1], code, pConn);
} else {
tTrace("%s cid:%d sid:%d id:%s, %s is received, parsing error:%u pConn:%p", pServer->label, chann, sid,
pHeader->meterId, taosMsg[pHeader->msgType], code, pConn);
}
void taosProcessIncomingMsg(SRpcConn *pConn, SRpcHeader *pHeader) {
SRpcInfo *pRpc = pConn->pRpc;
int msgLen = rpcContLenFromHeader(pHeader->msgLen);
free(data);
pHeader = rpcDecompressRpcMsg(pHeader);
if ( rpcIsReq(msgType) ) {
(*(pRpc->fp))(pHeader->msgType, pHeader->content, msgLen, pConn);
} else {
// parsing OK
// it's a response
STaosRsp *pRsp = (STaosRsp *)msg;
int32_t code = htonl(pRsp->code);
// internal communication is based on TAOS protocol, a trick here to make it efficient
if (pHeader->spi) msgLen -= sizeof(STaosDigest);
msgLen -= (int)sizeof(STaosHeader);
pHeader->msgLen = msgLen + (int)sizeof(SIntMsg);
SRpcReqContext *pContext = pConn->pContext;
pConn->pContext = NULL;
if ((pHeader->msgType & 1U) == 0 && (pHeader->content[0] == TSDB_CODE_INVALID_VALUE)) {
schedMsg.msg = NULL; // connection shall be closed
} else {
pHeader = taosDecompressRpcMsg(pHeader, &schedMsg, msgLen);
}
taosAddConnToIntoCache(pRpc->pCache, pConn, pConn->peerIp, pConn->peerPort, pConn->meterId);
if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16U)) {
tTrace("%s cid:%d sid:%d id:%s, %s is put into queue, msgLen:%d pConn:%p pTimer:%p", pServer->label, chann, sid,
pHeader->meterId, taosMsg[pHeader->msgType], pHeader->msgLen, pConn, pConn->pTimer);
if (code == TSDB_CODE_NOT_MASTER) {
pContext->terrno = code;
taosTmrStart(taosProcessConnError, 0, pContext, pRpc->tmrCtrl);
} else {
rpcFreeMsg(rpcGetMsgFromCont(pContext->cont)); // free the request msg
(*(pRpc->fp))(pHeader->msgType, pHeader->content, msgLen, pContext->ahandle);
}
pChann = pServer->channList + pConn->chann;
schedMsg.fp = taosProcessSchedMsg;
schedMsg.ahandle = pConn->ahandle;
schedMsg.thandle = pConn;
taosScheduleTask(pChann->qhandle, &schedMsg);
}
return pConn;
}
int taosSendMsgToPeerH(void *thandle, char *pCont, int contLen, void *ahandle) {
STaosHeader *pHeader;
SMsgNode * pMsgNode;
char * msg;
int msgLen = 0;
SRpcConn * pConn = (SRpcConn *)thandle;
STaosRpc * pServer;
SRpcChann * pChann;
uint8_t msgType;
if (pConn == NULL) return -1;
if (pConn->signature != pConn) return -1;
pServer = pConn->pServer;
pChann = pServer->channList + pConn->chann;
pHeader = (STaosHeader *)(pCont - sizeof(STaosHeader));
pHeader->destIp = pConn->peerIp;
msg = (char *)pHeader;
SRpcConn *rpcGetConnToServer(void *shandle, SRpcIpSet ipSet) {
SRpcInfo *pRpc = (SRpcInfo *)shandle;
SRpcConn *pConn = taosGetConnFromCache(pRpc->pCache, ipSet.ip[index], pRpc->peerPort, pRpc->meterId);
if ( pConn == NULL ) {
SRpcConnInit connInit;
memset(&connInit, 0, sizeof(connInit));
connInit.sid = 0;
connInit.spi = pRpc->spi;
connInit.encrypt = pRpc->encrypt;
connInit.meterId = pRpc->user;
connInit.peerId = 0;
connInit.shandle = pRpc;
connInit.peerIp = ipstr;
connInit.peerPort = pRpc->peerPort;
pConn = rpcOpenConn(&connInit);
}
if ((pHeader->msgType & 1U) == 0 && pConn->localPort) pHeader->port = pConn->localPort;
contLen = taosCompressRpcMsg(pCont, contLen);
return pConn;
}
msgLen = contLen + (int32_t)sizeof(STaosHeader);
int taosAddAuthPart(SRpcConn *pConn, char *msg, int msgLen) {
SRpcHeader *pHeader = (SRpcHeader *)msg;
if (pConn->spi) {
// add auth part
pHeader->spi = pConn->spi;
STaosDigest *pDigest = (STaosDigest *)(pCont + contLen);
SRpcDigest *pDigest = (SRpcDigest *)(msg + msgLen);
pDigest->timeStamp = htonl(taosGetTimestampSec());
msgLen += sizeof(STaosDigest);
msgLen += sizeof(SRpcDigest);
pHeader->msgLen = (int32_t)htonl((uint32_t)msgLen);
taosBuildAuthHeader((uint8_t *)pHeader, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret);
rpcBuildAuthHeader((uint8_t *)pHeader, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret);
} else {
pHeader->msgLen = (int32_t)htonl((uint32_t)msgLen);
}
pthread_mutex_lock(&pChann->mutex);
msgType = pHeader->msgType;
return msgLen;
}
if ((msgType & 1U) == 0) {
// response
pConn->inType = 0;
tfree(pConn->pRspMsg);
pConn->pRspMsg = msg;
pConn->rspMsgLen = msgLen;
int rpcSendDataToPeer(SRpcConn *pConn, char *data, int dataLen) {
int writtenLen = 0;
SRpcInfo *pRpc = pConn->pRpc;
SRpcHeader *pHeader = (SRpcHeader *)data;
int code = 0;
if (pHeader->content[0] == TSDB_CODE_ACTION_IN_PROGRESS) pConn->inTranId--;
dataLen = taosAddAuthPart(pConn, data, dataLen);
if ( rpcIsReq(pHeader->msgType)) {
if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16))
tTrace("%s pConn:%p, %s is sent to %s:%hu, len:%d source:0x%08x dest:0x%08x tranId:%d",
pRpc->label, pConn, taosMsg[pHeader->msgType], pConn->peerIpstr,
pConn->peerPort, dataLen, pHeader->sourceId, pHeader->destId, pHeader->tranId);
} else {
// request
pMsgNode = (SMsgNode *)(pCont - sizeof(STaosHeader) - sizeof(SMsgNode));
pMsgNode->msgLen = msgLen;
pMsgNode->next = NULL;
pMsgNode->ahandle = ahandle;
if (pConn->outType) {
if (pConn->pTail) {
pConn->pTail->next = pMsgNode;
pConn->pTail = pMsgNode;
} else {
pConn->pTail = pMsgNode;
pConn->pHead = pMsgNode;
}
tTrace("%s cid:%d sid:%d id:%s, msg:%s is put into queue pConn:%p", pServer->label, pConn->chann, pConn->sid,
pConn->meterId, taosMsg[msgType], pConn);
msgLen = 0;
if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16))
tTrace( "%s pConn:%p, %s is sent to %s:%hu, code:%u len:%d source:0x%08x dest:0x%08x tranId:%d",
pRpc->label, pConn, taosMsg[pHeader->msgType], pConn->peerIpstr, pConn->peerPort,
(uint8_t)pHeader->content[0], dataLen, pHeader->sourceId, pHeader->destId, pHeader->tranId);
}
} else {
assert(pConn->pMsgNode == NULL);
if (pConn->pMsgNode) {
tError("%s cid:%d sid:%d id:%s, bug, there shall be no pengding req pConn:%p", pServer->label, pConn->chann,
pConn->sid, pConn->meterId, pConn);
}
writtenLen = (*taosSendData[pRpc->type])(pConn->peerIp, pConn->peerPort, (char *)pHeader, dataLen, pConn->chandle);
pConn->outType = msgType;
pConn->outTranId = pHeader->tranId;
pConn->pMsgNode = pMsgNode;
pConn->rspReceived = 0;
if (pMsgNode->ahandle) pConn->ahandle = pMsgNode->ahandle;
}
if (writtenLen != dataLen) {
tError("%s pConn:%p, failed to send, dataLen:%d writtenLen:%d, reason:%s", pRpc->label, pConn,
dataLen, writtenLen, strerror(errno));
code = -1;
}
tDump(data, dataLen);
if (msgLen) {
taosSendDataToPeer(pConn, (char *)pHeader, msgLen);
if (msgType & 1U) {
taosTmrReset(taosProcessTaosTimer, tsRpcTimer, pConn, pChann->tmrCtrl, &pConn->pTimer);
}
return code;
}
void rpcSendReqToOneServer(SRpcConn *pConn, SRpcReqContext *pContext) {
char *pHeader = rpcHeaderFromCont(pContext->pCont);
SRpcHeader *msg = (char *)pHeader;
int msgLen = rpcGetMsgLen(pContext->contLen);
char msgType = pContext->msgType;
// set the message header
pHeader->version = 1;
pHeader->msgType = msgType;
pHeader->tcp = 0;
pHeader->encrypt = 0;
pConn->tranId++;
if ( pConn->tranId == 0 ) pConn->tranId++;
pHeader->tranId = pConn->tranId;
pHeader->sourceId = pConn->ownId;
pHeader->destId = pConn->peerId;
pHeader->port = 0;
pHeader->uid = (uint32_t)((int64_t)pConn + (int64_t)getpid());
memcpy(pHeader->meterId, pConn->meterId, tListLen(pHeader->meterId));
// set the connection parameters
pConn->outType = msgType;
pConn->outTranId = pHeader->tranId;
pConn->pMsgNode = pMsgNode;
pConn->pReqMsg = msg;
pConn->reqMsgLen = msgLen;
pConn->context = pContext;
if ( rpcSendDataToPeer(pConn, msg, msgLen) < 0 ) {
taosReportError(pConn->pContext, terrno);
} else {
taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer);
}
}
void rpcSendRequest(void *shandle, SRpcIpSet ipSet, char type, char *pCont, int contLen, void *ahandle) {
SRpcConn *pConn;
SRpcReqContext *pContext;
contLen = rpcCompressRpcMsg(pCont, contLen);
pContext = (SRpcReqContext *) (pCont-sizeof(SRpcHeader)-sizeof(SRpcReqContext));
pContext->ahandle = ahandle;
pContext->pRpc = (SRpcInfo *)shandle;
pContext->ipSet = ipSet;
pContext->contLen = contLen
pContext->pCont = pCont;
pContext->type = type;
pConn = rpcGetConnToServer(shandle, ipSet);
pContext->terrno = terrno;
if (pConn == NULL) taosTmrStart(taosProcessConnError, 0, pContext, pRpc->tmrCtrl);
rpcSendReqToOneServer(pConn, pContext);
return;
}
void rpcSendResponse(SRpcConn *pConn, char *pCont, int contLen) {
int msgLen = 0;
SRpcConn *pConn;
SRpcHeader *pHeader = rpcHeaderFromCont(pCont);
char *msg = (char *)pHeader;
contLen = rpcCompressRpcMsg(pCont, contLen);
msgLen = rpcMsgLenFromCont(contLen);
pthread_mutex_unlock(&pChann->mutex);
pthread_mutex_lock(&pRpc->mutex);
return contLen;
// set msg header
pHeader->version = 1;
pHeader->msgType = pConn->inType+1;
pHeader->spi = 0;
pHeader->tcp = 0;
pHeader->encrypt = 0;
pHeader->tranId = pConn->inTranId;
pHeader->sourceId = pConn->ownId;
pHeader->destId = pConn->peerId;
pHeader->uid = 0;
memcpy(pHeader->meterId, pConn->meterId, tListLen(pHeader->meterId));
// set pConn parameters
pConn->inType = 0;
rpcFreeMsg(pConn->pRspMsg);
pConn->pRspMsg = msg;
pConn->rspMsgLen = msgLen;
if (pHeader->content[0] == TSDB_CODE_ACTION_IN_PROGRESS) pConn->inTranId--;
pthread_mutex_lock(&pRpc->mutex);
rpcSendDataToPeer(pConn, msg, msgLen);
return;
}
int taosReSendRspToPeer(SRpcConn *pConn) {
STaosHeader *pHeader;
int writtenLen;
STaosRpc * pServer = pConn->pServer;
static void rpcResendRspToPeer(SRpcConn *pConn) {
if (pConn->pRspMsg == NULL || pConn->rspMsgLen <= 0) {
tError("%s cid:%d sid:%d id:%s, rsp is null", pServer->label, pConn->chann, pConn->sid, pConn->meterId);
return -1;
if (pConn->pRspMsg == NULL || pConn->rspMsgLen <= 0 || pConn->rspMsgLen <= sizeof(SRpcHeader)) {
tError("%s pConn:%p, rsp is null", pRpc->label);
return;
}
pHeader = (STaosHeader *)pConn->pRspMsg;
if (pHeader->msgLen <= sizeof(SIntMsg) + 1 || pHeader->msgType <= 0) {
tError("%s cid:%d sid:%d id:%s, rsp is null, rspLen:%d, msgType:%d", pServer->label, pConn->chann, pConn->sid,
pConn->meterId, pHeader->msgLen, pHeader->msgType);
return -1;
SRpcHeader *pHeader = (SRpcHeader *)pConn->pRspMsg;
if (pHeader->msgType <= 0) {
tError("%s pConn:%p, msgType is messed up, rspLen:%d, msgType:%d", pRpc->label, pConn, pHeader->msgLen, pHeader->msgType);
return;
}
writtenLen =
(*taosSendData[pServer->type])(pConn->peerIp, pConn->peerPort, pConn->pRspMsg, pConn->rspMsgLen, pConn->chandle);
rpcSendDataToPeer(pConn, pConn->pRspMsg, pConn->rspMsgLen);
}
static void rpcProcessConnError(void *param, void *id) {
SRpcReqContext *pContext = (SRpcContext *)param;
if (writtenLen != pConn->rspMsgLen) {
tError("%s cid:%d sid:%d id:%s, failed to re-send %s, reason:%s pConn:%p", pServer->label, pConn->chann, pConn->sid,
pConn->meterId, taosMsg[(int)pHeader->msgType], strerror(errno), pConn);
if ( pContext->numOfRetry >= pContext->ipSet.numOfIps ) {
char *rsp = calloc(1, RPC_MSG_OVERHEAD + sizeof(STaosRsp));
if ( rsp ) {
STaosRsp *pRsp = (rsp+sizeof(SRpcHeader));
pRsp->code = pContext->terrno;
(*(pRpc->fp))(pContext->msgType+1, pRsp, sizeof(STaosRsp), pContext->ahandle);
} else {
tError("%s failed to malloc RSP", pRpc->label);
}
} else {
tTrace("%s cid:%d sid:%d id:%s, msg:%s is re-sent to %s:%hu, len:%d pConn:%p", pServer->label, pConn->chann,
pConn->sid, pConn->meterId, taosMsg[(int)pHeader->msgType], pConn->peerIpstr, pConn->peerPort,
pConn->rspMsgLen, pConn);
}
// move to next IP
pContext->ipSet.index++;
pContext->ipSet.index = pContext->ipSet.index % pContext->ipSet.numOfIps;
pConn = rpcGetConnToServer(pContext->pRpc, pContext->ipSet);
pContext->terrno = terrno;
if (pConn == NULL) taosTmrStart(taosProcessConnError, 0, pContext, pRpc->tmrCtrl);
return 0;
taosSendReqToOneServer(pConn, pContext);
}
}
void taosProcessTaosTimer(void *param, void *tmrId) {
STaosHeader *pHeader = NULL;
SRpcConn * pConn = (SRpcConn *)param;
int msgLen;
int reportDisc = 0;
static void rpcProcessRetryTimer(void *param, void *tmrId) {
SRpcConn *pConn = (SRpcConn *)param;
int reportDisc = 0;
if (pConn->signature != param) {
tError("pConn Signature:0x%x, pConn:0x%x not matched", pConn->signature, param);
return;
}
STaosRpc * pServer = pConn->pServer;
SRpcChann *pChann = pServer->channList + pConn->chann;
SRpcInfo *pRpc = pConn->pRpc;
if (pConn->pTimer != tmrId) {
tTrace("%s cid:%d sid:%d id:%s, timer:%p already processed pConn:%p", pServer->label, pConn->chann, pConn->sid,
pConn->meterId, tmrId, pConn);
tTrace("%s pConn:%p, timer:%p already processed%", pRpc->label, pConn);
return;
}
pthread_mutex_lock(&pChann->mutex);
pthread_mutex_lock(&pRpc->mutex);
if (pConn->rspReceived) {
tTrace("%s cid:%d sid:%d id:%s, rsp just received, pConn:%p", pServer->label, pConn->chann, pConn->sid,
pConn->meterId, pConn);
} else if (pConn->outType == 0) {
tTrace("%s cid:%d sid:%d id:%s, outtype is zero, pConn:%p", pServer->label, pConn->chann, pConn->sid,
pConn->meterId, pConn);
if (pConn->outType == 0) {
tTrace("%s pConn:%p, outtype is zero", pRpc->label, pConn);
} else {
tTrace("%s cid:%d sid:%d id:%s, expected %s is not received, pConn:%p", pServer->label, pConn->chann, pConn->sid,
pConn->meterId, taosMsg[(int)pConn->outType + 1], pConn);
tTrace("%s pConn:%p, expected %s is not received", pRpc->label, pConn, taosMsg[(int)pConn->outType + 1]);
pConn->pTimer = NULL;
pConn->retry++;
if (pConn->retry < 4) {
tTrace("%s cid:%d sid:%d id:%s, re-send msg:%s to %s:%hu pConn:%p", pServer->label, pConn->chann, pConn->sid,
pConn->meterId, taosMsg[pConn->outType], pConn->peerIpstr, pConn->peerPort, pConn);
if (pConn->pMsgNode && pConn->pMsgNode->msgLen > 0) {
pHeader = (STaosHeader *)((char *)pConn->pMsgNode + sizeof(SMsgNode));
pHeader->destId = pConn->peerId;
msgLen = pConn->pMsgNode->msgLen;
if (pConn->spi) {
STaosDigest *pDigest = (STaosDigest *)(((char *)pHeader) + pConn->pMsgNode->msgLen - sizeof(STaosDigest));
pDigest->timeStamp = htonl(taosGetTimestampSec());
taosBuildAuthHeader((uint8_t *)pHeader, pConn->pMsgNode->msgLen - TSDB_AUTH_LEN, pDigest->auth,
pConn->secret);
}
tTrace("%s pConn:%p, re-send msg:%s to %s:%hu", pRpc->label,
taosMsg[pConn->outType], pConn->peerIpstr, pConn->peerPort);
if (pConn->pReqMsg && pConn->pReqMsgLen > 0) {
rpcSendDataToPeer(pConn, pReqMsg, pReqMsgLen);
}
} else {
// close the connection
tTrace("%s cid:%d sid:%d id:%s, failed to send msg:%s to %s:%hu pConn:%p", pServer->label, pConn->chann,
pConn->sid, pConn->meterId, taosMsg[pConn->outType], pConn->peerIpstr, pConn->peerPort, pConn);
if (pConn->rspReceived == 0) {
pConn->rspReceived = 1;
reportDisc = 1;
}
tTrace("%s pConn:%p, failed to send msg:%s to %s:%hu", pRpc->label, pConn,
taosMsg[pConn->outType], pConn->peerIpstr, pConn->peerPort, pConn);
reportDisc = 1;
}
}
if (pHeader) {
(*taosSendData[pServer->type])(pConn->peerIp, pConn->peerPort, (char *)pHeader, msgLen, pConn->chandle);
taosTmrReset(taosProcessTaosTimer, tsRpcTimer<<pConn->retry, pConn, pChann->tmrCtrl, &pConn->pTimer);
}
pthread_mutex_unlock(&pChann->mutex);
if (reportDisc) taosReportDisconnection(pChann, pConn);
}
void taosGetRpcConnInfo(void *thandle, uint32_t *peerId, uint32_t *peerIp, uint16_t *peerPort, int *cid, int *sid) {
SRpcConn *pConn = (SRpcConn *)thandle;
*peerId = pConn->peerId;
*peerIp = pConn->peerIp;
*peerPort = pConn->peerPort;
*cid = pConn->chann;
*sid = pConn->sid;
}
int taosGetOutType(void *thandle) {
SRpcConn *pConn = (SRpcConn *)thandle;
if (pConn == NULL) return -1;
return pConn->outType;
}
void taosProcessSchedMsg(SSchedMsg *pMsg) {
SIntMsg * pHeader = (SIntMsg *)pMsg->msg;
SRpcConn *pConn = (SRpcConn *)pMsg->thandle;
if (pConn == NULL || pConn->signature != pMsg->thandle || pConn->pServer == NULL) return;
STaosRpc *pRpc = pConn->pServer;
void *ahandle = (*(pRpc->fp))(pMsg->msg, pMsg->ahandle, pMsg->thandle);
if (ahandle == NULL || pMsg->msg == NULL) {
taosCloseRpcConn(pConn);
} else {
pConn->ahandle = ahandle;
if (pHeader && ((pHeader->msgType & 1) == 0)) taosProcessResponse(pConn);
}
if (pMsg->msg) free(pMsg->msg - sizeof(STaosHeader) + sizeof(SIntMsg));
}
void taosStopRpcConn(void *thandle) {
SRpcConn * pConn = (SRpcConn *)thandle;
STaosRpc * pServer = pConn->pServer;
SRpcChann *pChann = pServer->channList + pConn->chann;
tTrace("%s cid:%d sid:%d id:%s, stop the connection pConn:%p", pServer->label, pConn->chann, pConn->sid,
pConn->meterId, pConn);
int reportDisc = 0;
pthread_mutex_lock(&pChann->mutex);
if (pConn->outType) {
pConn->rspReceived = 1;
reportDisc = 1;
pthread_mutex_unlock(&pChann->mutex);
} else {
pthread_mutex_unlock(&pChann->mutex);
taosCloseRpcConn(pConn);
}
pthread_mutex_unlock(&pRpc->mutex);
if (reportDisc) taosReportDisconnection(pChann, pConn);
pConn->terrno = TSDB_CODE_NETWORK_UNAVAIL;
if (reportDisc) taosProcessConnError(pConn->pContext, NULL);
}
int taosAuthenticateMsg(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey) {
static int rpcAuthenticateMsg(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey) {
MD5_CTX context;
int ret = -1;
......@@ -1484,7 +1160,7 @@ int taosAuthenticateMsg(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey
return ret;
}
int taosBuildAuthHeader(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey) {
static int rpcBuildAuthHeader(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey) {
MD5_CTX context;
MD5Init(&context);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册