提交 84eeb2ab 编写于 作者: dengyihao's avatar dengyihao

Merge branch 'rpc' into 3.0

/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_RPCHEAD_H
#define TDENGINE_RPCHEAD_H
#include <tdef.h>
#ifdef __cplusplus
extern "C" {
#endif
#define RPC_CONN_TCP 2
extern int tsRpcOverhead;
typedef struct {
void* msg;
int msgLen;
uint32_t ip;
uint16_t port;
int connType;
void* shandle;
void* thandle;
void* chandle;
} SRecvInfo;
#pragma pack(push, 1)
typedef struct {
char version : 4; // RPC version
char comp : 4; // compression algorithm, 0:no compression 1:lz4
char resflag : 2; // reserved bits
char spi : 3; // security parameter index
char encrypt : 3; // encrypt algorithm, 0: no encryption
uint16_t tranId; // transcation ID
uint32_t linkUid; // for unique connection ID assigned by client
uint64_t ahandle; // ahandle assigned by client
uint32_t sourceId; // source ID, an index for connection list
uint32_t destId; // destination ID, an index for connection list
uint32_t destIp; // destination IP address, for NAT scenario
char user[TSDB_UNI_LEN]; // user ID
uint16_t port; // for UDP only, port may be changed
char empty[1]; // reserved
uint16_t msgType; // message type
int32_t msgLen; // message length including the header iteslf
uint32_t msgVer;
int32_t code; // code in response message
uint8_t content[0]; // message body starts from here
} SRpcHead;
typedef struct {
int32_t reserved;
int32_t contLen;
} SRpcComp;
typedef struct {
uint32_t timeStamp;
uint8_t auth[TSDB_AUTH_LEN];
} SRpcDigest;
#pragma pack(pop)
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_RPCHEAD_H
......@@ -20,9 +20,8 @@
extern "C" {
#endif
#ifdef __cplusplus
}
#endif
#endif /*_TD_TRANSPORT_H_*/
\ No newline at end of file
#endif /*_TD_TRANSPORT_H_*/
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_RPC_LOG_H
#define TDENGINE_RPC_LOG_H
#ifdef __cplusplus
extern "C" {
#endif
#include "tlog.h"
#define tFatal(...) \
{ \
if (rpcDebugFlag & DEBUG_FATAL) { \
taosPrintLog("RPC FATAL ", DEBUG_FATAL, rpcDebugFlag, __VA_ARGS__); \
} \
}
#define tError(...) \
{ \
if (rpcDebugFlag & DEBUG_ERROR) { \
taosPrintLog("RPC ERROR ", DEBUG_ERROR, rpcDebugFlag, __VA_ARGS__); \
} \
}
#define tWarn(...) \
{ \
if (rpcDebugFlag & DEBUG_WARN) { \
taosPrintLog("RPC WARN ", DEBUG_WARN, rpcDebugFlag, __VA_ARGS__); \
} \
}
#define tInfo(...) \
{ \
if (rpcDebugFlag & DEBUG_INFO) { \
taosPrintLog("RPC ", DEBUG_INFO, rpcDebugFlag, __VA_ARGS__); \
} \
}
#define tDebug(...) \
{ \
if (rpcDebugFlag & DEBUG_DEBUG) { \
taosPrintLog("RPC ", DEBUG_DEBUG, rpcDebugFlag, __VA_ARGS__); \
} \
}
#define tTrace(...) \
{ \
if (rpcDebugFlag & DEBUG_TRACE) { \
taosPrintLog("RPC ", DEBUG_TRACE, rpcDebugFlag, __VA_ARGS__); \
} \
}
#define tDump(x, y) \
{ \
if (rpcDebugFlag & DEBUG_DUMP) { \
taosDumpData((unsigned char *)x, y); \
} \
}
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_RPC_LOG_H
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _rpc_tcp_header_
#define _rpc_tcp_header_
#include <stdint.h>
#ifdef __cplusplus
extern "C" {
#endif
void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle);
void taosStopTcpServer(void *param);
void taosCleanUpTcpServer(void *param);
void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void *fp, void *shandle);
void taosStopTcpClient(void *chandle);
void taosCleanUpTcpClient(void *chandle);
void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uint16_t port);
void taosCloseTcpConnection(void *chandle);
int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chandle);
#ifdef __cplusplus
}
#endif
#endif
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _rpc_udp_header_
#define _rpc_udp_header_
#ifdef __cplusplus
extern "C" {
#endif
#include "taosdef.h"
void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int, void *fp, void *shandle);
void taosStopUdpConnection(void *handle);
void taosCleanUpUdpConnection(void *handle);
int taosSendUdpData(uint32_t ip, uint16_t port, void *data, int dataLen, void *chandle);
void *taosOpenUdpConnection(void *shandle, void *thandle, uint32_t ip, uint16_t port);
void taosFreeMsgHdr(void *hdr);
int taosMsgHdrSize(void *hdr);
void taosSendMsgHdr(void *hdr, TdFilePtr pFile);
void taosInitMsgHdr(void **hdr, void *dest, int maxPkts);
void taosSetMsgHdrData(void *hdr, char *data, int dataLen);
#ifdef __cplusplus
}
#endif
#endif
......@@ -12,7 +12,8 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifdef USE_UV
#ifndef _TD_TRANSPORT_COMM_H
#define _TD_TRANSPORT_COMM_H
#ifdef __cplusplus
extern "C" {
......@@ -22,9 +23,6 @@ extern "C" {
#include "lz4.h"
#include "os.h"
#include "osSocket.h"
#include "rpcCache.h"
#include "rpcHead.h"
#include "rpcLog.h"
#include "taoserror.h"
#include "tglobal.h"
#include "thash.h"
......@@ -33,6 +31,7 @@ extern "C" {
#include "tmd5.h"
#include "tmempool.h"
#include "tmsg.h"
#include "transLog.h"
#include "transportInt.h"
#include "tref.h"
#include "trpc.h"
......@@ -193,12 +192,7 @@ typedef enum { ConnNormal, ConnAcquire, ConnRelease, ConnBroken, ConnInPool } Co
#define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member)))
#define RPC_RESERVE_SIZE (sizeof(STranConnCtx))
#define RPC_MSG_OVERHEAD (sizeof(SRpcHead) + sizeof(SRpcDigest))
#define rpcHeadFromCont(cont) ((SRpcHead*)((char*)cont - sizeof(SRpcHead)))
#define rpcContFromHead(msg) (msg + sizeof(SRpcHead))
#define rpcMsgLenFromCont(contLen) (contLen + sizeof(SRpcHead))
#define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHead))
#define rpcIsReq(type) (type & 1U)
#define rpcIsReq(type) (type & 1U)
#define TRANS_RESERVE_SIZE (sizeof(STranConnCtx))
......@@ -209,15 +203,14 @@ typedef enum { ConnNormal, ConnAcquire, ConnRelease, ConnBroken, ConnInPool } Co
#define transContLenFromMsg(msgLen) (msgLen - sizeof(STransMsgHead));
#define transIsReq(type) (type & 1U)
int rpcAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey);
void rpcBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey);
int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen);
SRpcHead* rpcDecompressRpcMsg(SRpcHead* pHead);
int transAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey);
void transBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey);
bool transCompressMsg(char* msg, int32_t len, int32_t* flen);
bool transDecompressMsg(char* msg, int32_t len, int32_t* flen);
// int rpcAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey);
// void rpcBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey);
//// int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen);
//
// int transAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey);
// void transBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey);
// bool transCompressMsg(char* msg, int32_t len, int32_t* flen);
// bool transDecompressMsg(char* msg, int32_t len, int32_t* flen);
void transFreeMsg(void* msg);
......@@ -365,4 +358,4 @@ void transThreadOnce();
}
#endif
#endif
#endif // _TD_TRANSPORT_COMM_H
......@@ -13,22 +13,26 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_RPC_CACHE_H
#define TDENGINE_RPC_CACHE_H
#include <stdint.h>
#ifndef _TD_TRANSPORT_LOG_H
#define _TD_TRANSPORT_LOG_H
#ifdef __cplusplus
extern "C" {
#endif
void *rpcOpenConnCache(int maxSessions, void (*cleanFp)(void *), void *tmrCtrl, int64_t keepTimer);
void rpcCloseConnCache(void *handle);
void rpcAddConnIntoCache(void *handle, void *data, char *fqdn, uint16_t port, int8_t connType);
void *rpcGetConnFromCache(void *handle, char *fqdn, uint16_t port, int8_t connType);
// clang-format off
#include "tlog.h"
#define tFatal(...) do {if (rpcDebugFlag & DEBUG_FATAL){ taosPrintLog("RPC FATAL ", DEBUG_FATAL, rpcDebugFlag, __VA_ARGS__); }} while (0)
#define tError(...)do { if (rpcDebugFlag & DEBUG_ERROR){ taosPrintLog("RPC ERROR ", DEBUG_ERROR, rpcDebugFlag, __VA_ARGS__); } } while(0)
#define tWarn(...) do { if (rpcDebugFlag & DEBUG_WARN) { taosPrintLog("RPC WARN ", DEBUG_WARN, rpcDebugFlag, __VA_ARGS__); }} while(0)
#define tInfo(...) do { if (rpcDebugFlag & DEBUG_INFO) { taosPrintLog("RPC ", DEBUG_INFO, rpcDebugFlag, __VA_ARGS__); }} while(0)
#define tDebug(...) do {if (rpcDebugFlag & DEBUG_DEBUG){ taosPrintLog("RPC ", DEBUG_DEBUG, rpcDebugFlag, __VA_ARGS__); }} while(0)
#define tTrace(...) do {if (rpcDebugFlag & DEBUG_TRACE){ taosPrintLog("RPC ", DEBUG_TRACE, rpcDebugFlag, __VA_ARGS__); }} while(0)
#define tDump(x, y) do {if (rpcDebugFlag & DEBUG_DUMP) { taosDumpData((unsigned char *)x, y); } } while(0)
// clang-format on
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_RPC_CACHE_H
#endif // __TRANS_LOG_H
......@@ -21,14 +21,12 @@
#endif
#include "lz4.h"
#include "os.h"
#include "rpcCache.h"
#include "rpcHead.h"
#include "rpcLog.h"
#include "taoserror.h"
#include "tglobal.h"
#include "thash.h"
#include "tidpool.h"
#include "tmsg.h"
#include "transLog.h"
#include "tref.h"
#include "trpc.h"
#include "ttimer.h"
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "rpcCache.h"
#include "os.h"
#include "rpcLog.h"
#include "taosdef.h"
#include "tglobal.h"
#include "tmempool.h"
#include "ttimer.h"
#include "tutil.h"
typedef struct SConnHash {
char fqdn[TSDB_FQDN_LEN];
uint16_t port;
char connType;
struct SConnHash *prev;
struct SConnHash *next;
void * data;
uint64_t time;
} SConnHash;
typedef struct {
SConnHash ** connHashList;
mpool_h connHashMemPool;
int maxSessions;
int total;
int * count;
int64_t keepTimer;
TdThreadMutex mutex;
void (*cleanFp)(void *);
void * tmrCtrl;
void * pTimer;
int64_t *lockedBy;
} SConnCache;
static int rpcHashConn(void *handle, char *fqdn, uint16_t port, int8_t connType);
static void rpcLockCache(int64_t *lockedBy);
static void rpcUnlockCache(int64_t *lockedBy);
static void rpcCleanConnCache(void *handle, void *tmrId);
static void rpcRemoveExpiredNodes(SConnCache *pCache, SConnHash *pNode, int hash, uint64_t time);
void *rpcOpenConnCache(int maxSessions, void (*cleanFp)(void *), void *tmrCtrl, int64_t keepTimer) {
SConnHash **connHashList;
mpool_h connHashMemPool;
SConnCache *pCache;
connHashMemPool = taosMemPoolInit(maxSessions, sizeof(SConnHash));
if (connHashMemPool == 0) return NULL;
connHashList = taosMemoryCalloc(sizeof(SConnHash *), maxSessions);
if (connHashList == 0) {
taosMemPoolCleanUp(connHashMemPool);
return NULL;
}
pCache = taosMemoryMalloc(sizeof(SConnCache));
if (pCache == NULL) {
taosMemPoolCleanUp(connHashMemPool);
taosMemoryFree(connHashList);
return NULL;
}
memset(pCache, 0, sizeof(SConnCache));
pCache->count = taosMemoryCalloc(sizeof(int), maxSessions);
pCache->total = 0;
pCache->keepTimer = keepTimer;
pCache->maxSessions = maxSessions;
pCache->connHashMemPool = connHashMemPool;
pCache->connHashList = connHashList;
pCache->cleanFp = cleanFp;
pCache->tmrCtrl = tmrCtrl;
pCache->lockedBy = taosMemoryCalloc(sizeof(int64_t), maxSessions);
taosTmrReset(rpcCleanConnCache, (int32_t)(pCache->keepTimer * 2), pCache, pCache->tmrCtrl, &pCache->pTimer);
taosThreadMutexInit(&pCache->mutex, NULL);
return pCache;
}
void rpcCloseConnCache(void *handle) {
SConnCache *pCache;
pCache = (SConnCache *)handle;
if (pCache == NULL || pCache->maxSessions == 0) return;
taosThreadMutexLock(&pCache->mutex);
taosTmrStopA(&(pCache->pTimer));
if (pCache->connHashMemPool) taosMemPoolCleanUp(pCache->connHashMemPool);
taosMemoryFreeClear(pCache->connHashList);
taosMemoryFreeClear(pCache->count);
taosMemoryFreeClear(pCache->lockedBy);
taosThreadMutexUnlock(&pCache->mutex);
taosThreadMutexDestroy(&pCache->mutex);
memset(pCache, 0, sizeof(SConnCache));
taosMemoryFree(pCache);
}
void rpcAddConnIntoCache(void *handle, void *data, char *fqdn, uint16_t port, int8_t connType) {
int hash;
SConnHash * pNode;
SConnCache *pCache;
uint64_t time = taosGetTimestampMs();
pCache = (SConnCache *)handle;
assert(pCache);
assert(data);
hash = rpcHashConn(pCache, fqdn, port, connType);
pNode = (SConnHash *)taosMemPoolMalloc(pCache->connHashMemPool);
tstrncpy(pNode->fqdn, fqdn, sizeof(pNode->fqdn));
pNode->port = port;
pNode->connType = connType;
pNode->data = data;
pNode->prev = NULL;
pNode->time = time;
rpcLockCache(pCache->lockedBy + hash);
pNode->next = pCache->connHashList[hash];
if (pCache->connHashList[hash] != NULL) (pCache->connHashList[hash])->prev = pNode;
pCache->connHashList[hash] = pNode;
pCache->count[hash]++;
rpcRemoveExpiredNodes(pCache, pNode->next, hash, time);
rpcUnlockCache(pCache->lockedBy + hash);
pCache->total++;
// tTrace("%p %s:%hu:%d:%d:%p added into cache, connections:%d", data, fqdn, port, connType, hash, pNode,
// pCache->count[hash]);
return;
}
void *rpcGetConnFromCache(void *handle, char *fqdn, uint16_t port, int8_t connType) {
int hash;
SConnHash * pNode;
SConnCache *pCache;
void * pData = NULL;
pCache = (SConnCache *)handle;
assert(pCache);
uint64_t time = taosGetTimestampMs();
hash = rpcHashConn(pCache, fqdn, port, connType);
rpcLockCache(pCache->lockedBy + hash);
pNode = pCache->connHashList[hash];
while (pNode) {
if (time >= pCache->keepTimer + pNode->time) {
rpcRemoveExpiredNodes(pCache, pNode, hash, time);
pNode = NULL;
break;
}
if (strcmp(pNode->fqdn, fqdn) == 0 && pNode->port == port && pNode->connType == connType) break;
pNode = pNode->next;
}
if (pNode) {
rpcRemoveExpiredNodes(pCache, pNode->next, hash, time);
if (pNode->prev) {
pNode->prev->next = pNode->next;
} else {
pCache->connHashList[hash] = pNode->next;
}
if (pNode->next) {
pNode->next->prev = pNode->prev;
}
pData = pNode->data;
taosMemPoolFree(pCache->connHashMemPool, (char *)pNode);
pCache->total--;
pCache->count[hash]--;
}
rpcUnlockCache(pCache->lockedBy + hash);
if (pData) {
// tTrace("%p %s:%hu:%d:%d:%p retrieved from cache, connections:%d", pData, fqdn, port, connType, hash, pNode,
// pCache->count[hash]);
} else {
// tTrace("%s:%hu:%d:%d failed to retrieve conn from cache, connections:%d", fqdn, port, connType, hash,
// pCache->count[hash]);
}
return pData;
}
static void rpcCleanConnCache(void *handle, void *tmrId) {
int hash;
SConnHash * pNode;
SConnCache *pCache;
pCache = (SConnCache *)handle;
if (pCache == NULL || pCache->maxSessions == 0) return;
if (pCache->pTimer != tmrId) return;
taosThreadMutexLock(&pCache->mutex);
uint64_t time = taosGetTimestampMs();
for (hash = 0; hash < pCache->maxSessions; ++hash) {
rpcLockCache(pCache->lockedBy + hash);
pNode = pCache->connHashList[hash];
rpcRemoveExpiredNodes(pCache, pNode, hash, time);
rpcUnlockCache(pCache->lockedBy + hash);
}
// tTrace("timer, total connections in cache:%d", pCache->total);
taosTmrReset(rpcCleanConnCache, (int32_t)(pCache->keepTimer * 2), pCache, pCache->tmrCtrl, &pCache->pTimer);
taosThreadMutexUnlock(&pCache->mutex);
}
static void rpcRemoveExpiredNodes(SConnCache *pCache, SConnHash *pNode, int hash, uint64_t time) {
if (pNode == NULL || (time < pCache->keepTimer + pNode->time)) return;
SConnHash *pPrev = pNode->prev, *pNext;
while (pNode) {
(*pCache->cleanFp)(pNode->data);
pNext = pNode->next;
pCache->total--;
pCache->count[hash]--;
// tTrace("%p %s:%hu:%d:%d:%p removed from cache, connections:%d", pNode->data, pNode->fqdn, pNode->port,
// pNode->connType, hash, pNode,
// pCache->count[hash]);
taosMemPoolFree(pCache->connHashMemPool, (char *)pNode);
pNode = pNext;
}
if (pPrev)
pPrev->next = NULL;
else
pCache->connHashList[hash] = NULL;
}
static int rpcHashConn(void *handle, char *fqdn, uint16_t port, int8_t connType) {
SConnCache *pCache = (SConnCache *)handle;
int hash = 0;
char * temp = fqdn;
while (*temp) {
hash += *temp;
++temp;
}
hash += port;
hash += connType;
hash = hash % pCache->maxSessions;
return hash;
}
static void rpcLockCache(int64_t *lockedBy) {
int64_t tid = taosGetSelfPthreadId();
int i = 0;
while (atomic_val_compare_exchange_64(lockedBy, 0, tid) != 0) {
if (++i % 100 == 0) {
sched_yield();
}
}
}
static void rpcUnlockCache(int64_t *lockedBy) {
int64_t tid = taosGetSelfPthreadId();
if (atomic_val_compare_exchange_64(lockedBy, tid, 0) != tid) {
assert(false);
}
}
此差异已折叠。
此差异已折叠。
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "rpcUdp.h"
#include "os.h"
#include "rpcHead.h"
#include "rpcLog.h"
#include "taosdef.h"
#include "taoserror.h"
#include "ttimer.h"
#include "tutil.h"
#ifndef USE_UV
#define RPC_MAX_UDP_CONNS 256
#define RPC_MAX_UDP_PKTS 1000
#define RPC_UDP_BUF_TIME 5 // mseconds
#define RPC_MAX_UDP_SIZE 65480
typedef struct {
int index;
TdSocketPtr pSocket;
uint16_t port; // peer port
uint16_t localPort; // local port
char label[TSDB_LABEL_LEN]; // copy from udpConnSet;
TdThread thread;
void *hash;
void *shandle; // handle passed by upper layer during server initialization
void *pSet;
void *(*processData)(SRecvInfo *pRecv);
char *buffer; // buffer to receive data
} SUdpConn;
typedef struct {
int index;
int server;
uint32_t ip; // local IP
uint16_t port; // local Port
void * shandle; // handle passed by upper layer during server initialization
int threads;
char label[TSDB_LABEL_LEN];
void *(*fp)(SRecvInfo *pPacket);
SUdpConn udpConn[];
} SUdpConnSet;
static void *taosRecvUdpData(void *param);
void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads, void *fp, void *shandle) {
SUdpConn * pConn;
SUdpConnSet *pSet;
int size = (int)sizeof(SUdpConnSet) + threads * (int)sizeof(SUdpConn);
pSet = (SUdpConnSet *)taosMemoryMalloc((size_t)size);
if (pSet == NULL) {
tError("%s failed to allocate UdpConn", label);
terrno = TAOS_SYSTEM_ERROR(errno);
return NULL;
}
memset(pSet, 0, (size_t)size);
pSet->ip = ip;
pSet->port = port;
pSet->shandle = shandle;
pSet->fp = fp;
pSet->threads = threads;
tstrncpy(pSet->label, label, sizeof(pSet->label));
TdThreadAttr thAttr;
taosThreadAttrInit(&thAttr);
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
int i;
uint16_t ownPort;
for (i = 0; i < threads; ++i) {
pConn = pSet->udpConn + i;
ownPort = (port ? port + i : 0);
pConn->pSocket = taosOpenUdpSocket(ip, ownPort);
if (pConn->pSocket == NULL) {
tError("%s failed to open UDP socket %x:%hu", label, ip, port);
break;
}
pConn->buffer = taosMemoryMalloc(RPC_MAX_UDP_SIZE);
if (NULL == pConn->buffer) {
tError("%s failed to malloc recv buffer", label);
break;
}
struct sockaddr_in sin;
unsigned int addrlen = sizeof(sin);
if (taosGetSocketName(pConn->pSocket, (struct sockaddr *)&sin, &addrlen) == 0 && sin.sin_family == AF_INET &&
addrlen == sizeof(sin)) {
pConn->localPort = (uint16_t)ntohs(sin.sin_port);
}
tstrncpy(pConn->label, label, sizeof(pConn->label));
pConn->shandle = shandle;
pConn->processData = fp;
pConn->index = i;
pConn->pSet = pSet;
int code = taosThreadCreate(&pConn->thread, &thAttr, taosRecvUdpData, pConn);
if (code != 0) {
tError("%s failed to create thread to process UDP data(%s)", label, strerror(errno));
break;
}
}
taosThreadAttrDestroy(&thAttr);
if (i != threads) {
terrno = TAOS_SYSTEM_ERROR(errno);
taosCleanUpUdpConnection(pSet);
return NULL;
}
tDebug("%s UDP connection is initialized, ip:%x:%hu threads:%d", label, ip, port, threads);
return pSet;
}
void taosStopUdpConnection(void *handle) {
SUdpConnSet *pSet = (SUdpConnSet *)handle;
SUdpConn * pConn;
if (pSet == NULL) return;
for (int i = 0; i < pSet->threads; ++i) {
pConn = pSet->udpConn + i;
if (pConn->pSocket != NULL) taosShutDownSocketRDWR(pConn->pSocket);
if (pConn->pSocket != NULL) taosCloseSocket(&pConn->pSocket);
pConn->pSocket = NULL;
}
for (int i = 0; i < pSet->threads; ++i) {
pConn = pSet->udpConn + i;
if (taosCheckPthreadValid(pConn->thread)) {
taosThreadJoin(pConn->thread, NULL);
}
taosMemoryFreeClear(pConn->buffer);
// tTrace("%s UDP thread is closed, index:%d", pConn->label, i);
}
tDebug("%s UDP is stopped", pSet->label);
}
void taosCleanUpUdpConnection(void *handle) {
SUdpConnSet *pSet = (SUdpConnSet *)handle;
SUdpConn * pConn;
if (pSet == NULL) return;
for (int i = 0; i < pSet->threads; ++i) {
pConn = pSet->udpConn + i;
if (pConn->pSocket != NULL) taosCloseSocket(&pConn->pSocket);
}
tDebug("%s UDP is cleaned up", pSet->label);
taosMemoryFreeClear(pSet);
}
void *taosOpenUdpConnection(void *shandle, void *thandle, uint32_t ip, uint16_t port) {
SUdpConnSet *pSet = (SUdpConnSet *)shandle;
pSet->index = (pSet->index + 1) % pSet->threads;
SUdpConn *pConn = pSet->udpConn + pSet->index;
pConn->port = port;
tDebug("%s UDP connection is setup, ip:%x:%hu localPort:%hu", pConn->label, ip, port, pConn->localPort);
return pConn;
}
static void *taosRecvUdpData(void *param) {
SUdpConn * pConn = param;
struct sockaddr_in sourceAdd;
ssize_t dataLen;
unsigned int addLen;
uint16_t port;
SRecvInfo recvInfo;
memset(&sourceAdd, 0, sizeof(sourceAdd));
addLen = sizeof(sourceAdd);
tDebug("%s UDP thread is created, index:%d", pConn->label, pConn->index);
char *msg = pConn->buffer;
setThreadName("recvUdpData");
while (1) {
dataLen = taosReadFromSocket(pConn->pSocket, pConn->buffer, RPC_MAX_UDP_SIZE, 0, (struct sockaddr *)&sourceAdd, &addLen);
if (dataLen <= 0) {
tDebug("%s UDP socket was closed, exiting(%s), dataLen:%d", pConn->label, strerror(errno), (int32_t)dataLen);
// for windows usage, remote shutdown also returns - 1 in windows client
if (pConn->pSocket == NULL) {
break;
} else {
continue;
}
}
port = ntohs(sourceAdd.sin_port);
if (dataLen < sizeof(SRpcHead)) {
tError("%s recvfrom failed(%s)", pConn->label, strerror(errno));
continue;
}
int32_t size = dataLen + tsRpcOverhead;
char * tmsg = taosMemoryMalloc(size);
if (NULL == tmsg) {
tError("%s failed to allocate memory, size:%" PRId64, pConn->label, (int64_t)dataLen);
continue;
} else {
tTrace("UDP malloc mem:%p size:%d", tmsg, size);
}
tmsg += tsRpcOverhead; // overhead for SRpcReqContext
memcpy(tmsg, msg, dataLen);
recvInfo.msg = tmsg;
recvInfo.msgLen = dataLen;
recvInfo.ip = sourceAdd.sin_addr.s_addr;
recvInfo.port = port;
recvInfo.shandle = pConn->shandle;
recvInfo.thandle = NULL;
recvInfo.chandle = pConn;
recvInfo.connType = 0;
(*(pConn->processData))(&recvInfo);
}
return NULL;
}
int taosSendUdpData(uint32_t ip, uint16_t port, void *data, int dataLen, void *chandle) {
SUdpConn *pConn = (SUdpConn *)chandle;
if (pConn == NULL) return -1;
struct sockaddr_in destAdd;
memset(&destAdd, 0, sizeof(destAdd));
destAdd.sin_family = AF_INET;
destAdd.sin_addr.s_addr = ip;
destAdd.sin_port = htons(port);
int ret = taosSendto(pConn->pSocket, data, (size_t)dataLen, 0, (struct sockaddr *)&destAdd, sizeof(destAdd));
return ret;
}
#endif
\ No newline at end of file
......@@ -15,9 +15,9 @@
//#define _DEFAULT_SOURCE
#include "os.h"
#include "rpcLog.h"
#include "tglobal.h"
#include "tqueue.h"
#include "transLog.h"
#include "trpc.h"
int msgSize = 128;
......
......@@ -14,9 +14,9 @@
*/
#include <tdatablock.h>
#include "os.h"
#include "rpcLog.h"
#include "taoserror.h"
#include "tglobal.h"
#include "transLog.h"
#include "trpc.h"
#include "tutil.h"
......
......@@ -15,9 +15,9 @@
//#define _DEFAULT_SOURCE
#include "os.h"
#include "rpcLog.h"
#include "tglobal.h"
#include "tqueue.h"
#include "transLog.h"
#include "trpc.h"
int msgSize = 128;
......
......@@ -14,9 +14,9 @@
*/
#include <tdatablock.h>
#include "os.h"
#include "rpcLog.h"
#include "taoserror.h"
#include "tglobal.h"
#include "transLog.h"
#include "trpc.h"
#include "tutil.h"
......
......@@ -15,10 +15,10 @@
#include <gtest/gtest.h>
#include <cstdio>
#include <cstring>
#include "rpcLog.h"
#include "tdatablock.h"
#include "tglobal.h"
#include "tlog.h"
#include "transLog.h"
#include "trpc.h"
using namespace std;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册