From 32e7e0e7d8c6adb6447a106a7a8456ec67fdfebb Mon Sep 17 00:00:00 2001 From: slguan Date: Wed, 19 Feb 2020 17:03:06 +0800 Subject: [PATCH] cmake file for rpc unittest --- src/CMakeLists.txt | 16 +++--- src/client/CMakeLists.txt | 4 +- src/client/inc/tsclient.h | 10 ++-- src/client/src/tscAsync.c | 1 + src/client/src/tscProfile.c | 2 +- src/client/src/tscServer.c | 104 ++++++++++++++++++++---------------- src/client/src/tscSql.c | 16 +++--- src/client/src/tscStream.c | 2 +- src/client/src/tscSystem.c | 49 ++++++----------- src/inc/taosmsg.h | 1 + src/inc/trpc.h | 1 + src/rpc/CMakeLists.txt | 3 ++ src/rpc/inc/thaship.h | 8 +++ src/rpc/inc/ttcpclient.h | 8 +++ src/rpc/inc/ttcpserver.h | 8 +++ src/rpc/inc/tudp.h | 8 +++ src/rpc/src/trpc.c | 1 + src/rpc/test/CMakeLists.txt | 15 ++++++ src/rpc/test/unittest.c | 46 ++++++++++++++++ 19 files changed, 197 insertions(+), 106 deletions(-) create mode 100644 src/rpc/test/CMakeLists.txt create mode 100644 src/rpc/test/unittest.c diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index d29213298a..8d4951b891 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -4,11 +4,11 @@ PROJECT(TDengine) ADD_SUBDIRECTORY(os) ADD_SUBDIRECTORY(util) ADD_SUBDIRECTORY(rpc) -ADD_SUBDIRECTORY(client) -ADD_SUBDIRECTORY(kit) -ADD_SUBDIRECTORY(plugins) -ADD_SUBDIRECTORY(sdb) -ADD_SUBDIRECTORY(mnode) -ADD_SUBDIRECTORY(dnode) -ADD_SUBDIRECTORY(vnode) -ADD_SUBDIRECTORY(connector/jdbc) +#ADD_SUBDIRECTORY(client) +#ADD_SUBDIRECTORY(kit) +#ADD_SUBDIRECTORY(plugins) +#ADD_SUBDIRECTORY(sdb) +#ADD_SUBDIRECTORY(mnode) +#ADD_SUBDIRECTORY(dnode) +#ADD_SUBDIRECTORY(vnode) +#ADD_SUBDIRECTORY(connector/jdbc) diff --git a/src/client/CMakeLists.txt b/src/client/CMakeLists.txt index 92d6b61eb2..55d6cb251c 100644 --- a/src/client/CMakeLists.txt +++ b/src/client/CMakeLists.txt @@ -4,8 +4,10 @@ PROJECT(TDengine) INCLUDE_DIRECTORIES(inc) INCLUDE_DIRECTORIES(jni) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc) +INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc) +INCLUDE_DIRECTORIES(${TD_ENTERPRISE_DIR}/src/inc) INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc) -AUX_SOURCE_DIRECTORY(./src SRC) +AUX_SOURCE_DIRECTORY(src SRC) IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/jni/linux) diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 3720a09459..ec839e2575 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -30,6 +30,7 @@ extern "C" { #include "taosdef.h" #include "tsqlfunction.h" #include "tutil.h" +#include "trpc.h" #define TSC_GET_RESPTR_BASE(res, _queryinfo, col, ord) \ (res->data + tscFieldInfoGetOffset(_queryinfo, col) * res->numOfRows) @@ -324,6 +325,7 @@ typedef struct _sql_obj { int64_t stime; uint32_t queryId; void * thandle; + SRpcIpSet ipSet; void * pStream; void * pSubscription; char * sqlstr; @@ -371,12 +373,6 @@ typedef struct _sstream { struct _sstream *prev, *next; } SSqlStream; -typedef struct { - char numOfIps; - uint32_t ip[TSDB_MAX_MGMT_IPS]; - char ipstr[TSDB_MAX_MGMT_IPS][TSDB_IPv4ADDR_LEN]; -} SIpStrList; - // tscSql API int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion); @@ -461,7 +457,7 @@ extern void * tscQhandle; extern int tscKeepConn[]; extern int tsInsertHeadSize; extern int tscNumOfThreads; -extern SIpStrList tscMgmtIpList; +extern SRpcIpSet tscMgmtIpList; typedef void (*__async_cb_func_t)(void *param, TAOS_RES *tres, int numOfRows); diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 94ebaefd36..a70a314298 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -25,6 +25,7 @@ #include "tscSQLParser.h" #include "tutil.h" #include "tnote.h" +#include "tsched.h" static void tscProcessFetchRow(SSchedMsg *pMsg); static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOfRows); diff --git a/src/client/src/tscProfile.c b/src/client/src/tscProfile.c index a7a774b3a8..c6abfabf93 100644 --- a/src/client/src/tscProfile.c +++ b/src/client/src/tscProfile.c @@ -279,7 +279,7 @@ void tscKillConnection(STscObj *pObj) { SSqlObj *pSql = pObj->sqlList; while (pSql) { - taosStopRpcConn(pSql->thandle); + //taosStopRpcConn(pSql->thandle); pSql = pSql->next; } diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 1fe4ba2979..fe385c3077 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -31,10 +31,14 @@ #define TSC_MGMT_VNODE 999 -SIpStrList tscMgmtIpList; +SRpcIpSet tscMgmtIpList; int tsMasterIndex = 0; int tsSlaveIndex = 1; +//temp +SRpcIpSet tscMgmtIpSet; +SRpcIpSet tscDnodeIpSet; + int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0}; int (*tscProcessMsgRsp[TSDB_SQL_MAX])(SSqlObj *pSql); @@ -53,7 +57,7 @@ void tscPrintMgmtIp() { tscError("invalid mgmt IP list:%d", tscMgmtIpList.numOfIps); } else { for (int i = 0; i < tscMgmtIpList.numOfIps; ++i) { - tscTrace("mgmt index:%d ip:%s", i, tscMgmtIpList.ipstr[i]); + tscTrace("mgmt index:%d ip:%s", i, tscMgmtIpList.ipStr[i]); } } } @@ -62,7 +66,7 @@ void tscSetMgmtIpListFromCluster(SIpList *pIpList) { tscMgmtIpList.numOfIps = pIpList->numOfIps; if (memcmp(tscMgmtIpList.ip, pIpList->ip, pIpList->numOfIps * 4) != 0) { for (int i = 0; i < pIpList->numOfIps; ++i) { - tinet_ntoa(tscMgmtIpList.ipstr[i], pIpList->ip[i]); + tinet_ntoa(tscMgmtIpList.ipStr[i], pIpList->ip[i]); tscMgmtIpList.ip[i] = pIpList->ip[i]; } tscTrace("cluster mgmt IP list:"); @@ -73,9 +77,9 @@ void tscSetMgmtIpListFromCluster(SIpList *pIpList) { void tscSetMgmtIpListFromEdge() { if (tscMgmtIpList.numOfIps != 2) { tscMgmtIpList.numOfIps = 2; - strcpy(tscMgmtIpList.ipstr[0], tsMasterIp); + strcpy(tscMgmtIpList.ipStr[0], tsMasterIp); tscMgmtIpList.ip[0] = inet_addr(tsMasterIp); - strcpy(tscMgmtIpList.ipstr[1], tsMasterIp); + strcpy(tscMgmtIpList.ipStr[1], tsMasterIp); tscMgmtIpList.ip[1] = inet_addr(tsMasterIp); tscTrace("edge mgmt IP list:"); tscPrintMgmtIp(); @@ -168,7 +172,7 @@ void tscProcessActivityTimer(void *handle, void *tmrId) { if (tscShouldFreeHeatBeat(pObj->pHb)) { tscTrace("%p free HB object and release connection, pConn:%p", pObj, pObj->pHb->thandle); - taosCloseRpcConn(pObj->pHb->thandle); + //taosCloseRpcConn(pObj->pHb->thandle); tscFreeSqlObj(pObj->pHb); tscCloseTscObj(pObj); @@ -178,6 +182,7 @@ void tscProcessActivityTimer(void *handle, void *tmrId) { tscProcessSql(pObj->pHb); } + void tscGetConnToMgmt(SSqlObj *pSql, uint8_t *pCode) { STscObj *pTscObj = pSql->pTscObj; if (pSql->retry < tscGetMgmtConnMaxRetryTimes()) { @@ -187,23 +192,24 @@ void tscGetConnToMgmt(SSqlObj *pSql, uint8_t *pCode) { if (pSql->cmd.command > TSDB_SQL_READ && pSql->index == 0) pSql->index = 1; void *thandle = taosGetConnFromCache(tscConnCache, tscMgmtIpList.ip[pSql->index], TSC_MGMT_VNODE, pTscObj->user); - if (thandle == NULL) { - SRpcConnInit connInit; - memset(&connInit, 0, sizeof(connInit)); - connInit.cid = 0; - connInit.sid = 0; - connInit.meterId = pSql->pTscObj->user; - connInit.peerId = 0; - connInit.shandle = pTscMgmtConn; - connInit.ahandle = pSql; - connInit.peerPort = tsMgmtShellPort; - connInit.spi = 1; - connInit.encrypt = 0; - connInit.secret = pSql->pTscObj->pass; - - connInit.peerIp = tscMgmtIpList.ipstr[pSql->index]; - thandle = taosOpenRpcConn(&connInit, pCode); - } + +// if (thandle == NULL) { +// SRpcConnInit connInit; +// memset(&connInit, 0, sizeof(connInit)); +// connInit.cid = 0; +// connInit.sid = 0; +// connInit.meterId = pSql->pTscObj->user; +// connInit.peerId = 0; +// connInit.shandle = pTscMgmtConn; +// connInit.ahandle = pSql; +// connInit.peerPort = tsMgmtShellPort; +// connInit.spi = 1; +// connInit.encrypt = 0; +// connInit.secret = pSql->pTscObj->pass; +// +// connInit.peerIp = tscMgmtIpList.ipstr[pSql->index]; +// thandle = taosOpenRpcConn(&connInit, pCode); +// } pSql->thandle = thandle; pSql->ip = tscMgmtIpList.ip[pSql->index]; @@ -267,23 +273,23 @@ void tscGetConnToVnode(SSqlObj *pSql, uint8_t *pCode) { void *thandle = taosGetConnFromCache(tscConnCache, pVPeersDesc[pSql->index].ip, pVPeersDesc[pSql->index].vnode, pTscObj->user); - if (thandle == NULL) { - SRpcConnInit connInit; - tinet_ntoa(ipstr, pVPeersDesc[pSql->index].ip); - memset(&connInit, 0, sizeof(connInit)); - connInit.cid = vidIndex; - connInit.sid = 0; - connInit.spi = 0; - connInit.encrypt = 0; - connInit.meterId = pSql->pTscObj->user; - connInit.peerId = htonl((pVPeersDesc[pSql->index].vnode << TSDB_SHELL_VNODE_BITS)); - connInit.shandle = pVnodeConn; - connInit.ahandle = pSql; - connInit.peerIp = ipstr; - connInit.peerPort = tsVnodeShellPort; - thandle = taosOpenRpcConn(&connInit, pCode); - vidIndex = (vidIndex + 1) % tscNumOfThreads; - } +// if (thandle == NULL) { +// SRpcConnInit connInit; +// tinet_ntoa(ipstr, pVPeersDesc[pSql->index].ip); +// memset(&connInit, 0, sizeof(connInit)); +// connInit.cid = vidIndex; +// connInit.sid = 0; +// connInit.spi = 0; +// connInit.encrypt = 0; +// connInit.meterId = pSql->pTscObj->user; +// connInit.peerId = htonl((pVPeersDesc[pSql->index].vnode << TSDB_SHELL_VNODE_BITS)); +// connInit.shandle = pVnodeConn; +// connInit.ahandle = pSql; +// connInit.peerIp = ipstr; +// connInit.peerPort = tsVnodeShellPort; +// thandle = taosOpenRpcConn(&connInit, pCode); +// vidIndex = (vidIndex + 1) % tscNumOfThreads; +// } pSql->thandle = thandle; pSql->ip = pVPeersDesc[pSql->index].ip; @@ -291,6 +297,8 @@ void tscGetConnToVnode(SSqlObj *pSql, uint8_t *pCode) { tscTrace("%p vnode:%d ip:%p index:%d is picked up, pConn:%p", pSql, pVPeersDesc[pSql->index].vnode, pVPeersDesc[pSql->index].ip, pSql->index, pSql->thandle); + //TODO fetch from vpeerdesc + pSql->ipSet = tscMgmtIpSet; break; } @@ -326,25 +334,29 @@ int tscSendMsgToServer(SSqlObj *pSql) { size_t totalLen = pSql->cmd.payloadLen + tsRpcHeadSize + sizeof(STaosDigest); // the memory will be released by taosProcessResponse, so no memory leak here - char *buf = malloc(totalLen); - if (NULL == buf) { + char *pStart = rpcMallocCont(pSql->cmd.payloadLen); + if (NULL == pStart) { tscError("%p msg:%s malloc fail", pSql, taosMsg[pSql->cmd.msgType]); return TSDB_CODE_CLI_OUT_OF_MEMORY; } - memcpy(buf, pSql->cmd.payload, totalLen); + memcpy(pStart, pSql->cmd.payload + tsRpcHeadSize, pSql->cmd.payloadLen); tscTrace("%p msg:%s is sent to server", pSql, taosMsg[pSql->cmd.msgType]); - char *pStart = taosBuildReqHeader(pSql->thandle, pSql->cmd.msgType, buf); if (pStart) { /* * this SQL object may be released by other thread due to the completion of this query even before the log * is dumped to log file. So the signature needs to be kept in a local variable. */ uint64_t signature = (uint64_t)pSql->signature; - if (tscUpdateVnodeMsg[pSql->cmd.command]) (*tscUpdateVnodeMsg[pSql->cmd.command])(pSql, buf); + //if (tscUpdateVnodeMsg[pSql->cmd.command]) (*tscUpdateVnodeMsg[pSql->cmd.command])(pSql, pStart); + + int ret; + if (pSql->cmd.command < TSDB_SQL_MGMT) + ret = rpcSendRequest(pTscMgmtConn, pSql->cmd.msgType, pStart, pSql->cmd.payloadLen, pSql); + else + ret = rpcSendRequest(pVnodeConn, pSql->cmd.msgType, pStart, pSql->cmd.payloadLen, pSql); - int ret = taosSendMsgToPeerH(pSql->thandle, pStart, pSql->cmd.payloadLen, pSql); if (ret >= 0) { code = 0; } diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 3b30c9ccb6..ddccbccb23 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -64,15 +64,15 @@ TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const } if (ip && ip[0]) { - tscMgmtIpList.numOfIps = 4; - strcpy(tscMgmtIpList.ipstr[0], ip); + tscMgmtIpList.numOfIps = 3; + strcpy(tscMgmtIpList.ipStr[0], ip); tscMgmtIpList.ip[0] = inet_addr(ip); - strcpy(tscMgmtIpList.ipstr[1], ip); - tscMgmtIpList.ip[1] = inet_addr(ip); - strcpy(tscMgmtIpList.ipstr[2], tsMasterIp); - tscMgmtIpList.ip[2] = inet_addr(tsMasterIp); - strcpy(tscMgmtIpList.ipstr[3], tsSecondIp); - tscMgmtIpList.ip[3] = inet_addr(tsSecondIp); + strcpy(tscMgmtIpList.ipStr[1], tsMasterIp); + tscMgmtIpList.ip[1] = inet_addr(tsMasterIp); + strcpy(tscMgmtIpList.ipStr[2], tsSecondIp); + tscMgmtIpList.ip[2] = inet_addr(tsSecondIp); + tscMgmtIpList.index = 0; + tscMgmtIpList.port = tsMgmtShellPort; } pObj = (STscObj *)malloc(sizeof(STscObj)); diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index 1b5b55352e..79b524be0f 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -19,7 +19,7 @@ #include "ttime.h" #include "ttimer.h" #include "tutil.h" - +#include "tsched.h" #include "taosmsg.h" #include "tscUtil.h" #include "tsclient.h" diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index e07f459cf4..b9c0ae2018 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -24,8 +24,9 @@ #include "ttime.h" #include "ttimer.h" #include "tutil.h" - +#include "tsched.h" #include "tsclient.h" + // global, not configurable void * pVnodeConn; void * pVMeterConn; @@ -94,18 +95,17 @@ void taos_init_imp() { if (tsTscEnableRecordSql != 0) { taosInitNote(tsNumOfLogLines / 10, 1, (char*)"tsc_note"); } - - tscMgmtIpList.numOfIps = 2; - strcpy(tscMgmtIpList.ipstr[0], tsMasterIp); - tscMgmtIpList.ip[0] = inet_addr(tsMasterIp); - strcpy(tscMgmtIpList.ipstr[1], tsMasterIp); - tscMgmtIpList.ip[1] = inet_addr(tsMasterIp); + tscMgmtIpList.index = 0; + tscMgmtIpList.port = tsMgmtShellPort; + tscMgmtIpList.numOfIps = 1; + strcpy(tscMgmtIpList.ipStr[0], tsMasterIp); + tscMgmtIpList.ip[0] = inet_addr(tsMasterIp); if (tsSecondIp[0]) { - tscMgmtIpList.numOfIps = 3; - strcpy(tscMgmtIpList.ipstr[2], tsSecondIp); - tscMgmtIpList.ip[2] = inet_addr(tsSecondIp); + tscMgmtIpList.numOfIps = 2; + strcpy(tscMgmtIpList.ipStr[1], tsSecondIp); + tscMgmtIpList.ip[1] = inet_addr(tsSecondIp); } tscInitMsgs(); @@ -132,42 +132,23 @@ void taos_init_imp() { rpcInit.label = "TSC-vnode"; rpcInit.numOfThreads = tscNumOfThreads; rpcInit.fp = tscProcessMsgFromServer; - rpcInit.bits = 20; - rpcInit.numOfChanns = tscNumOfThreads; - rpcInit.sessionsPerChann = tsMaxVnodeConnections / tscNumOfThreads; - rpcInit.idMgmt = TAOS_ID_FREE; - rpcInit.noFree = 0; + rpcInit.sessions = tsMaxVnodeConnections; rpcInit.connType = TAOS_CONN_SOCKET_TYPE_C(); - rpcInit.qhandle = tscQhandle; - pVnodeConn = taosOpenRpc(&rpcInit); + pVnodeConn = rpcOpen(&rpcInit); if (pVnodeConn == NULL) { tscError("failed to init connection to vnode"); return; } - for (int i = 0; i < tscNumOfThreads; ++i) { - int retVal = taosOpenRpcChann(pVnodeConn, i, rpcInit.sessionsPerChann); - if (0 != retVal) { - tError("TSC-vnode, failed to open rpc chann"); - taosCloseRpc(pVnodeConn); - return; - } - } - memset(&rpcInit, 0, sizeof(rpcInit)); rpcInit.localIp = tsLocalIp; rpcInit.localPort = 0; rpcInit.label = "TSC-mgmt"; rpcInit.numOfThreads = 1; rpcInit.fp = tscProcessMsgFromServer; - rpcInit.bits = 20; - rpcInit.numOfChanns = 1; - rpcInit.sessionsPerChann = tsMaxMgmtConnections; - rpcInit.idMgmt = TAOS_ID_FREE; - rpcInit.noFree = 0; + rpcInit.sessions = tsMaxMgmtConnections; rpcInit.connType = TAOS_CONN_SOCKET_TYPE_C(); - rpcInit.qhandle = tscQhandle; - pTscMgmtConn = taosOpenRpc(&rpcInit); + pTscMgmtConn = rpcOpen(&rpcInit); if (pTscMgmtConn == NULL) { tscError("failed to init connection to mgmt"); return; @@ -183,7 +164,7 @@ void taos_init_imp() { if (tscCacheHandle == NULL) tscCacheHandle = taosInitDataCache(tsMaxMeterConnections / 2, tscTmr, refreshTime); - tscConnCache = taosOpenConnCache(tsMaxMeterConnections * 2, taosCloseRpcConn, tscTmr, tsShellActivityTimer * 1000); + tscConnCache = taosOpenConnCache(tsMaxMeterConnections * 2, NULL/*taosCloseRpcConn*/, tscTmr, tsShellActivityTimer * 1000); initialized = 1; tscTrace("client is initialized successfully"); diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 7a37c54f81..0c4fa999b9 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -688,6 +688,7 @@ typedef struct { typedef struct { int32_t dnode; //the ID of dnode int32_t vnode; //the index of vnode + uint32_t ip; } SVPeerDesc; typedef struct { diff --git a/src/inc/trpc.h b/src/inc/trpc.h index 86d57385ad..666347a8c9 100644 --- a/src/inc/trpc.h +++ b/src/inc/trpc.h @@ -21,6 +21,7 @@ extern "C" { #include #include +#include "taosdef.h" #define TAOS_CONN_UDPS 0 #define TAOS_CONN_UDPC 1 diff --git a/src/rpc/CMakeLists.txt b/src/rpc/CMakeLists.txt index 70511db533..e8768c10dd 100644 --- a/src/rpc/CMakeLists.txt +++ b/src/rpc/CMakeLists.txt @@ -23,3 +23,6 @@ ENDIF () ADD_LIBRARY(trpc ${SRC}) TARGET_LINK_LIBRARIES(trpc tutil) + +ADD_SUBDIRECTORY(test) + diff --git a/src/rpc/inc/thaship.h b/src/rpc/inc/thaship.h index 4acf8b3fbb..9d4396ce6a 100644 --- a/src/rpc/inc/thaship.h +++ b/src/rpc/inc/thaship.h @@ -16,10 +16,18 @@ #ifndef _rpc_hash_ip_header_ #define _rpc_hash_ip_header_ +#ifdef __cplusplus +extern "C" { +#endif + void *taosOpenIpHash(int maxSessions); void taosCloseIpHash(void *handle); void *taosAddIpHash(void *handle, void *pData, uint32_t ip, uint16_t port); void taosDeleteIpHash(void *handle, uint32_t ip, uint16_t port); void *taosGetIpHash(void *handle, uint32_t ip, uint16_t port); +#ifdef __cplusplus +} +#endif + #endif diff --git a/src/rpc/inc/ttcpclient.h b/src/rpc/inc/ttcpclient.h index 1246b2560e..952d1c4a0e 100644 --- a/src/rpc/inc/ttcpclient.h +++ b/src/rpc/inc/ttcpclient.h @@ -16,6 +16,10 @@ #ifndef _taos_tcp_client_header_ #define _taos_tcp_client_header_ +#ifdef __cplusplus +extern "C" { +#endif + #include "taosdef.h" void *taosInitTcpClient(char *ip, uint16_t port, char *label, int num, void *fp, void *shandle); @@ -24,4 +28,8 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, char *ip, uint16 void taosCloseTcpClientConnection(void *chandle); int taosSendTcpClientData(uint32_t ip, uint16_t port, char *data, int len, void *chandle); +#ifdef __cplusplus +} +#endif + #endif diff --git a/src/rpc/inc/ttcpserver.h b/src/rpc/inc/ttcpserver.h index b62949e73e..299adb3169 100644 --- a/src/rpc/inc/ttcpserver.h +++ b/src/rpc/inc/ttcpserver.h @@ -16,6 +16,10 @@ #ifndef _taos_tcp_server_header_ #define _taos_tcp_server_header_ +#ifdef __cplusplus +extern "C" { +#endif + #include "taosdef.h" void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle); @@ -23,4 +27,8 @@ void taosCleanUpTcpServer(void *param); void taosCloseTcpServerConnection(void *param); int taosSendTcpServerData(uint32_t ip, uint16_t port, char *data, int len, void *chandle); +#ifdef __cplusplus +} +#endif + #endif diff --git a/src/rpc/inc/tudp.h b/src/rpc/inc/tudp.h index 647d54bade..cb2f8d2b10 100644 --- a/src/rpc/inc/tudp.h +++ b/src/rpc/inc/tudp.h @@ -16,6 +16,10 @@ #ifndef _taos_udp_header_ #define _taos_udp_header_ +#ifdef __cplusplus +extern "C" { +#endif + #include "taosdef.h" void *taosInitUdpServer(char *ip, uint16_t port, char *label, int, void *fp, void *shandle); @@ -30,4 +34,8 @@ void taosSendMsgHdr(void *hdr, int fd); void taosInitMsgHdr(void **hdr, void *dest, int maxPkts); void taosSetMsgHdrData(void *hdr, char *data, int dataLen); +#ifdef __cplusplus +} +#endif + #endif diff --git a/src/rpc/src/trpc.c b/src/rpc/src/trpc.c index dd881f0692..345e66732b 100755 --- a/src/rpc/src/trpc.c +++ b/src/rpc/src/trpc.c @@ -30,6 +30,7 @@ #include "lz4.h" #include "tconncache.h" #include "trpc.h" +#include "taoserror.h" #define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHeader) + sizeof(SRpcDigest)) #define rpcHeaderFromCont(cont) ((SRpcHeader *) (cont - sizeof(SRpcHeader))) diff --git a/src/rpc/test/CMakeLists.txt b/src/rpc/test/CMakeLists.txt new file mode 100644 index 0000000000..f1e7a1cd39 --- /dev/null +++ b/src/rpc/test/CMakeLists.txt @@ -0,0 +1,15 @@ +CMAKE_MINIMUM_REQUIRED(VERSION 2.8) +PROJECT(TDengine) + +IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) + INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc) + INCLUDE_DIRECTORIES(../inc) + AUX_SOURCE_DIRECTORY(./ TEST_SRC) + + ADD_EXECUTABLE(rpcTest ${TEST_SRC}) + TARGET_LINK_LIBRARIES(rpcTest trpc) +ENDIF () + + diff --git a/src/rpc/test/unittest.c b/src/rpc/test/unittest.c new file mode 100644 index 0000000000..262c93a62b --- /dev/null +++ b/src/rpc/test/unittest.c @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +//#define _DEFAULT_SOURCE +#include "os.h" +#include "tlog.h" +#include "trpc.h" +#include + +int32_t main(int32_t argc, char *argv[]) { + dPrint("unit test for rpc module"); + + SRpcInit rpcInit; + memset(&rpcInit, 0, sizeof(rpcInit)); + rpcInit.localIp = "0.0.0.0"; + rpcInit.localPort = 7000; + rpcInit.label = "unittest"; + rpcInit.numOfThreads = 1; + rpcInit.fp = NULL; + rpcInit.sessions = 1000; + rpcInit.connType = TAOS_CONN_SOCKET_TYPE_S(); + rpcInit.idleTime = 2000; + + void *pConn = rpcOpen(&rpcInit); + if (pConn != NULL) { + dPrint("conection is opened"); + } else { + dError("failed to initialize rpc"); + } + + return 0; +} + + -- GitLab