diff --git a/cmake/libuv_CMakeLists.txt.in b/cmake/libuv_CMakeLists.txt.in index ed406e089e0f641f0a87c1a80fd5a501acbaf490..6c7ab79ed29a39725771d8aa20ede47e50597ae5 100644 --- a/cmake/libuv_CMakeLists.txt.in +++ b/cmake/libuv_CMakeLists.txt.in @@ -4,9 +4,9 @@ ExternalProject_Add(libuv GIT_REPOSITORY https://github.com/libuv/libuv.git GIT_TAG v1.42.0 SOURCE_DIR "${CMAKE_CONTRIB_DIR}/libuv" - BINARY_DIR "" - CONFIGURE_COMMAND "" - BUILD_COMMAND "" + BINARY_DIR "${CMAKE_CONTRIB_DIR}/libuv" + CONFIGURE_COMMAND "" + BUILD_COMMAND "" INSTALL_COMMAND "" TEST_COMMAND "" - ) \ No newline at end of file + ) diff --git a/source/libs/transport/CMakeLists.txt b/source/libs/transport/CMakeLists.txt index 98a380dc8f79c11073d162b78d4817a567e42511..c4eeef5df20f23e8d2c557bc7e91a8ce2442032c 100644 --- a/source/libs/transport/CMakeLists.txt +++ b/source/libs/transport/CMakeLists.txt @@ -12,4 +12,19 @@ target_link_libraries( PUBLIC os PUBLIC util PUBLIC common -) \ No newline at end of file +) +if (${BUILD_WITH_UV}) + target_include_directories( + transport + PUBLIC "${CMAKE_SOURCE_DIR}/contrib/libuv/include" + ) + +#LINK_DIRECTORIES("${CMAKE_SOURCE_DIR}/debug/contrib/libuv") + target_link_libraries( + transport + PUBLIC uv_a + ) + add_definitions(-DUSE_UV) +endif(${BUILD_WITH_UV}) + + diff --git a/source/libs/transport/inc/rpcCache.h b/source/libs/transport/inc/rpcCache.h index 3a996aab7ce6a5f76d8a3f8fefbf6eefbb917183..5148f5e23c928026c32f623e67fb7e576abd5604 100644 --- a/source/libs/transport/inc/rpcCache.h +++ b/source/libs/transport/inc/rpcCache.h @@ -16,6 +16,8 @@ #ifndef TDENGINE_RPC_CACHE_H #define TDENGINE_RPC_CACHE_H +#include + #ifdef __cplusplus extern "C" { #endif diff --git a/source/libs/transport/inc/rpcHead.h b/source/libs/transport/inc/rpcHead.h index 6e98bbd563a7653007d8b664d75ac24cc275cd76..7317d84af1671c06452b6b3f4525675414e160a5 100644 --- a/source/libs/transport/inc/rpcHead.h +++ b/source/libs/transport/inc/rpcHead.h @@ -16,52 +16,57 @@ #ifndef TDENGINE_RPCHEAD_H #define TDENGINE_RPCHEAD_H +#include #ifdef __cplusplus extern "C" { #endif -#define RPC_CONN_TCP 2 +#ifdef USE_UV + +#else + +#define RPC_CONN_TCP 2 extern int tsRpcOverhead; typedef struct { - void *msg; + void* msg; int msgLen; - uint32_t ip; + uint32_t ip; uint16_t port; int connType; - void *shandle; - void *thandle; - void *chandle; + void* shandle; + void* thandle; + void* chandle; } SRecvInfo; #pragma pack(push, 1) typedef struct { - char version:4; // RPC version - char comp:4; // compression algorithm, 0:no compression 1:lz4 - char resflag:2; // reserved bits - char spi:3; // security parameter index - char encrypt:3; // encrypt algorithm, 0: no encryption - uint16_t tranId; // transcation ID - uint32_t linkUid; // for unique connection ID assigned by client - uint64_t ahandle; // ahandle assigned by client - uint32_t sourceId; // source ID, an index for connection list - uint32_t destId; // destination ID, an index for connection list - uint32_t destIp; // destination IP address, for NAT scenario - char user[TSDB_UNI_LEN]; // user ID - uint16_t port; // for UDP only, port may be changed - char empty[1]; // reserved - uint16_t msgType; // message type - int32_t msgLen; // message length including the header iteslf + char version : 4; // RPC version + char comp : 4; // compression algorithm, 0:no compression 1:lz4 + char resflag : 2; // reserved bits + char spi : 3; // security parameter index + char encrypt : 3; // encrypt algorithm, 0: no encryption + uint16_t tranId; // transcation ID + uint32_t linkUid; // for unique connection ID assigned by client + uint64_t ahandle; // ahandle assigned by client + uint32_t sourceId; // source ID, an index for connection list + uint32_t destId; // destination ID, an index for connection list + uint32_t destIp; // destination IP address, for NAT scenario + char user[TSDB_UNI_LEN]; // user ID + uint16_t port; // for UDP only, port may be changed + char empty[1]; // reserved + uint16_t msgType; // message type + int32_t msgLen; // message length including the header iteslf uint32_t msgVer; - int32_t code; // code in response message - uint8_t content[0]; // message body starts from here + int32_t code; // code in response message + uint8_t content[0]; // message body starts from here } SRpcHead; typedef struct { - int32_t reserved; - int32_t contLen; + int32_t reserved; + int32_t contLen; } SRpcComp; typedef struct { @@ -70,11 +75,10 @@ typedef struct { } SRpcDigest; #pragma pack(pop) - +#endif #ifdef __cplusplus } #endif #endif // TDENGINE_RPCHEAD_H - diff --git a/source/libs/transport/inc/rpcTcp.h b/source/libs/transport/inc/rpcTcp.h index 6ef8fc2d921a3379532bbc0efd2f226ef3389fc5..5e5c43a1db95a2886552db9c0e65df83115b2cd3 100644 --- a/source/libs/transport/inc/rpcTcp.h +++ b/source/libs/transport/inc/rpcTcp.h @@ -15,23 +15,28 @@ #ifndef _rpc_tcp_header_ #define _rpc_tcp_header_ +#include #ifdef __cplusplus extern "C" { #endif +#ifdef USE_UV +#else void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle); -void taosStopTcpServer(void *param); -void taosCleanUpTcpServer(void *param); +void taosStopTcpServer(void *param); +void taosCleanUpTcpServer(void *param); void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void *fp, void *shandle); -void taosStopTcpClient(void *chandle); -void taosCleanUpTcpClient(void *chandle); +void taosStopTcpClient(void *chandle); +void taosCleanUpTcpClient(void *chandle); void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uint16_t port); void taosCloseTcpConnection(void *chandle); int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chandle); +#endif + #ifdef __cplusplus } #endif diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index 24a2e0ef89fcf32bc3c1950384480dbce9c7228c..9809f7ee1af24f3cbe426c7e67f62f7f0ebebb42 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -20,8 +20,13 @@ extern "C" { #endif +#ifdef USE_UV + +#else + +#endif #ifdef __cplusplus } #endif -#endif /*_TD_TRANSPORT_INT_H_*/ \ No newline at end of file +#endif /*_TD_TRANSPORT_INT_H_*/ diff --git a/source/libs/transport/src/rpcCache.c b/source/libs/transport/src/rpcCache.c index 7de7aa341b90ed753b63d20b29979f7f72b53734..40767d2ba53c27cc183c29305837d041673320d2 100644 --- a/source/libs/transport/src/rpcCache.c +++ b/source/libs/transport/src/rpcCache.c @@ -13,37 +13,40 @@ * along with this program. If not, see . */ +#include "rpcCache.h" #include "os.h" +#include "rpcLog.h" #include "taosdef.h" #include "tglobal.h" #include "tmempool.h" #include "ttimer.h" #include "tutil.h" -#include "rpcLog.h" -#include "rpcCache.h" +#ifdef USE_UV + +#else typedef struct SConnHash { char fqdn[TSDB_FQDN_LEN]; uint16_t port; char connType; struct SConnHash *prev; struct SConnHash *next; - void *data; + void * data; uint64_t time; } SConnHash; typedef struct { - SConnHash **connHashList; + SConnHash ** connHashList; mpool_h connHashMemPool; int maxSessions; int total; int * count; int64_t keepTimer; pthread_mutex_t mutex; - void (*cleanFp)(void *); - void *tmrCtrl; - void *pTimer; - int64_t *lockedBy; + void (*cleanFp)(void *); + void * tmrCtrl; + void * pTimer; + int64_t *lockedBy; } SConnCache; static int rpcHashConn(void *handle, char *fqdn, uint16_t port, int8_t connType); @@ -122,7 +125,7 @@ void rpcAddConnIntoCache(void *handle, void *data, char *fqdn, uint16_t port, in uint64_t time = taosGetTimestampMs(); pCache = (SConnCache *)handle; - assert(pCache); + assert(pCache); assert(data); hash = rpcHashConn(pCache, fqdn, port, connType); @@ -134,7 +137,7 @@ void rpcAddConnIntoCache(void *handle, void *data, char *fqdn, uint16_t port, in pNode->prev = NULL; pNode->time = time; - rpcLockCache(pCache->lockedBy+hash); + rpcLockCache(pCache->lockedBy + hash); pNode->next = pCache->connHashList[hash]; if (pCache->connHashList[hash] != NULL) (pCache->connHashList[hash])->prev = pNode; @@ -143,10 +146,11 @@ void rpcAddConnIntoCache(void *handle, void *data, char *fqdn, uint16_t port, in pCache->count[hash]++; rpcRemoveExpiredNodes(pCache, pNode->next, hash, time); - rpcUnlockCache(pCache->lockedBy+hash); + rpcUnlockCache(pCache->lockedBy + hash); pCache->total++; - // tTrace("%p %s:%hu:%d:%d:%p added into cache, connections:%d", data, fqdn, port, connType, hash, pNode, pCache->count[hash]); + // tTrace("%p %s:%hu:%d:%d:%p added into cache, connections:%d", data, fqdn, port, connType, hash, pNode, + // pCache->count[hash]); return; } @@ -158,12 +162,12 @@ void *rpcGetConnFromCache(void *handle, char *fqdn, uint16_t port, int8_t connTy void * pData = NULL; pCache = (SConnCache *)handle; - assert(pCache); + assert(pCache); uint64_t time = taosGetTimestampMs(); hash = rpcHashConn(pCache, fqdn, port, connType); - rpcLockCache(pCache->lockedBy+hash); + rpcLockCache(pCache->lockedBy + hash); pNode = pCache->connHashList[hash]; while (pNode) { @@ -197,12 +201,14 @@ void *rpcGetConnFromCache(void *handle, char *fqdn, uint16_t port, int8_t connTy pCache->count[hash]--; } - rpcUnlockCache(pCache->lockedBy+hash); + rpcUnlockCache(pCache->lockedBy + hash); if (pData) { - //tTrace("%p %s:%hu:%d:%d:%p retrieved from cache, connections:%d", pData, fqdn, port, connType, hash, pNode, pCache->count[hash]); + // tTrace("%p %s:%hu:%d:%d:%p retrieved from cache, connections:%d", pData, fqdn, port, connType, hash, pNode, + // pCache->count[hash]); } else { - //tTrace("%s:%hu:%d:%d failed to retrieve conn from cache, connections:%d", fqdn, port, connType, hash, pCache->count[hash]); + // tTrace("%s:%hu:%d:%d failed to retrieve conn from cache, connections:%d", fqdn, port, connType, hash, + // pCache->count[hash]); } return pData; @@ -221,10 +227,10 @@ static void rpcCleanConnCache(void *handle, void *tmrId) { uint64_t time = taosGetTimestampMs(); for (hash = 0; hash < pCache->maxSessions; ++hash) { - rpcLockCache(pCache->lockedBy+hash); + rpcLockCache(pCache->lockedBy + hash); pNode = pCache->connHashList[hash]; rpcRemoveExpiredNodes(pCache, pNode, hash, time); - rpcUnlockCache(pCache->lockedBy+hash); + rpcUnlockCache(pCache->lockedBy + hash); } // tTrace("timer, total connections in cache:%d", pCache->total); @@ -233,7 +239,7 @@ static void rpcCleanConnCache(void *handle, void *tmrId) { } static void rpcRemoveExpiredNodes(SConnCache *pCache, SConnHash *pNode, int hash, uint64_t time) { - if (pNode == NULL || (time < pCache->keepTimer + pNode->time) ) return; + if (pNode == NULL || (time < pCache->keepTimer + pNode->time)) return; SConnHash *pPrev = pNode->prev, *pNext; @@ -242,7 +248,8 @@ static void rpcRemoveExpiredNodes(SConnCache *pCache, SConnHash *pNode, int hash pNext = pNode->next; pCache->total--; pCache->count[hash]--; - //tTrace("%p %s:%hu:%d:%d:%p removed from cache, connections:%d", pNode->data, pNode->fqdn, pNode->port, pNode->connType, hash, pNode, + // tTrace("%p %s:%hu:%d:%d:%p removed from cache, connections:%d", pNode->data, pNode->fqdn, pNode->port, + // pNode->connType, hash, pNode, // pCache->count[hash]); taosMemPoolFree(pCache->connHashMemPool, (char *)pNode); pNode = pNext; @@ -257,7 +264,7 @@ static void rpcRemoveExpiredNodes(SConnCache *pCache, SConnHash *pNode, int hash static int rpcHashConn(void *handle, char *fqdn, uint16_t port, int8_t connType) { SConnCache *pCache = (SConnCache *)handle; int hash = 0; - char *temp = fqdn; + char * temp = fqdn; while (*temp) { hash += *temp; @@ -288,4 +295,4 @@ static void rpcUnlockCache(int64_t *lockedBy) { assert(false); } } - +#endif diff --git a/source/libs/transport/src/rpcMain.c b/source/libs/transport/src/rpcMain.c index d58ea63c4fe44d000a14fd0d9fe9f9e16945ff37..9bba0dca0a634c46cf3170f0411ed8d7322cc30a 100644 --- a/source/libs/transport/src/rpcMain.c +++ b/source/libs/transport/src/rpcMain.c @@ -13,117 +13,265 @@ * along with this program. If not, see . */ +#include +#include "lz4.h" #include "os.h" +#include "rpcCache.h" +#include "rpcHead.h" +#include "rpcLog.h" +#include "rpcTcp.h" +#include "rpcUdp.h" +#include "taoserror.h" +#include "tglobal.h" +#include "thash.h" #include "tidpool.h" #include "tmd5.h" #include "tmempool.h" -#include "ttimer.h" -#include "tutil.h" -#include "lz4.h" -#include "tref.h" -#include "taoserror.h" -#include "tglobal.h" #include "tmsg.h" +#include "tref.h" #include "trpc.h" -#include "thash.h" -#include "rpcLog.h" -#include "rpcUdp.h" -#include "rpcCache.h" -#include "rpcTcp.h" -#include "rpcHead.h" +#include "ttimer.h" +#include "tutil.h" + +#ifdef USE_UV + +#define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member))) + +int32_t rpcInit() { return -1; } +void rpcCleanup() { return; }; +void* rpcOpen(const SRpcInit* pRpc) { return NULL; } +void rpcClose(void* arg) { return; } +void* rpcMallocCont(int contLen) { return NULL; } +void rpcFreeCont(void* cont) { return; } +void* rpcReallocCont(void* ptr, int contLen) { return NULL; } + +void rpcSendRequest(void* thandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* rid) { return; } + +void rpcSendResponse(const SRpcMsg* pMsg) {} + +void rpcSendRedirectRsp(void* pConn, const SEpSet* pEpSet) {} +int rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) { return -1; } +void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) { return; } +int rpcReportProgress(void* pConn, char* pCont, int contLen) { return -1; } +void rpcCancelRequest(int64_t rid) { return; } + +typedef struct SThreadObj { + pthread_t thread; + uv_pipe_t* pipe; + uv_loop_t* loop; + uv_async_t* workerAsync; // + int fd; +} SThreadObj; + +typedef struct SServerObj { + uv_tcp_t server; + uv_loop_t* loop; + int workerIdx; + int numOfThread; + SThreadObj** pThreadObj; + uv_pipe_t** pipe; +} SServerObj; + +typedef struct SConnCtx { + uv_tcp_t* pClient; + uv_timer_t* pTimer; + uv_async_t* pWorkerAsync; + int ref; +} SConnCtx; + +void allocBuffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { + buf->base = malloc(suggested_size); + buf->len = suggested_size; +} + +void onTimeout(uv_timer_t* handle) { + // opt + tDebug("time out"); +} +void onRead(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { + // opt + tDebug("data already was read on a stream"); +} + +void onWrite(uv_write_t* req, int status) { + // opt + if (req) tDebug("data already was written on stream"); +} -#define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHead) + sizeof(SRpcDigest)) -#define rpcHeadFromCont(cont) ((SRpcHead *) ((char*)cont - sizeof(SRpcHead))) +static void workerAsyncCB(uv_async_t* handle) { + // opt + SThreadObj* pObj = container_of(handle, SThreadObj, workerAsync); +} +void onAccept(uv_stream_t* stream, int status) { + if (status == -1) { + return; + } + SServerObj* pObj = container_of(stream, SServerObj, server); + tDebug("new conntion accepted by main server, dispatch to one worker thread"); + + uv_tcp_t* cli = (uv_tcp_t*)malloc(sizeof(uv_tcp_t)); + uv_tcp_init(pObj->loop, cli); + if (uv_accept(stream, (uv_stream_t*)cli) == 0) { + uv_write_t* wr = (uv_write_t*)malloc(sizeof(uv_write_t)); + + uv_buf_t buf = uv_buf_init("a", 1); + // despatch to worker thread + pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThread; + uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, onWrite); + } else { + uv_close((uv_handle_t*)cli, NULL); + } +} +void onConnection(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { + if (nread < 0) { + if (nread != UV_EOF) { + tError("read error %s", uv_err_name(nread)); + } + // TODO(log other failure reason) + uv_close((uv_handle_t*)q, NULL); + return; + } + SThreadObj* pObj = (SThreadObj*)container_of(q, struct SThreadObj, pipe); + + uv_pipe_t* pipe = (uv_pipe_t*)q; + if (!uv_pipe_pending_count(pipe)) { + tError("No pending count"); + return; + } + uv_handle_type pending = uv_pipe_pending_type(pipe); + assert(pending == UV_TCP); + + SConnCtx* pConn = malloc(sizeof(SConnCtx)); + /* init conn timer*/ + pConn->pTimer = malloc(sizeof(uv_timer_t)); + uv_timer_init(pObj->loop, pConn->pTimer); + + pConn->pClient = (uv_tcp_t*)malloc(sizeof(uv_tcp_t)); + pConn->pWorkerAsync = pObj->workerAsync; // thread safty + uv_tcp_init(pObj->loop, pConn->pClient); + + if (uv_accept(q, (uv_stream_t*)(pConn->pClient)) == 0) { + uv_os_fd_t fd; + uv_fileno((const uv_handle_t*)pConn->pClient, &fd); + tDebug("new connection created: %d", fd); + uv_timer_start(pConn->pTimer, onTimeout, 10, 0); + uv_read_start((uv_stream_t*)(pConn->pClient), allocBuffer, onRead); + } else { + uv_timer_stop(pConn->pTimer); + free(pConn->pTimer); + uv_close((uv_handle_t*)pConn->pClient, NULL); + free(pConn->pClient); + free(pConn); + } +} + +void* workerThread(void* arg) { + SThreadObj* pObj = (SThreadObj*)arg; + int fd = pObj->fd; + pObj->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t)); + uv_loop_init(pObj->loop); + + uv_pipe_init(pObj->loop, pObj->pipe, 1); + uv_pipe_open(pObj->pipe, fd); + + pObj->workerAsync = malloc(sizeof(uv_async_t)); + uv_async_init(pObj->loop, pObj->workerAsync, workerAsyncCB); + uv_read_start((uv_stream_t*)pObj->pipe, allocBuffer, onConnection); +} +#else + +#define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHead) + sizeof(SRpcDigest)) +#define rpcHeadFromCont(cont) ((SRpcHead*)((char*)cont - sizeof(SRpcHead))) #define rpcContFromHead(msg) (msg + sizeof(SRpcHead)) #define rpcMsgLenFromCont(contLen) (contLen + sizeof(SRpcHead)) #define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHead)) #define rpcIsReq(type) (type & 1U) typedef struct { - int sessions; // number of sessions allowed - int numOfThreads; // number of threads to process incoming messages - int idleTime; // milliseconds; + int sessions; // number of sessions allowed + int numOfThreads; // number of threads to process incoming messages + int idleTime; // milliseconds; uint16_t localPort; int8_t connType; - int index; // for UDP server only, round robin for multiple threads + int index; // for UDP server only, round robin for multiple threads char label[TSDB_LABEL_LEN]; - char user[TSDB_UNI_LEN]; // meter ID - char spi; // security parameter index - char encrypt; // encrypt algorithm - char secret[TSDB_PASSWORD_LEN]; // secret for the link - char ckey[TSDB_PASSWORD_LEN]; // ciphering key - - void (*cfp)(void *parent, SRpcMsg *, SEpSet *); - int (*afp)(void *parent, char *user, char *spi, char *encrypt, char *secret, char *ckey); - - int32_t refCount; - void *parent; - void *idPool; // handle to ID pool - void *tmrCtrl; // handle to timer - SHashObj *hash; // handle returned by hash utility - void *tcphandle;// returned handle from TCP initialization - void *udphandle;// returned handle from UDP initialization - void *pCache; // connection cache + char user[TSDB_UNI_LEN]; // meter ID + char spi; // security parameter index + char encrypt; // encrypt algorithm + char secret[TSDB_PASSWORD_LEN]; // secret for the link + char ckey[TSDB_PASSWORD_LEN]; // ciphering key + + void (*cfp)(void *parent, SRpcMsg *, SEpSet *); + int (*afp)(void *parent, char *user, char *spi, char *encrypt, char *secret, char *ckey); + + int32_t refCount; + void * parent; + void * idPool; // handle to ID pool + void * tmrCtrl; // handle to timer + SHashObj * hash; // handle returned by hash utility + void * tcphandle; // returned handle from TCP initialization + void * udphandle; // returned handle from UDP initialization + void * pCache; // connection cache pthread_mutex_t mutex; struct SRpcConn *connList; // connection list } SRpcInfo; typedef struct { - SRpcInfo *pRpc; // associated SRpcInfo - SEpSet epSet; // ip list provided by app - void *ahandle; // handle provided by app - struct SRpcConn *pConn; // pConn allocated - tmsg_t msgType; // message type - uint8_t *pCont; // content provided by app - int32_t contLen; // content length - int32_t code; // error code - int16_t numOfTry; // number of try for different servers - int8_t oldInUse; // server EP inUse passed by app - int8_t redirect; // flag to indicate redirect - int8_t connType; // connection type - int64_t rid; // refId returned by taosAddRef - SRpcMsg *pRsp; // for synchronous API - tsem_t *pSem; // for synchronous API - SEpSet *pSet; // for synchronous API - char msg[0]; // RpcHead starts from here + SRpcInfo * pRpc; // associated SRpcInfo + SEpSet epSet; // ip list provided by app + void * ahandle; // handle provided by app + struct SRpcConn *pConn; // pConn allocated + tmsg_t msgType; // message type + uint8_t * pCont; // content provided by app + int32_t contLen; // content length + int32_t code; // error code + int16_t numOfTry; // number of try for different servers + int8_t oldInUse; // server EP inUse passed by app + int8_t redirect; // flag to indicate redirect + int8_t connType; // connection type + int64_t rid; // refId returned by taosAddRef + SRpcMsg * pRsp; // for synchronous API + tsem_t * pSem; // for synchronous API + SEpSet * pSet; // for synchronous API + char msg[0]; // RpcHead starts from here } SRpcReqContext; typedef struct SRpcConn { - char info[48];// debug info: label + pConn + ahandle - int sid; // session ID - uint32_t ownId; // own link ID - uint32_t peerId; // peer link ID - char user[TSDB_UNI_LEN]; // user ID for the link - char spi; // security parameter index - char encrypt; // encryption, 0:1 - char secret[TSDB_PASSWORD_LEN]; // secret for the link - char ckey[TSDB_PASSWORD_LEN]; // ciphering key - char secured; // if set to 1, no authentication - uint16_t localPort; // for UDP only - uint32_t linkUid; // connection unique ID assigned by client - uint32_t peerIp; // peer IP - uint16_t peerPort; // peer port - char peerFqdn[TSDB_FQDN_LEN]; // peer FQDN or ip string - uint16_t tranId; // outgoing transcation ID, for build message - uint16_t outTranId; // outgoing transcation ID - uint16_t inTranId; // transcation ID for incoming msg - tmsg_t outType; // message type for outgoing request - tmsg_t inType; // message type for incoming request - void *chandle; // handle passed by TCP/UDP connection layer - void *ahandle; // handle provided by upper app layter - int retry; // number of retry for sending request - int tretry; // total retry - void *pTimer; // retry timer to monitor the response - void *pIdleTimer; // idle timer - char *pRspMsg; // response message including header - int rspMsgLen; // response messag length - char *pReqMsg; // request message including header - int reqMsgLen; // request message length - SRpcInfo *pRpc; // the associated SRpcInfo - int8_t connType; // connection type - int64_t lockedBy; // lock for connection - SRpcReqContext *pContext; // request context + char info[48]; // debug info: label + pConn + ahandle + int sid; // session ID + uint32_t ownId; // own link ID + uint32_t peerId; // peer link ID + char user[TSDB_UNI_LEN]; // user ID for the link + char spi; // security parameter index + char encrypt; // encryption, 0:1 + char secret[TSDB_PASSWORD_LEN]; // secret for the link + char ckey[TSDB_PASSWORD_LEN]; // ciphering key + char secured; // if set to 1, no authentication + uint16_t localPort; // for UDP only + uint32_t linkUid; // connection unique ID assigned by client + uint32_t peerIp; // peer IP + uint16_t peerPort; // peer port + char peerFqdn[TSDB_FQDN_LEN]; // peer FQDN or ip string + uint16_t tranId; // outgoing transcation ID, for build message + uint16_t outTranId; // outgoing transcation ID + uint16_t inTranId; // transcation ID for incoming msg + tmsg_t outType; // message type for outgoing request + tmsg_t inType; // message type for incoming request + void * chandle; // handle passed by TCP/UDP connection layer + void * ahandle; // handle provided by upper app layter + int retry; // number of retry for sending request + int tretry; // total retry + void * pTimer; // retry timer to monitor the response + void * pIdleTimer; // idle timer + char * pRspMsg; // response message including header + int rspMsgLen; // response messag length + char * pReqMsg; // request message including header + int reqMsgLen; // request message length + SRpcInfo * pRpc; // the associated SRpcInfo + int8_t connType; // connection type + int64_t lockedBy; // lock for connection + SRpcReqContext *pContext; // request context } SRpcConn; static pthread_once_t tsRpcInitOnce = PTHREAD_ONCE_INIT; @@ -137,41 +285,29 @@ int tsRpcOverhead; static int tsRpcRefId = -1; static int32_t tsRpcNum = 0; -//static pthread_once_t tsRpcInit = PTHREAD_ONCE_INIT; +// static pthread_once_t tsRpcInit = PTHREAD_ONCE_INIT; // server:0 client:1 tcp:2 udp:0 -#define RPC_CONN_UDPS 0 -#define RPC_CONN_UDPC 1 -#define RPC_CONN_TCPS 2 -#define RPC_CONN_TCPC 3 +#define RPC_CONN_UDPS 0 +#define RPC_CONN_UDPC 1 +#define RPC_CONN_TCPS 2 +#define RPC_CONN_TCPC 3 void *(*taosInitConn[])(uint32_t ip, uint16_t port, char *label, int threads, void *fp, void *shandle) = { - taosInitUdpConnection, - taosInitUdpConnection, - taosInitTcpServer, - taosInitTcpClient -}; + taosInitUdpConnection, taosInitUdpConnection, taosInitTcpServer, taosInitTcpClient}; -void (*taosCleanUpConn[])(void *thandle) = { - taosCleanUpUdpConnection, - taosCleanUpUdpConnection, - taosCleanUpTcpServer, - taosCleanUpTcpClient -}; +void (*taosCleanUpConn[])(void *thandle) = {taosCleanUpUdpConnection, taosCleanUpUdpConnection, taosCleanUpTcpServer, + taosCleanUpTcpClient}; void (*taosStopConn[])(void *thandle) = { - taosStopUdpConnection, - taosStopUdpConnection, + taosStopUdpConnection, + taosStopUdpConnection, taosStopTcpServer, taosStopTcpClient, }; int (*taosSendData[])(uint32_t ip, uint16_t port, void *data, int len, void *chandle) = { - taosSendUdpData, - taosSendUdpData, - taosSendTcpData, - taosSendTcpData -}; + taosSendUdpData, taosSendUdpData, taosSendTcpData, taosSendTcpData}; void *(*taosOpenConn[])(void *shandle, void *thandle, uint32_t ip, uint16_t port) = { taosOpenUdpConnection, @@ -180,12 +316,7 @@ void *(*taosOpenConn[])(void *shandle, void *thandle, uint32_t ip, uint16_t port taosOpenTcpClientConnection, }; -void (*taosCloseConn[])(void *chandle) = { - NULL, - NULL, - taosCloseTcpConnection, - taosCloseTcpConnection -}; +void (*taosCloseConn[])(void *chandle) = {NULL, NULL, taosCloseTcpConnection, taosCloseTcpConnection}; static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerFqdn, uint16_t peerPort, int8_t connType); static void rpcCloseConn(void *thandle); @@ -194,11 +325,11 @@ static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc); static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv); static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv); -static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext); -static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code); -static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code); -static void rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen); -static void rpcSendReqHead(SRpcConn *pConn); +static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext); +static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code); +static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code); +static void rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen); +static void rpcSendReqHead(SRpcConn *pConn); static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv); static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqContext *pContext); @@ -207,15 +338,15 @@ static void rpcProcessRetryTimer(void *, void *); static void rpcProcessIdleTimer(void *param, void *tmrId); static void rpcProcessProgressTimer(void *param, void *tmrId); -static void rpcFreeMsg(void *msg); -static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen); +static void rpcFreeMsg(void *msg); +static int32_t rpcCompressRpcMsg(char *pCont, int32_t contLen); static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead); -static int rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen); -static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen); -static void rpcLockConn(SRpcConn *pConn); -static void rpcUnlockConn(SRpcConn *pConn); -static void rpcAddRef(SRpcInfo *pRpc); -static void rpcDecRef(SRpcInfo *pRpc); +static int rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen); +static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen); +static void rpcLockConn(SRpcConn *pConn); +static void rpcUnlockConn(SRpcConn *pConn); +static void rpcAddRef(SRpcInfo *pRpc); +static void rpcDecRef(SRpcInfo *pRpc); static void rpcFree(void *p) { tTrace("free mem: %p", p); @@ -240,26 +371,26 @@ void rpcCleanup(void) { taosCloseRef(tsRpcRefId); tsRpcRefId = -1; } - + void *rpcOpen(const SRpcInit *pInit) { SRpcInfo *pRpc; - //pthread_once(&tsRpcInit, rpcInit); + // pthread_once(&tsRpcInit, rpcInit); pRpc = (SRpcInfo *)calloc(1, sizeof(SRpcInfo)); if (pRpc == NULL) return NULL; - if(pInit->label) tstrncpy(pRpc->label, pInit->label, sizeof(pRpc->label)); + if (pInit->label) tstrncpy(pRpc->label, pInit->label, sizeof(pRpc->label)); pRpc->connType = pInit->connType; if (pRpc->connType == TAOS_CONN_CLIENT) { pRpc->numOfThreads = pInit->numOfThreads; } else { - pRpc->numOfThreads = pInit->numOfThreads>TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS:pInit->numOfThreads; + pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; } pRpc->idleTime = pInit->idleTime; pRpc->localPort = pInit->localPort; pRpc->afp = pInit->afp; - pRpc->sessions = pInit->sessions+1; + pRpc->sessions = pInit->sessions + 1; if (pInit->user) tstrncpy(pRpc->user, pInit->user, sizeof(pRpc->user)); if (pInit->secret) memcpy(pRpc->secret, pInit->secret, sizeof(pRpc->secret)); if (pInit->ckey) tstrncpy(pRpc->ckey, pInit->ckey, sizeof(pRpc->ckey)); @@ -279,14 +410,14 @@ void *rpcOpen(const SRpcInit *pInit) { return NULL; } - pRpc->idPool = taosInitIdPool(pRpc->sessions-1); + pRpc->idPool = taosInitIdPool(pRpc->sessions - 1); if (pRpc->idPool == NULL) { tError("%s failed to init ID pool", pRpc->label); rpcClose(pRpc); return NULL; } - pRpc->tmrCtrl = taosTmrInit(pRpc->sessions*2 + 1, 50, 10000, pRpc->label); + pRpc->tmrCtrl = taosTmrInit(pRpc->sessions * 2 + 1, 50, 10000, pRpc->label); if (pRpc->tmrCtrl == NULL) { tError("%s failed to init timers", pRpc->label); rpcClose(pRpc); @@ -301,8 +432,8 @@ void *rpcOpen(const SRpcInit *pInit) { return NULL; } } else { - pRpc->pCache = rpcOpenConnCache(pRpc->sessions, rpcCloseConn, pRpc->tmrCtrl, pRpc->idleTime * 20); - if ( pRpc->pCache == NULL ) { + pRpc->pCache = rpcOpenConnCache(pRpc->sessions, rpcCloseConn, pRpc->tmrCtrl, pRpc->idleTime * 20); + if (pRpc->pCache == NULL) { tError("%s failed to init connection cache", pRpc->label); rpcClose(pRpc); return NULL; @@ -311,10 +442,10 @@ void *rpcOpen(const SRpcInit *pInit) { pthread_mutex_init(&pRpc->mutex, NULL); - pRpc->tcphandle = (*taosInitConn[pRpc->connType|RPC_CONN_TCP])(0, pRpc->localPort, pRpc->label, - pRpc->numOfThreads, rpcProcessMsgFromPeer, pRpc); - pRpc->udphandle = (*taosInitConn[pRpc->connType])(0, pRpc->localPort, pRpc->label, - pRpc->numOfThreads, rpcProcessMsgFromPeer, pRpc); + pRpc->tcphandle = (*taosInitConn[pRpc->connType | RPC_CONN_TCP])(0, pRpc->localPort, pRpc->label, pRpc->numOfThreads, + rpcProcessMsgFromPeer, pRpc); + pRpc->udphandle = + (*taosInitConn[pRpc->connType])(0, pRpc->localPort, pRpc->label, pRpc->numOfThreads, rpcProcessMsgFromPeer, pRpc); if (pRpc->tcphandle == NULL || pRpc->udphandle == NULL) { tError("%s failed to init network, port:%d", pRpc->label, pRpc->localPort); @@ -334,7 +465,7 @@ void rpcClose(void *param) { (*taosStopConn[pRpc->connType | RPC_CONN_TCP])(pRpc->tcphandle); (*taosStopConn[pRpc->connType])(pRpc->udphandle); - // close all connections + // close all connections for (int i = 0; i < pRpc->sessions; ++i) { if (pRpc->connList && pRpc->connList[i].user[0]) { rpcCloseConn((void *)(pRpc->connList + i)); @@ -375,8 +506,8 @@ void *rpcReallocCont(void *ptr, int contLen) { if (ptr == NULL) return rpcMallocCont(contLen); char *start = ((char *)ptr) - sizeof(SRpcReqContext) - sizeof(SRpcHead); - if (contLen == 0 ) { - free(start); + if (contLen == 0) { + free(start); return NULL; } @@ -385,17 +516,17 @@ void *rpcReallocCont(void *ptr, int contLen) { if (start == NULL) { tError("failed to realloc cont, size:%d", size); return NULL; - } + } return start + sizeof(SRpcReqContext) + sizeof(SRpcHead); } void rpcSendRequest(void *shandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *pRid) { - SRpcInfo *pRpc = (SRpcInfo *)shandle; + SRpcInfo * pRpc = (SRpcInfo *)shandle; SRpcReqContext *pContext; int contLen = rpcCompressRpcMsg(pMsg->pCont, pMsg->contLen); - pContext = (SRpcReqContext *) ((char*)pMsg->pCont-sizeof(SRpcHead)-sizeof(SRpcReqContext)); + pContext = (SRpcReqContext *)((char *)pMsg->pCont - sizeof(SRpcHead) - sizeof(SRpcReqContext)); pContext->ahandle = pMsg->ahandle; pContext->pRpc = (SRpcInfo *)shandle; pContext->epSet = *pEpSet; @@ -404,16 +535,15 @@ void rpcSendRequest(void *shandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t pContext->msgType = pMsg->msgType; pContext->oldInUse = pEpSet->inUse; - pContext->connType = RPC_CONN_UDPC; - if (contLen > tsRpcMaxUdpSize || tsRpcForceTcp ) pContext->connType = RPC_CONN_TCPC; + pContext->connType = RPC_CONN_UDPC; + if (contLen > tsRpcMaxUdpSize || tsRpcForceTcp) pContext->connType = RPC_CONN_TCPC; - // connection type is application specific. + // connection type is application specific. // for TDengine, all the query, show commands shall have TCP connection tmsg_t type = pMsg->msgType; - if (type == TDMT_VND_QUERY || type == TDMT_MND_SHOW_RETRIEVE - || type == TDMT_VND_FETCH || type == TDMT_MND_VGROUP_LIST - || type == TDMT_VND_TABLES_META || type == TDMT_VND_TABLE_META - || type == TDMT_MND_SHOW || type == TDMT_MND_STATUS || type == TDMT_VND_ALTER_TABLE) + if (type == TDMT_VND_QUERY || type == TDMT_MND_SHOW_RETRIEVE || type == TDMT_VND_FETCH || + type == TDMT_MND_VGROUP_LIST || type == TDMT_VND_TABLES_META || type == TDMT_VND_TABLE_META || + type == TDMT_MND_SHOW || type == TDMT_MND_STATUS || type == TDMT_VND_ALTER_TABLE) pContext->connType = RPC_CONN_TCPC; pContext->rid = taosAddRef(tsRpcRefId, pContext); @@ -425,26 +555,26 @@ void rpcSendRequest(void *shandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t void rpcSendResponse(const SRpcMsg *pRsp) { if (pRsp->handle == NULL) return; - int msgLen = 0; - SRpcConn *pConn = (SRpcConn *)pRsp->handle; - SRpcMsg rpcMsg = *pRsp; - SRpcMsg *pMsg = &rpcMsg; - SRpcInfo *pRpc = pConn->pRpc; + int msgLen = 0; + SRpcConn *pConn = (SRpcConn *)pRsp->handle; + SRpcMsg rpcMsg = *pRsp; + SRpcMsg * pMsg = &rpcMsg; + SRpcInfo *pRpc = pConn->pRpc; - if ( pMsg->pCont == NULL ) { + if (pMsg->pCont == NULL) { pMsg->pCont = rpcMallocCont(0); pMsg->contLen = 0; } - SRpcHead *pHead = rpcHeadFromCont(pMsg->pCont); - char *msg = (char *)pHead; + SRpcHead *pHead = rpcHeadFromCont(pMsg->pCont); + char * msg = (char *)pHead; pMsg->contLen = rpcCompressRpcMsg(pMsg->pCont, pMsg->contLen); msgLen = rpcMsgLenFromCont(pMsg->contLen); rpcLockConn(pConn); - if ( pConn->inType == 0 || pConn->user[0] == 0 ) { + if (pConn->inType == 0 || pConn->user[0] == 0) { tError("%s, connection is already released, rsp wont be sent", pConn->info); rpcUnlockConn(pConn); rpcFreeCont(pMsg->pCont); @@ -454,7 +584,7 @@ void rpcSendResponse(const SRpcMsg *pRsp) { // set msg header pHead->version = 1; - pHead->msgType = pConn->inType+1; + pHead->msgType = pConn->inType + 1; pHead->spi = pConn->spi; pHead->encrypt = pConn->encrypt; pHead->tranId = pConn->inTranId; @@ -463,13 +593,13 @@ void rpcSendResponse(const SRpcMsg *pRsp) { pHead->linkUid = pConn->linkUid; pHead->port = htons(pConn->localPort); pHead->code = htonl(pMsg->code); - pHead->ahandle = (uint64_t) pConn->ahandle; - + pHead->ahandle = (uint64_t)pConn->ahandle; + // set pConn parameters pConn->inType = 0; // response message is released until new response is sent - rpcFreeMsg(pConn->pRspMsg); + rpcFreeMsg(pConn->pRspMsg); pConn->pRspMsg = msg; pConn->rspMsgLen = msgLen; if (pMsg->code == TSDB_CODE_RPC_ACTION_IN_PROGRESS) pConn->inTranId--; @@ -482,23 +612,22 @@ void rpcSendResponse(const SRpcMsg *pRsp) { rpcSendMsgToPeer(pConn, msg, msgLen); // if not set to secured, set it expcet NOT_READY case, since client wont treat it as secured - if (pConn->secured == 0 && pMsg->code != TSDB_CODE_RPC_NOT_READY) - pConn->secured = 1; // connection shall be secured + if (pConn->secured == 0 && pMsg->code != TSDB_CODE_RPC_NOT_READY) pConn->secured = 1; // connection shall be secured if (pConn->pReqMsg) rpcFreeCont(pConn->pReqMsg); pConn->pReqMsg = NULL; pConn->reqMsgLen = 0; rpcUnlockConn(pConn); - rpcDecRef(pRpc); // decrease the referene count + rpcDecRef(pRpc); // decrease the referene count return; } void rpcSendRedirectRsp(void *thandle, const SEpSet *pEpSet) { - SRpcMsg rpcMsg; + SRpcMsg rpcMsg; memset(&rpcMsg, 0, sizeof(rpcMsg)); - + rpcMsg.contLen = sizeof(SEpSet); rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); if (rpcMsg.pCont == NULL) return; @@ -514,24 +643,24 @@ void rpcSendRedirectRsp(void *thandle, const SEpSet *pEpSet) { } int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) { - SRpcConn *pConn = (SRpcConn *)thandle; + SRpcConn *pConn = (SRpcConn *)thandle; if (pConn->user[0] == 0) return -1; pInfo->clientIp = pConn->peerIp; pInfo->clientPort = pConn->peerPort; // pInfo->serverIp = pConn->destIp; - + tstrncpy(pInfo->user, pConn->user, sizeof(pInfo->user)); return 0; } void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { SRpcReqContext *pContext; - pContext = (SRpcReqContext *) ((char*)pMsg->pCont-sizeof(SRpcHead)-sizeof(SRpcReqContext)); + pContext = (SRpcReqContext *)((char *)pMsg->pCont - sizeof(SRpcHead) - sizeof(SRpcReqContext)); memset(pRsp, 0, sizeof(SRpcMsg)); - - tsem_t sem; + + tsem_t sem; tsem_init(&sem, 0, 0); pContext->pSem = &sem; pContext->pRsp = pRsp; @@ -548,13 +677,13 @@ void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { // this API is used by server app to keep an APP context in case connection is broken int rpcReportProgress(void *handle, char *pCont, int contLen) { SRpcConn *pConn = (SRpcConn *)handle; - int code = 0; + int code = 0; rpcLockConn(pConn); if (pConn->user[0]) { // pReqMsg and reqMsgLen is re-used to store the context from app server - pConn->pReqMsg = pCont; + pConn->pReqMsg = pCont; pConn->reqMsgLen = contLen; } else { tDebug("%s, rpc connection is already released", pConn->info); @@ -567,7 +696,6 @@ int rpcReportProgress(void *handle, char *pCont, int contLen) { } void rpcCancelRequest(int64_t rid) { - SRpcReqContext *pContext = taosAcquireRef(tsRpcRefId, rid); if (pContext == NULL) return; @@ -577,7 +705,7 @@ void rpcCancelRequest(int64_t rid) { } static void rpcFreeMsg(void *msg) { - if ( msg ) { + if (msg) { char *temp = (char *)msg - sizeof(SRpcReqContext); free(temp); tTrace("free mem: %p", temp); @@ -589,14 +717,14 @@ static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerFqdn, uint16_t peerPort, uint32_t peerIp = taosGetIpv4FromFqdn(peerFqdn); if (peerIp == 0xFFFFFFFF) { - tError("%s, failed to resolve FQDN:%s", pRpc->label, peerFqdn); - terrno = TSDB_CODE_RPC_FQDN_ERROR; + tError("%s, failed to resolve FQDN:%s", pRpc->label, peerFqdn); + terrno = TSDB_CODE_RPC_FQDN_ERROR; return NULL; } - pConn = rpcAllocateClientConn(pRpc); + pConn = rpcAllocateClientConn(pRpc); - if (pConn) { + if (pConn) { tstrncpy(pConn->peerFqdn, peerFqdn, sizeof(pConn->peerFqdn)); pConn->peerIp = peerIp; pConn->peerPort = peerPort; @@ -604,7 +732,7 @@ static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerFqdn, uint16_t peerPort, pConn->connType = connType; if (taosOpenConn[connType]) { - void *shandle = (connType & RPC_CONN_TCP)? pRpc->tcphandle:pRpc->udphandle; + void *shandle = (connType & RPC_CONN_TCP) ? pRpc->tcphandle : pRpc->udphandle; pConn->chandle = (*taosOpenConn[connType])(shandle, pConn, pConn->peerIp, pConn->peerPort); if (pConn->chandle == NULL) { tError("failed to connect to:%s:%d", taosIpStr(pConn->peerIp), pConn->peerPort); @@ -629,13 +757,14 @@ static void rpcReleaseConn(SRpcConn *pConn) { taosTmrStopA(&pConn->pTimer); taosTmrStopA(&pConn->pIdleTimer); - if ( pRpc->connType == TAOS_CONN_SERVER) { - char hashstr[40] = {0}; - size_t size = snprintf(hashstr, sizeof(hashstr), "%x:%x:%x:%d", pConn->peerIp, pConn->linkUid, pConn->peerId, pConn->connType); + if (pRpc->connType == TAOS_CONN_SERVER) { + char hashstr[40] = {0}; + size_t size = snprintf(hashstr, sizeof(hashstr), "%x:%x:%x:%d", pConn->peerIp, pConn->linkUid, pConn->peerId, + pConn->connType); taosHashRemove(pRpc->hash, hashstr, size); - rpcFreeMsg(pConn->pRspMsg); // it may have a response msg saved, but not request msg + rpcFreeMsg(pConn->pRspMsg); // it may have a response msg saved, but not request msg pConn->pRspMsg = NULL; - + // if server has ever reported progress, free content if (pConn->pReqMsg) rpcFreeCont(pConn->pReqMsg); // do not use rpcFreeMsg } else { @@ -643,17 +772,17 @@ static void rpcReleaseConn(SRpcConn *pConn) { if (pConn->outType && pConn->pReqMsg) { SRpcReqContext *pContext = pConn->pContext; if (pContext) { - if (pContext->pRsp) { - // for synchronous API, post semaphore to unblock app + if (pContext->pRsp) { + // for synchronous API, post semaphore to unblock app pContext->pRsp->code = TSDB_CODE_RPC_APP_ERROR; pContext->pRsp->pCont = NULL; pContext->pRsp->contLen = 0; tsem_post(pContext->pSem); } - pContext->pConn = NULL; + pContext->pConn = NULL; taosRemoveRef(tsRpcRefId, pContext->rid); } else { - assert(0); + assert(0); } } } @@ -682,8 +811,7 @@ static void rpcCloseConn(void *thandle) { rpcLockConn(pConn); - if (pConn->user[0]) - rpcReleaseConn(pConn); + if (pConn->user[0]) rpcReleaseConn(pConn); rpcUnlockConn(pConn); } @@ -717,8 +845,9 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) { char hashstr[40] = {0}; SRpcHead *pHead = (SRpcHead *)pRecv->msg; - size_t size = snprintf(hashstr, sizeof(hashstr), "%x:%x:%x:%d", pRecv->ip, pHead->linkUid, pHead->sourceId, pRecv->connType); - + size_t size = + snprintf(hashstr, sizeof(hashstr), "%x:%x:%x:%d", pRecv->ip, pHead->linkUid, pHead->sourceId, pRecv->connType); + // check if it is already allocated SRpcConn **ppConn = (SRpcConn **)(taosHashGet(pRpc->hash, hashstr, size)); if (ppConn) pConn = *ppConn; @@ -767,22 +896,23 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) { } taosHashPut(pRpc->hash, hashstr, size, (char *)&pConn, POINTER_BYTES); - tDebug("%s %p server connection is allocated, uid:0x%x sid:%d key:%s", pRpc->label, pConn, pConn->linkUid, sid, hashstr); + tDebug("%s %p server connection is allocated, uid:0x%x sid:%d key:%s", pRpc->label, pConn, pConn->linkUid, sid, + hashstr); } return pConn; } static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv) { - SRpcConn *pConn = NULL; + SRpcConn *pConn = NULL; SRpcHead *pHead = (SRpcHead *)pRecv->msg; if (sid) { pConn = pRpc->connList + sid; if (pConn->user[0] == 0) pConn = NULL; - } + } - if (pConn == NULL) { + if (pConn == NULL) { if (pRpc->connType == TAOS_CONN_SERVER) { pConn = rpcAllocateServerConn(pRpc, pRecv); } else { @@ -805,12 +935,13 @@ static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv) { static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext) { SRpcConn *pConn; SRpcInfo *pRpc = pContext->pRpc; - SEpSet *pEpSet = &pContext->epSet; + SEpSet * pEpSet = &pContext->epSet; - pConn = rpcGetConnFromCache(pRpc->pCache, pEpSet->fqdn[pEpSet->inUse], pEpSet->port[pEpSet->inUse], pContext->connType); - if ( pConn == NULL || pConn->user[0] == 0) { + pConn = + rpcGetConnFromCache(pRpc->pCache, pEpSet->fqdn[pEpSet->inUse], pEpSet->port[pEpSet->inUse], pContext->connType); + if (pConn == NULL || pConn->user[0] == 0) { pConn = rpcOpenConn(pRpc, pEpSet->fqdn[pEpSet->inUse], pEpSet->port[pEpSet->inUse], pContext->connType); - } + } if (pConn) { pConn->tretry = 0; @@ -825,55 +956,52 @@ static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext) { } static int rpcProcessReqHead(SRpcConn *pConn, SRpcHead *pHead) { - - if (pConn->peerId == 0) { - pConn->peerId = pHead->sourceId; - } else { - if (pConn->peerId != pHead->sourceId) { - tDebug("%s, source Id is changed, old:0x%08x new:0x%08x", pConn->info, - pConn->peerId, pHead->sourceId); - return TSDB_CODE_RPC_INVALID_VALUE; - } + if (pConn->peerId == 0) { + pConn->peerId = pHead->sourceId; + } else { + if (pConn->peerId != pHead->sourceId) { + tDebug("%s, source Id is changed, old:0x%08x new:0x%08x", pConn->info, pConn->peerId, pHead->sourceId); + return TSDB_CODE_RPC_INVALID_VALUE; } + } - if (pConn->inTranId == pHead->tranId) { - if (pConn->inType == pHead->msgType) { - if (pHead->code == 0) { - tDebug("%s, %s is retransmitted", pConn->info, TMSG_INFO(pHead->msgType)); - rpcSendQuickRsp(pConn, TSDB_CODE_RPC_ACTION_IN_PROGRESS); - } else { - // do nothing, it is heart beat from client - } - } else if (pConn->inType == 0) { - tDebug("%s, %s is already processed, tranId:%d", pConn->info, TMSG_INFO(pHead->msgType), pConn->inTranId); - rpcSendMsgToPeer(pConn, pConn->pRspMsg, pConn->rspMsgLen); // resend the response + if (pConn->inTranId == pHead->tranId) { + if (pConn->inType == pHead->msgType) { + if (pHead->code == 0) { + tDebug("%s, %s is retransmitted", pConn->info, TMSG_INFO(pHead->msgType)); + rpcSendQuickRsp(pConn, TSDB_CODE_RPC_ACTION_IN_PROGRESS); } else { - tDebug("%s, mismatched message %s and tranId", pConn->info, TMSG_INFO(pHead->msgType)); + // do nothing, it is heart beat from client } - - // do not reply any message - return TSDB_CODE_RPC_ALREADY_PROCESSED; + } else if (pConn->inType == 0) { + tDebug("%s, %s is already processed, tranId:%d", pConn->info, TMSG_INFO(pHead->msgType), pConn->inTranId); + rpcSendMsgToPeer(pConn, pConn->pRspMsg, pConn->rspMsgLen); // resend the response + } else { + tDebug("%s, mismatched message %s and tranId", pConn->info, TMSG_INFO(pHead->msgType)); } - if (pConn->inType != 0) { - tDebug("%s, last session is not finished, inTranId:%d tranId:%d", pConn->info, - pConn->inTranId, pHead->tranId); - return TSDB_CODE_RPC_LAST_SESSION_NOT_FINISHED; - } + // do not reply any message + return TSDB_CODE_RPC_ALREADY_PROCESSED; + } - if (rpcContLenFromMsg(pHead->msgLen) <= 0) { - tDebug("%s, message body is empty, ignore", pConn->info); - return TSDB_CODE_RPC_APP_ERROR; - } + if (pConn->inType != 0) { + tDebug("%s, last session is not finished, inTranId:%d tranId:%d", pConn->info, pConn->inTranId, pHead->tranId); + return TSDB_CODE_RPC_LAST_SESSION_NOT_FINISHED; + } - pConn->inTranId = pHead->tranId; - pConn->inType = pHead->msgType; + if (rpcContLenFromMsg(pHead->msgLen) <= 0) { + tDebug("%s, message body is empty, ignore", pConn->info); + return TSDB_CODE_RPC_APP_ERROR; + } - // start the progress timer to monitor the response from server app - if (pConn->connType != RPC_CONN_TCPS) - pConn->pTimer = taosTmrStart(rpcProcessProgressTimer, tsProgressTimer, pConn, pConn->pRpc->tmrCtrl); - - return 0; + pConn->inTranId = pHead->tranId; + pConn->inType = pHead->msgType; + + // start the progress timer to monitor the response from server app + if (pConn->connType != RPC_CONN_TCPS) + pConn->pTimer = taosTmrStart(rpcProcessProgressTimer, tsProgressTimer, pConn, pConn->pRpc->tmrCtrl); + + return 0; } static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) { @@ -898,7 +1026,7 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) { if (pHead->code == TSDB_CODE_RPC_AUTH_REQUIRED && pRpc->spi) { tDebug("%s, authentication shall be restarted", pConn->info); pConn->secured = 0; - rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen); + rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen); if (pConn->connType != RPC_CONN_TCPC) pConn->pTimer = taosTmrStart(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl); return TSDB_CODE_RPC_ALREADY_PROCESSED; @@ -908,7 +1036,7 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) { tDebug("%s, mismatched linkUid, link shall be restarted", pConn->info); pConn->secured = 0; ((SRpcHead *)pConn->pReqMsg)->destId = 0; - rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen); + rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen); if (pConn->connType != RPC_CONN_TCPC) pConn->pTimer = taosTmrStart(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl); return TSDB_CODE_RPC_ALREADY_PROCESSED; @@ -934,25 +1062,25 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) { pConn->reqMsgLen = 0; SRpcReqContext *pContext = pConn->pContext; - if (pHead->code == TSDB_CODE_RPC_REDIRECT) { + if (pHead->code == TSDB_CODE_RPC_REDIRECT) { if (rpcContLenFromMsg(pHead->msgLen) < sizeof(SEpSet)) { // if EpSet is not included in the msg, treat it as NOT_READY - pHead->code = TSDB_CODE_RPC_NOT_READY; + pHead->code = TSDB_CODE_RPC_NOT_READY; } else { pContext->redirect++; if (pContext->redirect > TSDB_MAX_REPLICA) { - pHead->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; + pHead->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; tWarn("%s, too many redirects, quit", pConn->info); } } - } + } return TSDB_CODE_SUCCESS; } static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqContext **ppContext) { - int32_t sid; - SRpcConn *pConn = NULL; + int32_t sid; + SRpcConn *pConn = NULL; SRpcHead *pHead = (SRpcHead *)pRecv->msg; @@ -961,25 +1089,29 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqCont if (TMSG_INDEX(pHead->msgType) >= TDMT_MAX || TMSG_INDEX(pHead->msgType) <= 0) { tDebug("%s sid:%d, invalid message type:%d", pRpc->label, sid, pHead->msgType); - terrno = TSDB_CODE_RPC_INVALID_MSG_TYPE; return NULL; + terrno = TSDB_CODE_RPC_INVALID_MSG_TYPE; + return NULL; } if (sid < 0 || sid >= pRpc->sessions) { - tDebug("%s sid:%d, sid is out of range, max sid:%d, %s discarded", pRpc->label, sid, - pRpc->sessions, TMSG_INFO(pHead->msgType)); - terrno = TSDB_CODE_RPC_INVALID_SESSION_ID; return NULL; + tDebug("%s sid:%d, sid is out of range, max sid:%d, %s discarded", pRpc->label, sid, pRpc->sessions, + TMSG_INFO(pHead->msgType)); + terrno = TSDB_CODE_RPC_INVALID_SESSION_ID; + return NULL; } if (rpcIsReq(pHead->msgType) && htonl(pHead->msgVer) != tsVersion >> 8) { - tDebug("%s sid:%d, invalid client version:%x/%x %s", pRpc->label, sid, htonl(pHead->msgVer), tsVersion, TMSG_INFO(pHead->msgType)); - terrno = TSDB_CODE_RPC_INVALID_VERSION; return NULL; + tDebug("%s sid:%d, invalid client version:%x/%x %s", pRpc->label, sid, htonl(pHead->msgVer), tsVersion, + TMSG_INFO(pHead->msgType)); + terrno = TSDB_CODE_RPC_INVALID_VERSION; + return NULL; } pConn = rpcGetConnObj(pRpc, sid, pRecv); if (pConn == NULL) { - tDebug("%s %p, failed to get connection obj(%s)", pRpc->label, (void *)pHead->ahandle, tstrerror(terrno)); + tDebug("%s %p, failed to get connection obj(%s)", pRpc->label, (void *)pHead->ahandle, tstrerror(terrno)); return NULL; - } + } rpcLockConn(pConn); @@ -990,9 +1122,9 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqCont sid = pConn->sid; if (pConn->chandle == NULL) pConn->chandle = pRecv->chandle; - pConn->peerIp = pRecv->ip; + pConn->peerIp = pRecv->ip; pConn->peerPort = pRecv->port; - if (pHead->port) pConn->peerPort = htons(pHead->port); + if (pHead->port) pConn->peerPort = htons(pHead->port); terrno = rpcCheckAuthentication(pConn, (char *)pHead, pRecv->msgLen); @@ -1004,16 +1136,16 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqCont // decrypt here } - if ( rpcIsReq(pHead->msgType) ) { + if (rpcIsReq(pHead->msgType)) { pConn->connType = pRecv->connType; terrno = rpcProcessReqHead(pConn, pHead); // stop idle timer - taosTmrStopA(&pConn->pIdleTimer); + taosTmrStopA(&pConn->pIdleTimer); - // client shall send the request within tsRpcTime again for UDP, double it + // client shall send the request within tsRpcTime again for UDP, double it if (pConn->connType != RPC_CONN_TCPS) - pConn->pIdleTimer = taosTmrStart(rpcProcessIdleTimer, tsRpcTimer*2, pConn, pRpc->tmrCtrl); + pConn->pIdleTimer = taosTmrStart(rpcProcessIdleTimer, tsRpcTimer * 2, pConn, pRpc->tmrCtrl); } else { terrno = rpcProcessRspHead(pConn, pHead); *ppContext = pConn->pContext; @@ -1026,9 +1158,9 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqCont } static void doRpcReportBrokenLinkToServer(void *param, void *id) { - SRpcMsg *pRpcMsg = (SRpcMsg *)(param); - SRpcConn *pConn = (SRpcConn *)(pRpcMsg->handle); - SRpcInfo *pRpc = pConn->pRpc; + SRpcMsg * pRpcMsg = (SRpcMsg *)(param); + SRpcConn *pConn = (SRpcConn *)(pRpcMsg->handle); + SRpcInfo *pRpc = pConn->pRpc; (*(pRpc->cfp))(pRpc->parent, pRpcMsg, NULL); free(pRpcMsg); } @@ -1041,12 +1173,12 @@ static void rpcReportBrokenLinkToServer(SRpcConn *pConn) { tDebug("%s, notify the server app, connection is gone", pConn->info); SRpcMsg *rpcMsg = malloc(sizeof(SRpcMsg)); - rpcMsg->pCont = pConn->pReqMsg; // pReqMsg is re-used to store the APP context from server - rpcMsg->contLen = pConn->reqMsgLen; // reqMsgLen is re-used to store the APP context length + rpcMsg->pCont = pConn->pReqMsg; // pReqMsg is re-used to store the APP context from server + rpcMsg->contLen = pConn->reqMsgLen; // reqMsgLen is re-used to store the APP context length rpcMsg->ahandle = pConn->ahandle; rpcMsg->handle = pConn; rpcMsg->msgType = pConn->inType; - rpcMsg->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; + rpcMsg->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; pConn->pReqMsg = NULL; pConn->reqMsgLen = 0; if (pRpc->cfp) { @@ -1070,22 +1202,22 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) { pConn->pReqMsg = NULL; taosTmrStart(rpcProcessConnError, 0, pContext, pRpc->tmrCtrl); } - - if (pConn->inType) rpcReportBrokenLinkToServer(pConn); + + if (pConn->inType) rpcReportBrokenLinkToServer(pConn); rpcReleaseConn(pConn); rpcUnlockConn(pConn); } static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { - SRpcHead *pHead = (SRpcHead *)pRecv->msg; - SRpcInfo *pRpc = (SRpcInfo *)pRecv->shandle; - SRpcConn *pConn = (SRpcConn *)pRecv->thandle; + SRpcHead *pHead = (SRpcHead *)pRecv->msg; + SRpcInfo *pRpc = (SRpcInfo *)pRecv->shandle; + SRpcConn *pConn = (SRpcConn *)pRecv->thandle; tDump(pRecv->msg, pRecv->msgLen); // underlying UDP layer does not know it is server or client - pRecv->connType = pRecv->connType | pRpc->connType; + pRecv->connType = pRecv->connType | pRpc->connType; if (pRecv->msg == NULL) { rpcProcessBrokenLink(pConn); @@ -1100,62 +1232,62 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { taosIpPort2String(pRecv->ip, pRecv->port, ipstr); if (TMSG_INDEX(pHead->msgType) >= 1 && TMSG_INDEX(pHead->msgType) < TDMT_MAX) { - tDebug("%s %p %p, %s received from %s, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", pRpc->label, - pConn, (void *)pHead->ahandle, TMSG_INFO(pHead->msgType), ipstr, terrno, pRecv->msgLen, - pHead->sourceId, pHead->destId, pHead->tranId, pHead->code); + tDebug("%s %p %p, %s received from %s, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", pRpc->label, pConn, + (void *)pHead->ahandle, TMSG_INFO(pHead->msgType), ipstr, terrno, pRecv->msgLen, pHead->sourceId, + pHead->destId, pHead->tranId, pHead->code); } else { - tDebug("%s %p %p, %d received from %s, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", pRpc->label, - pConn, (void *)pHead->ahandle, pHead->msgType, ipstr, terrno, pRecv->msgLen, - pHead->sourceId, pHead->destId, pHead->tranId, pHead->code); + tDebug("%s %p %p, %d received from %s, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", pRpc->label, pConn, + (void *)pHead->ahandle, pHead->msgType, ipstr, terrno, pRecv->msgLen, pHead->sourceId, pHead->destId, + pHead->tranId, pHead->code); } int32_t code = terrno; if (code != TSDB_CODE_RPC_ALREADY_PROCESSED) { - if (code != 0) { // parsing error + if (code != 0) { // parsing error if (rpcIsReq(pHead->msgType)) { rpcSendErrorMsgToPeer(pRecv, code); if (code == TSDB_CODE_RPC_INVALID_TIME_STAMP || code == TSDB_CODE_RPC_AUTH_FAILURE) { rpcCloseConn(pConn); } if (TMSG_INDEX(pHead->msgType) + 1 > 1 && TMSG_INDEX(pHead->msgType) + 1 < TDMT_MAX) { - tDebug("%s %p %p, %s is sent with error code:0x%x", pRpc->label, pConn, (void *)pHead->ahandle, TMSG_INFO(pHead->msgType+1), code); + tDebug("%s %p %p, %s is sent with error code:0x%x", pRpc->label, pConn, (void *)pHead->ahandle, + TMSG_INFO(pHead->msgType + 1), code); } else { - tError("%s %p %p, %s is sent with error code:0x%x", pRpc->label, pConn, (void *)pHead->ahandle, TMSG_INFO(pHead->msgType), code); + tError("%s %p %p, %s is sent with error code:0x%x", pRpc->label, pConn, (void *)pHead->ahandle, + TMSG_INFO(pHead->msgType), code); } - } - } else { // msg is passed to app only parsing is ok + } + } else { // msg is passed to app only parsing is ok rpcProcessIncomingMsg(pConn, pHead, pContext); } } - if (code) rpcFreeMsg(pRecv->msg); // parsing failed, msg shall be freed + if (code) rpcFreeMsg(pRecv->msg); // parsing failed, msg shall be freed return pConn; } static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) { - SRpcInfo *pRpc = pContext->pRpc; + SRpcInfo *pRpc = pContext->pRpc; pContext->pConn = NULL; - if (pContext->pRsp) { + if (pContext->pRsp) { // for synchronous API memcpy(pContext->pSet, &pContext->epSet, sizeof(SEpSet)); memcpy(pContext->pRsp, pMsg, sizeof(SRpcMsg)); tsem_post(pContext->pSem); } else { - // for asynchronous API + // for asynchronous API SEpSet *pEpSet = NULL; - if (pContext->epSet.inUse != pContext->oldInUse || pContext->redirect) - pEpSet = &pContext->epSet; + if (pContext->epSet.inUse != pContext->oldInUse || pContext->redirect) pEpSet = &pContext->epSet; (*pRpc->cfp)(pRpc->parent, pMsg, pEpSet); } // free the request message - taosRemoveRef(tsRpcRefId, pContext->rid); + taosRemoveRef(tsRpcRefId, pContext->rid); } static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqContext *pContext) { - SRpcInfo *pRpc = pConn->pRpc; SRpcMsg rpcMsg; @@ -1180,14 +1312,15 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte // for UDP, port may be changed by server, the port in epSet shall be used for cache if (pHead->code != TSDB_CODE_RPC_TOO_SLOW) { - rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerFqdn, pContext->epSet.port[pContext->epSet.inUse], pConn->connType); + rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerFqdn, pContext->epSet.port[pContext->epSet.inUse], + pConn->connType); } else { rpcCloseConn(pConn); } if (pHead->code == TSDB_CODE_RPC_REDIRECT) { pContext->numOfTry = 0; - SEpSet *pEpSet = (SEpSet*)pHead->content; + SEpSet *pEpSet = (SEpSet *)pHead->content; if (pEpSet->numOfEps > 0) { memcpy(&pContext->epSet, pHead->content, sizeof(pContext->epSet)); tDebug("%s, redirect is received, numOfEps:%d inUse:%d", pConn->info, pContext->epSet.numOfEps, @@ -1200,7 +1333,8 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte } rpcSendReqToServer(pRpc, pContext); rpcFreeCont(rpcMsg.pCont); - } else if (pHead->code == TSDB_CODE_RPC_NOT_READY || pHead->code == TSDB_CODE_APP_NOT_READY || pHead->code == TSDB_CODE_DND_OFFLINE) { + } else if (pHead->code == TSDB_CODE_RPC_NOT_READY || pHead->code == TSDB_CODE_APP_NOT_READY || + pHead->code == TSDB_CODE_DND_OFFLINE) { pContext->code = pHead->code; rpcProcessConnError(pContext, NULL); rpcFreeCont(rpcMsg.pCont); @@ -1211,14 +1345,14 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte } static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code) { - char msg[RPC_MSG_OVERHEAD]; - SRpcHead *pHead; + char msg[RPC_MSG_OVERHEAD]; + SRpcHead *pHead; // set msg header memset(msg, 0, sizeof(SRpcHead)); pHead = (SRpcHead *)msg; pHead->version = 1; - pHead->msgType = pConn->inType+1; + pHead->msgType = pConn->inType + 1; pHead->spi = pConn->spi; pHead->encrypt = 0; pHead->tranId = pConn->inTranId; @@ -1230,12 +1364,12 @@ static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code) { pHead->code = htonl(code); rpcSendMsgToPeer(pConn, msg, sizeof(SRpcHead)); - pConn->secured = 1; // connection shall be secured + pConn->secured = 1; // connection shall be secured } static void rpcSendReqHead(SRpcConn *pConn) { - char msg[RPC_MSG_OVERHEAD]; - SRpcHead *pHead; + char msg[RPC_MSG_OVERHEAD]; + SRpcHead *pHead; // set msg header memset(msg, 0, sizeof(SRpcHead)); @@ -1257,10 +1391,10 @@ static void rpcSendReqHead(SRpcConn *pConn) { } static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code) { - SRpcHead *pRecvHead, *pReplyHead; - char msg[sizeof(SRpcHead) + sizeof(SRpcDigest) + sizeof(uint32_t) ]; - uint32_t timeStamp; - int msgLen; + SRpcHead *pRecvHead, *pReplyHead; + char msg[sizeof(SRpcHead) + sizeof(SRpcDigest) + sizeof(uint32_t)]; + uint32_t timeStamp; + int msgLen; pRecvHead = (SRpcHead *)pRecv->msg; pReplyHead = (SRpcHead *)msg; @@ -1290,14 +1424,14 @@ static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code) { pReplyHead->msgLen = (int32_t)htonl((uint32_t)msgLen); (*taosSendData[pRecv->connType])(pRecv->ip, pRecv->port, msg, msgLen, pRecv->chandle); - return; + return; } static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { - SRpcHead *pHead = rpcHeadFromCont(pContext->pCont); - char *msg = (char *)pHead; - int msgLen = rpcMsgLenFromCont(pContext->contLen); - tmsg_t msgType = pContext->msgType; + SRpcHead *pHead = rpcHeadFromCont(pContext->pCont); + char * msg = (char *)pHead; + int msgLen = rpcMsgLenFromCont(pContext->contLen); + tmsg_t msgType = pContext->msgType; pContext->numOfTry++; SRpcConn *pConn = rpcSetupConnToServer(pContext); @@ -1311,13 +1445,13 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { pConn->ahandle = pContext->ahandle; rpcLockConn(pConn); - // set the message header + // set the message header pHead->version = 1; pHead->msgVer = htonl(tsVersion >> 8); pHead->msgType = msgType; pHead->encrypt = 0; pConn->tranId++; - if ( pConn->tranId == 0 ) pConn->tranId++; + if (pConn->tranId == 0) pConn->tranId++; pHead->tranId = pConn->tranId; pHead->sourceId = pConn->ownId; pHead->destId = pConn->peerId; @@ -1341,45 +1475,43 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { } static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) { - int writtenLen = 0; - SRpcHead *pHead = (SRpcHead *)msg; + int writtenLen = 0; + SRpcHead *pHead = (SRpcHead *)msg; msgLen = rpcAddAuthPart(pConn, msg, msgLen); - if ( rpcIsReq(pHead->msgType)) { - tDebug("%s, %s is sent to %s:%hu, len:%d sig:0x%08x:0x%08x:%d", - pConn->info, TMSG_INFO(pHead->msgType), pConn->peerFqdn, pConn->peerPort, - msgLen, pHead->sourceId, pHead->destId, pHead->tranId); + if (rpcIsReq(pHead->msgType)) { + tDebug("%s, %s is sent to %s:%hu, len:%d sig:0x%08x:0x%08x:%d", pConn->info, TMSG_INFO(pHead->msgType), + pConn->peerFqdn, pConn->peerPort, msgLen, pHead->sourceId, pHead->destId, pHead->tranId); } else { - if (pHead->code == 0) pConn->secured = 1; // for success response, set link as secured - tDebug("%s, %s is sent to 0x%x:%hu, code:0x%x len:%d sig:0x%08x:0x%08x:%d", - pConn->info, TMSG_INFO(pHead->msgType), pConn->peerIp, pConn->peerPort, - htonl(pHead->code), msgLen, pHead->sourceId, pHead->destId, pHead->tranId); + if (pHead->code == 0) pConn->secured = 1; // for success response, set link as secured + tDebug("%s, %s is sent to 0x%x:%hu, code:0x%x len:%d sig:0x%08x:0x%08x:%d", pConn->info, TMSG_INFO(pHead->msgType), + pConn->peerIp, pConn->peerPort, htonl(pHead->code), msgLen, pHead->sourceId, pHead->destId, pHead->tranId); } - //tTrace("connection type is: %d", pConn->connType); + // tTrace("connection type is: %d", pConn->connType); writtenLen = (*taosSendData[pConn->connType])(pConn->peerIp, pConn->peerPort, pHead, msgLen, pConn->chandle); if (writtenLen != msgLen) { tError("%s, failed to send, msgLen:%d written:%d, reason:%s", pConn->info, msgLen, writtenLen, strerror(errno)); } - + tDump(msg, msgLen); } static void rpcProcessConnError(void *param, void *id) { SRpcReqContext *pContext = (SRpcReqContext *)param; - SRpcInfo *pRpc = pContext->pRpc; + SRpcInfo * pRpc = pContext->pRpc; SRpcMsg rpcMsg; - + if (pRpc == NULL) { return; } - + tDebug("%s %p, connection error happens", pRpc->label, pContext->ahandle); if (pContext->numOfTry >= pContext->epSet.numOfEps || pContext->msgType == TDMT_VND_FETCH) { - rpcMsg.msgType = pContext->msgType+1; + rpcMsg.msgType = pContext->msgType + 1; rpcMsg.ahandle = pContext->ahandle; rpcMsg.code = pContext->code; rpcMsg.pCont = NULL; @@ -1387,7 +1519,7 @@ static void rpcProcessConnError(void *param, void *id) { rpcNotifyClient(pContext, &rpcMsg); } else { - // move to next IP + // move to next IP pContext->epSet.inUse++; pContext->epSet.inUse = pContext->epSet.inUse % pContext->epSet.numOfEps; rpcSendReqToServer(pRpc, pContext); @@ -1407,11 +1539,12 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) { if (pConn->retry < 4) { tDebug("%s, re-send msg:%s to %s:%hu", pConn->info, TMSG_INFO(pConn->outType), pConn->peerFqdn, pConn->peerPort); - rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen); + rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen); pConn->pTimer = taosTmrStart(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl); } else { // close the connection - tDebug("%s, failed to send msg:%s to %s:%hu", pConn->info, TMSG_INFO(pConn->outType), pConn->peerFqdn, pConn->peerPort); + tDebug("%s, failed to send msg:%s to %s:%hu", pConn->info, TMSG_INFO(pConn->outType), pConn->peerFqdn, + pConn->peerPort); if (pConn->pContext) { pConn->pContext->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; pConn->pContext->pConn = NULL; @@ -1434,7 +1567,7 @@ static void rpcProcessIdleTimer(void *param, void *tmrId) { if (pConn->user[0]) { tDebug("%s, close the connection since no activity", pConn->info); - if (pConn->inType) rpcReportBrokenLinkToServer(pConn); + if (pConn->inType) rpcReportBrokenLinkToServer(pConn); rpcReleaseConn(pConn); } else { tDebug("%s, idle timer:%p not processed", pConn->info, tmrId); @@ -1460,34 +1593,34 @@ static void rpcProcessProgressTimer(void *param, void *tmrId) { rpcUnlockConn(pConn); } -static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) { - SRpcHead *pHead = rpcHeadFromCont(pCont); - int32_t finalLen = 0; - int overhead = sizeof(SRpcComp); - +static int32_t rpcCompressRpcMsg(char *pCont, int32_t contLen) { + SRpcHead *pHead = rpcHeadFromCont(pCont); + int32_t finalLen = 0; + int overhead = sizeof(SRpcComp); + if (!NEEDTO_COMPRESSS_MSG(contLen)) { return contLen; } - - char *buf = malloc (contLen + overhead + 8); // 8 extra bytes + + char *buf = malloc(contLen + overhead + 8); // 8 extra bytes if (buf == NULL) { tError("failed to allocate memory for rpc msg compression, contLen:%d", contLen); return contLen; } - + int32_t compLen = LZ4_compress_default(pCont, buf, contLen, contLen + overhead); tDebug("compress rpc msg, before:%d, after:%d, overhead:%d", contLen, compLen, overhead); - + /* * only the compressed size is less than the value of contLen - overhead, the compression is applied * The first four bytes is set to 0, the second four bytes are utilized to keep the original length of message */ if (compLen > 0 && compLen < contLen - overhead) { SRpcComp *pComp = (SRpcComp *)pCont; - pComp->reserved = 0; - pComp->contLen = htonl(contLen); + pComp->reserved = 0; + pComp->contLen = htonl(contLen); memcpy(pCont + overhead, buf, compLen); - + pHead->comp = 1; tDebug("compress rpc msg, before:%d, after:%d", contLen, compLen); finalLen = compLen + overhead; @@ -1500,29 +1633,29 @@ static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) { } static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead) { - int overhead = sizeof(SRpcComp); - SRpcHead *pNewHead = NULL; - uint8_t *pCont = pHead->content; - SRpcComp *pComp = (SRpcComp *)pHead->content; + int overhead = sizeof(SRpcComp); + SRpcHead *pNewHead = NULL; + uint8_t * pCont = pHead->content; + SRpcComp *pComp = (SRpcComp *)pHead->content; if (pHead->comp) { // decompress the content assert(pComp->reserved == 0); int contLen = htonl(pComp->contLen); - + // prepare the temporary buffer to decompress message char *temp = (char *)malloc(contLen + RPC_MSG_OVERHEAD); - pNewHead = (SRpcHead *)(temp + sizeof(SRpcReqContext)); // reserve SRpcReqContext - + pNewHead = (SRpcHead *)(temp + sizeof(SRpcReqContext)); // reserve SRpcReqContext + if (pNewHead) { int compLen = rpcContLenFromMsg(pHead->msgLen) - overhead; - int origLen = LZ4_decompress_safe((char*)(pCont + overhead), (char *)pNewHead->content, compLen, contLen); + int origLen = LZ4_decompress_safe((char *)(pCont + overhead), (char *)pNewHead->content, compLen, contLen); assert(origLen == contLen); - + memcpy(pNewHead, pHead, sizeof(SRpcHead)); pNewHead->msgLen = rpcMsgLenFromCont(origLen); - rpcFreeMsg(pHead); // free the compressed message buffer - pHead = pNewHead; + rpcFreeMsg(pHead); // free the compressed message buffer + pHead = pNewHead; tTrace("decomp malloc mem:%p", temp); } else { tError("failed to allocate memory to decompress msg, contLen:%d", contLen); @@ -1534,7 +1667,7 @@ static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead) { static int rpcAuthenticateMsg(void *pMsg, int msgLen, void *pAuth, void *pKey) { T_MD5_CTX context; - int ret = -1; + int ret = -1; tMD5Init(&context); tMD5Update(&context, (uint8_t *)pKey, TSDB_PASSWORD_LEN); @@ -1582,25 +1715,25 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) { SRpcHead *pHead = (SRpcHead *)msg; int code = 0; - if ((pConn->secured && pHead->spi == 0) || (pHead->spi == 0 && pConn->spi == 0)){ - // secured link, or no authentication + if ((pConn->secured && pHead->spi == 0) || (pHead->spi == 0 && pConn->spi == 0)) { + // secured link, or no authentication pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen); // tTrace("%s, secured link, no auth is required", pConn->info); return 0; } - if ( !rpcIsReq(pHead->msgType) ) { + if (!rpcIsReq(pHead->msgType)) { // for response, if code is auth failure, it shall bypass the auth process code = htonl(pHead->code); if (code == TSDB_CODE_RPC_INVALID_TIME_STAMP || code == TSDB_CODE_RPC_AUTH_FAILURE || - code == TSDB_CODE_RPC_INVALID_VERSION || - code == TSDB_CODE_RPC_AUTH_REQUIRED || code == TSDB_CODE_MND_USER_NOT_EXIST || code == TSDB_CODE_RPC_NOT_READY) { + code == TSDB_CODE_RPC_INVALID_VERSION || code == TSDB_CODE_RPC_AUTH_REQUIRED || + code == TSDB_CODE_MND_USER_NOT_EXIST || code == TSDB_CODE_RPC_NOT_READY) { pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen); // tTrace("%s, dont check authentication since code is:0x%x", pConn->info, code); return 0; } } - + code = 0; if (pHead->spi == pConn->spi) { // authentication @@ -1613,12 +1746,12 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) { tWarn("%s, time diff:%d is too big, msg discarded", pConn->info, delta); code = TSDB_CODE_RPC_INVALID_TIME_STAMP; } else { - if (rpcAuthenticateMsg(pHead, msgLen-TSDB_AUTH_LEN, pDigest->auth, pConn->secret) < 0) { + if (rpcAuthenticateMsg(pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret) < 0) { tDebug("%s, authentication failed, msg discarded", pConn->info); code = TSDB_CODE_RPC_AUTH_FAILURE; } else { pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen) - sizeof(SRpcDigest); - if ( !rpcIsReq(pHead->msgType) ) pConn->secured = 1; // link is secured for client + if (!rpcIsReq(pHead->msgType)) pConn->secured = 1; // link is secured for client // tTrace("%s, message is authenticated", pConn->info); } } @@ -1647,13 +1780,9 @@ static void rpcUnlockConn(SRpcConn *pConn) { } } -static void rpcAddRef(SRpcInfo *pRpc) -{ - atomic_add_fetch_32(&pRpc->refCount, 1); -} +static void rpcAddRef(SRpcInfo *pRpc) { atomic_add_fetch_32(&pRpc->refCount, 1); } -static void rpcDecRef(SRpcInfo *pRpc) -{ +static void rpcDecRef(SRpcInfo *pRpc) { if (atomic_sub_fetch_32(&pRpc->refCount, 1) == 0) { rpcCloseConnCache(pRpc->pCache); taosHashCleanup(pRpc->hash); @@ -1668,4 +1797,4 @@ static void rpcDecRef(SRpcInfo *pRpc) atomic_sub_fetch_32(&tsRpcNum, 1); } } - +#endif diff --git a/source/libs/transport/src/rpcTcp.c b/source/libs/transport/src/rpcTcp.c index d0710c883fe3f80bf213bcdb3ae5ce7f81f3e7bf..a3e1f2434b5e8b4394b3367e4fd1fbd8387d69a1 100644 --- a/source/libs/transport/src/rpcTcp.c +++ b/source/libs/transport/src/rpcTcp.c @@ -13,24 +13,28 @@ * along with this program. If not, see . */ +#include "rpcTcp.h" +#include #include "os.h" -#include "tutil.h" +#include "rpcHead.h" +#include "rpcLog.h" #include "taosdef.h" #include "taoserror.h" -#include "rpcLog.h" -#include "rpcHead.h" -#include "rpcTcp.h" +#include "tutil.h" +#ifdef USE_UV + +#else typedef struct SFdObj { - void *signature; - SOCKET fd; // TCP socket FD - void *thandle; // handle from upper layer, like TAOS + void * signature; + SOCKET fd; // TCP socket FD + void * thandle; // handle from upper layer, like TAOS uint32_t ip; uint16_t port; - int16_t closedByApp; // 1: already closed by App + int16_t closedByApp; // 1: already closed by App struct SThreadObj *pThreadObj; - struct SFdObj *prev; - struct SFdObj *next; + struct SFdObj * prev; + struct SFdObj * next; } SFdObj; typedef struct SThreadObj { @@ -43,35 +47,35 @@ typedef struct SThreadObj { int numOfFds; int threadId; char label[TSDB_LABEL_LEN]; - void *shandle; // handle passed by upper layer during server initialization - void *(*processData)(SRecvInfo *pPacket); + void * shandle; // handle passed by upper layer during server initialization + void *(*processData)(SRecvInfo *pPacket); } SThreadObj; typedef struct { - char label[TSDB_LABEL_LEN]; - int32_t index; - int numOfThreads; + char label[TSDB_LABEL_LEN]; + int32_t index; + int numOfThreads; SThreadObj **pThreadObj; } SClientObj; typedef struct { - SOCKET fd; - uint32_t ip; - uint16_t port; - int8_t stop; - int8_t reserve; - char label[TSDB_LABEL_LEN]; - int numOfThreads; - void * shandle; + SOCKET fd; + uint32_t ip; + uint16_t port; + int8_t stop; + int8_t reserve; + char label[TSDB_LABEL_LEN]; + int numOfThreads; + void * shandle; SThreadObj **pThreadObj; - pthread_t thread; + pthread_t thread; } SServerObj; -static void *taosProcessTcpData(void *param); +static void * taosProcessTcpData(void *param); static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, SOCKET fd); static void taosFreeFdObj(SFdObj *pFdObj); static void taosReportBrokenLink(SFdObj *pFdObj); -static void *taosAcceptTcpConnection(void *arg); +static void * taosAcceptTcpConnection(void *arg); void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle) { SServerObj *pServerObj; @@ -99,7 +103,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread return NULL; } - int code = 0; + int code = 0; pthread_attr_t thattr; pthread_attr_init(&thattr); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); @@ -110,7 +114,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread if (pThreadObj == NULL) { tError("TCP:%s no enough memory", label); terrno = TAOS_SYSTEM_ERROR(errno); - for (int j=0; jpThreadObj[j]); + for (int j = 0; j < i; ++j) free(pServerObj->pThreadObj[j]); free(pServerObj->pThreadObj); free(pServerObj); return NULL; @@ -172,8 +176,10 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread return (void *)pServerObj; } -static void taosStopTcpThread(SThreadObj* pThreadObj) { - if (pThreadObj == NULL) { return;} +static void taosStopTcpThread(SThreadObj *pThreadObj) { + if (pThreadObj == NULL) { + return; + } // save thread into local variable and signal thread to stop pthread_t thread = pThreadObj->thread; if (!taosCheckPthreadValid(thread)) { @@ -194,7 +200,7 @@ void taosStopTcpServer(void *handle) { pServerObj->stop = 1; if (pServerObj->fd >= 0) { - taosShutDownSocketRD(pServerObj->fd); + taosShutDownSocketRD(pServerObj->fd); } if (taosCheckPthreadValid(pServerObj->thread)) { if (taosComparePthread(pServerObj->thread, pthread_self())) { @@ -227,8 +233,8 @@ static void *taosAcceptTcpConnection(void *arg) { SOCKET connFd = -1; struct sockaddr_in caddr; int threadId = 0; - SThreadObj *pThreadObj; - SServerObj *pServerObj; + SThreadObj * pThreadObj; + SServerObj * pServerObj; pServerObj = (SServerObj *)arg; tDebug("%s TCP server is ready, ip:0x%x:%hu", pServerObj->label, pServerObj->ip, pServerObj->port); @@ -253,8 +259,8 @@ static void *taosAcceptTcpConnection(void *arg) { } taosKeepTcpAlive(connFd); - struct timeval to={5, 0}; - int32_t ret = taosSetSockOpt(connFd, SOL_SOCKET, SO_RCVTIMEO, &to, sizeof(to)); + struct timeval to = {5, 0}; + int32_t ret = taosSetSockOpt(connFd, SOL_SOCKET, SO_RCVTIMEO, &to, sizeof(to)); if (ret != 0) { taosCloseSocket(connFd); tError("%s failed to set recv timeout fd(%s)for connection from:%s:%hu", pServerObj->label, strerror(errno), @@ -262,7 +268,6 @@ static void *taosAcceptTcpConnection(void *arg) { continue; } - // pick up the thread to handle this connection pThreadObj = pServerObj->pThreadObj[threadId]; @@ -271,7 +276,7 @@ static void *taosAcceptTcpConnection(void *arg) { pFdObj->ip = caddr.sin_addr.s_addr; pFdObj->port = htons(caddr.sin_port); tDebug("%s new TCP connection from %s:%hu, fd:%d FD:%p numOfFds:%d", pServerObj->label, - taosInetNtoa(caddr.sin_addr), pFdObj->port, connFd, pFdObj, pThreadObj->numOfFds); + taosInetNtoa(caddr.sin_addr), pFdObj->port, connFd, pFdObj, pThreadObj->numOfFds); } else { taosCloseSocket(connFd); tError("%s failed to malloc FdObj(%s) for connection from:%s:%hu", pServerObj->label, strerror(errno), @@ -297,14 +302,14 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int numOfThread tstrncpy(pClientObj->label, label, sizeof(pClientObj->label)); pClientObj->numOfThreads = numOfThreads; - pClientObj->pThreadObj = (SThreadObj **)calloc(numOfThreads, sizeof(SThreadObj*)); + pClientObj->pThreadObj = (SThreadObj **)calloc(numOfThreads, sizeof(SThreadObj *)); if (pClientObj->pThreadObj == NULL) { tError("TCP:%s no enough memory", label); tfree(pClientObj); terrno = TAOS_SYSTEM_ERROR(errno); } - int code = 0; + int code = 0; pthread_attr_t thattr; pthread_attr_init(&thattr); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); @@ -314,15 +319,15 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int numOfThread if (pThreadObj == NULL) { tError("TCP:%s no enough memory", label); terrno = TAOS_SYSTEM_ERROR(errno); - for (int j=0; jpThreadObj[j]); + for (int j = 0; j < i; ++j) free(pClientObj->pThreadObj[j]); free(pClientObj); pthread_attr_destroy(&thattr); return NULL; } pClientObj->pThreadObj[i] = pThreadObj; taosResetPthread(&pThreadObj->thread); - pThreadObj->ip = ip; - pThreadObj->stop = false; + pThreadObj->ip = ip; + pThreadObj->stop = false; tstrncpy(pThreadObj->label, label, sizeof(pThreadObj->label)); pThreadObj->shandle = shandle; pThreadObj->processData = fp; @@ -364,14 +369,14 @@ void taosStopTcpClient(void *chandle) { if (pClientObj == NULL) return; - tDebug ("%s TCP client is stopped", pClientObj->label); + tDebug("%s TCP client is stopped", pClientObj->label); } void taosCleanUpTcpClient(void *chandle) { SClientObj *pClientObj = chandle; if (pClientObj == NULL) return; for (int i = 0; i < pClientObj->numOfThreads; ++i) { - SThreadObj *pThreadObj= pClientObj->pThreadObj[i]; + SThreadObj *pThreadObj = pClientObj->pThreadObj[i]; taosStopTcpThread(pThreadObj); } @@ -381,9 +386,9 @@ void taosCleanUpTcpClient(void *chandle) { } void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uint16_t port) { - SClientObj * pClientObj = shandle; - int32_t index = atomic_load_32(&pClientObj->index) % pClientObj->numOfThreads; - atomic_store_32(&pClientObj->index, index + 1); + SClientObj *pClientObj = shandle; + int32_t index = atomic_load_32(&pClientObj->index) % pClientObj->numOfThreads; + atomic_store_32(&pClientObj->index, index + 1); SThreadObj *pThreadObj = pClientObj->pThreadObj[index]; SOCKET fd = taosOpenTcpClientSocket(ip, port, pThreadObj->ip); @@ -394,10 +399,9 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uin #endif struct sockaddr_in sin; - uint16_t localPort = 0; - unsigned int addrlen = sizeof(sin); - if (getsockname(fd, (struct sockaddr *)&sin, &addrlen) == 0 && - sin.sin_family == AF_INET && addrlen == sizeof(sin)) { + uint16_t localPort = 0; + unsigned int addrlen = sizeof(sin); + if (getsockname(fd, (struct sockaddr *)&sin, &addrlen) == 0 && sin.sin_family == AF_INET && addrlen == sizeof(sin)) { localPort = (uint16_t)ntohs(sin.sin_port); } @@ -407,8 +411,8 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uin pFdObj->thandle = thandle; pFdObj->port = port; pFdObj->ip = ip; - tDebug("%s %p TCP connection to 0x%x:%hu is created, localPort:%hu FD:%p numOfFds:%d", - pThreadObj->label, thandle, ip, port, localPort, pFdObj, pThreadObj->numOfFds); + tDebug("%s %p TCP connection to 0x%x:%hu is created, localPort:%hu FD:%p numOfFds:%d", pThreadObj->label, thandle, + ip, port, localPort, pFdObj, pThreadObj->numOfFds); } else { tError("%s failed to malloc client FdObj(%s)", pThreadObj->label, strerror(errno)); taosCloseSocket(fd); @@ -441,7 +445,6 @@ int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chand } static void taosReportBrokenLink(SFdObj *pFdObj) { - SThreadObj *pThreadObj = pFdObj->pThreadObj; // notify the upper layer, so it will clean the associated context @@ -464,9 +467,9 @@ static void taosReportBrokenLink(SFdObj *pFdObj) { } static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) { - SRpcHead rpcHead; - int32_t msgLen, leftLen, retLen, headLen; - char *buffer, *msg; + SRpcHead rpcHead; + int32_t msgLen, leftLen, retLen, headLen; + char * buffer, *msg; SThreadObj *pThreadObj = pFdObj->pThreadObj; @@ -483,7 +486,8 @@ static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) { tError("%s %p TCP malloc(size:%d) fail", pThreadObj->label, pFdObj->thandle, msgLen); return -1; } else { - tTrace("%s %p read data, FD:%p fd:%d TCP malloc mem:%p", pThreadObj->label, pFdObj->thandle, pFdObj, pFdObj->fd, buffer); + tTrace("%s %p read data, FD:%p fd:%d TCP malloc mem:%p", pThreadObj->label, pFdObj->thandle, pFdObj, pFdObj->fd, + buffer); } msg = buffer + tsRpcOverhead; @@ -491,8 +495,7 @@ static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) { retLen = taosReadMsg(pFdObj->fd, msg + headLen, leftLen); if (leftLen != retLen) { - tError("%s %p read error, leftLen:%d retLen:%d FD:%p", - pThreadObj->label, pFdObj->thandle, leftLen, retLen, pFdObj); + tError("%s %p read error, leftLen:%d retLen:%d FD:%p", pThreadObj->label, pFdObj->thandle, leftLen, retLen, pFdObj); free(buffer); return -1; } @@ -519,8 +522,8 @@ static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) { #define maxEvents 10 static void *taosProcessTcpData(void *param) { - SThreadObj *pThreadObj = param; - SFdObj *pFdObj; + SThreadObj * pThreadObj = param; + SFdObj * pFdObj; struct epoll_event events[maxEvents]; SRecvInfo recvInfo; @@ -569,7 +572,7 @@ static void *taosProcessTcpData(void *param) { if (pThreadObj->stop) break; } - if (pThreadObj->pollFd >=0) { + if (pThreadObj->pollFd >= 0) { EpollClose(pThreadObj->pollFd); pThreadObj->pollFd = -1; } @@ -620,7 +623,6 @@ static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, SOCKET fd) { } static void taosFreeFdObj(SFdObj *pFdObj) { - if (pFdObj == NULL) return; if (pFdObj->signature != pFdObj) return; @@ -638,8 +640,8 @@ static void taosFreeFdObj(SFdObj *pFdObj) { pThreadObj->numOfFds--; if (pThreadObj->numOfFds < 0) - tError("%s %p TCP thread:%d, number of FDs is negative!!!", - pThreadObj->label, pFdObj->thandle, pThreadObj->threadId); + tError("%s %p TCP thread:%d, number of FDs is negative!!!", pThreadObj->label, pFdObj->thandle, + pThreadObj->threadId); if (pFdObj->prev) { (pFdObj->prev)->next = pFdObj->next; @@ -653,8 +655,10 @@ static void taosFreeFdObj(SFdObj *pFdObj) { pthread_mutex_unlock(&pThreadObj->mutex); - tDebug("%s %p TCP connection is closed, FD:%p fd:%d numOfFds:%d", - pThreadObj->label, pFdObj->thandle, pFdObj, pFdObj->fd, pThreadObj->numOfFds); + tDebug("%s %p TCP connection is closed, FD:%p fd:%d numOfFds:%d", pThreadObj->label, pFdObj->thandle, pFdObj, + pFdObj->fd, pThreadObj->numOfFds); tfree(pFdObj); } + +#endif diff --git a/source/libs/transport/src/rpcUdp.c b/source/libs/transport/src/rpcUdp.c index 5bc31c189a0eef9dbd145e23aea5be4e0b9b0f1d..79956cc98db038c6e6e06a06fdbd7cb947c4ecef 100644 --- a/source/libs/transport/src/rpcUdp.c +++ b/source/libs/transport/src/rpcUdp.c @@ -13,50 +13,53 @@ * along with this program. If not, see . */ +#include "rpcUdp.h" #include "os.h" -#include "ttimer.h" -#include "tutil.h" +#include "rpcHead.h" +#include "rpcLog.h" #include "taosdef.h" #include "taoserror.h" -#include "rpcLog.h" -#include "rpcUdp.h" -#include "rpcHead.h" +#include "ttimer.h" +#include "tutil.h" +#ifdef USE_UV +// no support upd currently +#else #define RPC_MAX_UDP_CONNS 256 #define RPC_MAX_UDP_PKTS 1000 #define RPC_UDP_BUF_TIME 5 // mseconds #define RPC_MAX_UDP_SIZE 65480 typedef struct { - int index; - SOCKET fd; - uint16_t port; // peer port - uint16_t localPort; // local port - char label[TSDB_LABEL_LEN]; // copy from udpConnSet; - pthread_t thread; - void *hash; - void *shandle; // handle passed by upper layer during server initialization - void *pSet; - void *(*processData)(SRecvInfo *pRecv); - char *buffer; // buffer to receive data + int index; + SOCKET fd; + uint16_t port; // peer port + uint16_t localPort; // local port + char label[TSDB_LABEL_LEN]; // copy from udpConnSet; + pthread_t thread; + void * hash; + void * shandle; // handle passed by upper layer during server initialization + void * pSet; + void *(*processData)(SRecvInfo *pRecv); + char *buffer; // buffer to receive data } SUdpConn; typedef struct { - int index; - int server; - uint32_t ip; // local IP - uint16_t port; // local Port - void *shandle; // handle passed by upper layer during server initialization - int threads; - char label[TSDB_LABEL_LEN]; - void *(*fp)(SRecvInfo *pPacket); - SUdpConn udpConn[]; + int index; + int server; + uint32_t ip; // local IP + uint16_t port; // local Port + void * shandle; // handle passed by upper layer during server initialization + int threads; + char label[TSDB_LABEL_LEN]; + void *(*fp)(SRecvInfo *pPacket); + SUdpConn udpConn[]; } SUdpConnSet; static void *taosRecvUdpData(void *param); void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads, void *fp, void *shandle) { - SUdpConn *pConn; + SUdpConn * pConn; SUdpConnSet *pSet; int size = (int)sizeof(SUdpConnSet) + threads * (int)sizeof(SUdpConn); @@ -79,7 +82,7 @@ void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads pthread_attr_init(&thAttr); pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); - int i; + int i; uint16_t ownPort; for (i = 0; i < threads; ++i) { pConn = pSet->udpConn + i; @@ -97,9 +100,9 @@ void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads } struct sockaddr_in sin; - unsigned int addrlen = sizeof(sin); - if (getsockname(pConn->fd, (struct sockaddr *)&sin, &addrlen) == 0 && - sin.sin_family == AF_INET && addrlen == sizeof(sin)) { + unsigned int addrlen = sizeof(sin); + if (getsockname(pConn->fd, (struct sockaddr *)&sin, &addrlen) == 0 && sin.sin_family == AF_INET && + addrlen == sizeof(sin)) { pConn->localPort = (uint16_t)ntohs(sin.sin_port); } @@ -118,7 +121,7 @@ void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads pthread_attr_destroy(&thAttr); - if (i != threads) { + if (i != threads) { terrno = TAOS_SYSTEM_ERROR(errno); taosCleanUpUdpConnection(pSet); return NULL; @@ -130,14 +133,14 @@ void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads void taosStopUdpConnection(void *handle) { SUdpConnSet *pSet = (SUdpConnSet *)handle; - SUdpConn *pConn; + SUdpConn * pConn; if (pSet == NULL) return; for (int i = 0; i < pSet->threads; ++i) { pConn = pSet->udpConn + i; - if (pConn->fd >=0) shutdown(pConn->fd, SHUT_RDWR); - if (pConn->fd >=0) taosCloseSocket(pConn->fd); + if (pConn->fd >= 0) shutdown(pConn->fd, SHUT_RDWR); + if (pConn->fd >= 0) taosCloseSocket(pConn->fd); pConn->fd = -1; } @@ -155,13 +158,13 @@ void taosStopUdpConnection(void *handle) { void taosCleanUpUdpConnection(void *handle) { SUdpConnSet *pSet = (SUdpConnSet *)handle; - SUdpConn *pConn; + SUdpConn * pConn; if (pSet == NULL) return; for (int i = 0; i < pSet->threads; ++i) { pConn = pSet->udpConn + i; - if (pConn->fd >=0) taosCloseSocket(pConn->fd); + if (pConn->fd >= 0) taosCloseSocket(pConn->fd); } tDebug("%s UDP is cleaned up", pSet->label); @@ -182,7 +185,7 @@ void *taosOpenUdpConnection(void *shandle, void *thandle, uint32_t ip, uint16_t } static void *taosRecvUdpData(void *param) { - SUdpConn *pConn = param; + SUdpConn * pConn = param; struct sockaddr_in sourceAdd; ssize_t dataLen; unsigned int addLen; @@ -218,7 +221,7 @@ static void *taosRecvUdpData(void *param) { } int32_t size = dataLen + tsRpcOverhead; - char *tmsg = malloc(size); + char * tmsg = malloc(size); if (NULL == tmsg) { tError("%s failed to allocate memory, size:%" PRId64, pConn->label, (int64_t)dataLen); continue; @@ -257,4 +260,4 @@ int taosSendUdpData(uint32_t ip, uint16_t port, void *data, int dataLen, void *c return ret; } - +#endif diff --git a/source/libs/transport/src/transport.c b/source/libs/transport/src/transport.c index 6dea4a4e57392be988126c579648f39a8270b9bf..f2f48bbc8a69a022d0fc6b8a88c5a9a55d0b4ad6 100644 --- a/source/libs/transport/src/transport.c +++ b/source/libs/transport/src/transport.c @@ -11,4 +11,4 @@ * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . - */ \ No newline at end of file + */