提交 a2629a02 编写于 作者: 陶建辉(Jeff)'s avatar 陶建辉(Jeff)

fix some bugs, rename files in rpc module

上级 789246a4
...@@ -199,37 +199,6 @@ typedef struct { ...@@ -199,37 +199,6 @@ typedef struct {
uint32_t ip[TSDB_MAX_MGMT_IPS]; uint32_t ip[TSDB_MAX_MGMT_IPS];
} SMgmtIpList; } SMgmtIpList;
typedef struct {
char version : 4;
char comp : 4;
char tcp : 2;
char spi : 3;
char encrypt : 3;
uint16_t tranId;
uint32_t uid; // for unique ID inside a client
uint32_t sourceId;
// internal part
uint32_t destId;
uint32_t destIp;
char meterId[TSDB_UNI_LEN];
uint16_t port; // for UDP only
char empty[1];
uint8_t msgType;
int32_t msgLen;
uint8_t content[0];
} STaosHeader;
typedef struct {
uint32_t timeStamp;
uint8_t auth[TSDB_AUTH_LEN];
} STaosDigest;
typedef struct {
unsigned char code;
char more[];
} STaosRsp, SMsgReply;
typedef struct { typedef struct {
uint32_t customerId; uint32_t customerId;
uint32_t osId; uint32_t osId;
......
...@@ -68,7 +68,7 @@ typedef struct { ...@@ -68,7 +68,7 @@ typedef struct {
void (*ufp)(void *ahandle, SRpcIpSet ipSet); void (*ufp)(void *ahandle, SRpcIpSet ipSet);
// call back to retrieve the client auth info // call back to retrieve the client auth info
int (*afp)(char *meterId, char *spi, char *encrypt, uint8_t *secret, uint8_t *ckey); int (*afp)(char *meterId, char *spi, char *encrypt, char *secret, char *ckey);
} SRpcInit; } SRpcInit;
void *rpcOpen(SRpcInit *pRpc); void *rpcOpen(SRpcInit *pRpc);
......
...@@ -13,8 +13,8 @@ ...@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef TDENGINE_CONN_CACHE_H #ifndef TDENGINE_RPC_CACHE_H
#define TDENGINE_CONN_CACHE_H #define TDENGINE_RPC_CACHE_H
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
...@@ -29,4 +29,4 @@ void *rpcGetConnFromCache(void *handle, uint32_t ip, uint16_t port, char *user); ...@@ -29,4 +29,4 @@ void *rpcGetConnFromCache(void *handle, uint32_t ip, uint16_t port, char *user);
} }
#endif #endif
#endif // TDENGINE_CONN_CACHE_H #endif // TDENGINE_RPC_CACHE_H
...@@ -13,8 +13,8 @@ ...@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef _taos_tcp_client_header_ #ifndef _rpc_client_header_
#define _taos_tcp_client_header_ #define _rpc_client_header_
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
......
...@@ -20,11 +20,11 @@ ...@@ -20,11 +20,11 @@
extern "C" { extern "C" {
#endif #endif
void *taosOpenIpHash(int maxSessions); void *rpcOpenIpHash(int maxSessions);
void taosCloseIpHash(void *handle); void rpcCloseIpHash(void *handle);
void *taosAddIpHash(void *handle, void *pData, uint32_t ip, uint16_t port); void *rpcAddIpHash(void *handle, void *pData, uint32_t ip, uint16_t port);
void taosDeleteIpHash(void *handle, uint32_t ip, uint16_t port); void rpcDeleteIpHash(void *handle, uint32_t ip, uint16_t port);
void *taosGetIpHash(void *handle, uint32_t ip, uint16_t port); void *rpcGetIpHash(void *handle, uint32_t ip, uint16_t port);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_RPCHEAD_H
#define TDENGINE_RPCHEAD_H
#ifdef __cplusplus
extern "C" {
#endif
#pragma pack(push, 1)
typedef struct {
char version:4; // RPC version
char comp:4; // compression algorithm, 0:no compression 1:lz4
char tcp:2; // tcp flag
char spi:3; // security parameter index
char encrypt:3; // encrypt algorithm, 0: no encryption
uint16_t tranId; // transcation ID
uint32_t uid; // for unique ID inside a client
uint32_t sourceId; // source ID, an index for connection list
uint32_t destId; // destination ID, an index for connection list
char meterId[TSDB_UNI_LEN];
uint16_t port; // for UDP only, port may be changed
char empty[1]; // reserved
uint8_t msgType; // message type
int32_t msgLen; // message length including the header iteslf
int32_t code;
uint8_t content[0]; // message body starts from here
} SRpcHead;
typedef struct {
int32_t reserved;
int32_t contLen;
} SRpcComp;
typedef struct {
uint32_t timeStamp;
uint8_t auth[TSDB_AUTH_LEN];
} SRpcDigest;
#pragma pack(pop)
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_RPCHEAD_H
...@@ -13,8 +13,8 @@ ...@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef _taos_tcp_server_header_ #ifndef _rpc_server_header_
#define _taos_tcp_server_header_ #define _rpc_server_header_
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
......
...@@ -13,8 +13,8 @@ ...@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef _taos_udp_header_ #ifndef _rpc_udp_header_
#define _taos_udp_header_ #define _rpc_udp_header_
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
...@@ -24,15 +24,15 @@ extern "C" { ...@@ -24,15 +24,15 @@ extern "C" {
void *taosInitUdpServer(char *ip, uint16_t port, char *label, int, void *fp, void *shandle); void *taosInitUdpServer(char *ip, uint16_t port, char *label, int, void *fp, void *shandle);
void *taosInitUdpClient(char *ip, uint16_t port, char *label, int, void *fp, void *shandle); void *taosInitUdpClient(char *ip, uint16_t port, char *label, int, void *fp, void *shandle);
void taosCleanUpUdpConnection(void *handle); void taosCleanUpUdpConnection(void *handle);
int taosSendUdpData(uint32_t ip, uint16_t port, char *data, int dataLen, void *chandle); int taosSendUdpData(uint32_t ip, uint16_t port, char *data, int dataLen, void *chandle);
void *taosOpenUdpConnection(void *shandle, void *thandle, char *ip, uint16_t port); void *taosOpenUdpConnection(void *shandle, void *thandle, char *ip, uint16_t port);
void taosFreeMsgHdr(void *hdr); void taosFreeMsgHdr(void *hdr);
int taosMsgHdrSize(void *hdr); int taosMsgHdrSize(void *hdr);
void taosSendMsgHdr(void *hdr, int fd); void taosSendMsgHdr(void *hdr, int fd);
void taosInitMsgHdr(void **hdr, void *dest, int maxPkts); void taosInitMsgHdr(void **hdr, void *dest, int maxPkts);
void taosSetMsgHdrData(void *hdr, char *data, int dataLen); void taosSetMsgHdrData(void *hdr, char *data, int dataLen);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -21,7 +21,7 @@ ...@@ -21,7 +21,7 @@
#include "ttime.h" #include "ttime.h"
#include "ttimer.h" #include "ttimer.h"
#include "tutil.h" #include "tutil.h"
#include "tconncache.h" #include "rpcCache.h"
typedef struct _c_hash_t { typedef struct _c_hash_t {
uint32_t ip; uint32_t ip;
......
...@@ -17,8 +17,9 @@ ...@@ -17,8 +17,9 @@
#include "taosmsg.h" #include "taosmsg.h"
#include "tlog.h" #include "tlog.h"
#include "tsocket.h" #include "tsocket.h"
#include "ttcpclient.h"
#include "tutil.h" #include "tutil.h"
#include "rpcClient.h"
#include "rpcHead.h"
#ifndef EPOLLWAKEUP #ifndef EPOLLWAKEUP
#define EPOLLWAKEUP (1u << 29) #define EPOLLWAKEUP (1u << 29)
...@@ -152,15 +153,15 @@ static void *taosReadTcpData(void *param) { ...@@ -152,15 +153,15 @@ static void *taosReadTcpData(void *param) {
continue; continue;
} }
int headLen = taosReadMsg(pFdObj->fd, buffer, sizeof(STaosHeader)); int headLen = taosReadMsg(pFdObj->fd, buffer, sizeof(SRpcHead));
if (headLen != sizeof(STaosHeader)) { if (headLen != sizeof(SRpcHead)) {
tError("%s read error, headLen:%d", pTcp->label, headLen); tError("%s read error, headLen:%d", pTcp->label, headLen);
tfree(buffer); tfree(buffer);
taosCleanUpTcpFdObj(pFdObj); taosCleanUpTcpFdObj(pFdObj);
continue; continue;
} }
int dataLen = (int32_t)htonl((uint32_t)((STaosHeader *)buffer)->msgLen); int dataLen = (int32_t)htonl((uint32_t)((SRpcHead *)buffer)->msgLen);
if (dataLen > 1024) { if (dataLen > 1024) {
void *b = realloc(buffer, (size_t)dataLen); void *b = realloc(buffer, (size_t)dataLen);
if (NULL == b) { if (NULL == b) {
......
...@@ -32,7 +32,7 @@ typedef struct { ...@@ -32,7 +32,7 @@ typedef struct {
int maxSessions; int maxSessions;
} SHashObj; } SHashObj;
int taosHashIp(void *handle, uint32_t ip, uint16_t port) { int rpcHashIp(void *handle, uint32_t ip, uint16_t port) {
SHashObj *pObj = (SHashObj *)handle; SHashObj *pObj = (SHashObj *)handle;
int hash = 0; int hash = 0;
...@@ -45,7 +45,7 @@ int taosHashIp(void *handle, uint32_t ip, uint16_t port) { ...@@ -45,7 +45,7 @@ int taosHashIp(void *handle, uint32_t ip, uint16_t port) {
return hash; return hash;
} }
void *taosAddIpHash(void *handle, void *data, uint32_t ip, uint16_t port) { void *rpcAddIpHash(void *handle, void *data, uint32_t ip, uint16_t port) {
int hash; int hash;
SIpHash * pNode; SIpHash * pNode;
SHashObj *pObj; SHashObj *pObj;
...@@ -53,7 +53,7 @@ void *taosAddIpHash(void *handle, void *data, uint32_t ip, uint16_t port) { ...@@ -53,7 +53,7 @@ void *taosAddIpHash(void *handle, void *data, uint32_t ip, uint16_t port) {
pObj = (SHashObj *)handle; pObj = (SHashObj *)handle;
if (pObj == NULL || pObj->maxSessions == 0) return NULL; if (pObj == NULL || pObj->maxSessions == 0) return NULL;
hash = taosHashIp(pObj, ip, port); hash = rpcHashIp(pObj, ip, port);
pNode = (SIpHash *)taosMemPoolMalloc(pObj->ipHashMemPool); pNode = (SIpHash *)taosMemPoolMalloc(pObj->ipHashMemPool);
pNode->ip = ip; pNode->ip = ip;
pNode->port = port; pNode->port = port;
...@@ -68,7 +68,7 @@ void *taosAddIpHash(void *handle, void *data, uint32_t ip, uint16_t port) { ...@@ -68,7 +68,7 @@ void *taosAddIpHash(void *handle, void *data, uint32_t ip, uint16_t port) {
return pObj; return pObj;
} }
void taosDeleteIpHash(void *handle, uint32_t ip, uint16_t port) { void rpcDeleteIpHash(void *handle, uint32_t ip, uint16_t port) {
int hash; int hash;
SIpHash * pNode; SIpHash * pNode;
SHashObj *pObj; SHashObj *pObj;
...@@ -76,7 +76,7 @@ void taosDeleteIpHash(void *handle, uint32_t ip, uint16_t port) { ...@@ -76,7 +76,7 @@ void taosDeleteIpHash(void *handle, uint32_t ip, uint16_t port) {
pObj = (SHashObj *)handle; pObj = (SHashObj *)handle;
if (pObj == NULL || pObj->maxSessions == 0) return; if (pObj == NULL || pObj->maxSessions == 0) return;
hash = taosHashIp(pObj, ip, port); hash = rpcHashIp(pObj, ip, port);
pNode = pObj->ipHashList[hash]; pNode = pObj->ipHashList[hash];
while (pNode) { while (pNode) {
...@@ -100,7 +100,7 @@ void taosDeleteIpHash(void *handle, uint32_t ip, uint16_t port) { ...@@ -100,7 +100,7 @@ void taosDeleteIpHash(void *handle, uint32_t ip, uint16_t port) {
} }
} }
void *taosGetIpHash(void *handle, uint32_t ip, uint16_t port) { void *rpcGetIpHash(void *handle, uint32_t ip, uint16_t port) {
int hash; int hash;
SIpHash * pNode; SIpHash * pNode;
SHashObj *pObj; SHashObj *pObj;
...@@ -108,7 +108,7 @@ void *taosGetIpHash(void *handle, uint32_t ip, uint16_t port) { ...@@ -108,7 +108,7 @@ void *taosGetIpHash(void *handle, uint32_t ip, uint16_t port) {
pObj = (SHashObj *)handle; pObj = (SHashObj *)handle;
if (pObj == NULL || pObj->maxSessions == 0) return NULL; if (pObj == NULL || pObj->maxSessions == 0) return NULL;
hash = taosHashIp(pObj, ip, port); hash = rpcHashIp(pObj, ip, port);
pNode = pObj->ipHashList[hash]; pNode = pObj->ipHashList[hash];
while (pNode) { while (pNode) {
...@@ -124,7 +124,7 @@ void *taosGetIpHash(void *handle, uint32_t ip, uint16_t port) { ...@@ -124,7 +124,7 @@ void *taosGetIpHash(void *handle, uint32_t ip, uint16_t port) {
return NULL; return NULL;
} }
void *taosOpenIpHash(int maxSessions) { void *rpcOpenIpHash(int maxSessions) {
SIpHash **ipHashList; SIpHash **ipHashList;
mpool_h ipHashMemPool; mpool_h ipHashMemPool;
SHashObj *pObj; SHashObj *pObj;
...@@ -152,7 +152,7 @@ void *taosOpenIpHash(int maxSessions) { ...@@ -152,7 +152,7 @@ void *taosOpenIpHash(int maxSessions) {
return pObj; return pObj;
} }
void taosCloseIpHash(void *handle) { void rpcCloseIpHash(void *handle) {
SHashObj *pObj; SHashObj *pObj;
pObj = (SHashObj *)handle; pObj = (SHashObj *)handle;
......
...@@ -14,23 +14,24 @@ ...@@ -14,23 +14,24 @@
*/ */
#include "os.h" #include "os.h"
#include "shash.h"
#include "taosmsg.h"
#include "tidpool.h" #include "tidpool.h"
#include "tlog.h" #include "tlog.h"
#include "tmd5.h" #include "tmd5.h"
#include "tmempool.h" #include "tmempool.h"
#include "tsocket.h"
#include "ttcpclient.h"
#include "ttcpserver.h"
#include "ttime.h" #include "ttime.h"
#include "ttimer.h" #include "ttimer.h"
#include "tudp.h"
#include "tutil.h" #include "tutil.h"
#include "lz4.h" #include "lz4.h"
#include "tconncache.h"
#include "trpc.h"
#include "taoserror.h" #include "taoserror.h"
#include "tsocket.h"
#include "shash.h"
#include "taosmsg.h"
#include "rpcUdp.h"
#include "rpcCache.h"
#include "rpcClient.h"
#include "rpcServer.h"
#include "rpcHead.h"
#include "trpc.h"
#define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHead) + sizeof(SRpcDigest)) #define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHead) + sizeof(SRpcDigest))
#define rpcHeadFromCont(cont) ((SRpcHead *) (cont - sizeof(SRpcHead))) #define rpcHeadFromCont(cont) ((SRpcHead *) (cont - sizeof(SRpcHead)))
...@@ -51,11 +52,11 @@ typedef struct { ...@@ -51,11 +52,11 @@ typedef struct {
char meterId[TSDB_UNI_LEN]; // meter ID char meterId[TSDB_UNI_LEN]; // meter ID
char spi; // security parameter index char spi; // security parameter index
char encrypt; // encrypt algorithm char encrypt; // encrypt algorithm
uint8_t secret[TSDB_KEY_LEN]; // secret for the link char secret[TSDB_KEY_LEN]; // secret for the link
uint8_t ckey[TSDB_KEY_LEN]; // ciphering key char ckey[TSDB_KEY_LEN]; // ciphering key
void (*cfp)(char type, void *pCont, int contLen, void *ahandle, int32_t code); void (*cfp)(char type, void *pCont, int contLen, void *ahandle, int32_t code);
int (*afp)(char *meterId, char *spi, char *encrypt, uint8_t *secret, uint8_t *ckey); int (*afp)(char *meterId, char *spi, char *encrypt, char *secret, char *ckey);
void (*ufp)(void *ahandle, SRpcIpSet ipSet); void (*ufp)(void *ahandle, SRpcIpSet ipSet);
void *idPool; // handle to ID pool void *idPool; // handle to ID pool
...@@ -88,8 +89,8 @@ typedef struct _RpcConn { ...@@ -88,8 +89,8 @@ typedef struct _RpcConn {
char meterId[TSDB_UNI_LEN]; // user ID for the link char meterId[TSDB_UNI_LEN]; // user ID for the link
char spi; // security parameter index char spi; // security parameter index
char encrypt; // encryption, 0:1 char encrypt; // encryption, 0:1
uint8_t secret[TSDB_KEY_LEN]; // secret for the link char secret[TSDB_KEY_LEN]; // secret for the link
uint8_t ckey[TSDB_KEY_LEN]; // ciphering key char ckey[TSDB_KEY_LEN]; // ciphering key
uint16_t localPort; // for UDP only uint16_t localPort; // for UDP only
uint32_t peerUid; // peer UID uint32_t peerUid; // peer UID
uint32_t peerIp; // peer IP uint32_t peerIp; // peer IP
...@@ -114,39 +115,6 @@ typedef struct _RpcConn { ...@@ -114,39 +115,6 @@ typedef struct _RpcConn {
SRpcReqContext *pContext; // request context SRpcReqContext *pContext; // request context
} SRpcConn; } SRpcConn;
#pragma pack(push, 1)
typedef struct {
char version:4; // RPC version
char comp:4; // compression algorithm, 0:no compression 1:lz4
char tcp:2; // tcp flag
char spi:3; // security parameter index
char encrypt:3; // encrypt algorithm, 0: no encryption
uint16_t tranId; // transcation ID
uint32_t uid; // for unique ID inside a client
uint32_t sourceId; // source ID, an index for connection list
uint32_t destId; // destination ID, an index for connection list
char meterId[TSDB_UNI_LEN];
uint16_t port; // for UDP only, port may be changed
char empty[1]; // reserved
uint8_t msgType; // message type
int32_t msgLen; // message length including the header iteslf
int32_t code;
uint8_t content[0]; // message body starts from here
} SRpcHead;
typedef struct {
int32_t reserved;
int32_t contLen;
} SRpcComp;
typedef struct {
uint32_t timeStamp;
uint8_t auth[TSDB_AUTH_LEN];
} SRpcDigest;
#pragma pack(pop)
int tsRpcProgressTime = 10; // milliseocnds int tsRpcProgressTime = 10; // milliseocnds
// not configurable // not configurable
...@@ -196,7 +164,7 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, char *meterId, char *hash ...@@ -196,7 +164,7 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, char *meterId, char *hash
static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, char *meterId, char *hashstr); static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, char *meterId, char *hashstr);
static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext); static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext);
static void rpcSendQuickRsp(SRpcConn *pConn, char code); static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code);
static void rpcSendErrorMsgToPeer(SRpcInfo *pRpc, char *pMsg, int32_t code, uint32_t ip, uint16_t port, void *chandle); static void rpcSendErrorMsgToPeer(SRpcInfo *pRpc, char *pMsg, int32_t code, uint32_t ip, uint16_t port, void *chandle);
static void rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen); static void rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen);
...@@ -509,10 +477,11 @@ static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc) { ...@@ -509,10 +477,11 @@ static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc) {
} }
static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, char *meterId, char *hashstr) { static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, char *meterId, char *hashstr) {
SRpcConn *pConn; SRpcConn *pConn = NULL;
// check if it is already allocated // check if it is already allocated
pConn = *(SRpcConn **)(taosGetStrHashData(pRpc->hash, hashstr)); SRpcConn **ppConn = (SRpcConn **)(taosGetStrHashData(pRpc->hash, hashstr));
if (ppConn) pConn = *ppConn;
if (pConn) return pConn; if (pConn) return pConn;
int sid = taosAllocateId(pRpc->idPool); int sid = taosAllocateId(pRpc->idPool);
...@@ -537,7 +506,7 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, char *meterId, char *hash ...@@ -537,7 +506,7 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, char *meterId, char *hash
if (pConn) { if (pConn) {
taosAddStrHash(pRpc->hash, hashstr, (char *)&pConn); taosAddStrHash(pRpc->hash, hashstr, (char *)&pConn);
tTrace("%s pConn:%p, rpc connection is allocated, sid:%d id:%s", pRpc->label, pConn, sid); tTrace("%s pConn:%p, rpc connection is allocated, sid:%d id:%s", pRpc->label, pConn, sid, pConn->meterId);
} }
return pConn; return pConn;
...@@ -660,7 +629,7 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) { ...@@ -660,7 +629,7 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int rpcProcessHead(SRpcInfo *pRpc, SRpcConn **ppConn, void *data, int dataLen, uint32_t ip) { static int32_t rpcProcessHead(SRpcInfo *pRpc, SRpcConn **ppConn, void *data, int dataLen, uint32_t ip) {
int32_t sid, code = 0; int32_t sid, code = 0;
SRpcConn * pConn = NULL; SRpcConn * pConn = NULL;
char hashstr[40] = {0}; char hashstr[40] = {0};
...@@ -724,7 +693,7 @@ static void *rpcProcessMsgFromPeer(void *data, int dataLen, uint32_t ip, uint16_ ...@@ -724,7 +693,7 @@ static void *rpcProcessMsgFromPeer(void *data, int dataLen, uint32_t ip, uint16_
SRpcHead *pHead = (SRpcHead *)data; SRpcHead *pHead = (SRpcHead *)data;
SRpcInfo *pRpc = (SRpcInfo *)shandle; SRpcInfo *pRpc = (SRpcInfo *)shandle;
SRpcConn *pConn = NULL; SRpcConn *pConn = NULL;
uint8_t code = 0; int32_t code = 0;
tDump(data, dataLen); tDump(data, dataLen);
...@@ -750,7 +719,7 @@ static void *rpcProcessMsgFromPeer(void *data, int dataLen, uint32_t ip, uint16_ ...@@ -750,7 +719,7 @@ static void *rpcProcessMsgFromPeer(void *data, int dataLen, uint32_t ip, uint16_
pthread_mutex_unlock(&pRpc->mutex); pthread_mutex_unlock(&pRpc->mutex);
if (pHead->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) { if (pHead->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) {
tTrace("%s pConn:%p, %s received from 0x%x:%hu, parse code:%u len:%d source:0x%08x dest:0x%08x tranId:%d", tTrace("%s pConn:%p, %s received from 0x%x:%hu, parse code:%x len:%d source:0x%08x dest:0x%08x tranId:%d",
pRpc->label, pConn, taosMsg[pHead->msgType], ip, port, code, pRpc->label, pConn, taosMsg[pHead->msgType], ip, port, code,
dataLen, pHead->sourceId, pHead->destId, pHead->tranId); dataLen, pHead->sourceId, pHead->destId, pHead->tranId);
} }
...@@ -763,7 +732,7 @@ static void *rpcProcessMsgFromPeer(void *data, int dataLen, uint32_t ip, uint16_ ...@@ -763,7 +732,7 @@ static void *rpcProcessMsgFromPeer(void *data, int dataLen, uint32_t ip, uint16_
if (code != 0) { // parsing error if (code != 0) { // parsing error
if ( rpcIsReq(pHead->msgType) ) { if ( rpcIsReq(pHead->msgType) ) {
rpcSendErrorMsgToPeer(pRpc, data, code, ip, port, chandle); rpcSendErrorMsgToPeer(pRpc, data, code, ip, port, chandle);
tTrace("%s pConn:%p, %s is sent with error code:%u", pRpc->label, pConn, taosMsg[pHead->msgType+1], code); tTrace("%s pConn:%p, %s is sent with error code:%x", pRpc->label, pConn, taosMsg[pHead->msgType+1], code);
} }
} else { // parsing OK } else { // parsing OK
rpcProcessIncomingMsg(pConn, pHead); rpcProcessIncomingMsg(pConn, pHead);
...@@ -804,7 +773,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { ...@@ -804,7 +773,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
} }
} }
static void rpcSendQuickRsp(SRpcConn *pConn, char code) { static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code) {
char msg[RPC_MSG_OVERHEAD]; char msg[RPC_MSG_OVERHEAD];
SRpcHead *pHead; SRpcHead *pHead;
...@@ -1131,7 +1100,7 @@ static int rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen) { ...@@ -1131,7 +1100,7 @@ static int rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen) {
pDigest->timeStamp = htonl(taosGetTimestampSec()); pDigest->timeStamp = htonl(taosGetTimestampSec());
msgLen += sizeof(SRpcDigest); msgLen += sizeof(SRpcDigest);
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
rpcBuildAuthHead((uint8_t *)pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret); rpcBuildAuthHead((uint8_t *)pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, (uint8_t *)pConn->secret);
} else { } else {
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
} }
...@@ -1142,7 +1111,7 @@ static int rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen) { ...@@ -1142,7 +1111,7 @@ static int rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen) {
static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) { static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) {
SRpcHead *pHead = (SRpcHead *)msg; SRpcHead *pHead = (SRpcHead *)msg;
SRpcInfo *pRpc = pConn->pRpc; SRpcInfo *pRpc = pConn->pRpc;
int code = 0; int32_t code = 0;
if (pConn->spi == 0 ) return 0; if (pConn->spi == 0 ) return 0;
...@@ -1158,7 +1127,7 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) { ...@@ -1158,7 +1127,7 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) {
delta, htonl(pDigest->timeStamp)); delta, htonl(pDigest->timeStamp));
code = TSDB_CODE_INVALID_TIME_STAMP; code = TSDB_CODE_INVALID_TIME_STAMP;
} else { } else {
if (rpcAuthenticateMsg((uint8_t *)pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret) < 0) { if (rpcAuthenticateMsg((uint8_t *)pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, (uint8_t *)pConn->secret) < 0) {
tError("%s pConn:%p, authentication failed, msg discarded", pRpc->label, pConn); tError("%s pConn:%p, authentication failed, msg discarded", pRpc->label, pConn);
code = TSDB_CODE_AUTH_FAILURE; code = TSDB_CODE_AUTH_FAILURE;
} else { } else {
......
...@@ -14,12 +14,12 @@ ...@@ -14,12 +14,12 @@
*/ */
#include "os.h" #include "os.h"
#include "taosmsg.h"
#include "tlog.h" #include "tlog.h"
#include "tlog.h" #include "tlog.h"
#include "tsocket.h" #include "tsocket.h"
#include "ttcpserver.h"
#include "tutil.h" #include "tutil.h"
#include "rpcServer.h"
#include "rpcHead.h"
#define TAOS_IPv4ADDR_LEN 16 #define TAOS_IPv4ADDR_LEN 16
#ifndef EPOLLWAKEUP #ifndef EPOLLWAKEUP
...@@ -184,16 +184,16 @@ static void taosProcessTcpData(void *param) { ...@@ -184,16 +184,16 @@ static void taosProcessTcpData(void *param) {
} }
void *buffer = malloc(1024); void *buffer = malloc(1024);
int headLen = taosReadMsg(pFdObj->fd, buffer, sizeof(STaosHeader)); int headLen = taosReadMsg(pFdObj->fd, buffer, sizeof(SRpcHead));
if (headLen != sizeof(STaosHeader)) { if (headLen != sizeof(SRpcHead)) {
tError("%s read error, headLen:%d, errno:%d", pThreadObj->label, headLen, errno); tError("%s read error, headLen:%d, errno:%d", pThreadObj->label, headLen, errno);
taosCleanUpFdObj(pFdObj); taosCleanUpFdObj(pFdObj);
tfree(buffer); tfree(buffer);
continue; continue;
} }
int dataLen = (int32_t)htonl((uint32_t)((STaosHeader *)buffer)->msgLen); int dataLen = (int32_t)htonl((uint32_t)((SRpcHead *)buffer)->msgLen);
if (dataLen > 1024) buffer = realloc(buffer, (size_t)dataLen); if (dataLen > 1024) buffer = realloc(buffer, (size_t)dataLen);
int leftLen = dataLen - headLen; int leftLen = dataLen - headLen;
......
...@@ -14,15 +14,15 @@ ...@@ -14,15 +14,15 @@
*/ */
#include "os.h" #include "os.h"
#include "taosmsg.h"
#include "thash.h"
#include "thaship.h"
#include "tlog.h" #include "tlog.h"
#include "tsocket.h" #include "tsocket.h"
#include "tsystem.h" #include "tsystem.h"
#include "ttimer.h" #include "ttimer.h"
#include "tudp.h"
#include "tutil.h" #include "tutil.h"
#include "thash.h"
#include "rpcHaship.h"
#include "rpcUdp.h"
#include "rpcHead.h"
#define RPC_MAX_UDP_CONNS 256 #define RPC_MAX_UDP_CONNS 256
#define RPC_MAX_UDP_PKTS 1000 #define RPC_MAX_UDP_PKTS 1000
...@@ -104,6 +104,41 @@ typedef struct { ...@@ -104,6 +104,41 @@ typedef struct {
uint64_t hash; uint64_t hash;
} SHandleViaTcp; } SHandleViaTcp;
void taosFreeMsgHdr(void *hdr) {
struct msghdr *msgHdr = (struct msghdr *)hdr;
free(msgHdr->msg_iov);
}
int taosMsgHdrSize(void *hdr) {
struct msghdr *msgHdr = (struct msghdr *)hdr;
return (int)msgHdr->msg_iovlen;
}
void taosSendMsgHdr(void *hdr, int fd) {
struct msghdr *msgHdr = (struct msghdr *)hdr;
sendmsg(fd, msgHdr, 0);
msgHdr->msg_iovlen = 0;
}
void taosInitMsgHdr(void **hdr, void *dest, int maxPkts) {
struct msghdr *msgHdr = (struct msghdr *)malloc(sizeof(struct msghdr));
memset(msgHdr, 0, sizeof(struct msghdr));
*hdr = msgHdr;
struct sockaddr_in *destAdd = (struct sockaddr_in *)dest;
msgHdr->msg_name = destAdd;
msgHdr->msg_namelen = sizeof(struct sockaddr_in);
int size = (int)sizeof(struct iovec) * maxPkts;
msgHdr->msg_iov = (struct iovec *)malloc((size_t)size);
memset(msgHdr->msg_iov, 0, (size_t)size);
}
void taosSetMsgHdrData(void *hdr, char *data, int dataLen) {
struct msghdr *msgHdr = (struct msghdr *)hdr;
msgHdr->msg_iov[msgHdr->msg_iovlen].iov_base = data;
msgHdr->msg_iov[msgHdr->msg_iovlen].iov_len = (size_t)dataLen;
msgHdr->msg_iovlen++;
}
bool taosCheckHandleViaTcpValid(SHandleViaTcp *handleViaTcp) { bool taosCheckHandleViaTcpValid(SHandleViaTcp *handleViaTcp) {
return handleViaTcp->hash == taosHashUInt64(handleViaTcp->handle); return handleViaTcp->hash == taosHashUInt64(handleViaTcp->handle);
} }
...@@ -134,8 +169,8 @@ void taosProcessMonitorTimer(void *param, void *tmrId) { ...@@ -134,8 +169,8 @@ void taosProcessMonitorTimer(void *param, void *tmrId) {
} }
void *taosReadTcpData(void *argv) { void *taosReadTcpData(void *argv) {
SMonitor * pMonitor = (SMonitor *)argv; SMonitor *pMonitor = (SMonitor *)argv;
STaosHeader *pHead = (STaosHeader *)pMonitor->data; SRpcHead *pHead = (SRpcHead *)pMonitor->data;
SPacketInfo *pInfo = (SPacketInfo *)pHead->content; SPacketInfo *pInfo = (SPacketInfo *)pHead->content;
SUdpConnSet *pSet = pMonitor->pSet; SUdpConnSet *pSet = pMonitor->pSet;
int retLen, fd; int retLen, fd;
...@@ -189,7 +224,7 @@ void *taosReadTcpData(void *argv) { ...@@ -189,7 +224,7 @@ void *taosReadTcpData(void *argv) {
return NULL; return NULL;
} }
int taosReceivePacketViaTcp(uint32_t ip, STaosHeader *pHead, SUdpConn *pConn) { int taosReceivePacketViaTcp(uint32_t ip, SRpcHead *pHead, SUdpConn *pConn) {
SUdpConnSet * pSet = pConn->pSet; SUdpConnSet * pSet = pConn->pSet;
SPacketInfo * pInfo = (SPacketInfo *)pHead->content; SPacketInfo * pInfo = (SPacketInfo *)pHead->content;
int code = 0; int code = 0;
...@@ -200,7 +235,7 @@ int taosReceivePacketViaTcp(uint32_t ip, STaosHeader *pHead, SUdpConn *pConn) { ...@@ -200,7 +235,7 @@ int taosReceivePacketViaTcp(uint32_t ip, STaosHeader *pHead, SUdpConn *pConn) {
pHead->sourceId, pHead->destId, pHead->tranId); pHead->sourceId, pHead->destId, pHead->tranId);
SMonitor *pMonitor = (SMonitor *)calloc(1, sizeof(SMonitor)); SMonitor *pMonitor = (SMonitor *)calloc(1, sizeof(SMonitor));
pMonitor->dataLen = sizeof(STaosHeader) + sizeof(SPacketInfo); pMonitor->dataLen = sizeof(SRpcHead) + sizeof(SPacketInfo);
memcpy(pMonitor->data, pHead, (size_t)pMonitor->dataLen); memcpy(pMonitor->data, pHead, (size_t)pMonitor->dataLen);
pMonitor->pSet = pSet; pMonitor->pSet = pSet;
pMonitor->ip = ip; pMonitor->ip = ip;
...@@ -225,7 +260,7 @@ void *taosRecvUdpData(void *param) { ...@@ -225,7 +260,7 @@ void *taosRecvUdpData(void *param) {
unsigned int addLen, dataLen; unsigned int addLen, dataLen;
SUdpConn * pConn = (SUdpConn *)param; SUdpConn * pConn = (SUdpConn *)param;
uint16_t port; uint16_t port;
int minSize = sizeof(STaosHeader); int minSize = sizeof(SRpcHead);
memset(&sourceAdd, 0, sizeof(sourceAdd)); memset(&sourceAdd, 0, sizeof(sourceAdd));
addLen = sizeof(sourceAdd); addLen = sizeof(sourceAdd);
...@@ -237,7 +272,7 @@ void *taosRecvUdpData(void *param) { ...@@ -237,7 +272,7 @@ void *taosRecvUdpData(void *param) {
tTrace("%s msg is recv from 0x%x:%hu len:%d", pConn->label, sourceAdd.sin_addr.s_addr, ntohs(sourceAdd.sin_port), tTrace("%s msg is recv from 0x%x:%hu len:%d", pConn->label, sourceAdd.sin_addr.s_addr, ntohs(sourceAdd.sin_port),
dataLen); dataLen);
if (dataLen < sizeof(STaosHeader)) { if (dataLen < sizeof(SRpcHead)) {
tError("%s recvfrom failed, reason:%s\n", pConn->label, strerror(errno)); tError("%s recvfrom failed, reason:%s\n", pConn->label, strerror(errno));
continue; continue;
} }
...@@ -250,7 +285,7 @@ void *taosRecvUdpData(void *param) { ...@@ -250,7 +285,7 @@ void *taosRecvUdpData(void *param) {
char *msg = pConn->buffer; char *msg = pConn->buffer;
while (processedLen < (int)dataLen) { while (processedLen < (int)dataLen) {
leftLen = dataLen - processedLen; leftLen = dataLen - processedLen;
STaosHeader *pHead = (STaosHeader *)msg; SRpcHead *pHead = (SRpcHead *)msg;
msgLen = (int32_t)htonl((uint32_t)pHead->msgLen); msgLen = (int32_t)htonl((uint32_t)pHead->msgLen);
if (leftLen < minSize || msgLen > leftLen || msgLen < minSize) { if (leftLen < minSize || msgLen > leftLen || msgLen < minSize) {
tError("%s msg is messed up, dataLen:%d processedLen:%d count:%d msgLen:%d", pConn->label, dataLen, tError("%s msg is messed up, dataLen:%d processedLen:%d count:%d msgLen:%d", pConn->label, dataLen,
...@@ -259,7 +294,7 @@ void *taosRecvUdpData(void *param) { ...@@ -259,7 +294,7 @@ void *taosRecvUdpData(void *param) {
} }
if (pHead->tcp == 1) { if (pHead->tcp == 1) {
taosReceivePacketViaTcp(sourceAdd.sin_addr.s_addr, (STaosHeader *)msg, pConn); taosReceivePacketViaTcp(sourceAdd.sin_addr.s_addr, (SRpcHead *)msg, pConn);
} else { } else {
char *data = malloc((size_t)msgLen); char *data = malloc((size_t)msgLen);
memcpy(data, msg, (size_t)msgLen); memcpy(data, msg, (size_t)msgLen);
...@@ -282,7 +317,7 @@ void *taosTransferDataViaTcp(void *argv) { ...@@ -282,7 +317,7 @@ void *taosTransferDataViaTcp(void *argv) {
int connFd = pTransfer->fd; int connFd = pTransfer->fd;
int msgLen, retLen, leftLen; int msgLen, retLen, leftLen;
uint64_t handle; uint64_t handle;
STaosHeader *pHeader = NULL, head; SRpcHead *pHead = NULL, head;
SUdpConnSet *pSet = pTransfer->pSet; SUdpConnSet *pSet = pTransfer->pSet;
SHandleViaTcp handleViaTcp; SHandleViaTcp handleViaTcp;
...@@ -308,8 +343,8 @@ void *taosTransferDataViaTcp(void *argv) { ...@@ -308,8 +343,8 @@ void *taosTransferDataViaTcp(void *argv) {
if (handle == 0) { if (handle == 0) {
// receive a packet from client // receive a packet from client
tTrace("%s data will be received via TCP from 0x%x:%hu", pSet->label, pTransfer->ip, pTransfer->port); tTrace("%s data will be received via TCP from 0x%x:%hu", pSet->label, pTransfer->ip, pTransfer->port);
retLen = taosReadMsg(connFd, &head, sizeof(STaosHeader)); retLen = taosReadMsg(connFd, &head, sizeof(SRpcHead));
if (retLen != (int)sizeof(STaosHeader)) { if (retLen != (int)sizeof(SRpcHead)) {
tError("%s failed to read msg header, retLen:%d", pSet->label, retLen); tError("%s failed to read msg header, retLen:%d", pSet->label, retLen);
} else { } else {
SMonitor *pMonitor = (SMonitor *)calloc(1, sizeof(SMonitor)); SMonitor *pMonitor = (SMonitor *)calloc(1, sizeof(SMonitor));
...@@ -319,10 +354,10 @@ void *taosTransferDataViaTcp(void *argv) { ...@@ -319,10 +354,10 @@ void *taosTransferDataViaTcp(void *argv) {
free(pTransfer); free(pTransfer);
return NULL; return NULL;
} }
pMonitor->dataLen = sizeof(STaosHeader); pMonitor->dataLen = sizeof(SRpcHead);
memcpy(pMonitor->data, &head, (size_t)pMonitor->dataLen); memcpy(pMonitor->data, &head, (size_t)pMonitor->dataLen);
((STaosHeader *)pMonitor->data)->msgLen = (int32_t)htonl(sizeof(STaosHeader)); ((SRpcHead *)pMonitor->data)->msgLen = (int32_t)htonl(sizeof(SRpcHead));
((STaosHeader *)pMonitor->data)->tcp = 1; ((SRpcHead *)pMonitor->data)->tcp = 1;
pMonitor->ip = pTransfer->ip; pMonitor->ip = pTransfer->ip;
pMonitor->port = head.port; pMonitor->port = head.port;
pMonitor->pSet = pSet; pMonitor->pSet = pSet;
...@@ -337,8 +372,8 @@ void *taosTransferDataViaTcp(void *argv) { ...@@ -337,8 +372,8 @@ void *taosTransferDataViaTcp(void *argv) {
return NULL; return NULL;
} }
leftLen = msgLen - (int)sizeof(STaosHeader); leftLen = msgLen - (int)sizeof(SRpcHead);
retLen = taosReadMsg(connFd, buffer + sizeof(STaosHeader), leftLen); retLen = taosReadMsg(connFd, buffer + sizeof(SRpcHead), leftLen);
pMonitor->pSet = NULL; pMonitor->pSet = NULL;
if (retLen != leftLen) { if (retLen != leftLen) {
...@@ -349,7 +384,7 @@ void *taosTransferDataViaTcp(void *argv) { ...@@ -349,7 +384,7 @@ void *taosTransferDataViaTcp(void *argv) {
pTransfer->port, msgLen); pTransfer->port, msgLen);
pSet->index = (pSet->index + 1) % pSet->threads; pSet->index = (pSet->index + 1) % pSet->threads;
SUdpConn *pConn = pSet->udpConn + pSet->index; SUdpConn *pConn = pSet->udpConn + pSet->index;
memcpy(buffer, &head, sizeof(STaosHeader)); memcpy(buffer, &head, sizeof(SRpcHead));
(*pSet->fp)(buffer, msgLen, pTransfer->ip, head.port, pSet->shandle, NULL, pConn); (*pSet->fp)(buffer, msgLen, pTransfer->ip, head.port, pSet->shandle, NULL, pConn);
} }
...@@ -358,11 +393,11 @@ void *taosTransferDataViaTcp(void *argv) { ...@@ -358,11 +393,11 @@ void *taosTransferDataViaTcp(void *argv) {
} else { } else {
// send a packet to client // send a packet to client
tTrace("%s send packet to client via TCP, handle:0x%x", pSet->label, handle); tTrace("%s send packet to client via TCP, handle:0x%x", pSet->label, handle);
pHeader = (STaosHeader *)handle; pHead = (SRpcHead *)handle;
msgLen = (int32_t)htonl((uint32_t)pHeader->msgLen); msgLen = (int32_t)htonl((uint32_t)pHead->msgLen);
if (pHeader->tcp != 0 || msgLen < 1024) { if (pHead->tcp != 0 || msgLen < 1024) {
tError("%s invalid handle:%p, connection shall be closed", pSet->label, pHeader); tError("%s invalid handle:%p, connection shall be closed", pSet->label, pHead);
} else { } else {
SMonitor *pMonitor = (SMonitor *)calloc(1, sizeof(SMonitor)); SMonitor *pMonitor = (SMonitor *)calloc(1, sizeof(SMonitor));
if (NULL == pMonitor) { if (NULL == pMonitor) {
...@@ -371,12 +406,12 @@ void *taosTransferDataViaTcp(void *argv) { ...@@ -371,12 +406,12 @@ void *taosTransferDataViaTcp(void *argv) {
free(pTransfer); free(pTransfer);
return NULL; return NULL;
} }
pMonitor->dataLen = sizeof(STaosHeader); pMonitor->dataLen = sizeof(SRpcHead);
memcpy(pMonitor->data, (void *)handle, (size_t)pMonitor->dataLen); memcpy(pMonitor->data, (void *)handle, (size_t)pMonitor->dataLen);
STaosHeader *pThead = (STaosHeader *)pMonitor->data; SRpcHead *pThead = (SRpcHead *)pMonitor->data;
pThead->tcp = 1; pThead->tcp = 1;
pThead->msgType = (char)(pHeader->msgType - 1); pThead->msgType = (char)(pHead->msgType - 1);
pThead->msgLen = (int32_t)htonl(sizeof(STaosHeader)); pThead->msgLen = (int32_t)htonl(sizeof(SRpcHead));
uint32_t id = pThead->sourceId; pThead->sourceId = pThead->destId; pThead->destId = id; uint32_t id = pThead->sourceId; pThead->sourceId = pThead->destId; pThead->destId = id;
pMonitor->ip = pTransfer->ip; pMonitor->ip = pTransfer->ip;
pMonitor->port = pTransfer->port; pMonitor->port = pTransfer->port;
...@@ -522,7 +557,7 @@ void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, v ...@@ -522,7 +557,7 @@ void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, v
pConn->pSet = pSet; pConn->pSet = pSet;
pConn->signature = pConn; pConn->signature = pConn;
if (tsUdpDelay) { if (tsUdpDelay) {
pConn->hash = taosOpenIpHash(RPC_MAX_UDP_CONNS); pConn->hash = rpcOpenIpHash(RPC_MAX_UDP_CONNS);
pthread_mutex_init(&pConn->mutex, NULL); pthread_mutex_init(&pConn->mutex, NULL);
pConn->tmrCtrl = pSet->tmrCtrl; pConn->tmrCtrl = pSet->tmrCtrl;
} }
...@@ -575,7 +610,7 @@ void taosCleanUpUdpConnection(void *handle) { ...@@ -575,7 +610,7 @@ void taosCleanUpUdpConnection(void *handle) {
pthread_cancel(pConn->thread); pthread_cancel(pConn->thread);
taosCloseSocket(pConn->fd); taosCloseSocket(pConn->fd);
if (pConn->hash) { if (pConn->hash) {
taosCloseIpHash(pConn->hash); rpcCloseIpHash(pConn->hash);
pthread_mutex_destroy(&pConn->mutex); pthread_mutex_destroy(&pConn->mutex);
} }
} }
...@@ -608,7 +643,7 @@ void *taosOpenUdpConnection(void *shandle, void *thandle, char *ip, uint16_t por ...@@ -608,7 +643,7 @@ void *taosOpenUdpConnection(void *shandle, void *thandle, char *ip, uint16_t por
void taosRemoveUdpBuf(SUdpBuf *pBuf) { void taosRemoveUdpBuf(SUdpBuf *pBuf) {
taosTmrStopA(&pBuf->timer); taosTmrStopA(&pBuf->timer);
taosDeleteIpHash(pBuf->pConn->hash, pBuf->ip, pBuf->port); rpcDeleteIpHash(pBuf->pConn->hash, pBuf->ip, pBuf->port);
// tTrace("%s UDP buffer to:0x%lld:%d is removed", pBuf->pConn->label, // tTrace("%s UDP buffer to:0x%lld:%d is removed", pBuf->pConn->label,
// pBuf->ip, pBuf->port); // pBuf->ip, pBuf->port);
...@@ -671,13 +706,13 @@ int taosSendPacketViaTcp(uint32_t ip, uint16_t port, char *data, int dataLen, vo ...@@ -671,13 +706,13 @@ int taosSendPacketViaTcp(uint32_t ip, uint16_t port, char *data, int dataLen, vo
int code = -1, retLen, msgLen; int code = -1, retLen, msgLen;
char ipstr[64]; char ipstr[64];
char buffer[128]; char buffer[128];
STaosHeader *pHead; SRpcHead *pHead;
if (pSet->server) { if (pSet->server) {
// send from server // send from server
pHead = (STaosHeader *)buffer; pHead = (SRpcHead *)buffer;
memcpy(pHead, data, sizeof(STaosHeader)); memcpy(pHead, data, sizeof(SRpcHead));
pHead->tcp = 1; pHead->tcp = 1;
SPacketInfo *pInfo = (SPacketInfo *)pHead->content; SPacketInfo *pInfo = (SPacketInfo *)pHead->content;
...@@ -685,7 +720,7 @@ int taosSendPacketViaTcp(uint32_t ip, uint16_t port, char *data, int dataLen, vo ...@@ -685,7 +720,7 @@ int taosSendPacketViaTcp(uint32_t ip, uint16_t port, char *data, int dataLen, vo
pInfo->port = pSet->port; pInfo->port = pSet->port;
pInfo->msgLen = pHead->msgLen; pInfo->msgLen = pHead->msgLen;
msgLen = sizeof(STaosHeader) + sizeof(SPacketInfo); msgLen = sizeof(SRpcHead) + sizeof(SPacketInfo);
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
code = taosSendUdpData(ip, port, buffer, msgLen, chandle); code = taosSendUdpData(ip, port, buffer, msgLen, chandle);
tTrace("%s data from server will be sent via TCP:%hu, msgType:%d, length:%d, handle:0x%x", pSet->label, pInfo->port, tTrace("%s data from server will be sent via TCP:%hu, msgType:%d, length:%d, handle:0x%x", pSet->label, pInfo->port,
...@@ -696,16 +731,16 @@ int taosSendPacketViaTcp(uint32_t ip, uint16_t port, char *data, int dataLen, vo ...@@ -696,16 +731,16 @@ int taosSendPacketViaTcp(uint32_t ip, uint16_t port, char *data, int dataLen, vo
tTrace("%s data will be sent via TCP from client", pSet->label); tTrace("%s data will be sent via TCP from client", pSet->label);
// send a UDP header first to set up the connection // send a UDP header first to set up the connection
pHead = (STaosHeader *)buffer; pHead = (SRpcHead *)buffer;
memcpy(pHead, data, sizeof(STaosHeader)); memcpy(pHead, data, sizeof(SRpcHead));
pHead->tcp = 2; pHead->tcp = 2;
msgLen = sizeof(STaosHeader); msgLen = sizeof(SRpcHead);
pHead->msgLen = (int32_t)htonl(msgLen); pHead->msgLen = (int32_t)htonl(msgLen);
code = taosSendUdpData(ip, port, buffer, msgLen, chandle); code = taosSendUdpData(ip, port, buffer, msgLen, chandle);
//pHead = (STaosHeader *)data; //pHead = (SRpcHead *)data;
tinet_ntoa(ipstr, ip); tinet_ntoa(ipstr, ip);
int fd = taosOpenTcpClientSocket(ipstr, pConn->port, tsLocalIp); int fd = taosOpenTcpClientSocket(ipstr, pConn->port, tsLocalIp);
...@@ -762,10 +797,10 @@ int taosSendUdpData(uint32_t ip, uint16_t port, char *data, int dataLen, void *c ...@@ -762,10 +797,10 @@ int taosSendUdpData(uint32_t ip, uint16_t port, char *data, int dataLen, void *c
pthread_mutex_lock(&pConn->mutex); pthread_mutex_lock(&pConn->mutex);
pBuf = (SUdpBuf *)taosGetIpHash(pConn->hash, ip, port); pBuf = (SUdpBuf *)rpcGetIpHash(pConn->hash, ip, port);
if (pBuf == NULL) { if (pBuf == NULL) {
pBuf = taosCreateUdpBuf(pConn, ip, port); pBuf = taosCreateUdpBuf(pConn, ip, port);
taosAddIpHash(pConn->hash, pBuf, ip, port); rpcAddIpHash(pConn->hash, pBuf, ip, port);
} }
if ((pBuf->totalLen + dataLen > RPC_MAX_UDP_SIZE) || (taosMsgHdrSize(pBuf->msgHdr) >= RPC_MAX_UDP_PKTS)) { if ((pBuf->totalLen + dataLen > RPC_MAX_UDP_SIZE) || (taosMsgHdrSize(pBuf->msgHdr) >= RPC_MAX_UDP_PKTS)) {
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
void taosFreeMsgHdr(void *hdr) {
struct msghdr *msgHdr = (struct msghdr *)hdr;
free(msgHdr->msg_iov);
}
int taosMsgHdrSize(void *hdr) {
struct msghdr *msgHdr = (struct msghdr *)hdr;
return (int)msgHdr->msg_iovlen;
}
void taosSendMsgHdr(void *hdr, int fd) {
struct msghdr *msgHdr = (struct msghdr *)hdr;
sendmsg(fd, msgHdr, 0);
msgHdr->msg_iovlen = 0;
}
void taosInitMsgHdr(void **hdr, void *dest, int maxPkts) {
struct msghdr *msgHdr = (struct msghdr *)malloc(sizeof(struct msghdr));
memset(msgHdr, 0, sizeof(struct msghdr));
*hdr = msgHdr;
struct sockaddr_in *destAdd = (struct sockaddr_in *)dest;
msgHdr->msg_name = destAdd;
msgHdr->msg_namelen = sizeof(struct sockaddr_in);
int size = (int)sizeof(struct iovec) * maxPkts;
msgHdr->msg_iov = (struct iovec *)malloc((size_t)size);
memset(msgHdr->msg_iov, 0, (size_t)size);
}
void taosSetMsgHdrData(void *hdr, char *data, int dataLen) {
struct msghdr *msgHdr = (struct msghdr *)hdr;
msgHdr->msg_iov[msgHdr->msg_iovlen].iov_base = data;
msgHdr->msg_iov[msgHdr->msg_iovlen].iov_len = (size_t)dataLen;
msgHdr->msg_iovlen++;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册