diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 0c4fa999b9b358d67d338681ea2be85f394fe7c4..ea8045c8515432aa0a9d6e56c259b6eada7757f3 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -199,37 +199,6 @@ typedef struct { uint32_t ip[TSDB_MAX_MGMT_IPS]; } SMgmtIpList; -typedef struct { - char version : 4; - char comp : 4; - char tcp : 2; - char spi : 3; - char encrypt : 3; - uint16_t tranId; - uint32_t uid; // for unique ID inside a client - uint32_t sourceId; - - // internal part - uint32_t destId; - uint32_t destIp; - char meterId[TSDB_UNI_LEN]; - uint16_t port; // for UDP only - char empty[1]; - uint8_t msgType; - int32_t msgLen; - uint8_t content[0]; -} STaosHeader; - -typedef struct { - uint32_t timeStamp; - uint8_t auth[TSDB_AUTH_LEN]; -} STaosDigest; - -typedef struct { - unsigned char code; - char more[]; -} STaosRsp, SMsgReply; - typedef struct { uint32_t customerId; uint32_t osId; diff --git a/src/inc/trpc.h b/src/inc/trpc.h index cb5a8aeaa3ac4abb612a0ca5e5ae6d59f9f4dd88..48932154478df3f5ffb2db4398785a8a1a51108d 100644 --- a/src/inc/trpc.h +++ b/src/inc/trpc.h @@ -68,7 +68,7 @@ typedef struct { void (*ufp)(void *ahandle, SRpcIpSet ipSet); // call back to retrieve the client auth info - int (*afp)(char *meterId, char *spi, char *encrypt, uint8_t *secret, uint8_t *ckey); + int (*afp)(char *meterId, char *spi, char *encrypt, char *secret, char *ckey); } SRpcInit; void *rpcOpen(SRpcInit *pRpc); diff --git a/src/rpc/inc/tconncache.h b/src/rpc/inc/rpcCache.h similarity index 91% rename from src/rpc/inc/tconncache.h rename to src/rpc/inc/rpcCache.h index e09d29774ebec78ca275fb9d004ccd2065989595..5fc7992e43f2dfdb887f46c334799be09d4b1990 100644 --- a/src/rpc/inc/tconncache.h +++ b/src/rpc/inc/rpcCache.h @@ -13,8 +13,8 @@ * along with this program. If not, see . */ -#ifndef TDENGINE_CONN_CACHE_H -#define TDENGINE_CONN_CACHE_H +#ifndef TDENGINE_RPC_CACHE_H +#define TDENGINE_RPC_CACHE_H #ifdef __cplusplus extern "C" { @@ -29,4 +29,4 @@ void *rpcGetConnFromCache(void *handle, uint32_t ip, uint16_t port, char *user); } #endif -#endif // TDENGINE_CONN_CACHE_H +#endif // TDENGINE_RPC_CACHE_H diff --git a/src/rpc/inc/ttcpclient.h b/src/rpc/inc/rpcClient.h similarity index 94% rename from src/rpc/inc/ttcpclient.h rename to src/rpc/inc/rpcClient.h index 952d1c4a0ecb0d071bb7b96140d8c58e865924e9..dc5e9f744a9b8bd8c1cc359af30884df4459ffdc 100644 --- a/src/rpc/inc/ttcpclient.h +++ b/src/rpc/inc/rpcClient.h @@ -13,8 +13,8 @@ * along with this program. If not, see . */ -#ifndef _taos_tcp_client_header_ -#define _taos_tcp_client_header_ +#ifndef _rpc_client_header_ +#define _rpc_client_header_ #ifdef __cplusplus extern "C" { diff --git a/src/rpc/inc/thaship.h b/src/rpc/inc/rpcHaship.h similarity index 73% rename from src/rpc/inc/thaship.h rename to src/rpc/inc/rpcHaship.h index 9d4396ce6a29ec0bd5ccb14d9b0e0713e951cceb..d3ed48997a0b2798e8d89942d8a16f4b3de81f31 100644 --- a/src/rpc/inc/thaship.h +++ b/src/rpc/inc/rpcHaship.h @@ -20,11 +20,11 @@ extern "C" { #endif -void *taosOpenIpHash(int maxSessions); -void taosCloseIpHash(void *handle); -void *taosAddIpHash(void *handle, void *pData, uint32_t ip, uint16_t port); -void taosDeleteIpHash(void *handle, uint32_t ip, uint16_t port); -void *taosGetIpHash(void *handle, uint32_t ip, uint16_t port); +void *rpcOpenIpHash(int maxSessions); +void rpcCloseIpHash(void *handle); +void *rpcAddIpHash(void *handle, void *pData, uint32_t ip, uint16_t port); +void rpcDeleteIpHash(void *handle, uint32_t ip, uint16_t port); +void *rpcGetIpHash(void *handle, uint32_t ip, uint16_t port); #ifdef __cplusplus } diff --git a/src/rpc/inc/rpcHead.h b/src/rpc/inc/rpcHead.h new file mode 100644 index 0000000000000000000000000000000000000000..bfdcfb0beeae011215416ec08075d832dcd4880a --- /dev/null +++ b/src/rpc/inc/rpcHead.h @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_RPCHEAD_H +#define TDENGINE_RPCHEAD_H + +#ifdef __cplusplus +extern "C" { +#endif + +#pragma pack(push, 1) + +typedef struct { + char version:4; // RPC version + char comp:4; // compression algorithm, 0:no compression 1:lz4 + char tcp:2; // tcp flag + char spi:3; // security parameter index + char encrypt:3; // encrypt algorithm, 0: no encryption + uint16_t tranId; // transcation ID + uint32_t uid; // for unique ID inside a client + uint32_t sourceId; // source ID, an index for connection list + uint32_t destId; // destination ID, an index for connection list + char meterId[TSDB_UNI_LEN]; + uint16_t port; // for UDP only, port may be changed + char empty[1]; // reserved + uint8_t msgType; // message type + int32_t msgLen; // message length including the header iteslf + int32_t code; + uint8_t content[0]; // message body starts from here +} SRpcHead; + +typedef struct { + int32_t reserved; + int32_t contLen; +} SRpcComp; + +typedef struct { + uint32_t timeStamp; + uint8_t auth[TSDB_AUTH_LEN]; +} SRpcDigest; + +#pragma pack(pop) + +#ifdef __cplusplus +} +#endif + +#endif // TDENGINE_RPCHEAD_H + diff --git a/src/rpc/inc/ttcpserver.h b/src/rpc/inc/rpcServer.h similarity index 93% rename from src/rpc/inc/ttcpserver.h rename to src/rpc/inc/rpcServer.h index 299adb31694b14c77819d09fb5512c0c2b20e930..eccbd7271affe18e590e9b9da5cba57996df7eef 100644 --- a/src/rpc/inc/ttcpserver.h +++ b/src/rpc/inc/rpcServer.h @@ -13,8 +13,8 @@ * along with this program. If not, see . */ -#ifndef _taos_tcp_server_header_ -#define _taos_tcp_server_header_ +#ifndef _rpc_server_header_ +#define _rpc_server_header_ #ifdef __cplusplus extern "C" { diff --git a/src/rpc/inc/tudp.h b/src/rpc/inc/rpcUdp.h similarity index 70% rename from src/rpc/inc/tudp.h rename to src/rpc/inc/rpcUdp.h index cb2f8d2b1084ea25d9ea737f2549a2b206cbcd39..03498ac69dc283e25131e50621aa76f52f781b1c 100644 --- a/src/rpc/inc/tudp.h +++ b/src/rpc/inc/rpcUdp.h @@ -13,8 +13,8 @@ * along with this program. If not, see . */ -#ifndef _taos_udp_header_ -#define _taos_udp_header_ +#ifndef _rpc_udp_header_ +#define _rpc_udp_header_ #ifdef __cplusplus extern "C" { @@ -24,15 +24,15 @@ extern "C" { void *taosInitUdpServer(char *ip, uint16_t port, char *label, int, void *fp, void *shandle); void *taosInitUdpClient(char *ip, uint16_t port, char *label, int, void *fp, void *shandle); -void taosCleanUpUdpConnection(void *handle); -int taosSendUdpData(uint32_t ip, uint16_t port, char *data, int dataLen, void *chandle); +void taosCleanUpUdpConnection(void *handle); +int taosSendUdpData(uint32_t ip, uint16_t port, char *data, int dataLen, void *chandle); void *taosOpenUdpConnection(void *shandle, void *thandle, char *ip, uint16_t port); -void taosFreeMsgHdr(void *hdr); -int taosMsgHdrSize(void *hdr); -void taosSendMsgHdr(void *hdr, int fd); -void taosInitMsgHdr(void **hdr, void *dest, int maxPkts); -void taosSetMsgHdrData(void *hdr, char *data, int dataLen); +void taosFreeMsgHdr(void *hdr); +int taosMsgHdrSize(void *hdr); +void taosSendMsgHdr(void *hdr, int fd); +void taosInitMsgHdr(void **hdr, void *dest, int maxPkts); +void taosSetMsgHdrData(void *hdr, char *data, int dataLen); #ifdef __cplusplus } diff --git a/src/rpc/src/tconncache.c b/src/rpc/src/rpcCache.c similarity index 99% rename from src/rpc/src/tconncache.c rename to src/rpc/src/rpcCache.c index 1dfdc28a444d5d4f1b9ceb866beca15e38f77f6d..2f8979a15d0ce4cc217845bd154c3b531353788f 100644 --- a/src/rpc/src/tconncache.c +++ b/src/rpc/src/rpcCache.c @@ -21,7 +21,7 @@ #include "ttime.h" #include "ttimer.h" #include "tutil.h" -#include "tconncache.h" +#include "rpcCache.h" typedef struct _c_hash_t { uint32_t ip; diff --git a/src/rpc/src/ttcpclient.c b/src/rpc/src/rpcClient.c similarity index 97% rename from src/rpc/src/ttcpclient.c rename to src/rpc/src/rpcClient.c index 3d39be92fe4fd1c4476e35d56b88ed2fa5c9e474..f600004266a1d5edc10695d7b6aec6f9d54908fd 100644 --- a/src/rpc/src/ttcpclient.c +++ b/src/rpc/src/rpcClient.c @@ -17,8 +17,9 @@ #include "taosmsg.h" #include "tlog.h" #include "tsocket.h" -#include "ttcpclient.h" #include "tutil.h" +#include "rpcClient.h" +#include "rpcHead.h" #ifndef EPOLLWAKEUP #define EPOLLWAKEUP (1u << 29) @@ -152,15 +153,15 @@ static void *taosReadTcpData(void *param) { continue; } - int headLen = taosReadMsg(pFdObj->fd, buffer, sizeof(STaosHeader)); - if (headLen != sizeof(STaosHeader)) { + int headLen = taosReadMsg(pFdObj->fd, buffer, sizeof(SRpcHead)); + if (headLen != sizeof(SRpcHead)) { tError("%s read error, headLen:%d", pTcp->label, headLen); tfree(buffer); taosCleanUpTcpFdObj(pFdObj); continue; } - int dataLen = (int32_t)htonl((uint32_t)((STaosHeader *)buffer)->msgLen); + int dataLen = (int32_t)htonl((uint32_t)((SRpcHead *)buffer)->msgLen); if (dataLen > 1024) { void *b = realloc(buffer, (size_t)dataLen); if (NULL == b) { diff --git a/src/rpc/src/thaship.c b/src/rpc/src/rpcHaship.c similarity index 88% rename from src/rpc/src/thaship.c rename to src/rpc/src/rpcHaship.c index 6b76c4b59e236418c408fabaac5c717cbbafcc55..c904dba4a1de493c0889da9e558f8db4da502053 100644 --- a/src/rpc/src/thaship.c +++ b/src/rpc/src/rpcHaship.c @@ -32,7 +32,7 @@ typedef struct { int maxSessions; } SHashObj; -int taosHashIp(void *handle, uint32_t ip, uint16_t port) { +int rpcHashIp(void *handle, uint32_t ip, uint16_t port) { SHashObj *pObj = (SHashObj *)handle; int hash = 0; @@ -45,7 +45,7 @@ int taosHashIp(void *handle, uint32_t ip, uint16_t port) { return hash; } -void *taosAddIpHash(void *handle, void *data, uint32_t ip, uint16_t port) { +void *rpcAddIpHash(void *handle, void *data, uint32_t ip, uint16_t port) { int hash; SIpHash * pNode; SHashObj *pObj; @@ -53,7 +53,7 @@ void *taosAddIpHash(void *handle, void *data, uint32_t ip, uint16_t port) { pObj = (SHashObj *)handle; if (pObj == NULL || pObj->maxSessions == 0) return NULL; - hash = taosHashIp(pObj, ip, port); + hash = rpcHashIp(pObj, ip, port); pNode = (SIpHash *)taosMemPoolMalloc(pObj->ipHashMemPool); pNode->ip = ip; pNode->port = port; @@ -68,7 +68,7 @@ void *taosAddIpHash(void *handle, void *data, uint32_t ip, uint16_t port) { return pObj; } -void taosDeleteIpHash(void *handle, uint32_t ip, uint16_t port) { +void rpcDeleteIpHash(void *handle, uint32_t ip, uint16_t port) { int hash; SIpHash * pNode; SHashObj *pObj; @@ -76,7 +76,7 @@ void taosDeleteIpHash(void *handle, uint32_t ip, uint16_t port) { pObj = (SHashObj *)handle; if (pObj == NULL || pObj->maxSessions == 0) return; - hash = taosHashIp(pObj, ip, port); + hash = rpcHashIp(pObj, ip, port); pNode = pObj->ipHashList[hash]; while (pNode) { @@ -100,7 +100,7 @@ void taosDeleteIpHash(void *handle, uint32_t ip, uint16_t port) { } } -void *taosGetIpHash(void *handle, uint32_t ip, uint16_t port) { +void *rpcGetIpHash(void *handle, uint32_t ip, uint16_t port) { int hash; SIpHash * pNode; SHashObj *pObj; @@ -108,7 +108,7 @@ void *taosGetIpHash(void *handle, uint32_t ip, uint16_t port) { pObj = (SHashObj *)handle; if (pObj == NULL || pObj->maxSessions == 0) return NULL; - hash = taosHashIp(pObj, ip, port); + hash = rpcHashIp(pObj, ip, port); pNode = pObj->ipHashList[hash]; while (pNode) { @@ -124,7 +124,7 @@ void *taosGetIpHash(void *handle, uint32_t ip, uint16_t port) { return NULL; } -void *taosOpenIpHash(int maxSessions) { +void *rpcOpenIpHash(int maxSessions) { SIpHash **ipHashList; mpool_h ipHashMemPool; SHashObj *pObj; @@ -152,7 +152,7 @@ void *taosOpenIpHash(int maxSessions) { return pObj; } -void taosCloseIpHash(void *handle) { +void rpcCloseIpHash(void *handle) { SHashObj *pObj; pObj = (SHashObj *)handle; diff --git a/src/rpc/src/trpc.c b/src/rpc/src/rpcMain.c similarity index 94% rename from src/rpc/src/trpc.c rename to src/rpc/src/rpcMain.c index bd72ddd3cc7d2d26059c012bbf570b18cf89cce5..3dbf666a11039eaccbb516e9bd40a218a99c5a46 100755 --- a/src/rpc/src/trpc.c +++ b/src/rpc/src/rpcMain.c @@ -14,23 +14,24 @@ */ #include "os.h" -#include "shash.h" -#include "taosmsg.h" #include "tidpool.h" #include "tlog.h" #include "tmd5.h" #include "tmempool.h" -#include "tsocket.h" -#include "ttcpclient.h" -#include "ttcpserver.h" #include "ttime.h" #include "ttimer.h" -#include "tudp.h" #include "tutil.h" #include "lz4.h" -#include "tconncache.h" -#include "trpc.h" #include "taoserror.h" +#include "tsocket.h" +#include "shash.h" +#include "taosmsg.h" +#include "rpcUdp.h" +#include "rpcCache.h" +#include "rpcClient.h" +#include "rpcServer.h" +#include "rpcHead.h" +#include "trpc.h" #define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHead) + sizeof(SRpcDigest)) #define rpcHeadFromCont(cont) ((SRpcHead *) (cont - sizeof(SRpcHead))) @@ -51,11 +52,11 @@ typedef struct { char meterId[TSDB_UNI_LEN]; // meter ID char spi; // security parameter index char encrypt; // encrypt algorithm - uint8_t secret[TSDB_KEY_LEN]; // secret for the link - uint8_t ckey[TSDB_KEY_LEN]; // ciphering key + char secret[TSDB_KEY_LEN]; // secret for the link + char ckey[TSDB_KEY_LEN]; // ciphering key void (*cfp)(char type, void *pCont, int contLen, void *ahandle, int32_t code); - int (*afp)(char *meterId, char *spi, char *encrypt, uint8_t *secret, uint8_t *ckey); + int (*afp)(char *meterId, char *spi, char *encrypt, char *secret, char *ckey); void (*ufp)(void *ahandle, SRpcIpSet ipSet); void *idPool; // handle to ID pool @@ -88,8 +89,8 @@ typedef struct _RpcConn { char meterId[TSDB_UNI_LEN]; // user ID for the link char spi; // security parameter index char encrypt; // encryption, 0:1 - uint8_t secret[TSDB_KEY_LEN]; // secret for the link - uint8_t ckey[TSDB_KEY_LEN]; // ciphering key + char secret[TSDB_KEY_LEN]; // secret for the link + char ckey[TSDB_KEY_LEN]; // ciphering key uint16_t localPort; // for UDP only uint32_t peerUid; // peer UID uint32_t peerIp; // peer IP @@ -114,39 +115,6 @@ typedef struct _RpcConn { SRpcReqContext *pContext; // request context } SRpcConn; -#pragma pack(push, 1) - -typedef struct { - char version:4; // RPC version - char comp:4; // compression algorithm, 0:no compression 1:lz4 - char tcp:2; // tcp flag - char spi:3; // security parameter index - char encrypt:3; // encrypt algorithm, 0: no encryption - uint16_t tranId; // transcation ID - uint32_t uid; // for unique ID inside a client - uint32_t sourceId; // source ID, an index for connection list - uint32_t destId; // destination ID, an index for connection list - char meterId[TSDB_UNI_LEN]; - uint16_t port; // for UDP only, port may be changed - char empty[1]; // reserved - uint8_t msgType; // message type - int32_t msgLen; // message length including the header iteslf - int32_t code; - uint8_t content[0]; // message body starts from here -} SRpcHead; - -typedef struct { - int32_t reserved; - int32_t contLen; -} SRpcComp; - -typedef struct { - uint32_t timeStamp; - uint8_t auth[TSDB_AUTH_LEN]; -} SRpcDigest; - -#pragma pack(pop) - int tsRpcProgressTime = 10; // milliseocnds // not configurable @@ -196,7 +164,7 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, char *meterId, char *hash static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, char *meterId, char *hashstr); static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext); -static void rpcSendQuickRsp(SRpcConn *pConn, char code); +static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code); static void rpcSendErrorMsgToPeer(SRpcInfo *pRpc, char *pMsg, int32_t code, uint32_t ip, uint16_t port, void *chandle); static void rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen); @@ -509,10 +477,11 @@ static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc) { } static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, char *meterId, char *hashstr) { - SRpcConn *pConn; + SRpcConn *pConn = NULL; // check if it is already allocated - pConn = *(SRpcConn **)(taosGetStrHashData(pRpc->hash, hashstr)); + SRpcConn **ppConn = (SRpcConn **)(taosGetStrHashData(pRpc->hash, hashstr)); + if (ppConn) pConn = *ppConn; if (pConn) return pConn; int sid = taosAllocateId(pRpc->idPool); @@ -537,7 +506,7 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, char *meterId, char *hash if (pConn) { taosAddStrHash(pRpc->hash, hashstr, (char *)&pConn); - tTrace("%s pConn:%p, rpc connection is allocated, sid:%d id:%s", pRpc->label, pConn, sid); + tTrace("%s pConn:%p, rpc connection is allocated, sid:%d id:%s", pRpc->label, pConn, sid, pConn->meterId); } return pConn; @@ -660,7 +629,7 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) { return TSDB_CODE_SUCCESS; } -static int rpcProcessHead(SRpcInfo *pRpc, SRpcConn **ppConn, void *data, int dataLen, uint32_t ip) { +static int32_t rpcProcessHead(SRpcInfo *pRpc, SRpcConn **ppConn, void *data, int dataLen, uint32_t ip) { int32_t sid, code = 0; SRpcConn * pConn = NULL; char hashstr[40] = {0}; @@ -724,7 +693,7 @@ static void *rpcProcessMsgFromPeer(void *data, int dataLen, uint32_t ip, uint16_ SRpcHead *pHead = (SRpcHead *)data; SRpcInfo *pRpc = (SRpcInfo *)shandle; SRpcConn *pConn = NULL; - uint8_t code = 0; + int32_t code = 0; tDump(data, dataLen); @@ -750,7 +719,7 @@ static void *rpcProcessMsgFromPeer(void *data, int dataLen, uint32_t ip, uint16_ pthread_mutex_unlock(&pRpc->mutex); if (pHead->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) { - tTrace("%s pConn:%p, %s received from 0x%x:%hu, parse code:%u len:%d source:0x%08x dest:0x%08x tranId:%d", + tTrace("%s pConn:%p, %s received from 0x%x:%hu, parse code:%x len:%d source:0x%08x dest:0x%08x tranId:%d", pRpc->label, pConn, taosMsg[pHead->msgType], ip, port, code, dataLen, pHead->sourceId, pHead->destId, pHead->tranId); } @@ -763,7 +732,7 @@ static void *rpcProcessMsgFromPeer(void *data, int dataLen, uint32_t ip, uint16_ if (code != 0) { // parsing error if ( rpcIsReq(pHead->msgType) ) { rpcSendErrorMsgToPeer(pRpc, data, code, ip, port, chandle); - tTrace("%s pConn:%p, %s is sent with error code:%u", pRpc->label, pConn, taosMsg[pHead->msgType+1], code); + tTrace("%s pConn:%p, %s is sent with error code:%x", pRpc->label, pConn, taosMsg[pHead->msgType+1], code); } } else { // parsing OK rpcProcessIncomingMsg(pConn, pHead); @@ -804,7 +773,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { } } -static void rpcSendQuickRsp(SRpcConn *pConn, char code) { +static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code) { char msg[RPC_MSG_OVERHEAD]; SRpcHead *pHead; @@ -1131,7 +1100,7 @@ static int rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen) { pDigest->timeStamp = htonl(taosGetTimestampSec()); msgLen += sizeof(SRpcDigest); pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); - rpcBuildAuthHead((uint8_t *)pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret); + rpcBuildAuthHead((uint8_t *)pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, (uint8_t *)pConn->secret); } else { pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); } @@ -1142,7 +1111,7 @@ static int rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen) { static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) { SRpcHead *pHead = (SRpcHead *)msg; SRpcInfo *pRpc = pConn->pRpc; - int code = 0; + int32_t code = 0; if (pConn->spi == 0 ) return 0; @@ -1158,7 +1127,7 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) { delta, htonl(pDigest->timeStamp)); code = TSDB_CODE_INVALID_TIME_STAMP; } else { - if (rpcAuthenticateMsg((uint8_t *)pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret) < 0) { + if (rpcAuthenticateMsg((uint8_t *)pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, (uint8_t *)pConn->secret) < 0) { tError("%s pConn:%p, authentication failed, msg discarded", pRpc->label, pConn); code = TSDB_CODE_AUTH_FAILURE; } else { diff --git a/src/rpc/src/ttcpserver.c b/src/rpc/src/rpcServer.c similarity index 98% rename from src/rpc/src/ttcpserver.c rename to src/rpc/src/rpcServer.c index 663bfcdf8ee86008b0cfa4803725af1866a950bf..49992e5931a211f4ae075dcc2645ab9fc8f20883 100644 --- a/src/rpc/src/ttcpserver.c +++ b/src/rpc/src/rpcServer.c @@ -14,12 +14,12 @@ */ #include "os.h" -#include "taosmsg.h" #include "tlog.h" #include "tlog.h" #include "tsocket.h" -#include "ttcpserver.h" #include "tutil.h" +#include "rpcServer.h" +#include "rpcHead.h" #define TAOS_IPv4ADDR_LEN 16 #ifndef EPOLLWAKEUP @@ -184,16 +184,16 @@ static void taosProcessTcpData(void *param) { } void *buffer = malloc(1024); - int headLen = taosReadMsg(pFdObj->fd, buffer, sizeof(STaosHeader)); + int headLen = taosReadMsg(pFdObj->fd, buffer, sizeof(SRpcHead)); - if (headLen != sizeof(STaosHeader)) { + if (headLen != sizeof(SRpcHead)) { tError("%s read error, headLen:%d, errno:%d", pThreadObj->label, headLen, errno); taosCleanUpFdObj(pFdObj); tfree(buffer); continue; } - int dataLen = (int32_t)htonl((uint32_t)((STaosHeader *)buffer)->msgLen); + int dataLen = (int32_t)htonl((uint32_t)((SRpcHead *)buffer)->msgLen); if (dataLen > 1024) buffer = realloc(buffer, (size_t)dataLen); int leftLen = dataLen - headLen; diff --git a/src/rpc/src/tudp.c b/src/rpc/src/rpcUdp.c similarity index 88% rename from src/rpc/src/tudp.c rename to src/rpc/src/rpcUdp.c index a605abea5f6bdbb4470b80892910d1e49d6ee951..46cb768995f31b7e806b880e00291ef05f1929c6 100644 --- a/src/rpc/src/tudp.c +++ b/src/rpc/src/rpcUdp.c @@ -14,15 +14,15 @@ */ #include "os.h" -#include "taosmsg.h" -#include "thash.h" -#include "thaship.h" #include "tlog.h" #include "tsocket.h" #include "tsystem.h" #include "ttimer.h" -#include "tudp.h" #include "tutil.h" +#include "thash.h" +#include "rpcHaship.h" +#include "rpcUdp.h" +#include "rpcHead.h" #define RPC_MAX_UDP_CONNS 256 #define RPC_MAX_UDP_PKTS 1000 @@ -104,6 +104,41 @@ typedef struct { uint64_t hash; } SHandleViaTcp; +void taosFreeMsgHdr(void *hdr) { + struct msghdr *msgHdr = (struct msghdr *)hdr; + free(msgHdr->msg_iov); +} + +int taosMsgHdrSize(void *hdr) { + struct msghdr *msgHdr = (struct msghdr *)hdr; + return (int)msgHdr->msg_iovlen; +} + +void taosSendMsgHdr(void *hdr, int fd) { + struct msghdr *msgHdr = (struct msghdr *)hdr; + sendmsg(fd, msgHdr, 0); + msgHdr->msg_iovlen = 0; +} + +void taosInitMsgHdr(void **hdr, void *dest, int maxPkts) { + struct msghdr *msgHdr = (struct msghdr *)malloc(sizeof(struct msghdr)); + memset(msgHdr, 0, sizeof(struct msghdr)); + *hdr = msgHdr; + struct sockaddr_in *destAdd = (struct sockaddr_in *)dest; + + msgHdr->msg_name = destAdd; + msgHdr->msg_namelen = sizeof(struct sockaddr_in); + int size = (int)sizeof(struct iovec) * maxPkts; + msgHdr->msg_iov = (struct iovec *)malloc((size_t)size); + memset(msgHdr->msg_iov, 0, (size_t)size); +} + +void taosSetMsgHdrData(void *hdr, char *data, int dataLen) { + struct msghdr *msgHdr = (struct msghdr *)hdr; + msgHdr->msg_iov[msgHdr->msg_iovlen].iov_base = data; + msgHdr->msg_iov[msgHdr->msg_iovlen].iov_len = (size_t)dataLen; + msgHdr->msg_iovlen++; +} bool taosCheckHandleViaTcpValid(SHandleViaTcp *handleViaTcp) { return handleViaTcp->hash == taosHashUInt64(handleViaTcp->handle); } @@ -134,8 +169,8 @@ void taosProcessMonitorTimer(void *param, void *tmrId) { } void *taosReadTcpData(void *argv) { - SMonitor * pMonitor = (SMonitor *)argv; - STaosHeader *pHead = (STaosHeader *)pMonitor->data; + SMonitor *pMonitor = (SMonitor *)argv; + SRpcHead *pHead = (SRpcHead *)pMonitor->data; SPacketInfo *pInfo = (SPacketInfo *)pHead->content; SUdpConnSet *pSet = pMonitor->pSet; int retLen, fd; @@ -189,7 +224,7 @@ void *taosReadTcpData(void *argv) { return NULL; } -int taosReceivePacketViaTcp(uint32_t ip, STaosHeader *pHead, SUdpConn *pConn) { +int taosReceivePacketViaTcp(uint32_t ip, SRpcHead *pHead, SUdpConn *pConn) { SUdpConnSet * pSet = pConn->pSet; SPacketInfo * pInfo = (SPacketInfo *)pHead->content; int code = 0; @@ -200,7 +235,7 @@ int taosReceivePacketViaTcp(uint32_t ip, STaosHeader *pHead, SUdpConn *pConn) { pHead->sourceId, pHead->destId, pHead->tranId); SMonitor *pMonitor = (SMonitor *)calloc(1, sizeof(SMonitor)); - pMonitor->dataLen = sizeof(STaosHeader) + sizeof(SPacketInfo); + pMonitor->dataLen = sizeof(SRpcHead) + sizeof(SPacketInfo); memcpy(pMonitor->data, pHead, (size_t)pMonitor->dataLen); pMonitor->pSet = pSet; pMonitor->ip = ip; @@ -225,7 +260,7 @@ void *taosRecvUdpData(void *param) { unsigned int addLen, dataLen; SUdpConn * pConn = (SUdpConn *)param; uint16_t port; - int minSize = sizeof(STaosHeader); + int minSize = sizeof(SRpcHead); memset(&sourceAdd, 0, sizeof(sourceAdd)); addLen = sizeof(sourceAdd); @@ -237,7 +272,7 @@ void *taosRecvUdpData(void *param) { tTrace("%s msg is recv from 0x%x:%hu len:%d", pConn->label, sourceAdd.sin_addr.s_addr, ntohs(sourceAdd.sin_port), dataLen); - if (dataLen < sizeof(STaosHeader)) { + if (dataLen < sizeof(SRpcHead)) { tError("%s recvfrom failed, reason:%s\n", pConn->label, strerror(errno)); continue; } @@ -250,7 +285,7 @@ void *taosRecvUdpData(void *param) { char *msg = pConn->buffer; while (processedLen < (int)dataLen) { leftLen = dataLen - processedLen; - STaosHeader *pHead = (STaosHeader *)msg; + SRpcHead *pHead = (SRpcHead *)msg; msgLen = (int32_t)htonl((uint32_t)pHead->msgLen); if (leftLen < minSize || msgLen > leftLen || msgLen < minSize) { tError("%s msg is messed up, dataLen:%d processedLen:%d count:%d msgLen:%d", pConn->label, dataLen, @@ -259,7 +294,7 @@ void *taosRecvUdpData(void *param) { } if (pHead->tcp == 1) { - taosReceivePacketViaTcp(sourceAdd.sin_addr.s_addr, (STaosHeader *)msg, pConn); + taosReceivePacketViaTcp(sourceAdd.sin_addr.s_addr, (SRpcHead *)msg, pConn); } else { char *data = malloc((size_t)msgLen); memcpy(data, msg, (size_t)msgLen); @@ -282,7 +317,7 @@ void *taosTransferDataViaTcp(void *argv) { int connFd = pTransfer->fd; int msgLen, retLen, leftLen; uint64_t handle; - STaosHeader *pHeader = NULL, head; + SRpcHead *pHead = NULL, head; SUdpConnSet *pSet = pTransfer->pSet; SHandleViaTcp handleViaTcp; @@ -308,8 +343,8 @@ void *taosTransferDataViaTcp(void *argv) { if (handle == 0) { // receive a packet from client tTrace("%s data will be received via TCP from 0x%x:%hu", pSet->label, pTransfer->ip, pTransfer->port); - retLen = taosReadMsg(connFd, &head, sizeof(STaosHeader)); - if (retLen != (int)sizeof(STaosHeader)) { + retLen = taosReadMsg(connFd, &head, sizeof(SRpcHead)); + if (retLen != (int)sizeof(SRpcHead)) { tError("%s failed to read msg header, retLen:%d", pSet->label, retLen); } else { SMonitor *pMonitor = (SMonitor *)calloc(1, sizeof(SMonitor)); @@ -319,10 +354,10 @@ void *taosTransferDataViaTcp(void *argv) { free(pTransfer); return NULL; } - pMonitor->dataLen = sizeof(STaosHeader); + pMonitor->dataLen = sizeof(SRpcHead); memcpy(pMonitor->data, &head, (size_t)pMonitor->dataLen); - ((STaosHeader *)pMonitor->data)->msgLen = (int32_t)htonl(sizeof(STaosHeader)); - ((STaosHeader *)pMonitor->data)->tcp = 1; + ((SRpcHead *)pMonitor->data)->msgLen = (int32_t)htonl(sizeof(SRpcHead)); + ((SRpcHead *)pMonitor->data)->tcp = 1; pMonitor->ip = pTransfer->ip; pMonitor->port = head.port; pMonitor->pSet = pSet; @@ -337,8 +372,8 @@ void *taosTransferDataViaTcp(void *argv) { return NULL; } - leftLen = msgLen - (int)sizeof(STaosHeader); - retLen = taosReadMsg(connFd, buffer + sizeof(STaosHeader), leftLen); + leftLen = msgLen - (int)sizeof(SRpcHead); + retLen = taosReadMsg(connFd, buffer + sizeof(SRpcHead), leftLen); pMonitor->pSet = NULL; if (retLen != leftLen) { @@ -349,7 +384,7 @@ void *taosTransferDataViaTcp(void *argv) { pTransfer->port, msgLen); pSet->index = (pSet->index + 1) % pSet->threads; SUdpConn *pConn = pSet->udpConn + pSet->index; - memcpy(buffer, &head, sizeof(STaosHeader)); + memcpy(buffer, &head, sizeof(SRpcHead)); (*pSet->fp)(buffer, msgLen, pTransfer->ip, head.port, pSet->shandle, NULL, pConn); } @@ -358,11 +393,11 @@ void *taosTransferDataViaTcp(void *argv) { } else { // send a packet to client tTrace("%s send packet to client via TCP, handle:0x%x", pSet->label, handle); - pHeader = (STaosHeader *)handle; - msgLen = (int32_t)htonl((uint32_t)pHeader->msgLen); + pHead = (SRpcHead *)handle; + msgLen = (int32_t)htonl((uint32_t)pHead->msgLen); - if (pHeader->tcp != 0 || msgLen < 1024) { - tError("%s invalid handle:%p, connection shall be closed", pSet->label, pHeader); + if (pHead->tcp != 0 || msgLen < 1024) { + tError("%s invalid handle:%p, connection shall be closed", pSet->label, pHead); } else { SMonitor *pMonitor = (SMonitor *)calloc(1, sizeof(SMonitor)); if (NULL == pMonitor) { @@ -371,12 +406,12 @@ void *taosTransferDataViaTcp(void *argv) { free(pTransfer); return NULL; } - pMonitor->dataLen = sizeof(STaosHeader); + pMonitor->dataLen = sizeof(SRpcHead); memcpy(pMonitor->data, (void *)handle, (size_t)pMonitor->dataLen); - STaosHeader *pThead = (STaosHeader *)pMonitor->data; + SRpcHead *pThead = (SRpcHead *)pMonitor->data; pThead->tcp = 1; - pThead->msgType = (char)(pHeader->msgType - 1); - pThead->msgLen = (int32_t)htonl(sizeof(STaosHeader)); + pThead->msgType = (char)(pHead->msgType - 1); + pThead->msgLen = (int32_t)htonl(sizeof(SRpcHead)); uint32_t id = pThead->sourceId; pThead->sourceId = pThead->destId; pThead->destId = id; pMonitor->ip = pTransfer->ip; pMonitor->port = pTransfer->port; @@ -522,7 +557,7 @@ void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, v pConn->pSet = pSet; pConn->signature = pConn; if (tsUdpDelay) { - pConn->hash = taosOpenIpHash(RPC_MAX_UDP_CONNS); + pConn->hash = rpcOpenIpHash(RPC_MAX_UDP_CONNS); pthread_mutex_init(&pConn->mutex, NULL); pConn->tmrCtrl = pSet->tmrCtrl; } @@ -575,7 +610,7 @@ void taosCleanUpUdpConnection(void *handle) { pthread_cancel(pConn->thread); taosCloseSocket(pConn->fd); if (pConn->hash) { - taosCloseIpHash(pConn->hash); + rpcCloseIpHash(pConn->hash); pthread_mutex_destroy(&pConn->mutex); } } @@ -608,7 +643,7 @@ void *taosOpenUdpConnection(void *shandle, void *thandle, char *ip, uint16_t por void taosRemoveUdpBuf(SUdpBuf *pBuf) { taosTmrStopA(&pBuf->timer); - taosDeleteIpHash(pBuf->pConn->hash, pBuf->ip, pBuf->port); + rpcDeleteIpHash(pBuf->pConn->hash, pBuf->ip, pBuf->port); // tTrace("%s UDP buffer to:0x%lld:%d is removed", pBuf->pConn->label, // pBuf->ip, pBuf->port); @@ -671,13 +706,13 @@ int taosSendPacketViaTcp(uint32_t ip, uint16_t port, char *data, int dataLen, vo int code = -1, retLen, msgLen; char ipstr[64]; char buffer[128]; - STaosHeader *pHead; + SRpcHead *pHead; if (pSet->server) { // send from server - pHead = (STaosHeader *)buffer; - memcpy(pHead, data, sizeof(STaosHeader)); + pHead = (SRpcHead *)buffer; + memcpy(pHead, data, sizeof(SRpcHead)); pHead->tcp = 1; SPacketInfo *pInfo = (SPacketInfo *)pHead->content; @@ -685,7 +720,7 @@ int taosSendPacketViaTcp(uint32_t ip, uint16_t port, char *data, int dataLen, vo pInfo->port = pSet->port; pInfo->msgLen = pHead->msgLen; - msgLen = sizeof(STaosHeader) + sizeof(SPacketInfo); + msgLen = sizeof(SRpcHead) + sizeof(SPacketInfo); pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); code = taosSendUdpData(ip, port, buffer, msgLen, chandle); tTrace("%s data from server will be sent via TCP:%hu, msgType:%d, length:%d, handle:0x%x", pSet->label, pInfo->port, @@ -696,16 +731,16 @@ int taosSendPacketViaTcp(uint32_t ip, uint16_t port, char *data, int dataLen, vo tTrace("%s data will be sent via TCP from client", pSet->label); // send a UDP header first to set up the connection - pHead = (STaosHeader *)buffer; - memcpy(pHead, data, sizeof(STaosHeader)); + pHead = (SRpcHead *)buffer; + memcpy(pHead, data, sizeof(SRpcHead)); pHead->tcp = 2; - msgLen = sizeof(STaosHeader); + msgLen = sizeof(SRpcHead); pHead->msgLen = (int32_t)htonl(msgLen); code = taosSendUdpData(ip, port, buffer, msgLen, chandle); - //pHead = (STaosHeader *)data; + //pHead = (SRpcHead *)data; tinet_ntoa(ipstr, ip); int fd = taosOpenTcpClientSocket(ipstr, pConn->port, tsLocalIp); @@ -762,10 +797,10 @@ int taosSendUdpData(uint32_t ip, uint16_t port, char *data, int dataLen, void *c pthread_mutex_lock(&pConn->mutex); - pBuf = (SUdpBuf *)taosGetIpHash(pConn->hash, ip, port); + pBuf = (SUdpBuf *)rpcGetIpHash(pConn->hash, ip, port); if (pBuf == NULL) { pBuf = taosCreateUdpBuf(pConn, ip, port); - taosAddIpHash(pConn->hash, pBuf, ip, port); + rpcAddIpHash(pConn->hash, pBuf, ip, port); } if ((pBuf->totalLen + dataLen > RPC_MAX_UDP_SIZE) || (taosMsgHdrSize(pBuf->msgHdr) >= RPC_MAX_UDP_PKTS)) { diff --git a/src/rpc/src/tmsghdr.c b/src/rpc/src/tmsghdr.c deleted file mode 100644 index a46f182b1d21b31e08c040b4990e74a27971b46e..0000000000000000000000000000000000000000 --- a/src/rpc/src/tmsghdr.c +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include "os.h" - -void taosFreeMsgHdr(void *hdr) { - struct msghdr *msgHdr = (struct msghdr *)hdr; - free(msgHdr->msg_iov); -} - -int taosMsgHdrSize(void *hdr) { - struct msghdr *msgHdr = (struct msghdr *)hdr; - return (int)msgHdr->msg_iovlen; -} - -void taosSendMsgHdr(void *hdr, int fd) { - struct msghdr *msgHdr = (struct msghdr *)hdr; - sendmsg(fd, msgHdr, 0); - msgHdr->msg_iovlen = 0; -} - -void taosInitMsgHdr(void **hdr, void *dest, int maxPkts) { - struct msghdr *msgHdr = (struct msghdr *)malloc(sizeof(struct msghdr)); - memset(msgHdr, 0, sizeof(struct msghdr)); - *hdr = msgHdr; - struct sockaddr_in *destAdd = (struct sockaddr_in *)dest; - - msgHdr->msg_name = destAdd; - msgHdr->msg_namelen = sizeof(struct sockaddr_in); - int size = (int)sizeof(struct iovec) * maxPkts; - msgHdr->msg_iov = (struct iovec *)malloc((size_t)size); - memset(msgHdr->msg_iov, 0, (size_t)size); -} - -void taosSetMsgHdrData(void *hdr, char *data, int dataLen) { - struct msghdr *msgHdr = (struct msghdr *)hdr; - msgHdr->msg_iov[msgHdr->msg_iovlen].iov_base = data; - msgHdr->msg_iov[msgHdr->msg_iovlen].iov_len = (size_t)dataLen; - msgHdr->msg_iovlen++; -} diff --git a/src/rpc/src/tstring.c b/src/util/src/tstring.c similarity index 100% rename from src/rpc/src/tstring.c rename to src/util/src/tstring.c