diff --git a/include/libs/transport/rpcHead.h b/include/libs/transport/rpcHead.h deleted file mode 100644 index 5ddf1a83c96271abba9aefc254b9cee442c375cc..0000000000000000000000000000000000000000 --- a/include/libs/transport/rpcHead.h +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef TDENGINE_RPCHEAD_H -#define TDENGINE_RPCHEAD_H - -#include -#ifdef __cplusplus -extern "C" { -#endif - -#define RPC_CONN_TCP 2 - -extern int tsRpcOverhead; - -typedef struct { - void* msg; - int msgLen; - uint32_t ip; - uint16_t port; - int connType; - void* shandle; - void* thandle; - void* chandle; -} SRecvInfo; - -#pragma pack(push, 1) - -typedef struct { - char version : 4; // RPC version - char comp : 4; // compression algorithm, 0:no compression 1:lz4 - char resflag : 2; // reserved bits - char spi : 3; // security parameter index - char encrypt : 3; // encrypt algorithm, 0: no encryption - uint16_t tranId; // transcation ID - uint32_t linkUid; // for unique connection ID assigned by client - uint64_t ahandle; // ahandle assigned by client - uint32_t sourceId; // source ID, an index for connection list - uint32_t destId; // destination ID, an index for connection list - uint32_t destIp; // destination IP address, for NAT scenario - char user[TSDB_UNI_LEN]; // user ID - uint16_t port; // for UDP only, port may be changed - char empty[1]; // reserved - uint16_t msgType; // message type - int32_t msgLen; // message length including the header iteslf - uint32_t msgVer; - int32_t code; // code in response message - uint8_t content[0]; // message body starts from here -} SRpcHead; - -typedef struct { - int32_t reserved; - int32_t contLen; -} SRpcComp; - -typedef struct { - uint32_t timeStamp; - uint8_t auth[TSDB_AUTH_LEN]; -} SRpcDigest; - -#pragma pack(pop) - -#ifdef __cplusplus -} -#endif - -#endif // TDENGINE_RPCHEAD_H diff --git a/include/libs/transport/transport.h b/include/libs/transport/transport.h index f5ffc125eaccfcd6431d29cc379b04ed88657d4d..136e8b35cde07e299cceb0d9268bd066cf40b2a3 100644 --- a/include/libs/transport/transport.h +++ b/include/libs/transport/transport.h @@ -20,9 +20,8 @@ extern "C" { #endif - #ifdef __cplusplus } #endif -#endif /*_TD_TRANSPORT_H_*/ \ No newline at end of file +#endif /*_TD_TRANSPORT_H_*/ diff --git a/source/libs/transport/inc/rpcCache.h b/source/libs/transport/inc/rpcCache.h deleted file mode 100644 index 5148f5e23c928026c32f623e67fb7e576abd5604..0000000000000000000000000000000000000000 --- a/source/libs/transport/inc/rpcCache.h +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef TDENGINE_RPC_CACHE_H -#define TDENGINE_RPC_CACHE_H - -#include - -#ifdef __cplusplus -extern "C" { -#endif - -void *rpcOpenConnCache(int maxSessions, void (*cleanFp)(void *), void *tmrCtrl, int64_t keepTimer); -void rpcCloseConnCache(void *handle); -void rpcAddConnIntoCache(void *handle, void *data, char *fqdn, uint16_t port, int8_t connType); -void *rpcGetConnFromCache(void *handle, char *fqdn, uint16_t port, int8_t connType); - -#ifdef __cplusplus -} -#endif - -#endif // TDENGINE_RPC_CACHE_H diff --git a/source/libs/transport/inc/rpcLog.h b/source/libs/transport/inc/rpcLog.h deleted file mode 100644 index a5db866932c5228741de88b3a165def88950f238..0000000000000000000000000000000000000000 --- a/source/libs/transport/inc/rpcLog.h +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef TDENGINE_RPC_LOG_H -#define TDENGINE_RPC_LOG_H - -#ifdef __cplusplus -extern "C" { -#endif - -#include "tlog.h" - -#define tFatal(...) \ - { \ - if (rpcDebugFlag & DEBUG_FATAL) { \ - taosPrintLog("RPC FATAL ", DEBUG_FATAL, rpcDebugFlag, __VA_ARGS__); \ - } \ - } -#define tError(...) \ - { \ - if (rpcDebugFlag & DEBUG_ERROR) { \ - taosPrintLog("RPC ERROR ", DEBUG_ERROR, rpcDebugFlag, __VA_ARGS__); \ - } \ - } -#define tWarn(...) \ - { \ - if (rpcDebugFlag & DEBUG_WARN) { \ - taosPrintLog("RPC WARN ", DEBUG_WARN, rpcDebugFlag, __VA_ARGS__); \ - } \ - } -#define tInfo(...) \ - { \ - if (rpcDebugFlag & DEBUG_INFO) { \ - taosPrintLog("RPC ", DEBUG_INFO, rpcDebugFlag, __VA_ARGS__); \ - } \ - } -#define tDebug(...) \ - { \ - if (rpcDebugFlag & DEBUG_DEBUG) { \ - taosPrintLog("RPC ", DEBUG_DEBUG, rpcDebugFlag, __VA_ARGS__); \ - } \ - } -#define tTrace(...) \ - { \ - if (rpcDebugFlag & DEBUG_TRACE) { \ - taosPrintLog("RPC ", DEBUG_TRACE, rpcDebugFlag, __VA_ARGS__); \ - } \ - } -#define tDump(x, y) \ - { \ - if (rpcDebugFlag & DEBUG_DUMP) { \ - taosDumpData((unsigned char *)x, y); \ - } \ - } - -#ifdef __cplusplus -} -#endif - -#endif // TDENGINE_RPC_LOG_H diff --git a/source/libs/transport/inc/rpcTcp.h b/source/libs/transport/inc/rpcTcp.h deleted file mode 100644 index ad42307516462521055575d11bef38273b125fe1..0000000000000000000000000000000000000000 --- a/source/libs/transport/inc/rpcTcp.h +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef _rpc_tcp_header_ -#define _rpc_tcp_header_ -#include - -#ifdef __cplusplus -extern "C" { -#endif - -void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle); -void taosStopTcpServer(void *param); -void taosCleanUpTcpServer(void *param); - -void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void *fp, void *shandle); -void taosStopTcpClient(void *chandle); -void taosCleanUpTcpClient(void *chandle); -void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uint16_t port); - -void taosCloseTcpConnection(void *chandle); -int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chandle); - -#ifdef __cplusplus -} -#endif - -#endif diff --git a/source/libs/transport/inc/rpcUdp.h b/source/libs/transport/inc/rpcUdp.h deleted file mode 100644 index 0c651d07ed4662305e107c09a7312f372aa98339..0000000000000000000000000000000000000000 --- a/source/libs/transport/inc/rpcUdp.h +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef _rpc_udp_header_ -#define _rpc_udp_header_ - -#ifdef __cplusplus -extern "C" { -#endif - -#include "taosdef.h" - -void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int, void *fp, void *shandle); -void taosStopUdpConnection(void *handle); -void taosCleanUpUdpConnection(void *handle); -int taosSendUdpData(uint32_t ip, uint16_t port, void *data, int dataLen, void *chandle); -void *taosOpenUdpConnection(void *shandle, void *thandle, uint32_t ip, uint16_t port); - -void taosFreeMsgHdr(void *hdr); -int taosMsgHdrSize(void *hdr); -void taosSendMsgHdr(void *hdr, TdFilePtr pFile); -void taosInitMsgHdr(void **hdr, void *dest, int maxPkts); -void taosSetMsgHdrData(void *hdr, char *data, int dataLen); - -#ifdef __cplusplus -} -#endif - -#endif diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index e4e79ccf476ce9a1253db536f623431810541ad5..3f82d6e2d85d2a345db7ed7a5bc2f1938a2eda62 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -12,7 +12,8 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -#ifdef USE_UV +#ifndef _TD_TRANSPORT_COMM_H +#define _TD_TRANSPORT_COMM_H #ifdef __cplusplus extern "C" { @@ -22,9 +23,6 @@ extern "C" { #include "lz4.h" #include "os.h" #include "osSocket.h" -#include "rpcCache.h" -#include "rpcHead.h" -#include "rpcLog.h" #include "taoserror.h" #include "tglobal.h" #include "thash.h" @@ -33,6 +31,7 @@ extern "C" { #include "tmd5.h" #include "tmempool.h" #include "tmsg.h" +#include "transLog.h" #include "transportInt.h" #include "tref.h" #include "trpc.h" @@ -193,12 +192,7 @@ typedef enum { ConnNormal, ConnAcquire, ConnRelease, ConnBroken, ConnInPool } Co #define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member))) #define RPC_RESERVE_SIZE (sizeof(STranConnCtx)) -#define RPC_MSG_OVERHEAD (sizeof(SRpcHead) + sizeof(SRpcDigest)) -#define rpcHeadFromCont(cont) ((SRpcHead*)((char*)cont - sizeof(SRpcHead))) -#define rpcContFromHead(msg) (msg + sizeof(SRpcHead)) -#define rpcMsgLenFromCont(contLen) (contLen + sizeof(SRpcHead)) -#define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHead)) -#define rpcIsReq(type) (type & 1U) +#define rpcIsReq(type) (type & 1U) #define TRANS_RESERVE_SIZE (sizeof(STranConnCtx)) @@ -209,15 +203,14 @@ typedef enum { ConnNormal, ConnAcquire, ConnRelease, ConnBroken, ConnInPool } Co #define transContLenFromMsg(msgLen) (msgLen - sizeof(STransMsgHead)); #define transIsReq(type) (type & 1U) -int rpcAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey); -void rpcBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey); -int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen); -SRpcHead* rpcDecompressRpcMsg(SRpcHead* pHead); - -int transAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey); -void transBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey); -bool transCompressMsg(char* msg, int32_t len, int32_t* flen); -bool transDecompressMsg(char* msg, int32_t len, int32_t* flen); +// int rpcAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey); +// void rpcBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey); +//// int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen); +// +// int transAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey); +// void transBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey); +// bool transCompressMsg(char* msg, int32_t len, int32_t* flen); +// bool transDecompressMsg(char* msg, int32_t len, int32_t* flen); void transFreeMsg(void* msg); @@ -365,4 +358,4 @@ void transThreadOnce(); } #endif -#endif +#endif // _TD_TRANSPORT_COMM_H diff --git a/source/libs/transport/inc/transLog.h b/source/libs/transport/inc/transLog.h new file mode 100644 index 0000000000000000000000000000000000000000..ebf231cfcff11aea08158264da5b71e77a429692 --- /dev/null +++ b/source/libs/transport/inc/transLog.h @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_TRANSPORT_LOG_H +#define _TD_TRANSPORT_LOG_H + +#ifdef __cplusplus +extern "C" { +#endif + +// clang-format off +#include "tlog.h" + +#define tFatal(...) do {if (rpcDebugFlag & DEBUG_FATAL){ taosPrintLog("RPC FATAL ", DEBUG_FATAL, rpcDebugFlag, __VA_ARGS__); }} while (0) +#define tError(...)do { if (rpcDebugFlag & DEBUG_ERROR){ taosPrintLog("RPC ERROR ", DEBUG_ERROR, rpcDebugFlag, __VA_ARGS__); } } while(0) +#define tWarn(...) do { if (rpcDebugFlag & DEBUG_WARN) { taosPrintLog("RPC WARN ", DEBUG_WARN, rpcDebugFlag, __VA_ARGS__); }} while(0) +#define tInfo(...) do { if (rpcDebugFlag & DEBUG_INFO) { taosPrintLog("RPC ", DEBUG_INFO, rpcDebugFlag, __VA_ARGS__); }} while(0) +#define tDebug(...) do {if (rpcDebugFlag & DEBUG_DEBUG){ taosPrintLog("RPC ", DEBUG_DEBUG, rpcDebugFlag, __VA_ARGS__); }} while(0) +#define tTrace(...) do {if (rpcDebugFlag & DEBUG_TRACE){ taosPrintLog("RPC ", DEBUG_TRACE, rpcDebugFlag, __VA_ARGS__); }} while(0) +#define tDump(x, y) do {if (rpcDebugFlag & DEBUG_DUMP) { taosDumpData((unsigned char *)x, y); } } while(0) +// clang-format on +#ifdef __cplusplus +} +#endif + +#endif // __TRANS_LOG_H diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index c8972067d824c04b088dd2762b8e19abc7af044d..a498571f33cb09cdf93cb04b3fd862dffa13bebd 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -21,14 +21,12 @@ #endif #include "lz4.h" #include "os.h" -#include "rpcCache.h" -#include "rpcHead.h" -#include "rpcLog.h" #include "taoserror.h" #include "tglobal.h" #include "thash.h" #include "tidpool.h" #include "tmsg.h" +#include "transLog.h" #include "tref.h" #include "trpc.h" #include "ttimer.h" diff --git a/source/libs/transport/src/rpcCache.c b/source/libs/transport/src/rpcCache.c deleted file mode 100644 index e02de2d8b96f628d8eb07355690b968840378863..0000000000000000000000000000000000000000 --- a/source/libs/transport/src/rpcCache.c +++ /dev/null @@ -1,294 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include "rpcCache.h" -#include "os.h" -#include "rpcLog.h" -#include "taosdef.h" -#include "tglobal.h" -#include "tmempool.h" -#include "ttimer.h" -#include "tutil.h" - -typedef struct SConnHash { - char fqdn[TSDB_FQDN_LEN]; - uint16_t port; - char connType; - struct SConnHash *prev; - struct SConnHash *next; - void * data; - uint64_t time; -} SConnHash; - -typedef struct { - SConnHash ** connHashList; - mpool_h connHashMemPool; - int maxSessions; - int total; - int * count; - int64_t keepTimer; - TdThreadMutex mutex; - void (*cleanFp)(void *); - void * tmrCtrl; - void * pTimer; - int64_t *lockedBy; -} SConnCache; - -static int rpcHashConn(void *handle, char *fqdn, uint16_t port, int8_t connType); -static void rpcLockCache(int64_t *lockedBy); -static void rpcUnlockCache(int64_t *lockedBy); -static void rpcCleanConnCache(void *handle, void *tmrId); -static void rpcRemoveExpiredNodes(SConnCache *pCache, SConnHash *pNode, int hash, uint64_t time); - -void *rpcOpenConnCache(int maxSessions, void (*cleanFp)(void *), void *tmrCtrl, int64_t keepTimer) { - SConnHash **connHashList; - mpool_h connHashMemPool; - SConnCache *pCache; - - connHashMemPool = taosMemPoolInit(maxSessions, sizeof(SConnHash)); - if (connHashMemPool == 0) return NULL; - - connHashList = taosMemoryCalloc(sizeof(SConnHash *), maxSessions); - if (connHashList == 0) { - taosMemPoolCleanUp(connHashMemPool); - return NULL; - } - - pCache = taosMemoryMalloc(sizeof(SConnCache)); - if (pCache == NULL) { - taosMemPoolCleanUp(connHashMemPool); - taosMemoryFree(connHashList); - return NULL; - } - memset(pCache, 0, sizeof(SConnCache)); - - pCache->count = taosMemoryCalloc(sizeof(int), maxSessions); - pCache->total = 0; - pCache->keepTimer = keepTimer; - pCache->maxSessions = maxSessions; - pCache->connHashMemPool = connHashMemPool; - pCache->connHashList = connHashList; - pCache->cleanFp = cleanFp; - pCache->tmrCtrl = tmrCtrl; - pCache->lockedBy = taosMemoryCalloc(sizeof(int64_t), maxSessions); - taosTmrReset(rpcCleanConnCache, (int32_t)(pCache->keepTimer * 2), pCache, pCache->tmrCtrl, &pCache->pTimer); - - taosThreadMutexInit(&pCache->mutex, NULL); - - return pCache; -} - -void rpcCloseConnCache(void *handle) { - SConnCache *pCache; - - pCache = (SConnCache *)handle; - if (pCache == NULL || pCache->maxSessions == 0) return; - - taosThreadMutexLock(&pCache->mutex); - - taosTmrStopA(&(pCache->pTimer)); - - if (pCache->connHashMemPool) taosMemPoolCleanUp(pCache->connHashMemPool); - - taosMemoryFreeClear(pCache->connHashList); - taosMemoryFreeClear(pCache->count); - taosMemoryFreeClear(pCache->lockedBy); - - taosThreadMutexUnlock(&pCache->mutex); - - taosThreadMutexDestroy(&pCache->mutex); - - memset(pCache, 0, sizeof(SConnCache)); - taosMemoryFree(pCache); -} - -void rpcAddConnIntoCache(void *handle, void *data, char *fqdn, uint16_t port, int8_t connType) { - int hash; - SConnHash * pNode; - SConnCache *pCache; - - uint64_t time = taosGetTimestampMs(); - - pCache = (SConnCache *)handle; - assert(pCache); - assert(data); - - hash = rpcHashConn(pCache, fqdn, port, connType); - pNode = (SConnHash *)taosMemPoolMalloc(pCache->connHashMemPool); - tstrncpy(pNode->fqdn, fqdn, sizeof(pNode->fqdn)); - pNode->port = port; - pNode->connType = connType; - pNode->data = data; - pNode->prev = NULL; - pNode->time = time; - - rpcLockCache(pCache->lockedBy + hash); - - pNode->next = pCache->connHashList[hash]; - if (pCache->connHashList[hash] != NULL) (pCache->connHashList[hash])->prev = pNode; - pCache->connHashList[hash] = pNode; - - pCache->count[hash]++; - rpcRemoveExpiredNodes(pCache, pNode->next, hash, time); - - rpcUnlockCache(pCache->lockedBy + hash); - - pCache->total++; - // tTrace("%p %s:%hu:%d:%d:%p added into cache, connections:%d", data, fqdn, port, connType, hash, pNode, - // pCache->count[hash]); - - return; -} - -void *rpcGetConnFromCache(void *handle, char *fqdn, uint16_t port, int8_t connType) { - int hash; - SConnHash * pNode; - SConnCache *pCache; - void * pData = NULL; - - pCache = (SConnCache *)handle; - assert(pCache); - - uint64_t time = taosGetTimestampMs(); - - hash = rpcHashConn(pCache, fqdn, port, connType); - rpcLockCache(pCache->lockedBy + hash); - - pNode = pCache->connHashList[hash]; - while (pNode) { - if (time >= pCache->keepTimer + pNode->time) { - rpcRemoveExpiredNodes(pCache, pNode, hash, time); - pNode = NULL; - break; - } - - if (strcmp(pNode->fqdn, fqdn) == 0 && pNode->port == port && pNode->connType == connType) break; - - pNode = pNode->next; - } - - if (pNode) { - rpcRemoveExpiredNodes(pCache, pNode->next, hash, time); - - if (pNode->prev) { - pNode->prev->next = pNode->next; - } else { - pCache->connHashList[hash] = pNode->next; - } - - if (pNode->next) { - pNode->next->prev = pNode->prev; - } - - pData = pNode->data; - taosMemPoolFree(pCache->connHashMemPool, (char *)pNode); - pCache->total--; - pCache->count[hash]--; - } - - rpcUnlockCache(pCache->lockedBy + hash); - - if (pData) { - // tTrace("%p %s:%hu:%d:%d:%p retrieved from cache, connections:%d", pData, fqdn, port, connType, hash, pNode, - // pCache->count[hash]); - } else { - // tTrace("%s:%hu:%d:%d failed to retrieve conn from cache, connections:%d", fqdn, port, connType, hash, - // pCache->count[hash]); - } - - return pData; -} - -static void rpcCleanConnCache(void *handle, void *tmrId) { - int hash; - SConnHash * pNode; - SConnCache *pCache; - - pCache = (SConnCache *)handle; - if (pCache == NULL || pCache->maxSessions == 0) return; - if (pCache->pTimer != tmrId) return; - - taosThreadMutexLock(&pCache->mutex); - uint64_t time = taosGetTimestampMs(); - - for (hash = 0; hash < pCache->maxSessions; ++hash) { - rpcLockCache(pCache->lockedBy + hash); - pNode = pCache->connHashList[hash]; - rpcRemoveExpiredNodes(pCache, pNode, hash, time); - rpcUnlockCache(pCache->lockedBy + hash); - } - - // tTrace("timer, total connections in cache:%d", pCache->total); - taosTmrReset(rpcCleanConnCache, (int32_t)(pCache->keepTimer * 2), pCache, pCache->tmrCtrl, &pCache->pTimer); - taosThreadMutexUnlock(&pCache->mutex); -} - -static void rpcRemoveExpiredNodes(SConnCache *pCache, SConnHash *pNode, int hash, uint64_t time) { - if (pNode == NULL || (time < pCache->keepTimer + pNode->time)) return; - - SConnHash *pPrev = pNode->prev, *pNext; - - while (pNode) { - (*pCache->cleanFp)(pNode->data); - pNext = pNode->next; - pCache->total--; - pCache->count[hash]--; - // tTrace("%p %s:%hu:%d:%d:%p removed from cache, connections:%d", pNode->data, pNode->fqdn, pNode->port, - // pNode->connType, hash, pNode, - // pCache->count[hash]); - taosMemPoolFree(pCache->connHashMemPool, (char *)pNode); - pNode = pNext; - } - - if (pPrev) - pPrev->next = NULL; - else - pCache->connHashList[hash] = NULL; -} - -static int rpcHashConn(void *handle, char *fqdn, uint16_t port, int8_t connType) { - SConnCache *pCache = (SConnCache *)handle; - int hash = 0; - char * temp = fqdn; - - while (*temp) { - hash += *temp; - ++temp; - } - - hash += port; - hash += connType; - - hash = hash % pCache->maxSessions; - - return hash; -} - -static void rpcLockCache(int64_t *lockedBy) { - int64_t tid = taosGetSelfPthreadId(); - int i = 0; - while (atomic_val_compare_exchange_64(lockedBy, 0, tid) != 0) { - if (++i % 100 == 0) { - sched_yield(); - } - } -} - -static void rpcUnlockCache(int64_t *lockedBy) { - int64_t tid = taosGetSelfPthreadId(); - if (atomic_val_compare_exchange_64(lockedBy, tid, 0) != tid) { - assert(false); - } -} diff --git a/source/libs/transport/src/rpcMain.c b/source/libs/transport/src/rpcMain.c deleted file mode 100644 index 0e425970da7a600a2ac85431de4d4d4621f80a2d..0000000000000000000000000000000000000000 --- a/source/libs/transport/src/rpcMain.c +++ /dev/null @@ -1,1682 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include "lz4.h" -#include "transportInt.h" -#include "os.h" -#include "rpcCache.h" -#include "rpcHead.h" -#include "rpcLog.h" -#include "rpcTcp.h" -#include "rpcUdp.h" -#include "taoserror.h" -#include "tglobal.h" -#include "thash.h" -#include "tidpool.h" -#include "tmd5.h" -#include "tmempool.h" -#include "tmsg.h" -#include "tref.h" -#include "trpc.h" -#include "ttimer.h" -#include "tutil.h" - -static TdThreadOnce tsRpcInitOnce = PTHREAD_ONCE_INIT; - -int tsRpcMaxUdpSize = 15000; // bytes -int tsProgressTimer = 100; -// not configurable -int tsRpcMaxRetry; -int tsRpcHeadSize; -int tsRpcOverhead; - -SHashObj *tsFqdnHash; - -#ifndef USE_UV - -typedef struct { - int sessions; // number of sessions allowed - int numOfThreads; // number of threads to process incoming messages - int idleTime; // milliseconds; - uint16_t localPort; - int8_t connType; - int index; // for UDP server only, round robin for multiple threads - char label[TSDB_LABEL_LEN]; - - char user[TSDB_UNI_LEN]; // meter ID - char spi; // security parameter index - char encrypt; // encrypt algorithm - char secret[TSDB_PASSWORD_LEN]; // secret for the link - char ckey[TSDB_PASSWORD_LEN]; // ciphering key - - void (*cfp)(void *parent, SRpcMsg *, SEpSet *); - int (*afp)(void *parent, char *user, char *spi, char *encrypt, char *secret, char *ckey); - - int32_t refCount; - void * parent; - void * idPool; // handle to ID pool - void * tmrCtrl; // handle to timer - SHashObj * hash; // handle returned by hash utility - void * tcphandle; // returned handle from TCP initialization - void * udphandle; // returned handle from UDP initialization - void * pCache; // connection cache - TdThreadMutex mutex; - struct SRpcConn *connList; // connection list -} SRpcInfo; - -typedef struct { - SRpcInfo * pRpc; // associated SRpcInfo - SEpSet epSet; // ip list provided by app - void * ahandle; // handle provided by app - struct SRpcConn *pConn; // pConn allocated - tmsg_t msgType; // message type - uint8_t * pCont; // content provided by app - int32_t contLen; // content length - int32_t code; // error code - int16_t numOfTry; // number of try for different servers - int8_t oldInUse; // server EP inUse passed by app - int8_t redirect; // flag to indicate redirect - int8_t connType; // connection type - int64_t rid; // refId returned by taosAddRef - SRpcMsg * pRsp; // for synchronous API - tsem_t * pSem; // for synchronous API - SEpSet * pSet; // for synchronous API - char msg[0]; // RpcHead starts from here -} SRpcReqContext; - -#define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHead) + sizeof(SRpcDigest)) -#define rpcHeadFromCont(cont) ((SRpcHead *)((char *)cont - sizeof(SRpcHead))) -#define rpcContFromHead(msg) (msg + sizeof(SRpcHead)) -#define rpcMsgLenFromCont(contLen) (contLen + sizeof(SRpcHead)) -#define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHead)) -#define rpcIsReq(type) (type & 1U) - -typedef struct SRpcConn { - char info[48]; // debug info: label + pConn + ahandle - int sid; // session ID - uint32_t ownId; // own link ID - uint32_t peerId; // peer link ID - char user[TSDB_UNI_LEN]; // user ID for the link - char spi; // security parameter index - char encrypt; // encryption, 0:1 - char secret[TSDB_PASSWORD_LEN]; // secret for the link - char ckey[TSDB_PASSWORD_LEN]; // ciphering key - char secured; // if set to 1, no authentication - uint16_t localPort; // for UDP only - uint32_t linkUid; // connection unique ID assigned by client - uint32_t peerIp; // peer IP - uint16_t peerPort; // peer port - char peerFqdn[TSDB_FQDN_LEN]; // peer FQDN or ip string - uint16_t tranId; // outgoing transcation ID, for build message - uint16_t outTranId; // outgoing transcation ID - uint16_t inTranId; // transcation ID for incoming msg - tmsg_t outType; // message type for outgoing request - tmsg_t inType; // message type for incoming request - void * chandle; // handle passed by TCP/UDP connection layer - void * ahandle; // handle provided by upper app layter - int retry; // number of retry for sending request - int tretry; // total retry - void * pTimer; // retry timer to monitor the response - void * pIdleTimer; // idle timer - char * pRspMsg; // response message including header - int rspMsgLen; // response messag length - char * pReqMsg; // request message including header - int reqMsgLen; // request message length - SRpcInfo * pRpc; // the associated SRpcInfo - int8_t connType; // connection type - int64_t lockedBy; // lock for connection - SRpcReqContext *pContext; // request context -} SRpcConn; - -static int tsRpcRefId = -1; -static int32_t tsRpcNum = 0; - -// static TdThreadOnce tsRpcInit = PTHREAD_ONCE_INIT; - -// server:0 client:1 tcp:2 udp:0 -#define RPC_CONN_UDPS 0 -#define RPC_CONN_UDPC 1 -#define RPC_CONN_TCPS 2 -#define RPC_CONN_TCPC 3 - -void *(*taosInitConn[])(uint32_t ip, uint16_t port, char *label, int threads, void *fp, void *shandle) = { - taosInitUdpConnection, taosInitUdpConnection, taosInitTcpServer, taosInitTcpClient}; - -void (*taosCleanUpConn[])(void *thandle) = {taosCleanUpUdpConnection, taosCleanUpUdpConnection, taosCleanUpTcpServer, - taosCleanUpTcpClient}; - -void (*taosStopConn[])(void *thandle) = { - taosStopUdpConnection, - taosStopUdpConnection, - taosStopTcpServer, - taosStopTcpClient, -}; - -int (*taosSendData[])(uint32_t ip, uint16_t port, void *data, int len, void *chandle) = { - taosSendUdpData, taosSendUdpData, taosSendTcpData, taosSendTcpData}; - -void *(*taosOpenConn[])(void *shandle, void *thandle, uint32_t ip, uint16_t port) = { - taosOpenUdpConnection, - taosOpenUdpConnection, - NULL, - taosOpenTcpClientConnection, -}; - -void (*taosCloseConn[])(void *chandle) = {NULL, NULL, taosCloseTcpConnection, taosCloseTcpConnection}; - -static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerFqdn, uint16_t peerPort, int8_t connType); -static void rpcCloseConn(void *thandle); -static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext); -static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc); -static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv); -static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv); - -static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext); -static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code); -static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code); -static void rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen); -static void rpcSendReqHead(SRpcConn *pConn); - -static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv); -static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqContext *pContext); -static void rpcProcessConnError(void *param, void *id); -static void rpcProcessRetryTimer(void *, void *); -static void rpcProcessIdleTimer(void *param, void *tmrId); -static void rpcProcessProgressTimer(void *param, void *tmrId); - -static void rpcFreeMsg(void *msg); -static int32_t rpcCompressRpcMsg(char *pCont, int32_t contLen); -static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead); -static int rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen); -static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen); -static void rpcLockConn(SRpcConn *pConn); -static void rpcUnlockConn(SRpcConn *pConn); -static void rpcAddRef(SRpcInfo *pRpc); -static void rpcDecRef(SRpcInfo *pRpc); - -static void rpcFree(void *p) { - tTrace("free mem: %p", p); - taosMemoryFree(p); -} - -static void rpcInitImp(void) { - tsProgressTimer = tsRpcTimer / 2; - tsRpcMaxRetry = tsRpcMaxTime * 1000 / tsProgressTimer; - tsRpcHeadSize = RPC_MSG_OVERHEAD; - tsRpcOverhead = sizeof(SRpcReqContext); - - tsRpcRefId = taosOpenRef(200, rpcFree); - - tsFqdnHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); -} - -int32_t rpcInit() { - taosThreadOnce(&tsRpcInitOnce, rpcInitImp); - return 0; -} - -void rpcCleanup(void) { - taosCloseRef(tsRpcRefId); - taosHashClear(tsFqdnHash); - taosHashCleanup(tsFqdnHash); - tsFqdnHash = NULL; - tsRpcRefId = -1; -} - -void *rpcOpen(const SRpcInit *pInit) { - SRpcInfo *pRpc; - - // taosThreadOnce(&tsRpcInit, rpcInit); - - pRpc = (SRpcInfo *)taosMemoryCalloc(1, sizeof(SRpcInfo)); - if (pRpc == NULL) return NULL; - - if (pInit->label) tstrncpy(pRpc->label, pInit->label, tListLen(pInit->label)); - - pRpc->connType = pInit->connType; - if (pRpc->connType == TAOS_CONN_CLIENT) { - pRpc->numOfThreads = pInit->numOfThreads; - if (pRpc->numOfThreads >= 10) { - pRpc->numOfThreads = 10; - } - } else { - pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; - } - pRpc->idleTime = pInit->idleTime; - pRpc->localPort = pInit->localPort; - pRpc->afp = pInit->afp; - pRpc->sessions = pInit->sessions + 1; - if (pInit->user) tstrncpy(pRpc->user, pInit->user, sizeof(pRpc->user)); - if (pInit->secret) memcpy(pRpc->secret, pInit->secret, sizeof(pRpc->secret)); - if (pInit->ckey) tstrncpy(pRpc->ckey, pInit->ckey, sizeof(pRpc->ckey)); - pRpc->spi = pInit->spi; - pRpc->cfp = pInit->cfp; - pRpc->afp = pInit->afp; - pRpc->parent = pInit->parent; - pRpc->refCount = 1; - - atomic_add_fetch_32(&tsRpcNum, 1); - - size_t size = sizeof(SRpcConn) * pRpc->sessions; - pRpc->connList = (SRpcConn *)taosMemoryCalloc(1, size); - if (pRpc->connList == NULL) { - tError("%s failed to allocate memory for taos connections, size:%" PRId64, pRpc->label, (int64_t)size); - rpcClose(pRpc); - return NULL; - } - - pRpc->idPool = taosInitIdPool(pRpc->sessions - 1); - if (pRpc->idPool == NULL) { - tError("%s failed to init ID pool", pRpc->label); - rpcClose(pRpc); - return NULL; - } - - pRpc->tmrCtrl = taosTmrInit(pRpc->sessions * 2 + 1, 50, 10000, pRpc->label); - if (pRpc->tmrCtrl == NULL) { - tError("%s failed to init timers", pRpc->label); - rpcClose(pRpc); - return NULL; - } - - if (pRpc->connType == TAOS_CONN_SERVER) { - pRpc->hash = taosHashInit(pRpc->sessions, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, true); - if (pRpc->hash == NULL) { - tError("%s failed to init string hash", pRpc->label); - rpcClose(pRpc); - return NULL; - } - } else { - pRpc->pCache = rpcOpenConnCache(pRpc->sessions, rpcCloseConn, pRpc->tmrCtrl, pRpc->idleTime * 20); - if (pRpc->pCache == NULL) { - tError("%s failed to init connection cache", pRpc->label); - rpcClose(pRpc); - return NULL; - } - } - - taosThreadMutexInit(&pRpc->mutex, NULL); - - pRpc->tcphandle = (*taosInitConn[pRpc->connType | RPC_CONN_TCP])(0, pRpc->localPort, pRpc->label, pRpc->numOfThreads, - rpcProcessMsgFromPeer, pRpc); - pRpc->udphandle = - (*taosInitConn[pRpc->connType])(0, pRpc->localPort, pRpc->label, pRpc->numOfThreads, rpcProcessMsgFromPeer, pRpc); - - if (pRpc->tcphandle == NULL || pRpc->udphandle == NULL) { - tError("%s failed to init network, port:%d", pRpc->label, pRpc->localPort); - rpcClose(pRpc); - return NULL; - } - - tDebug("%s rpc is opened, threads:%d sessions:%d", pRpc->label, pRpc->numOfThreads, pInit->sessions); - - return pRpc; -} - -void rpcClose(void *param) { - SRpcInfo *pRpc = (SRpcInfo *)param; - - // stop connection to outside first - (*taosStopConn[pRpc->connType | RPC_CONN_TCP])(pRpc->tcphandle); - (*taosStopConn[pRpc->connType])(pRpc->udphandle); - - // close all connections - for (int i = 0; i < pRpc->sessions; ++i) { - if (pRpc->connList && pRpc->connList[i].user[0]) { - rpcCloseConn((void *)(pRpc->connList + i)); - } - } - - // clean up - (*taosCleanUpConn[pRpc->connType | RPC_CONN_TCP])(pRpc->tcphandle); - (*taosCleanUpConn[pRpc->connType])(pRpc->udphandle); - - tDebug("%s rpc is closed", pRpc->label); - rpcDecRef(pRpc); -} - -void *rpcMallocCont(int contLen) { - int size = contLen + RPC_MSG_OVERHEAD; - - char *start = (char *)taosMemoryCalloc(1, (size_t)size); - if (start == NULL) { - tError("failed to malloc msg, size:%d", size); - return NULL; - } else { - tTrace("malloc mem:%p size:%d", start, size); - } - - return start + sizeof(SRpcReqContext) + sizeof(SRpcHead); -} - -void rpcFreeCont(void *cont) { - if (cont) { - char *temp = ((char *)cont) - sizeof(SRpcHead) - sizeof(SRpcReqContext); - taosMemoryFree(temp); - tTrace("free mem: %p", temp); - } -} - -void *rpcReallocCont(void *ptr, int contLen) { - if (ptr == NULL) return rpcMallocCont(contLen); - - char *start = ((char *)ptr) - sizeof(SRpcReqContext) - sizeof(SRpcHead); - if (contLen == 0) { - taosMemoryFree(start); - return NULL; - } - - int size = contLen + RPC_MSG_OVERHEAD; - start = taosMemoryRealloc(start, size); - if (start == NULL) { - tError("failed to realloc cont, size:%d", size); - return NULL; - } - - return start + sizeof(SRpcReqContext) + sizeof(SRpcHead); -} - -void rpcSendRequest(void *shandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *pRid) { - SRpcInfo * pRpc = (SRpcInfo *)shandle; - SRpcReqContext *pContext; - - int contLen = rpcCompressRpcMsg(pMsg->pCont, pMsg->contLen); - pContext = (SRpcReqContext *)((char *)pMsg->pCont - sizeof(SRpcHead) - sizeof(SRpcReqContext)); - pContext->ahandle = pMsg->ahandle; - pContext->pRpc = (SRpcInfo *)shandle; - pContext->epSet = *pEpSet; - pContext->contLen = contLen; - pContext->pCont = pMsg->pCont; - pContext->msgType = pMsg->msgType; - pContext->oldInUse = pEpSet->inUse; - - pContext->connType = RPC_CONN_UDPC; - if (contLen > tsRpcMaxUdpSize || tsRpcForceTcp) pContext->connType = RPC_CONN_TCPC; - - // connection type is application specific. - // for TDengine, all the query, show commands shall have TCP connection - tmsg_t type = pMsg->msgType; - if (type == TDMT_VND_QUERY || type == TDMT_MND_SHOW_RETRIEVE || type == TDMT_VND_FETCH || - type == TDMT_MND_VGROUP_LIST || type == TDMT_VND_TABLES_META || type == TDMT_VND_TABLE_META || - type == TDMT_MND_SHOW || type == TDMT_MND_STATUS || type == TDMT_VND_ALTER_TABLE) - pContext->connType = RPC_CONN_TCPC; - - pContext->rid = taosAddRef(tsRpcRefId, pContext); - if (pRid) *pRid = pContext->rid; - - rpcSendReqToServer(pRpc, pContext); -} - -void rpcSendResponse(const SRpcMsg *pRsp) { - ASSERT(pRsp->handle != NULL); - - int msgLen = 0; - SRpcConn *pConn = (SRpcConn *)pRsp->handle; - SRpcMsg rpcMsg = *pRsp; - SRpcMsg * pMsg = &rpcMsg; - SRpcInfo *pRpc = pConn->pRpc; - - if (pMsg->pCont == NULL) { - pMsg->pCont = rpcMallocCont(0); - pMsg->contLen = 0; - } - - SRpcHead *pHead = rpcHeadFromCont(pMsg->pCont); - char * msg = (char *)pHead; - - pMsg->contLen = rpcCompressRpcMsg(pMsg->pCont, pMsg->contLen); - msgLen = rpcMsgLenFromCont(pMsg->contLen); - - rpcLockConn(pConn); - - if (pConn->inType == 0 || pConn->user[0] == 0) { - tError("%s, connection is already released, rsp wont be sent", pConn->info); - rpcUnlockConn(pConn); - rpcFreeCont(pMsg->pCont); - rpcDecRef(pRpc); - return; - } - - // set msg header - pHead->version = 1; - pHead->msgType = pConn->inType + 1; - pHead->spi = pConn->spi; - pHead->encrypt = pConn->encrypt; - pHead->tranId = pConn->inTranId; - pHead->sourceId = pConn->ownId; - pHead->destId = pConn->peerId; - pHead->linkUid = pConn->linkUid; - pHead->port = htons(pConn->localPort); - pHead->code = htonl(pMsg->code); - pHead->ahandle = (uint64_t)pConn->ahandle; - - // set pConn parameters - pConn->inType = 0; - - // response message is released until new response is sent - rpcFreeMsg(pConn->pRspMsg); - pConn->pRspMsg = msg; - pConn->rspMsgLen = msgLen; - if (pMsg->code == TSDB_CODE_RPC_ACTION_IN_PROGRESS) pConn->inTranId--; - - // stop the progress timer - taosTmrStopA(&pConn->pTimer); - - // set the idle timer to monitor the activity - taosTmrReset(rpcProcessIdleTimer, pRpc->idleTime * 30, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer); - rpcSendMsgToPeer(pConn, msg, msgLen); - - // if not set to secured, set it expcet NOT_READY case, since client wont treat it as secured - if (pConn->secured == 0 && pMsg->code != TSDB_CODE_RPC_NOT_READY) pConn->secured = 1; // connection shall be secured - - if (pConn->pReqMsg) rpcFreeCont(pConn->pReqMsg); - pConn->pReqMsg = NULL; - pConn->reqMsgLen = 0; - - rpcUnlockConn(pConn); - rpcDecRef(pRpc); // decrease the referene count - - return; -} - -void rpcSendRedirectRsp(void *thandle, const SEpSet *pEpSet) { - SRpcMsg rpcMsg; - memset(&rpcMsg, 0, sizeof(rpcMsg)); - - rpcMsg.contLen = sizeof(SEpSet); - rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); - if (rpcMsg.pCont == NULL) return; - - memcpy(rpcMsg.pCont, pEpSet, sizeof(SEpSet)); - - rpcMsg.code = TSDB_CODE_RPC_REDIRECT; - rpcMsg.handle = thandle; - - rpcSendResponse(&rpcMsg); - - return; -} - -int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) { - SRpcConn *pConn = (SRpcConn *)thandle; - if (pConn->user[0] == 0) return -1; - - pInfo->clientIp = pConn->peerIp; - pInfo->clientPort = pConn->peerPort; - // pInfo->serverIp = pConn->destIp; - - tstrncpy(pInfo->user, pConn->user, sizeof(pInfo->user)); - return 0; -} - -void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { - SRpcReqContext *pContext; - pContext = (SRpcReqContext *)((char *)pMsg->pCont - sizeof(SRpcHead) - sizeof(SRpcReqContext)); - - memset(pRsp, 0, sizeof(SRpcMsg)); - - tsem_t sem; - tsem_init(&sem, 0, 0); - pContext->pSem = &sem; - pContext->pRsp = pRsp; - pContext->pSet = pEpSet; - - rpcSendRequest(shandle, pEpSet, pMsg, NULL); - - tsem_wait(&sem); - tsem_destroy(&sem); - - return; -} - -// this API is used by server app to keep an APP context in case connection is broken -int rpcReportProgress(void *handle, char *pCont, int contLen) { - SRpcConn *pConn = (SRpcConn *)handle; - int code = 0; - - rpcLockConn(pConn); - - if (pConn->user[0]) { - // pReqMsg and reqMsgLen is re-used to store the context from app server - pConn->pReqMsg = pCont; - pConn->reqMsgLen = contLen; - } else { - tDebug("%s, rpc connection is already released", pConn->info); - rpcFreeCont(pCont); - code = -1; - } - - rpcUnlockConn(pConn); - return code; -} - -void rpcCancelRequest(int64_t rid) { - SRpcReqContext *pContext = taosAcquireRef(tsRpcRefId, rid); - if (pContext == NULL) return; - - rpcCloseConn(pContext->pConn); - - taosReleaseRef(tsRpcRefId, rid); -} - -static void rpcFreeMsg(void *msg) { - if (msg) { - char *temp = (char *)msg - sizeof(SRpcReqContext); - taosMemoryFree(temp); - tTrace("free mem: %p", temp); - } -} - -static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerFqdn, uint16_t peerPort, int8_t connType) { - SRpcConn *pConn; - - uint32_t peerIp = 0; - uint32_t *pPeerIp = taosHashGet(tsFqdnHash, peerFqdn, strlen(peerFqdn) + 1); - if (pPeerIp != NULL) { - peerIp = *pPeerIp; - } else { - peerIp = taosGetIpv4FromFqdn(peerFqdn); - if (peerIp != 0xFFFFFFFF) { - taosHashPut(tsFqdnHash, peerFqdn, strlen(peerFqdn) + 1, &peerIp, sizeof(peerIp)); - } - } - - if (peerIp == 0xFFFFFFFF) { - tError("%s, failed to resolve FQDN:%s", pRpc->label, peerFqdn); - terrno = TSDB_CODE_RPC_FQDN_ERROR; - return NULL; - } - - pConn = rpcAllocateClientConn(pRpc); - - if (pConn) { - tstrncpy(pConn->peerFqdn, peerFqdn, sizeof(pConn->peerFqdn)); - pConn->peerIp = peerIp; - pConn->peerPort = peerPort; - tstrncpy(pConn->user, pRpc->user, sizeof(pConn->user)); - pConn->connType = connType; - - if (taosOpenConn[connType]) { - void *shandle = (connType & RPC_CONN_TCP) ? pRpc->tcphandle : pRpc->udphandle; - pConn->chandle = (*taosOpenConn[connType])(shandle, pConn, pConn->peerIp, pConn->peerPort); - if (pConn->chandle == NULL) { - tError("failed to connect to:%s:%d", taosIpStr(pConn->peerIp), pConn->peerPort); - - terrno = TSDB_CODE_RPC_NETWORK_UNAVAIL; - rpcCloseConn(pConn); - pConn = NULL; - } - } - } - - return pConn; -} - -static void rpcReleaseConn(SRpcConn *pConn) { - SRpcInfo *pRpc = pConn->pRpc; - if (pConn->user[0] == 0) return; - - pConn->user[0] = 0; - if (taosCloseConn[pConn->connType]) (*taosCloseConn[pConn->connType])(pConn->chandle); - - taosTmrStopA(&pConn->pTimer); - taosTmrStopA(&pConn->pIdleTimer); - - if (pRpc->connType == TAOS_CONN_SERVER) { - char hashstr[40] = {0}; - size_t size = snprintf(hashstr, sizeof(hashstr), "%x:%x:%x:%d", pConn->peerIp, pConn->linkUid, pConn->peerId, - pConn->connType); - taosHashRemove(pRpc->hash, hashstr, size); - rpcFreeMsg(pConn->pRspMsg); // it may have a response msg saved, but not request msg - pConn->pRspMsg = NULL; - - // if server has ever reported progress, free content - if (pConn->pReqMsg) rpcFreeCont(pConn->pReqMsg); // do not use rpcFreeMsg - } else { - // if there is an outgoing message, free it - if (pConn->outType && pConn->pReqMsg) { - SRpcReqContext *pContext = pConn->pContext; - if (pContext) { - if (pContext->pRsp) { - // for synchronous API, post semaphore to unblock app - pContext->pRsp->code = TSDB_CODE_RPC_APP_ERROR; - pContext->pRsp->pCont = NULL; - pContext->pRsp->contLen = 0; - tsem_post(pContext->pSem); - } - pContext->pConn = NULL; - taosRemoveRef(tsRpcRefId, pContext->rid); - } else { - assert(0); - } - } - } - - // memset could not be used, since lockeBy can not be reset - pConn->inType = 0; - pConn->outType = 0; - pConn->inTranId = 0; - pConn->outTranId = 0; - pConn->secured = 0; - pConn->peerId = 0; - pConn->peerIp = 0; - pConn->peerPort = 0; - pConn->pReqMsg = NULL; - pConn->reqMsgLen = 0; - pConn->pContext = NULL; - pConn->chandle = NULL; - - taosFreeId(pRpc->idPool, pConn->sid); - tDebug("%s, rpc connection is released", pConn->info); -} - -static void rpcCloseConn(void *thandle) { - SRpcConn *pConn = (SRpcConn *)thandle; - if (pConn == NULL) return; - - rpcLockConn(pConn); - - if (pConn->user[0]) rpcReleaseConn(pConn); - - rpcUnlockConn(pConn); -} - -static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc) { - SRpcConn *pConn = NULL; - - int sid = taosAllocateId(pRpc->idPool); - if (sid <= 0) { - tError("%s maximum number of sessions:%d is reached", pRpc->label, pRpc->sessions); - terrno = TSDB_CODE_RPC_MAX_SESSIONS; - } else { - pConn = pRpc->connList + sid; - - pConn->pRpc = pRpc; - pConn->sid = sid; - pConn->tranId = (uint16_t)(taosRand() & 0xFFFF); - pConn->ownId = htonl(pConn->sid); - pConn->linkUid = (uint32_t)((int64_t)pConn + taosGetPId() + (int64_t)pConn->tranId); - pConn->spi = pRpc->spi; - pConn->encrypt = pRpc->encrypt; - if (pConn->spi) memcpy(pConn->secret, pRpc->secret, TSDB_PASSWORD_LEN); - tDebug("%s %p client connection is allocated, uid:0x%x", pRpc->label, pConn, pConn->linkUid); - } - - return pConn; -} - -static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) { - SRpcConn *pConn = NULL; - char hashstr[40] = {0}; - SRpcHead *pHead = (SRpcHead *)pRecv->msg; - - size_t size = - snprintf(hashstr, sizeof(hashstr), "%x:%x:%x:%d", pRecv->ip, pHead->linkUid, pHead->sourceId, pRecv->connType); - - // check if it is already allocated - SRpcConn **ppConn = (SRpcConn **)(taosHashGet(pRpc->hash, hashstr, size)); - if (ppConn) pConn = *ppConn; - if (pConn) { - pConn->secured = 0; - return pConn; - } - - // if code is not 0, it means it is simple reqhead, just ignore - if (pHead->code != 0) { - terrno = TSDB_CODE_RPC_ALREADY_PROCESSED; - return NULL; - } - - int sid = taosAllocateId(pRpc->idPool); - if (sid <= 0) { - tError("%s maximum number of sessions:%d is reached", pRpc->label, pRpc->sessions); - terrno = TSDB_CODE_RPC_MAX_SESSIONS; - } else { - pConn = pRpc->connList + sid; - memcpy(pConn->user, pHead->user, tListLen(pConn->user)); - pConn->pRpc = pRpc; - pConn->sid = sid; - pConn->tranId = (uint16_t)(taosRand() & 0xFFFF); - pConn->ownId = htonl(pConn->sid); - pConn->linkUid = pHead->linkUid; - if (pRpc->afp) { - if (pConn->user[0] == 0) { - terrno = TSDB_CODE_RPC_AUTH_REQUIRED; - } else { - terrno = (*pRpc->afp)(pRpc->parent, pConn->user, &pConn->spi, &pConn->encrypt, pConn->secret, pConn->ckey); - } - - if (terrno != 0) { - taosFreeId(pRpc->idPool, sid); // sid shall be released - pConn = NULL; - } - } - } - - if (pConn) { - if (pRecv->connType == RPC_CONN_UDPS && pRpc->numOfThreads > 1) { - // UDP server, assign to new connection - pRpc->index = (pRpc->index + 1) % pRpc->numOfThreads; - pConn->localPort = (pRpc->localPort + pRpc->index); - } - - taosHashPut(pRpc->hash, hashstr, size, (char *)&pConn, POINTER_BYTES); - tDebug("%s %p server connection is allocated, uid:0x%x sid:%d key:%s spi:%d", pRpc->label, pConn, pConn->linkUid, - sid, hashstr, pConn->spi); - } - - return pConn; -} - -static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv) { - SRpcConn *pConn = NULL; - SRpcHead *pHead = (SRpcHead *)pRecv->msg; - - if (sid) { - pConn = pRpc->connList + sid; - if (pConn->user[0] == 0) pConn = NULL; - } - - if (pConn == NULL) { - if (pRpc->connType == TAOS_CONN_SERVER) { - pConn = rpcAllocateServerConn(pRpc, pRecv); - } else { - terrno = TSDB_CODE_RPC_UNEXPECTED_RESPONSE; - } - } - - if (pConn) { - if (pConn->linkUid != pHead->linkUid) { - terrno = TSDB_CODE_RPC_MISMATCHED_LINK_ID; - tDebug("%s %p %p, linkUid:0x%x is not matched with received:0x%x", pRpc->label, pConn, (void *)pHead->ahandle, - pConn->linkUid, pHead->linkUid); - pConn = NULL; - } - } - - return pConn; -} - -static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext) { - SRpcConn *pConn; - SRpcInfo *pRpc = pContext->pRpc; - SEpSet * pEpSet = &pContext->epSet; - - pConn = rpcGetConnFromCache(pRpc->pCache, pEpSet->eps[pEpSet->inUse].fqdn, pEpSet->eps[pEpSet->inUse].port, - pContext->connType); - if (pConn == NULL || pConn->user[0] == 0) { - pConn = rpcOpenConn(pRpc, pEpSet->eps[pEpSet->inUse].fqdn, pEpSet->eps[pEpSet->inUse].port, pContext->connType); - } - - if (pConn) { - pConn->tretry = 0; - pConn->ahandle = pContext->ahandle; - snprintf(pConn->info, sizeof(pConn->info), "%s %p %p", pRpc->label, pConn, pConn->ahandle); - pConn->tretry = 0; - } else { - tError("%s %p, failed to set up connection(%s)", pRpc->label, pContext->ahandle, tstrerror(terrno)); - } - - return pConn; -} - -static int rpcProcessReqHead(SRpcConn *pConn, SRpcHead *pHead) { - if (pConn->peerId == 0) { - pConn->peerId = pHead->sourceId; - } else { - if (pConn->peerId != pHead->sourceId) { - tDebug("%s, source Id is changed, old:0x%08x new:0x%08x", pConn->info, pConn->peerId, pHead->sourceId); - return TSDB_CODE_RPC_INVALID_VALUE; - } - } - - if (pConn->inTranId == pHead->tranId) { - if (pConn->inType == pHead->msgType) { - if (pHead->code == 0) { - tDebug("%s, %s is retransmitted", pConn->info, TMSG_INFO(pHead->msgType)); - rpcSendQuickRsp(pConn, TSDB_CODE_RPC_ACTION_IN_PROGRESS); - } else { - // do nothing, it is heart beat from client - } - } else if (pConn->inType == 0) { - tDebug("%s, %s is already processed, tranId:%d", pConn->info, TMSG_INFO(pHead->msgType), pConn->inTranId); - rpcSendMsgToPeer(pConn, pConn->pRspMsg, pConn->rspMsgLen); // resend the response - } else { - tDebug("%s, mismatched message %s and tranId", pConn->info, TMSG_INFO(pHead->msgType)); - } - - // do not reply any message - return TSDB_CODE_RPC_ALREADY_PROCESSED; - } - - if (pConn->inType != 0) { - tDebug("%s, last session is not finished, inTranId:%d tranId:%d", pConn->info, pConn->inTranId, pHead->tranId); - return TSDB_CODE_RPC_LAST_SESSION_NOT_FINISHED; - } - - if (rpcContLenFromMsg(pHead->msgLen) <= 0) { - tDebug("%s, message body is empty, ignore", pConn->info); - return TSDB_CODE_RPC_APP_ERROR; - } - - pConn->inTranId = pHead->tranId; - pConn->inType = pHead->msgType; - - // start the progress timer to monitor the response from server app - if (pConn->connType != RPC_CONN_TCPS) - pConn->pTimer = taosTmrStart(rpcProcessProgressTimer, tsProgressTimer, pConn, pConn->pRpc->tmrCtrl); - - return 0; -} - -static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) { - SRpcInfo *pRpc = pConn->pRpc; - pConn->peerId = pHead->sourceId; - - if (pConn->outType == 0 || pConn->pContext == NULL) { - return TSDB_CODE_RPC_UNEXPECTED_RESPONSE; - } - - if (pHead->tranId != pConn->outTranId) { - return TSDB_CODE_RPC_INVALID_TRAN_ID; - } - - if (pHead->msgType != pConn->outType + 1) { - return TSDB_CODE_RPC_INVALID_RESPONSE_TYPE; - } - - taosTmrStopA(&pConn->pTimer); - pConn->retry = 0; - - if (pHead->code == TSDB_CODE_RPC_AUTH_REQUIRED && pRpc->spi) { - tDebug("%s, authentication shall be restarted", pConn->info); - pConn->secured = 0; - rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen); - if (pConn->connType != RPC_CONN_TCPC) - pConn->pTimer = taosTmrStart(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl); - return TSDB_CODE_RPC_ALREADY_PROCESSED; - } - - if (pHead->code == TSDB_CODE_RPC_MISMATCHED_LINK_ID) { - tDebug("%s, mismatched linkUid, link shall be restarted", pConn->info); - pConn->secured = 0; - ((SRpcHead *)pConn->pReqMsg)->destId = 0; - rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen); - if (pConn->connType != RPC_CONN_TCPC) - pConn->pTimer = taosTmrStart(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl); - return TSDB_CODE_RPC_ALREADY_PROCESSED; - } - - if (pHead->code == TSDB_CODE_RPC_ACTION_IN_PROGRESS) { - if (pConn->tretry <= tsRpcMaxRetry) { - tDebug("%s, peer is still processing the transaction, retry:%d", pConn->info, pConn->tretry); - pConn->tretry++; - rpcSendReqHead(pConn); - if (pConn->connType != RPC_CONN_TCPC) - pConn->pTimer = taosTmrStart(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl); - return TSDB_CODE_RPC_ALREADY_PROCESSED; - } else { - // peer still in processing, give up - tDebug("%s, server processing takes too long time, give up", pConn->info); - pHead->code = TSDB_CODE_RPC_TOO_SLOW; - } - } - - pConn->outType = 0; - pConn->pReqMsg = NULL; - pConn->reqMsgLen = 0; - SRpcReqContext *pContext = pConn->pContext; - - if (pHead->code == TSDB_CODE_RPC_REDIRECT) { - if (rpcContLenFromMsg(pHead->msgLen) < sizeof(SEpSet)) { - // if EpSet is not included in the msg, treat it as NOT_READY - pHead->code = TSDB_CODE_RPC_NOT_READY; - } else { - pContext->redirect++; - if (pContext->redirect > TSDB_MAX_REPLICA) { - pHead->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; - tWarn("%s, too many redirects, quit", pConn->info); - } - } - } - - return TSDB_CODE_SUCCESS; -} - -static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqContext **ppContext) { - int32_t sid; - SRpcConn *pConn = NULL; - - SRpcHead *pHead = (SRpcHead *)pRecv->msg; - - sid = htonl(pHead->destId); - *ppContext = NULL; - - if (TMSG_INDEX(pHead->msgType) >= TDMT_MAX || TMSG_INDEX(pHead->msgType) <= 0) { - tDebug("%s sid:%d, invalid message type:%d", pRpc->label, sid, pHead->msgType); - terrno = TSDB_CODE_RPC_INVALID_MSG_TYPE; - return NULL; - } - - if (sid < 0 || sid >= pRpc->sessions) { - tDebug("%s sid:%d, sid is out of range, max sid:%d, %s discarded", pRpc->label, sid, pRpc->sessions, - TMSG_INFO(pHead->msgType)); - terrno = TSDB_CODE_RPC_INVALID_SESSION_ID; - return NULL; - } - - if (rpcIsReq(pHead->msgType) && htonl(pHead->msgVer) != tsVersion >> 8) { - tDebug("%s sid:%d, invalid client version:%x/%x %s", pRpc->label, sid, htonl(pHead->msgVer), tsVersion, - TMSG_INFO(pHead->msgType)); - terrno = TSDB_CODE_RPC_INVALID_VERSION; - return NULL; - } - - pConn = rpcGetConnObj(pRpc, sid, pRecv); - if (pConn == NULL) { - tDebug("%s %p, failed to get connection obj(%s)", pRpc->label, (void *)pHead->ahandle, tstrerror(terrno)); - return NULL; - } - - rpcLockConn(pConn); - - if (rpcIsReq(pHead->msgType)) { - pConn->ahandle = (void *)pHead->ahandle; - snprintf(pConn->info, sizeof(pConn->info), "%s %p %p", pRpc->label, pConn, pConn->ahandle); - } - - sid = pConn->sid; - if (pConn->chandle == NULL) pConn->chandle = pRecv->chandle; - pConn->peerIp = pRecv->ip; - pConn->peerPort = pRecv->port; - if (pHead->port) pConn->peerPort = htons(pHead->port); - - terrno = rpcCheckAuthentication(pConn, (char *)pHead, pRecv->msgLen); - - // code can be transformed only after authentication - pHead->code = htonl(pHead->code); - - if (terrno == 0) { - if (pHead->encrypt) { - // decrypt here - } - - if (rpcIsReq(pHead->msgType)) { - pConn->connType = pRecv->connType; - terrno = rpcProcessReqHead(pConn, pHead); - - // stop idle timer - taosTmrStopA(&pConn->pIdleTimer); - - // client shall send the request within tsRpcTime again for UDP, double it - if (pConn->connType != RPC_CONN_TCPS) - pConn->pIdleTimer = taosTmrStart(rpcProcessIdleTimer, tsRpcTimer * 2, pConn, pRpc->tmrCtrl); - } else { - terrno = rpcProcessRspHead(pConn, pHead); - *ppContext = pConn->pContext; - } - } - - rpcUnlockConn(pConn); - - return pConn; -} - -static void doRpcReportBrokenLinkToServer(void *param, void *id) { - SRpcMsg * pRpcMsg = (SRpcMsg *)(param); - SRpcConn *pConn = (SRpcConn *)(pRpcMsg->handle); - SRpcInfo *pRpc = pConn->pRpc; - (*(pRpc->cfp))(pRpc->parent, pRpcMsg, NULL); - taosMemoryFree(pRpcMsg); -} -static void rpcReportBrokenLinkToServer(SRpcConn *pConn) { - SRpcInfo *pRpc = pConn->pRpc; - if (pConn->pReqMsg == NULL) return; - - // if there are pending request, notify the app - rpcAddRef(pRpc); - tDebug("%s, notify the server app, connection is gone", pConn->info); - - SRpcMsg *rpcMsg = taosMemoryMalloc(sizeof(SRpcMsg)); - rpcMsg->pCont = pConn->pReqMsg; // pReqMsg is re-used to store the APP context from server - rpcMsg->contLen = pConn->reqMsgLen; // reqMsgLen is re-used to store the APP context length - rpcMsg->ahandle = pConn->ahandle; - rpcMsg->handle = pConn; - rpcMsg->msgType = pConn->inType; - rpcMsg->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; - pConn->pReqMsg = NULL; - pConn->reqMsgLen = 0; - if (pRpc->cfp) { - taosTmrStart(doRpcReportBrokenLinkToServer, 0, rpcMsg, pRpc->tmrCtrl); - } else { - taosMemoryFree(rpcMsg); - } -} - -static void rpcProcessBrokenLink(SRpcConn *pConn) { - if (pConn == NULL) return; - SRpcInfo *pRpc = pConn->pRpc; - tDebug("%s, link is broken", pConn->info); - - rpcLockConn(pConn); - - if (pConn->outType) { - SRpcReqContext *pContext = pConn->pContext; - pContext->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; - pContext->pConn = NULL; - pConn->pReqMsg = NULL; - taosTmrStart(rpcProcessConnError, 0, pContext, pRpc->tmrCtrl); - } - - if (pConn->inType) rpcReportBrokenLinkToServer(pConn); - - rpcReleaseConn(pConn); - rpcUnlockConn(pConn); -} - -static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { - SRpcHead *pHead = (SRpcHead *)pRecv->msg; - SRpcInfo *pRpc = (SRpcInfo *)pRecv->shandle; - SRpcConn *pConn = (SRpcConn *)pRecv->thandle; - - tDump(pRecv->msg, pRecv->msgLen); - - // underlying UDP layer does not know it is server or client - pRecv->connType = pRecv->connType | pRpc->connType; - - if (pRecv->msg == NULL) { - rpcProcessBrokenLink(pConn); - return NULL; - } - - terrno = 0; - SRpcReqContext *pContext; - pConn = rpcProcessMsgHead(pRpc, pRecv, &pContext); - - char ipstr[24] = {0}; - taosIpPort2String(pRecv->ip, pRecv->port, ipstr); - - if (TMSG_INDEX(pHead->msgType) >= 1 && TMSG_INDEX(pHead->msgType) < TDMT_MAX) { - tDebug("%s %p %p, %s received from %s, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", pRpc->label, pConn, - (void *)pHead->ahandle, TMSG_INFO(pHead->msgType), ipstr, terrno, pRecv->msgLen, pHead->sourceId, - pHead->destId, pHead->tranId, pHead->code); - } else { - tDebug("%s %p %p, %d received from %s, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", pRpc->label, pConn, - (void *)pHead->ahandle, pHead->msgType, ipstr, terrno, pRecv->msgLen, pHead->sourceId, pHead->destId, - pHead->tranId, pHead->code); - } - - int32_t code = terrno; - if (code != TSDB_CODE_RPC_ALREADY_PROCESSED) { - if (code != 0) { // parsing error - if (rpcIsReq(pHead->msgType)) { - rpcSendErrorMsgToPeer(pRecv, code); - if (code == TSDB_CODE_RPC_INVALID_TIME_STAMP || code == TSDB_CODE_RPC_AUTH_FAILURE) { - rpcCloseConn(pConn); - } - if (TMSG_INDEX(pHead->msgType) + 1 > 1 && TMSG_INDEX(pHead->msgType) + 1 < TDMT_MAX) { - tDebug("%s %p %p, %s is sent with error code:0x%x", pRpc->label, pConn, (void *)pHead->ahandle, - TMSG_INFO(pHead->msgType + 1), code); - } else { - tError("%s %p %p, %s is sent with error code:0x%x", pRpc->label, pConn, (void *)pHead->ahandle, - TMSG_INFO(pHead->msgType), code); - } - } - } else { // msg is passed to app only parsing is ok - rpcProcessIncomingMsg(pConn, pHead, pContext); - } - } - - if (code) rpcFreeMsg(pRecv->msg); // parsing failed, msg shall be freed - return pConn; -} - -static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) { - SRpcInfo *pRpc = pContext->pRpc; - - pContext->pConn = NULL; - if (pContext->pRsp) { - // for synchronous API - memcpy(pContext->pSet, &pContext->epSet, sizeof(SEpSet)); - memcpy(pContext->pRsp, pMsg, sizeof(SRpcMsg)); - tsem_post(pContext->pSem); - } else { - // for asynchronous API - SEpSet *pEpSet = NULL; - if (pContext->epSet.inUse != pContext->oldInUse || pContext->redirect) pEpSet = &pContext->epSet; - - (*pRpc->cfp)(pRpc->parent, pMsg, pEpSet); - } - - // free the request message - taosRemoveRef(tsRpcRefId, pContext->rid); -} - -static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqContext *pContext) { - SRpcInfo *pRpc = pConn->pRpc; - SRpcMsg rpcMsg; - - pHead = rpcDecompressRpcMsg(pHead); - rpcMsg.contLen = rpcContLenFromMsg(pHead->msgLen); - rpcMsg.pCont = pHead->content; - rpcMsg.msgType = pHead->msgType; - rpcMsg.code = pHead->code; - - if (rpcIsReq(pHead->msgType)) { - rpcMsg.ahandle = pConn->ahandle; - rpcMsg.handle = pConn; - rpcAddRef(pRpc); // add the refCount for requests - - // notify the server app - (*(pRpc->cfp))(pRpc->parent, &rpcMsg, NULL); - } else { - // it's a response - rpcMsg.handle = pContext; - rpcMsg.ahandle = pContext->ahandle; - pContext->pConn = NULL; - - // for UDP, port may be changed by server, the port in epSet shall be used for cache - if (pHead->code != TSDB_CODE_RPC_TOO_SLOW) { - rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerFqdn, pContext->epSet.eps[pContext->epSet.inUse].port, - pConn->connType); - } else { - rpcCloseConn(pConn); - } - - if (pHead->code == TSDB_CODE_RPC_REDIRECT) { - pContext->numOfTry = 0; - SEpSet *pEpSet = (SEpSet *)pHead->content; - if (pEpSet->numOfEps > 0) { - memcpy(&pContext->epSet, pHead->content, sizeof(pContext->epSet)); - tDebug("%s, redirect is received, numOfEps:%d inUse:%d", pConn->info, pContext->epSet.numOfEps, - pContext->epSet.inUse); - for (int i = 0; i < pContext->epSet.numOfEps; ++i) { - pContext->epSet.eps[i].port = htons(pContext->epSet.eps[i].port); - tDebug("%s, redirect is received, index:%d ep:%s:%u", pConn->info, i, pContext->epSet.eps[i].fqdn, - pContext->epSet.eps[i].port); - } - } - rpcSendReqToServer(pRpc, pContext); - rpcFreeCont(rpcMsg.pCont); - } else if (pHead->code == TSDB_CODE_RPC_NOT_READY || pHead->code == TSDB_CODE_APP_NOT_READY || - pHead->code == TSDB_CODE_NODE_OFFLINE) { - pContext->code = pHead->code; - rpcProcessConnError(pContext, NULL); - rpcFreeCont(rpcMsg.pCont); - } else { - rpcNotifyClient(pContext, &rpcMsg); - } - } -} - -static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code) { - char msg[RPC_MSG_OVERHEAD]; - SRpcHead *pHead; - - // set msg header - memset(msg, 0, sizeof(SRpcHead)); - pHead = (SRpcHead *)msg; - pHead->version = 1; - pHead->msgType = pConn->inType + 1; - pHead->spi = pConn->spi; - pHead->encrypt = 0; - pHead->tranId = pConn->inTranId; - pHead->sourceId = pConn->ownId; - pHead->destId = pConn->peerId; - pHead->linkUid = pConn->linkUid; - pHead->ahandle = (uint64_t)pConn->ahandle; - memcpy(pHead->user, pConn->user, tListLen(pHead->user)); - pHead->code = htonl(code); - - rpcSendMsgToPeer(pConn, msg, sizeof(SRpcHead)); - pConn->secured = 1; // connection shall be secured -} - -static void rpcSendReqHead(SRpcConn *pConn) { - char msg[RPC_MSG_OVERHEAD]; - SRpcHead *pHead; - - // set msg header - memset(msg, 0, sizeof(SRpcHead)); - pHead = (SRpcHead *)msg; - pHead->version = 1; - pHead->msgType = pConn->outType; - pHead->msgVer = htonl(tsVersion >> 8); - pHead->spi = pConn->spi; - pHead->encrypt = 0; - pHead->tranId = pConn->outTranId; - pHead->sourceId = pConn->ownId; - pHead->destId = pConn->peerId; - pHead->linkUid = pConn->linkUid; - pHead->ahandle = (uint64_t)pConn->ahandle; - memcpy(pHead->user, pConn->user, tListLen(pHead->user)); - pHead->code = 1; - - rpcSendMsgToPeer(pConn, msg, sizeof(SRpcHead)); -} - -static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code) { - SRpcHead *pRecvHead, *pReplyHead; - char msg[sizeof(SRpcHead) + sizeof(SRpcDigest) + sizeof(uint32_t)]; - uint32_t timeStamp; - int msgLen; - - pRecvHead = (SRpcHead *)pRecv->msg; - pReplyHead = (SRpcHead *)msg; - - memset(msg, 0, sizeof(SRpcHead)); - pReplyHead->version = pRecvHead->version; - pReplyHead->msgType = (tmsg_t)(pRecvHead->msgType + 1); - pReplyHead->spi = 0; - pReplyHead->encrypt = pRecvHead->encrypt; - pReplyHead->tranId = pRecvHead->tranId; - pReplyHead->sourceId = pRecvHead->destId; - pReplyHead->destId = pRecvHead->sourceId; - pReplyHead->linkUid = pRecvHead->linkUid; - pReplyHead->ahandle = pRecvHead->ahandle; - - pReplyHead->code = htonl(code); - msgLen = sizeof(SRpcHead); - - if (code == TSDB_CODE_RPC_INVALID_TIME_STAMP) { - // include a time stamp if client's time is not synchronized well - uint8_t *pContent = pReplyHead->content; - timeStamp = htonl(taosGetTimestampSec()); - memcpy(pContent, &timeStamp, sizeof(timeStamp)); - msgLen += sizeof(timeStamp); - } - - pReplyHead->msgLen = (int32_t)htonl((uint32_t)msgLen); - (*taosSendData[pRecv->connType])(pRecv->ip, pRecv->port, msg, msgLen, pRecv->chandle); - - return; -} - -static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { - SRpcHead *pHead = rpcHeadFromCont(pContext->pCont); - char * msg = (char *)pHead; - int msgLen = rpcMsgLenFromCont(pContext->contLen); - tmsg_t msgType = pContext->msgType; - - pContext->numOfTry++; - SRpcConn *pConn = rpcSetupConnToServer(pContext); - if (pConn == NULL) { - pContext->code = terrno; - taosTmrStart(rpcProcessConnError, 1, pContext, pRpc->tmrCtrl); - return; - } - - pContext->pConn = pConn; - pConn->ahandle = pContext->ahandle; - rpcLockConn(pConn); - - // set the message header - pHead->version = 1; - pHead->msgVer = htonl(tsVersion >> 8); - pHead->msgType = msgType; - pHead->encrypt = 0; - pConn->tranId++; - if (pConn->tranId == 0) pConn->tranId++; - pHead->tranId = pConn->tranId; - pHead->sourceId = pConn->ownId; - pHead->destId = pConn->peerId; - pHead->port = 0; - pHead->linkUid = pConn->linkUid; - pHead->ahandle = (uint64_t)pConn->ahandle; - memcpy(pHead->user, pConn->user, tListLen(pHead->user)); - - // set the connection parameters - pConn->outType = msgType; - pConn->outTranId = pHead->tranId; - pConn->pReqMsg = msg; - pConn->reqMsgLen = msgLen; - pConn->pContext = pContext; - - rpcSendMsgToPeer(pConn, msg, msgLen); - if (pConn->connType != RPC_CONN_TCPC) - taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer); - - rpcUnlockConn(pConn); -} - -static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) { - int writtenLen = 0; - SRpcHead *pHead = (SRpcHead *)msg; - - msgLen = rpcAddAuthPart(pConn, msg, msgLen); - - if (rpcIsReq(pHead->msgType)) { - tDebug("%s, %s is sent to %s:%hu, len:%d sig:0x%08x:0x%08x:%d", pConn->info, TMSG_INFO(pHead->msgType), - pConn->peerFqdn, pConn->peerPort, msgLen, pHead->sourceId, pHead->destId, pHead->tranId); - } else { - if (pHead->code == 0) { - pConn->secured = 1; // for success response, set link as secured - } - - char ipport[40] = {0}; - taosIpPort2String(pConn->peerIp, pConn->peerPort, ipport); - tDebug("%s, %s is sent to %s, code:0x%x len:%d sig:0x%08x:0x%08x:%d", pConn->info, TMSG_INFO(pHead->msgType), - ipport, htonl(pHead->code), msgLen, pHead->sourceId, pHead->destId, pHead->tranId); - } - - // tTrace("connection type is: %d", pConn->connType); - writtenLen = (*taosSendData[pConn->connType])(pConn->peerIp, pConn->peerPort, pHead, msgLen, pConn->chandle); - - if (writtenLen != msgLen) { - tError("%s, failed to send, msgLen:%d written:%d, reason:%s", pConn->info, msgLen, writtenLen, strerror(errno)); - } - - tDump(msg, msgLen); -} - -static void rpcProcessConnError(void *param, void *id) { - SRpcReqContext *pContext = (SRpcReqContext *)param; - SRpcInfo * pRpc = pContext->pRpc; - SRpcMsg rpcMsg = {0}; - - if (pRpc == NULL) { - return; - } - - tDebug("%s %p, connection error happens", pRpc->label, pContext->ahandle); - - if (pContext->numOfTry >= pContext->epSet.numOfEps || pContext->msgType == TDMT_VND_FETCH) { - rpcMsg.msgType = pContext->msgType + 1; - rpcMsg.ahandle = pContext->ahandle; - rpcMsg.code = pContext->code; - rpcMsg.pCont = NULL; - rpcMsg.contLen = 0; - - rpcNotifyClient(pContext, &rpcMsg); - } else { - // move to next IP - pContext->epSet.inUse++; - pContext->epSet.inUse = pContext->epSet.inUse % pContext->epSet.numOfEps; - rpcSendReqToServer(pRpc, pContext); - } -} - -static void rpcProcessRetryTimer(void *param, void *tmrId) { - SRpcConn *pConn = (SRpcConn *)param; - SRpcInfo *pRpc = pConn->pRpc; - - rpcLockConn(pConn); - - if (pConn->outType && pConn->user[0]) { - tDebug("%s, expected %s is not received", pConn->info, TMSG_INFO((int)pConn->outType + 1)); - pConn->pTimer = NULL; - pConn->retry++; - - if (pConn->retry < 4) { - tDebug("%s, re-send msg:%s to %s:%hu", pConn->info, TMSG_INFO(pConn->outType), pConn->peerFqdn, pConn->peerPort); - rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen); - pConn->pTimer = taosTmrStart(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl); - } else { - // close the connection - tDebug("%s, failed to send msg:%s to %s:%hu", pConn->info, TMSG_INFO(pConn->outType), pConn->peerFqdn, - pConn->peerPort); - if (pConn->pContext) { - pConn->pContext->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; - pConn->pContext->pConn = NULL; - pConn->pReqMsg = NULL; - taosTmrStart(rpcProcessConnError, 1, pConn->pContext, pRpc->tmrCtrl); - rpcReleaseConn(pConn); - } - } - } else { - tDebug("%s, retry timer not processed", pConn->info); - } - - rpcUnlockConn(pConn); -} - -static void rpcProcessIdleTimer(void *param, void *tmrId) { - SRpcConn *pConn = (SRpcConn *)param; - - rpcLockConn(pConn); - - if (pConn->user[0]) { - tDebug("%s, close the connection since no activity", pConn->info); - if (pConn->inType) rpcReportBrokenLinkToServer(pConn); - rpcReleaseConn(pConn); - } else { - tDebug("%s, idle timer:%p not processed", pConn->info, tmrId); - } - - rpcUnlockConn(pConn); -} - -static void rpcProcessProgressTimer(void *param, void *tmrId) { - SRpcConn *pConn = (SRpcConn *)param; - SRpcInfo *pRpc = pConn->pRpc; - - rpcLockConn(pConn); - - if (pConn->inType && pConn->user[0]) { - tDebug("%s, progress timer expired, send progress", pConn->info); - rpcSendQuickRsp(pConn, TSDB_CODE_RPC_ACTION_IN_PROGRESS); - pConn->pTimer = taosTmrStart(rpcProcessProgressTimer, tsProgressTimer, pConn, pRpc->tmrCtrl); - } else { - tDebug("%s, progress timer:%p not processed", pConn->info, tmrId); - } - - rpcUnlockConn(pConn); -} - -static int32_t rpcCompressRpcMsg(char *pCont, int32_t contLen) { - SRpcHead *pHead = rpcHeadFromCont(pCont); - int32_t finalLen = 0; - int overhead = sizeof(SRpcComp); - - if (!NEEDTO_COMPRESSS_MSG(contLen)) { - return contLen; - } - - char *buf = taosMemoryMalloc(contLen + overhead + 8); // 8 extra bytes - if (buf == NULL) { - tError("failed to allocate memory for rpc msg compression, contLen:%d", contLen); - return contLen; - } - - int32_t compLen = LZ4_compress_default(pCont, buf, contLen, contLen + overhead); - tDebug("compress rpc msg, before:%d, after:%d, overhead:%d", contLen, compLen, overhead); - - /* - * only the compressed size is less than the value of contLen - overhead, the compression is applied - * The first four bytes is set to 0, the second four bytes are utilized to keep the original length of message - */ - if (compLen > 0 && compLen < contLen - overhead) { - SRpcComp *pComp = (SRpcComp *)pCont; - pComp->reserved = 0; - pComp->contLen = htonl(contLen); - memcpy(pCont + overhead, buf, compLen); - - pHead->comp = 1; - tDebug("compress rpc msg, before:%d, after:%d", contLen, compLen); - finalLen = compLen + overhead; - } else { - finalLen = contLen; - } - - taosMemoryFree(buf); - return finalLen; -} - -static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead) { - int overhead = sizeof(SRpcComp); - SRpcHead *pNewHead = NULL; - uint8_t * pCont = pHead->content; - SRpcComp *pComp = (SRpcComp *)pHead->content; - - if (pHead->comp) { - // decompress the content - assert(pComp->reserved == 0); - int contLen = htonl(pComp->contLen); - - // prepare the temporary buffer to decompress message - char *temp = (char *)taosMemoryMalloc(contLen + RPC_MSG_OVERHEAD); - pNewHead = (SRpcHead *)(temp + sizeof(SRpcReqContext)); // reserve SRpcReqContext - - if (pNewHead) { - int compLen = rpcContLenFromMsg(pHead->msgLen) - overhead; - int origLen = LZ4_decompress_safe((char *)(pCont + overhead), (char *)pNewHead->content, compLen, contLen); - assert(origLen == contLen); - - memcpy(pNewHead, pHead, sizeof(SRpcHead)); - pNewHead->msgLen = rpcMsgLenFromCont(origLen); - rpcFreeMsg(pHead); // free the compressed message buffer - pHead = pNewHead; - tTrace("decomp malloc mem:%p", temp); - } else { - tError("failed to allocate memory to decompress msg, contLen:%d", contLen); - } - } - - return pHead; -} - -static int rpcAuthenticateMsg(void *pMsg, int msgLen, void *pAuth, void *pKey) { - T_MD5_CTX context; - int ret = -1; - - tMD5Init(&context); - tMD5Update(&context, (uint8_t *)pKey, TSDB_PASSWORD_LEN); - tMD5Update(&context, (uint8_t *)pMsg, msgLen); - tMD5Update(&context, (uint8_t *)pKey, TSDB_PASSWORD_LEN); - tMD5Final(&context); - - if (memcmp(context.digest, pAuth, sizeof(context.digest)) == 0) ret = 0; - - return ret; -} - -static void rpcBuildAuthHead(void *pMsg, int msgLen, void *pAuth, void *pKey) { - T_MD5_CTX context; - - tMD5Init(&context); - tMD5Update(&context, (uint8_t *)pKey, TSDB_PASSWORD_LEN); - tMD5Update(&context, (uint8_t *)pMsg, msgLen); - tMD5Update(&context, (uint8_t *)pKey, TSDB_PASSWORD_LEN); - tMD5Final(&context); - - memcpy(pAuth, context.digest, sizeof(context.digest)); -} - -static int rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen) { - SRpcHead *pHead = (SRpcHead *)msg; - - if (pConn->spi && pConn->secured == 0) { - // add auth part - pHead->spi = pConn->spi; - SRpcDigest *pDigest = (SRpcDigest *)(msg + msgLen); - pDigest->timeStamp = htonl(taosGetTimestampSec()); - msgLen += sizeof(SRpcDigest); - pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); - rpcBuildAuthHead(pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret); - } else { - pHead->spi = 0; - pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); - } - - return msgLen; -} - -static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) { - SRpcHead *pHead = (SRpcHead *)msg; - int code = 0; - - if ((pConn->secured && pHead->spi == 0) || (pHead->spi == 0 && pConn->spi == 0)) { - // secured link, or no authentication - pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen); - // tTrace("%s, secured link, no auth is required", pConn->info); - return 0; - } - - if (!rpcIsReq(pHead->msgType)) { - // for response, if code is auth failure, it shall bypass the auth process - code = htonl(pHead->code); - if (code == TSDB_CODE_RPC_INVALID_TIME_STAMP || code == TSDB_CODE_RPC_AUTH_FAILURE || - code == TSDB_CODE_RPC_INVALID_VERSION || code == TSDB_CODE_RPC_AUTH_REQUIRED || - code == TSDB_CODE_MND_USER_NOT_EXIST || code == TSDB_CODE_RPC_NOT_READY) { - pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen); - // tTrace("%s, dont check authentication since code is:0x%x", pConn->info, code); - return 0; - } - } - - code = 0; - if (pHead->spi == pConn->spi) { - // authentication - SRpcDigest *pDigest = (SRpcDigest *)((char *)pHead + msgLen - sizeof(SRpcDigest)); - - int32_t delta; - delta = (int32_t)htonl(pDigest->timeStamp); - delta -= (int32_t)taosGetTimestampSec(); - if (abs(delta) > 900) { - tWarn("%s, time diff:%d is too big, msg discarded", pConn->info, delta); - code = TSDB_CODE_RPC_INVALID_TIME_STAMP; - } else { - if (rpcAuthenticateMsg(pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret) < 0) { - tDebug("%s, authentication failed, msg discarded", pConn->info); - code = TSDB_CODE_RPC_AUTH_FAILURE; - } else { - pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen) - sizeof(SRpcDigest); - if (!rpcIsReq(pHead->msgType)) pConn->secured = 1; // link is secured for client - // tTrace("%s, message is authenticated", pConn->info); - } - } - } else { - tError("%s, auth spi:%d not matched with received:%d %p", pConn->info, pConn->spi, pHead->spi, pConn); - code = pHead->spi ? TSDB_CODE_RPC_AUTH_FAILURE : TSDB_CODE_RPC_AUTH_REQUIRED; - } - - return code; -} - -static void rpcLockConn(SRpcConn *pConn) { - int64_t tid = taosGetSelfPthreadId(); - int i = 0; - while (atomic_val_compare_exchange_64(&(pConn->lockedBy), 0, tid) != 0) { - if (++i % 1000 == 0) { - sched_yield(); - } - } -} - -static void rpcUnlockConn(SRpcConn *pConn) { - int64_t tid = taosGetSelfPthreadId(); - if (atomic_val_compare_exchange_64(&(pConn->lockedBy), tid, 0) != tid) { - assert(false); - } -} - -static void rpcAddRef(SRpcInfo *pRpc) { atomic_add_fetch_32(&pRpc->refCount, 1); } - -static void rpcDecRef(SRpcInfo *pRpc) { - if (atomic_sub_fetch_32(&pRpc->refCount, 1) == 0) { - rpcCloseConnCache(pRpc->pCache); - taosHashCleanup(pRpc->hash); - taosTmrCleanUp(pRpc->tmrCtrl); - taosIdPoolCleanUp(pRpc->idPool); - - taosMemoryFreeClear(pRpc->connList); - taosThreadMutexDestroy(&pRpc->mutex); - tDebug("%s rpc resources are released", pRpc->label); - taosMemoryFreeClear(pRpc); - - atomic_sub_fetch_32(&tsRpcNum, 1); - } -} -#endif diff --git a/source/libs/transport/src/rpcTcp.c b/source/libs/transport/src/rpcTcp.c deleted file mode 100644 index 52c5ddcf631e9a0cc3a307fab00122c5139fa5dd..0000000000000000000000000000000000000000 --- a/source/libs/transport/src/rpcTcp.c +++ /dev/null @@ -1,657 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include "rpcTcp.h" -#include "os.h" -#include "rpcHead.h" -#include "rpcLog.h" -#include "taosdef.h" -#include "taoserror.h" -#include "tutil.h" - -#ifndef USE_UV -typedef struct SFdObj { - void * signature; - TdSocketPtr pSocket; // TCP socket FD - void * thandle; // handle from upper layer, like TAOS - uint32_t ip; - uint16_t port; - int16_t closedByApp; // 1: already closed by App - struct SThreadObj *pThreadObj; - struct SFdObj * prev; - struct SFdObj * next; -} SFdObj; - -typedef struct SThreadObj { - TdThread thread; - SFdObj * pHead; - TdThreadMutex mutex; - uint32_t ip; - bool stop; - TdEpollPtr pEpoll; - int numOfFds; - int threadId; - char label[TSDB_LABEL_LEN]; - void * shandle; // handle passed by upper layer during server initialization - void *(*processData)(SRecvInfo *pPacket); -} SThreadObj; - -typedef struct { - char label[TSDB_LABEL_LEN]; - int32_t index; - int numOfThreads; - SThreadObj **pThreadObj; -} SClientObj; - -typedef struct { - TdSocketServerPtr pSocketServer; - uint32_t ip; - uint16_t port; - int8_t stop; - int8_t reserve; - char label[TSDB_LABEL_LEN]; - int numOfThreads; - void * shandle; - SThreadObj **pThreadObj; - TdThread thread; -} SServerObj; - -static void * taosProcessTcpData(void *param); -static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, TdSocketPtr pSocket); -static void taosFreeFdObj(SFdObj *pFdObj); -static void taosReportBrokenLink(SFdObj *pFdObj); -static void * taosAcceptTcpConnection(void *arg); - -void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle) { - SServerObj *pServerObj; - SThreadObj *pThreadObj; - - pServerObj = (SServerObj *)taosMemoryCalloc(sizeof(SServerObj), 1); - if (pServerObj == NULL) { - tError("TCP:%s no enough memory", label); - terrno = TAOS_SYSTEM_ERROR(errno); - return NULL; - } - - pServerObj->pSocketServer = NULL; - taosResetPthread(&pServerObj->thread); - pServerObj->ip = ip; - pServerObj->port = port; - tstrncpy(pServerObj->label, label, sizeof(pServerObj->label)); - pServerObj->numOfThreads = numOfThreads; - - pServerObj->pThreadObj = (SThreadObj **)taosMemoryCalloc(sizeof(SThreadObj *), numOfThreads); - if (pServerObj->pThreadObj == NULL) { - tError("TCP:%s no enough memory", label); - terrno = TAOS_SYSTEM_ERROR(errno); - taosMemoryFree(pServerObj); - return NULL; - } - - int code = 0; - TdThreadAttr thattr; - taosThreadAttrInit(&thattr); - taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE); - - // initialize parameters in case it may encounter error later - for (int i = 0; i < numOfThreads; ++i) { - pThreadObj = (SThreadObj *)taosMemoryCalloc(sizeof(SThreadObj), 1); - if (pThreadObj == NULL) { - tError("TCP:%s no enough memory", label); - terrno = TAOS_SYSTEM_ERROR(errno); - for (int j = 0; j < i; ++j) taosMemoryFree(pServerObj->pThreadObj[j]); - taosMemoryFree(pServerObj->pThreadObj); - taosMemoryFree(pServerObj); - return NULL; - } - - pServerObj->pThreadObj[i] = pThreadObj; - pThreadObj->pEpoll = NULL; - taosResetPthread(&pThreadObj->thread); - pThreadObj->processData = fp; - tstrncpy(pThreadObj->label, label, sizeof(pThreadObj->label)); - pThreadObj->shandle = shandle; - pThreadObj->stop = false; - } - - // initialize mutex, thread, fd which may fail - for (int i = 0; i < numOfThreads; ++i) { - pThreadObj = pServerObj->pThreadObj[i]; - code = taosThreadMutexInit(&(pThreadObj->mutex), NULL); - if (code < 0) { - tError("%s failed to init TCP process data mutex(%s)", label, strerror(errno)); - break; - } - - pThreadObj->pEpoll = taosCreateEpoll(10); // size does not matter - if (pThreadObj->pEpoll == NULL) { - tError("%s failed to create TCP epoll", label); - code = -1; - break; - } - - code = taosThreadCreate(&(pThreadObj->thread), &thattr, taosProcessTcpData, (void *)(pThreadObj)); - if (code != 0) { - tError("%s failed to create TCP process data thread(%s)", label, strerror(errno)); - break; - } - - pThreadObj->threadId = i; - } - - pServerObj->pSocketServer = taosOpenTcpServerSocket(pServerObj->ip, pServerObj->port); - if (pServerObj->pSocketServer == NULL) code = -1; - - if (code == 0) { - code = taosThreadCreate(&pServerObj->thread, &thattr, taosAcceptTcpConnection, (void *)pServerObj); - if (code != 0) { - tError("%s failed to create TCP accept thread(%s)", label, strerror(code)); - } - } - - if (code != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - taosCleanUpTcpServer(pServerObj); - pServerObj = NULL; - } else { - tDebug("%s TCP server is initialized, ip:0x%x port:%hu numOfThreads:%d", label, ip, port, numOfThreads); - } - - taosThreadAttrDestroy(&thattr); - return (void *)pServerObj; -} - -static void taosStopTcpThread(SThreadObj *pThreadObj) { - if (pThreadObj == NULL) { - return; - } - // save thread into local variable and signal thread to stop - TdThread thread = pThreadObj->thread; - if (!taosCheckPthreadValid(thread)) { - return; - } - pThreadObj->stop = true; - if (taosComparePthread(thread, taosThreadSelf())) { - pthread_detach(taosThreadSelf()); - return; - } - taosThreadJoin(thread, NULL); -} - -void taosStopTcpServer(void *handle) { - SServerObj *pServerObj = handle; - - if (pServerObj == NULL) return; - pServerObj->stop = 1; - - if (pServerObj->pSocketServer != NULL) { - taosShutDownSocketServerRD(pServerObj->pSocketServer); - } - if (taosCheckPthreadValid(pServerObj->thread)) { - if (taosComparePthread(pServerObj->thread, taosThreadSelf())) { - pthread_detach(taosThreadSelf()); - } else { - taosThreadJoin(pServerObj->thread, NULL); - } - } - - tDebug("%s TCP server is stopped", pServerObj->label); -} - -void taosCleanUpTcpServer(void *handle) { - SServerObj *pServerObj = handle; - SThreadObj *pThreadObj; - if (pServerObj == NULL) return; - - for (int i = 0; i < pServerObj->numOfThreads; ++i) { - pThreadObj = pServerObj->pThreadObj[i]; - taosStopTcpThread(pThreadObj); - } - - tDebug("%s TCP server is cleaned up", pServerObj->label); - - taosMemoryFreeClear(pServerObj->pThreadObj); - taosMemoryFreeClear(pServerObj); -} - -static void *taosAcceptTcpConnection(void *arg) { - TdSocketPtr pSocket = NULL; - struct sockaddr_in caddr; - int threadId = 0; - SThreadObj * pThreadObj; - SServerObj * pServerObj; - - pServerObj = (SServerObj *)arg; - tDebug("%s TCP server is ready, ip:0x%x:%hu", pServerObj->label, pServerObj->ip, pServerObj->port); - setThreadName("acceptTcpConn"); - - while (1) { - socklen_t addrlen = sizeof(caddr); - pSocket = taosAcceptTcpConnectSocket(pServerObj->pSocketServer, (struct sockaddr *)&caddr, &addrlen); - if (pServerObj->stop) { - tDebug("%s TCP server stop accepting new connections", pServerObj->label); - break; - } - - if (pSocket == NULL) { - if (errno == EINVAL) { - tDebug("%s TCP server stop accepting new connections, exiting", pServerObj->label); - break; - } - - tError("%s TCP accept failure(%s)", pServerObj->label, strerror(errno)); - continue; - } - - taosKeepTcpAlive(pSocket); - struct timeval to = {5, 0}; - int32_t ret = taosSetSockOpt(pSocket, SOL_SOCKET, SO_RCVTIMEO, &to, sizeof(to)); - if (ret != 0) { - taosCloseSocket(&pSocket); - tError("%s failed to set recv timeout fd(%s)for connection from:%s:%hu", pServerObj->label, strerror(errno), - taosInetNtoa(caddr.sin_addr), htons(caddr.sin_port)); - continue; - } - - // pick up the thread to handle this connection - pThreadObj = pServerObj->pThreadObj[threadId]; - - SFdObj *pFdObj = taosMallocFdObj(pThreadObj, pSocket); - if (pFdObj) { - pFdObj->ip = caddr.sin_addr.s_addr; - pFdObj->port = htons(caddr.sin_port); - tDebug("%s new TCP connection from %s:%hu, FD:%p numOfFds:%d", pServerObj->label, - taosInetNtoa(caddr.sin_addr), pFdObj->port, pFdObj, pThreadObj->numOfFds); - } else { - taosCloseSocket(&pSocket); - tError("%s failed to malloc FdObj(%s) for connection from:%s:%hu", pServerObj->label, strerror(errno), - taosInetNtoa(caddr.sin_addr), htons(caddr.sin_port)); - } - - // pick up next thread for next connection - threadId++; - threadId = threadId % pServerObj->numOfThreads; - } - - taosCloseSocketServer(&pServerObj->pSocketServer); - return NULL; -} - -void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle) { - SClientObj *pClientObj = (SClientObj *)taosMemoryCalloc(1, sizeof(SClientObj)); - if (pClientObj == NULL) { - tError("TCP:%s no enough memory", label); - terrno = TAOS_SYSTEM_ERROR(errno); - return NULL; - } - - tstrncpy(pClientObj->label, label, sizeof(pClientObj->label)); - pClientObj->numOfThreads = numOfThreads; - pClientObj->pThreadObj = (SThreadObj **)taosMemoryCalloc(numOfThreads, sizeof(SThreadObj *)); - if (pClientObj->pThreadObj == NULL) { - tError("TCP:%s no enough memory", label); - taosMemoryFreeClear(pClientObj); - terrno = TAOS_SYSTEM_ERROR(errno); - } - - int code = 0; - TdThreadAttr thattr; - taosThreadAttrInit(&thattr); - taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE); - - for (int i = 0; i < numOfThreads; ++i) { - SThreadObj *pThreadObj = (SThreadObj *)taosMemoryCalloc(1, sizeof(SThreadObj)); - if (pThreadObj == NULL) { - tError("TCP:%s no enough memory", label); - terrno = TAOS_SYSTEM_ERROR(errno); - for (int j = 0; j < i; ++j) taosMemoryFree(pClientObj->pThreadObj[j]); - taosMemoryFree(pClientObj); - taosThreadAttrDestroy(&thattr); - return NULL; - } - pClientObj->pThreadObj[i] = pThreadObj; - taosResetPthread(&pThreadObj->thread); - pThreadObj->ip = ip; - pThreadObj->stop = false; - tstrncpy(pThreadObj->label, label, sizeof(pThreadObj->label)); - pThreadObj->shandle = shandle; - pThreadObj->processData = fp; - } - - // initialize mutex, thread, fd which may fail - for (int i = 0; i < numOfThreads; ++i) { - SThreadObj *pThreadObj = pClientObj->pThreadObj[i]; - code = taosThreadMutexInit(&(pThreadObj->mutex), NULL); - if (code < 0) { - tError("%s failed to init TCP process data mutex(%s)", label, strerror(errno)); - break; - } - - pThreadObj->pEpoll = taosCreateEpoll(10); // size does not matter - if (pThreadObj->pEpoll == NULL) { - tError("%s failed to create TCP epoll", label); - code = -1; - break; - } - - code = taosThreadCreate(&(pThreadObj->thread), &thattr, taosProcessTcpData, (void *)(pThreadObj)); - if (code != 0) { - tError("%s failed to create TCP process data thread(%s)", label, strerror(errno)); - break; - } - pThreadObj->threadId = i; - } - if (code != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - taosCleanUpTcpClient(pClientObj); - pClientObj = NULL; - } - return pClientObj; -} - -void taosStopTcpClient(void *chandle) { - SClientObj *pClientObj = chandle; - - if (pClientObj == NULL) return; - - tDebug("%s TCP client is stopped", pClientObj->label); -} - -void taosCleanUpTcpClient(void *chandle) { - SClientObj *pClientObj = chandle; - if (pClientObj == NULL) return; - for (int i = 0; i < pClientObj->numOfThreads; ++i) { - SThreadObj *pThreadObj = pClientObj->pThreadObj[i]; - taosStopTcpThread(pThreadObj); - } - - tDebug("%s TCP client is cleaned up", pClientObj->label); - taosMemoryFreeClear(pClientObj->pThreadObj); - taosMemoryFreeClear(pClientObj); -} - -void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uint16_t port) { - SClientObj *pClientObj = shandle; - int32_t index = atomic_load_32(&pClientObj->index) % pClientObj->numOfThreads; - atomic_store_32(&pClientObj->index, index + 1); - SThreadObj *pThreadObj = pClientObj->pThreadObj[index]; - - TdSocketPtr pSocket = taosOpenTcpClientSocket(ip, port, pThreadObj->ip); - if (pSocket == NULL) return NULL; - - struct sockaddr_in sin; - uint16_t localPort = 0; - unsigned int addrlen = sizeof(sin); - if (taosGetSocketName(pSocket, (struct sockaddr *)&sin, &addrlen) == 0 && sin.sin_family == AF_INET && addrlen == sizeof(sin)) { - localPort = (uint16_t)ntohs(sin.sin_port); - } - - SFdObj *pFdObj = taosMallocFdObj(pThreadObj, pSocket); - - if (pFdObj) { - pFdObj->thandle = thandle; - pFdObj->port = port; - pFdObj->ip = ip; - - char ipport[40] = {0}; - taosIpPort2String(ip, port, ipport); - tDebug("%s %p TCP connection to %s is created, localPort:%hu FD:%p numOfFds:%d", pThreadObj->label, thandle, - ipport, localPort, pFdObj, pThreadObj->numOfFds); - } else { - tError("%s failed to malloc client FdObj(%s)", pThreadObj->label, strerror(errno)); - taosCloseSocket(&pSocket); - } - - return pFdObj; -} - -void taosCloseTcpConnection(void *chandle) { - SFdObj *pFdObj = chandle; - if (pFdObj == NULL || pFdObj->signature != pFdObj) return; - - SThreadObj *pThreadObj = pFdObj->pThreadObj; - tDebug("%s %p TCP connection will be closed, FD:%p", pThreadObj->label, pFdObj->thandle, pFdObj); - - // pFdObj->thandle = NULL; - pFdObj->closedByApp = 1; - taosShutDownSocketWR(pFdObj->pSocket); -} - -int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chandle) { - SFdObj *pFdObj = chandle; - if (pFdObj == NULL || pFdObj->signature != pFdObj) return -1; - SThreadObj *pThreadObj = pFdObj->pThreadObj; - - int ret = taosWriteMsg(pFdObj->pSocket, data, len); - tTrace("%s %p TCP data is sent, FD:%p bytes:%d", pThreadObj->label, pFdObj->thandle, pFdObj, ret); - - return ret; -} - -static void taosReportBrokenLink(SFdObj *pFdObj) { - SThreadObj *pThreadObj = pFdObj->pThreadObj; - - // notify the upper layer, so it will clean the associated context - if (pFdObj->closedByApp == 0) { - taosShutDownSocketWR(pFdObj->pSocket); - - SRecvInfo recvInfo; - recvInfo.msg = NULL; - recvInfo.msgLen = 0; - recvInfo.ip = 0; - recvInfo.port = 0; - recvInfo.shandle = pThreadObj->shandle; - recvInfo.thandle = pFdObj->thandle; - recvInfo.chandle = NULL; - recvInfo.connType = RPC_CONN_TCP; - (*(pThreadObj->processData))(&recvInfo); - } - - taosFreeFdObj(pFdObj); -} - -static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) { - SRpcHead rpcHead; - int32_t msgLen, leftLen, retLen, headLen; - char * buffer, *msg; - - SThreadObj *pThreadObj = pFdObj->pThreadObj; - - headLen = taosReadMsg(pFdObj->pSocket, &rpcHead, sizeof(SRpcHead)); - if (headLen != sizeof(SRpcHead)) { - tDebug("%s %p read error, FD:%p headLen:%d", pThreadObj->label, pFdObj->thandle, pFdObj, headLen); - return -1; - } - - msgLen = (int32_t)htonl((uint32_t)rpcHead.msgLen); - int32_t size = msgLen + tsRpcOverhead; - buffer = taosMemoryMalloc(size); - if (NULL == buffer) { - tError("%s %p TCP taosMemoryMalloc(size:%d) fail", pThreadObj->label, pFdObj->thandle, msgLen); - return -1; - } else { - tTrace("%s %p read data, FD:%p TCP malloc mem:%p", pThreadObj->label, pFdObj->thandle, pFdObj, buffer); - } - - msg = buffer + tsRpcOverhead; - leftLen = msgLen - headLen; - retLen = taosReadMsg(pFdObj->pSocket, msg + headLen, leftLen); - - if (leftLen != retLen) { - tError("%s %p read error, leftLen:%d retLen:%d FD:%p", pThreadObj->label, pFdObj->thandle, leftLen, retLen, pFdObj); - taosMemoryFree(buffer); - return -1; - } - - memcpy(msg, &rpcHead, sizeof(SRpcHead)); - - pInfo->msg = msg; - pInfo->msgLen = msgLen; - pInfo->ip = pFdObj->ip; - pInfo->port = pFdObj->port; - pInfo->shandle = pThreadObj->shandle; - pInfo->thandle = pFdObj->thandle; - pInfo->chandle = pFdObj; - pInfo->connType = RPC_CONN_TCP; - - if (pFdObj->closedByApp) { - taosMemoryFree(buffer); - return -1; - } - - return 0; -} - -#define maxEvents 10 - -static void *taosProcessTcpData(void *param) { - SThreadObj * pThreadObj = param; - SFdObj * pFdObj; - struct epoll_event events[maxEvents]; - SRecvInfo recvInfo; - - char name[16] = {0}; - snprintf(name, tListLen(name), "%s-tcp", pThreadObj->label); - setThreadName(name); - - while (1) { - int fdNum = taosWaitEpoll(pThreadObj->pEpoll, events, maxEvents, TAOS_EPOLL_WAIT_TIME); - if (pThreadObj->stop) { - tDebug("%s TCP thread get stop event, exiting...", pThreadObj->label); - break; - } - if (fdNum < 0) continue; - - for (int i = 0; i < fdNum; ++i) { - pFdObj = events[i].data.ptr; - - if (events[i].events & EPOLLERR) { - tDebug("%s %p FD:%p epoll errors", pThreadObj->label, pFdObj->thandle, pFdObj); - taosReportBrokenLink(pFdObj); - continue; - } - - if (events[i].events & EPOLLRDHUP) { - tDebug("%s %p FD:%p RD hang up", pThreadObj->label, pFdObj->thandle, pFdObj); - taosReportBrokenLink(pFdObj); - continue; - } - - if (events[i].events & EPOLLHUP) { - tDebug("%s %p FD:%p hang up", pThreadObj->label, pFdObj->thandle, pFdObj); - taosReportBrokenLink(pFdObj); - continue; - } - - if (taosReadTcpData(pFdObj, &recvInfo) < 0) { - taosShutDownSocketWR(pFdObj->pSocket); - continue; - } - - pFdObj->thandle = (*(pThreadObj->processData))(&recvInfo); - if (pFdObj->thandle == NULL) taosFreeFdObj(pFdObj); - } - - if (pThreadObj->stop) break; - } - - if (pThreadObj->pEpoll != NULL) { - taosCloseEpoll(&pThreadObj->pEpoll); - pThreadObj->pEpoll = NULL; - } - - while (pThreadObj->pHead) { - pFdObj = pThreadObj->pHead; - pThreadObj->pHead = pFdObj->next; - taosReportBrokenLink(pFdObj); - } - - taosThreadMutexDestroy(&(pThreadObj->mutex)); - tDebug("%s TCP thread exits ...", pThreadObj->label); - taosMemoryFreeClear(pThreadObj); - - return NULL; -} - -static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, TdSocketPtr pSocket) { - struct epoll_event event; - - SFdObj *pFdObj = (SFdObj *)taosMemoryCalloc(sizeof(SFdObj), 1); - if (pFdObj == NULL) { - return NULL; - } - - pFdObj->closedByApp = 0; - pFdObj->pSocket = pSocket; - pFdObj->pThreadObj = pThreadObj; - pFdObj->signature = pFdObj; - - event.events = EPOLLIN | EPOLLRDHUP; - event.data.ptr = pFdObj; - if (taosCtlEpoll(pThreadObj->pEpoll, EPOLL_CTL_ADD, pSocket, &event) < 0) { - taosMemoryFreeClear(pFdObj); - terrno = TAOS_SYSTEM_ERROR(errno); - return NULL; - } - - // notify the data process, add into the FdObj list - taosThreadMutexLock(&(pThreadObj->mutex)); - pFdObj->next = pThreadObj->pHead; - if (pThreadObj->pHead) (pThreadObj->pHead)->prev = pFdObj; - pThreadObj->pHead = pFdObj; - pThreadObj->numOfFds++; - taosThreadMutexUnlock(&(pThreadObj->mutex)); - - return pFdObj; -} - -static void taosFreeFdObj(SFdObj *pFdObj) { - if (pFdObj == NULL) return; - if (pFdObj->signature != pFdObj) return; - - SThreadObj *pThreadObj = pFdObj->pThreadObj; - taosThreadMutexLock(&pThreadObj->mutex); - - if (pFdObj->signature == NULL) { - taosThreadMutexUnlock(&pThreadObj->mutex); - return; - } - - pFdObj->signature = NULL; - taosCtlEpoll(pThreadObj->pEpoll, EPOLL_CTL_DEL, pFdObj->pSocket, NULL); - taosCloseSocket(&pFdObj->pSocket); - - pThreadObj->numOfFds--; - if (pThreadObj->numOfFds < 0) - tError("%s %p TCP thread:%d, number of FDs is negative!!!", pThreadObj->label, pFdObj->thandle, - pThreadObj->threadId); - - if (pFdObj->prev) { - (pFdObj->prev)->next = pFdObj->next; - } else { - pThreadObj->pHead = pFdObj->next; - } - - if (pFdObj->next) { - (pFdObj->next)->prev = pFdObj->prev; - } - - taosThreadMutexUnlock(&pThreadObj->mutex); - - tDebug("%s %p TCP connection is closed, FD:%p numOfFds:%d", pThreadObj->label, pFdObj->thandle, pFdObj, pThreadObj->numOfFds); - - taosMemoryFreeClear(pFdObj); -} -#endif \ No newline at end of file diff --git a/source/libs/transport/src/rpcUdp.c b/source/libs/transport/src/rpcUdp.c deleted file mode 100644 index 359a94011dc493493aa9c63660806787b3be4d11..0000000000000000000000000000000000000000 --- a/source/libs/transport/src/rpcUdp.c +++ /dev/null @@ -1,261 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include "rpcUdp.h" -#include "os.h" -#include "rpcHead.h" -#include "rpcLog.h" -#include "taosdef.h" -#include "taoserror.h" -#include "ttimer.h" -#include "tutil.h" - -#ifndef USE_UV - -#define RPC_MAX_UDP_CONNS 256 -#define RPC_MAX_UDP_PKTS 1000 -#define RPC_UDP_BUF_TIME 5 // mseconds -#define RPC_MAX_UDP_SIZE 65480 - -typedef struct { - int index; - TdSocketPtr pSocket; - uint16_t port; // peer port - uint16_t localPort; // local port - char label[TSDB_LABEL_LEN]; // copy from udpConnSet; - TdThread thread; - void *hash; - void *shandle; // handle passed by upper layer during server initialization - void *pSet; - void *(*processData)(SRecvInfo *pRecv); - char *buffer; // buffer to receive data -} SUdpConn; - -typedef struct { - int index; - int server; - uint32_t ip; // local IP - uint16_t port; // local Port - void * shandle; // handle passed by upper layer during server initialization - int threads; - char label[TSDB_LABEL_LEN]; - void *(*fp)(SRecvInfo *pPacket); - SUdpConn udpConn[]; -} SUdpConnSet; - -static void *taosRecvUdpData(void *param); - -void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads, void *fp, void *shandle) { - SUdpConn * pConn; - SUdpConnSet *pSet; - - int size = (int)sizeof(SUdpConnSet) + threads * (int)sizeof(SUdpConn); - pSet = (SUdpConnSet *)taosMemoryMalloc((size_t)size); - if (pSet == NULL) { - tError("%s failed to allocate UdpConn", label); - terrno = TAOS_SYSTEM_ERROR(errno); - return NULL; - } - - memset(pSet, 0, (size_t)size); - pSet->ip = ip; - pSet->port = port; - pSet->shandle = shandle; - pSet->fp = fp; - pSet->threads = threads; - tstrncpy(pSet->label, label, sizeof(pSet->label)); - - TdThreadAttr thAttr; - taosThreadAttrInit(&thAttr); - taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); - - int i; - uint16_t ownPort; - for (i = 0; i < threads; ++i) { - pConn = pSet->udpConn + i; - ownPort = (port ? port + i : 0); - pConn->pSocket = taosOpenUdpSocket(ip, ownPort); - if (pConn->pSocket == NULL) { - tError("%s failed to open UDP socket %x:%hu", label, ip, port); - break; - } - - pConn->buffer = taosMemoryMalloc(RPC_MAX_UDP_SIZE); - if (NULL == pConn->buffer) { - tError("%s failed to malloc recv buffer", label); - break; - } - - struct sockaddr_in sin; - unsigned int addrlen = sizeof(sin); - if (taosGetSocketName(pConn->pSocket, (struct sockaddr *)&sin, &addrlen) == 0 && sin.sin_family == AF_INET && - addrlen == sizeof(sin)) { - pConn->localPort = (uint16_t)ntohs(sin.sin_port); - } - - tstrncpy(pConn->label, label, sizeof(pConn->label)); - pConn->shandle = shandle; - pConn->processData = fp; - pConn->index = i; - pConn->pSet = pSet; - - int code = taosThreadCreate(&pConn->thread, &thAttr, taosRecvUdpData, pConn); - if (code != 0) { - tError("%s failed to create thread to process UDP data(%s)", label, strerror(errno)); - break; - } - } - - taosThreadAttrDestroy(&thAttr); - - if (i != threads) { - terrno = TAOS_SYSTEM_ERROR(errno); - taosCleanUpUdpConnection(pSet); - return NULL; - } - - tDebug("%s UDP connection is initialized, ip:%x:%hu threads:%d", label, ip, port, threads); - return pSet; -} - -void taosStopUdpConnection(void *handle) { - SUdpConnSet *pSet = (SUdpConnSet *)handle; - SUdpConn * pConn; - - if (pSet == NULL) return; - - for (int i = 0; i < pSet->threads; ++i) { - pConn = pSet->udpConn + i; - if (pConn->pSocket != NULL) taosShutDownSocketRDWR(pConn->pSocket); - if (pConn->pSocket != NULL) taosCloseSocket(&pConn->pSocket); - pConn->pSocket = NULL; - } - - for (int i = 0; i < pSet->threads; ++i) { - pConn = pSet->udpConn + i; - if (taosCheckPthreadValid(pConn->thread)) { - taosThreadJoin(pConn->thread, NULL); - } - taosMemoryFreeClear(pConn->buffer); - // tTrace("%s UDP thread is closed, index:%d", pConn->label, i); - } - - tDebug("%s UDP is stopped", pSet->label); -} - -void taosCleanUpUdpConnection(void *handle) { - SUdpConnSet *pSet = (SUdpConnSet *)handle; - SUdpConn * pConn; - - if (pSet == NULL) return; - - for (int i = 0; i < pSet->threads; ++i) { - pConn = pSet->udpConn + i; - if (pConn->pSocket != NULL) taosCloseSocket(&pConn->pSocket); - } - - tDebug("%s UDP is cleaned up", pSet->label); - taosMemoryFreeClear(pSet); -} - -void *taosOpenUdpConnection(void *shandle, void *thandle, uint32_t ip, uint16_t port) { - SUdpConnSet *pSet = (SUdpConnSet *)shandle; - - pSet->index = (pSet->index + 1) % pSet->threads; - - SUdpConn *pConn = pSet->udpConn + pSet->index; - pConn->port = port; - - tDebug("%s UDP connection is setup, ip:%x:%hu localPort:%hu", pConn->label, ip, port, pConn->localPort); - - return pConn; -} - -static void *taosRecvUdpData(void *param) { - SUdpConn * pConn = param; - struct sockaddr_in sourceAdd; - ssize_t dataLen; - unsigned int addLen; - uint16_t port; - SRecvInfo recvInfo; - - memset(&sourceAdd, 0, sizeof(sourceAdd)); - addLen = sizeof(sourceAdd); - tDebug("%s UDP thread is created, index:%d", pConn->label, pConn->index); - char *msg = pConn->buffer; - - setThreadName("recvUdpData"); - - while (1) { - dataLen = taosReadFromSocket(pConn->pSocket, pConn->buffer, RPC_MAX_UDP_SIZE, 0, (struct sockaddr *)&sourceAdd, &addLen); - if (dataLen <= 0) { - tDebug("%s UDP socket was closed, exiting(%s), dataLen:%d", pConn->label, strerror(errno), (int32_t)dataLen); - - // for windows usage, remote shutdown also returns - 1 in windows client - if (pConn->pSocket == NULL) { - break; - } else { - continue; - } - } - - port = ntohs(sourceAdd.sin_port); - - if (dataLen < sizeof(SRpcHead)) { - tError("%s recvfrom failed(%s)", pConn->label, strerror(errno)); - continue; - } - - int32_t size = dataLen + tsRpcOverhead; - char * tmsg = taosMemoryMalloc(size); - if (NULL == tmsg) { - tError("%s failed to allocate memory, size:%" PRId64, pConn->label, (int64_t)dataLen); - continue; - } else { - tTrace("UDP malloc mem:%p size:%d", tmsg, size); - } - - tmsg += tsRpcOverhead; // overhead for SRpcReqContext - memcpy(tmsg, msg, dataLen); - recvInfo.msg = tmsg; - recvInfo.msgLen = dataLen; - recvInfo.ip = sourceAdd.sin_addr.s_addr; - recvInfo.port = port; - recvInfo.shandle = pConn->shandle; - recvInfo.thandle = NULL; - recvInfo.chandle = pConn; - recvInfo.connType = 0; - (*(pConn->processData))(&recvInfo); - } - - return NULL; -} - -int taosSendUdpData(uint32_t ip, uint16_t port, void *data, int dataLen, void *chandle) { - SUdpConn *pConn = (SUdpConn *)chandle; - - if (pConn == NULL) return -1; - - struct sockaddr_in destAdd; - memset(&destAdd, 0, sizeof(destAdd)); - destAdd.sin_family = AF_INET; - destAdd.sin_addr.s_addr = ip; - destAdd.sin_port = htons(port); - - int ret = taosSendto(pConn->pSocket, data, (size_t)dataLen, 0, (struct sockaddr *)&destAdd, sizeof(destAdd)); - - return ret; -} -#endif \ No newline at end of file diff --git a/source/libs/transport/test/pushServer.c b/source/libs/transport/test/pushServer.c index 2bf086b99bbdb5cc4709724ba4bba7d7e9dfcbba..8b1dcd46cfc0143c7d52d744317592b70eb5ec19 100644 --- a/source/libs/transport/test/pushServer.c +++ b/source/libs/transport/test/pushServer.c @@ -15,9 +15,9 @@ //#define _DEFAULT_SOURCE #include "os.h" -#include "rpcLog.h" #include "tglobal.h" #include "tqueue.h" +#include "transLog.h" #include "trpc.h" int msgSize = 128; diff --git a/source/libs/transport/test/rclient.c b/source/libs/transport/test/rclient.c index 5755e4a273bbc654b222ed23c20e8fc14e73b5a9..eea76096ffa6a96e0e8f4ce02e4cb6bf7b6eeb41 100644 --- a/source/libs/transport/test/rclient.c +++ b/source/libs/transport/test/rclient.c @@ -14,9 +14,9 @@ */ #include #include "os.h" -#include "rpcLog.h" #include "taoserror.h" #include "tglobal.h" +#include "transLog.h" #include "trpc.h" #include "tutil.h" diff --git a/source/libs/transport/test/rserver.c b/source/libs/transport/test/rserver.c index 42bebe5191801ad2e451c2bb4cea7b573aeeecb4..6262b3ae4843703fd301fb8d9675b477bb1e3128 100644 --- a/source/libs/transport/test/rserver.c +++ b/source/libs/transport/test/rserver.c @@ -15,9 +15,9 @@ //#define _DEFAULT_SOURCE #include "os.h" -#include "rpcLog.h" #include "tglobal.h" #include "tqueue.h" +#include "transLog.h" #include "trpc.h" int msgSize = 128; diff --git a/source/libs/transport/test/syncClient.c b/source/libs/transport/test/syncClient.c index 6fb7d81fcab1ae1e0db6814758f95046359b89e3..bc6461eaed1f5d2c7ca9d82f9ae47dbebf4fc02b 100644 --- a/source/libs/transport/test/syncClient.c +++ b/source/libs/transport/test/syncClient.c @@ -14,9 +14,9 @@ */ #include #include "os.h" -#include "rpcLog.h" #include "taoserror.h" #include "tglobal.h" +#include "transLog.h" #include "trpc.h" #include "tutil.h" diff --git a/source/libs/transport/test/transUT.cpp b/source/libs/transport/test/transUT.cpp index 4829f5aa397fd96dfb05caf23023b6a35d852e26..3f5ef1fb53e1b0a4235b3c851ef6790f09c6c89b 100644 --- a/source/libs/transport/test/transUT.cpp +++ b/source/libs/transport/test/transUT.cpp @@ -15,10 +15,10 @@ #include #include #include -#include "rpcLog.h" #include "tdatablock.h" #include "tglobal.h" #include "tlog.h" +#include "transLog.h" #include "trpc.h" using namespace std;