diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index d29213298a1de4bae3e50830d6fffad563d4dfa8..8d4951b89131bdbc40fd3276010dd0103672e57f 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -4,11 +4,11 @@ PROJECT(TDengine) ADD_SUBDIRECTORY(os) ADD_SUBDIRECTORY(util) ADD_SUBDIRECTORY(rpc) -ADD_SUBDIRECTORY(client) -ADD_SUBDIRECTORY(kit) -ADD_SUBDIRECTORY(plugins) -ADD_SUBDIRECTORY(sdb) -ADD_SUBDIRECTORY(mnode) -ADD_SUBDIRECTORY(dnode) -ADD_SUBDIRECTORY(vnode) -ADD_SUBDIRECTORY(connector/jdbc) +#ADD_SUBDIRECTORY(client) +#ADD_SUBDIRECTORY(kit) +#ADD_SUBDIRECTORY(plugins) +#ADD_SUBDIRECTORY(sdb) +#ADD_SUBDIRECTORY(mnode) +#ADD_SUBDIRECTORY(dnode) +#ADD_SUBDIRECTORY(vnode) +#ADD_SUBDIRECTORY(connector/jdbc) diff --git a/src/client/CMakeLists.txt b/src/client/CMakeLists.txt index 92d6b61eb2473c790c967a4a0091e233de84b8fa..55d6cb251caa839e85b023ea24193d1074262ba4 100644 --- a/src/client/CMakeLists.txt +++ b/src/client/CMakeLists.txt @@ -4,8 +4,10 @@ PROJECT(TDengine) INCLUDE_DIRECTORIES(inc) INCLUDE_DIRECTORIES(jni) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc) +INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc) +INCLUDE_DIRECTORIES(${TD_ENTERPRISE_DIR}/src/inc) INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc) -AUX_SOURCE_DIRECTORY(./src SRC) +AUX_SOURCE_DIRECTORY(src SRC) IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/jni/linux) diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 3720a09459af4d0275a5c409c5eb5057dcfe3c81..ec839e25758793f7ada6a12684274903f6bfdfa0 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -30,6 +30,7 @@ extern "C" { #include "taosdef.h" #include "tsqlfunction.h" #include "tutil.h" +#include "trpc.h" #define TSC_GET_RESPTR_BASE(res, _queryinfo, col, ord) \ (res->data + tscFieldInfoGetOffset(_queryinfo, col) * res->numOfRows) @@ -324,6 +325,7 @@ typedef struct _sql_obj { int64_t stime; uint32_t queryId; void * thandle; + SRpcIpSet ipSet; void * pStream; void * pSubscription; char * sqlstr; @@ -371,12 +373,6 @@ typedef struct _sstream { struct _sstream *prev, *next; } SSqlStream; -typedef struct { - char numOfIps; - uint32_t ip[TSDB_MAX_MGMT_IPS]; - char ipstr[TSDB_MAX_MGMT_IPS][TSDB_IPv4ADDR_LEN]; -} SIpStrList; - // tscSql API int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion); @@ -461,7 +457,7 @@ extern void * tscQhandle; extern int tscKeepConn[]; extern int tsInsertHeadSize; extern int tscNumOfThreads; -extern SIpStrList tscMgmtIpList; +extern SRpcIpSet tscMgmtIpList; typedef void (*__async_cb_func_t)(void *param, TAOS_RES *tres, int numOfRows); diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 94ebaefd369975c6873e6dce7ff36c3b513ad91e..a70a314298ca870e927ae1f7ff3629d0b925747d 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -25,6 +25,7 @@ #include "tscSQLParser.h" #include "tutil.h" #include "tnote.h" +#include "tsched.h" static void tscProcessFetchRow(SSchedMsg *pMsg); static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOfRows); diff --git a/src/client/src/tscProfile.c b/src/client/src/tscProfile.c index a7a774b3a8ce71a608d15ec9a71f931a7a59a06a..c6abfabf93370bac4995b6d8f6384b4fc98a6273 100644 --- a/src/client/src/tscProfile.c +++ b/src/client/src/tscProfile.c @@ -279,7 +279,7 @@ void tscKillConnection(STscObj *pObj) { SSqlObj *pSql = pObj->sqlList; while (pSql) { - taosStopRpcConn(pSql->thandle); + //taosStopRpcConn(pSql->thandle); pSql = pSql->next; } diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 1fe4ba2979a507b7044c581450db1cb0fbdb01ed..fe385c30770742a45be4746b73b5c7f43c8aeea4 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -31,10 +31,14 @@ #define TSC_MGMT_VNODE 999 -SIpStrList tscMgmtIpList; +SRpcIpSet tscMgmtIpList; int tsMasterIndex = 0; int tsSlaveIndex = 1; +//temp +SRpcIpSet tscMgmtIpSet; +SRpcIpSet tscDnodeIpSet; + int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0}; int (*tscProcessMsgRsp[TSDB_SQL_MAX])(SSqlObj *pSql); @@ -53,7 +57,7 @@ void tscPrintMgmtIp() { tscError("invalid mgmt IP list:%d", tscMgmtIpList.numOfIps); } else { for (int i = 0; i < tscMgmtIpList.numOfIps; ++i) { - tscTrace("mgmt index:%d ip:%s", i, tscMgmtIpList.ipstr[i]); + tscTrace("mgmt index:%d ip:%s", i, tscMgmtIpList.ipStr[i]); } } } @@ -62,7 +66,7 @@ void tscSetMgmtIpListFromCluster(SIpList *pIpList) { tscMgmtIpList.numOfIps = pIpList->numOfIps; if (memcmp(tscMgmtIpList.ip, pIpList->ip, pIpList->numOfIps * 4) != 0) { for (int i = 0; i < pIpList->numOfIps; ++i) { - tinet_ntoa(tscMgmtIpList.ipstr[i], pIpList->ip[i]); + tinet_ntoa(tscMgmtIpList.ipStr[i], pIpList->ip[i]); tscMgmtIpList.ip[i] = pIpList->ip[i]; } tscTrace("cluster mgmt IP list:"); @@ -73,9 +77,9 @@ void tscSetMgmtIpListFromCluster(SIpList *pIpList) { void tscSetMgmtIpListFromEdge() { if (tscMgmtIpList.numOfIps != 2) { tscMgmtIpList.numOfIps = 2; - strcpy(tscMgmtIpList.ipstr[0], tsMasterIp); + strcpy(tscMgmtIpList.ipStr[0], tsMasterIp); tscMgmtIpList.ip[0] = inet_addr(tsMasterIp); - strcpy(tscMgmtIpList.ipstr[1], tsMasterIp); + strcpy(tscMgmtIpList.ipStr[1], tsMasterIp); tscMgmtIpList.ip[1] = inet_addr(tsMasterIp); tscTrace("edge mgmt IP list:"); tscPrintMgmtIp(); @@ -168,7 +172,7 @@ void tscProcessActivityTimer(void *handle, void *tmrId) { if (tscShouldFreeHeatBeat(pObj->pHb)) { tscTrace("%p free HB object and release connection, pConn:%p", pObj, pObj->pHb->thandle); - taosCloseRpcConn(pObj->pHb->thandle); + //taosCloseRpcConn(pObj->pHb->thandle); tscFreeSqlObj(pObj->pHb); tscCloseTscObj(pObj); @@ -178,6 +182,7 @@ void tscProcessActivityTimer(void *handle, void *tmrId) { tscProcessSql(pObj->pHb); } + void tscGetConnToMgmt(SSqlObj *pSql, uint8_t *pCode) { STscObj *pTscObj = pSql->pTscObj; if (pSql->retry < tscGetMgmtConnMaxRetryTimes()) { @@ -187,23 +192,24 @@ void tscGetConnToMgmt(SSqlObj *pSql, uint8_t *pCode) { if (pSql->cmd.command > TSDB_SQL_READ && pSql->index == 0) pSql->index = 1; void *thandle = taosGetConnFromCache(tscConnCache, tscMgmtIpList.ip[pSql->index], TSC_MGMT_VNODE, pTscObj->user); - if (thandle == NULL) { - SRpcConnInit connInit; - memset(&connInit, 0, sizeof(connInit)); - connInit.cid = 0; - connInit.sid = 0; - connInit.meterId = pSql->pTscObj->user; - connInit.peerId = 0; - connInit.shandle = pTscMgmtConn; - connInit.ahandle = pSql; - connInit.peerPort = tsMgmtShellPort; - connInit.spi = 1; - connInit.encrypt = 0; - connInit.secret = pSql->pTscObj->pass; - - connInit.peerIp = tscMgmtIpList.ipstr[pSql->index]; - thandle = taosOpenRpcConn(&connInit, pCode); - } + +// if (thandle == NULL) { +// SRpcConnInit connInit; +// memset(&connInit, 0, sizeof(connInit)); +// connInit.cid = 0; +// connInit.sid = 0; +// connInit.meterId = pSql->pTscObj->user; +// connInit.peerId = 0; +// connInit.shandle = pTscMgmtConn; +// connInit.ahandle = pSql; +// connInit.peerPort = tsMgmtShellPort; +// connInit.spi = 1; +// connInit.encrypt = 0; +// connInit.secret = pSql->pTscObj->pass; +// +// connInit.peerIp = tscMgmtIpList.ipstr[pSql->index]; +// thandle = taosOpenRpcConn(&connInit, pCode); +// } pSql->thandle = thandle; pSql->ip = tscMgmtIpList.ip[pSql->index]; @@ -267,23 +273,23 @@ void tscGetConnToVnode(SSqlObj *pSql, uint8_t *pCode) { void *thandle = taosGetConnFromCache(tscConnCache, pVPeersDesc[pSql->index].ip, pVPeersDesc[pSql->index].vnode, pTscObj->user); - if (thandle == NULL) { - SRpcConnInit connInit; - tinet_ntoa(ipstr, pVPeersDesc[pSql->index].ip); - memset(&connInit, 0, sizeof(connInit)); - connInit.cid = vidIndex; - connInit.sid = 0; - connInit.spi = 0; - connInit.encrypt = 0; - connInit.meterId = pSql->pTscObj->user; - connInit.peerId = htonl((pVPeersDesc[pSql->index].vnode << TSDB_SHELL_VNODE_BITS)); - connInit.shandle = pVnodeConn; - connInit.ahandle = pSql; - connInit.peerIp = ipstr; - connInit.peerPort = tsVnodeShellPort; - thandle = taosOpenRpcConn(&connInit, pCode); - vidIndex = (vidIndex + 1) % tscNumOfThreads; - } +// if (thandle == NULL) { +// SRpcConnInit connInit; +// tinet_ntoa(ipstr, pVPeersDesc[pSql->index].ip); +// memset(&connInit, 0, sizeof(connInit)); +// connInit.cid = vidIndex; +// connInit.sid = 0; +// connInit.spi = 0; +// connInit.encrypt = 0; +// connInit.meterId = pSql->pTscObj->user; +// connInit.peerId = htonl((pVPeersDesc[pSql->index].vnode << TSDB_SHELL_VNODE_BITS)); +// connInit.shandle = pVnodeConn; +// connInit.ahandle = pSql; +// connInit.peerIp = ipstr; +// connInit.peerPort = tsVnodeShellPort; +// thandle = taosOpenRpcConn(&connInit, pCode); +// vidIndex = (vidIndex + 1) % tscNumOfThreads; +// } pSql->thandle = thandle; pSql->ip = pVPeersDesc[pSql->index].ip; @@ -291,6 +297,8 @@ void tscGetConnToVnode(SSqlObj *pSql, uint8_t *pCode) { tscTrace("%p vnode:%d ip:%p index:%d is picked up, pConn:%p", pSql, pVPeersDesc[pSql->index].vnode, pVPeersDesc[pSql->index].ip, pSql->index, pSql->thandle); + //TODO fetch from vpeerdesc + pSql->ipSet = tscMgmtIpSet; break; } @@ -326,25 +334,29 @@ int tscSendMsgToServer(SSqlObj *pSql) { size_t totalLen = pSql->cmd.payloadLen + tsRpcHeadSize + sizeof(STaosDigest); // the memory will be released by taosProcessResponse, so no memory leak here - char *buf = malloc(totalLen); - if (NULL == buf) { + char *pStart = rpcMallocCont(pSql->cmd.payloadLen); + if (NULL == pStart) { tscError("%p msg:%s malloc fail", pSql, taosMsg[pSql->cmd.msgType]); return TSDB_CODE_CLI_OUT_OF_MEMORY; } - memcpy(buf, pSql->cmd.payload, totalLen); + memcpy(pStart, pSql->cmd.payload + tsRpcHeadSize, pSql->cmd.payloadLen); tscTrace("%p msg:%s is sent to server", pSql, taosMsg[pSql->cmd.msgType]); - char *pStart = taosBuildReqHeader(pSql->thandle, pSql->cmd.msgType, buf); if (pStart) { /* * this SQL object may be released by other thread due to the completion of this query even before the log * is dumped to log file. So the signature needs to be kept in a local variable. */ uint64_t signature = (uint64_t)pSql->signature; - if (tscUpdateVnodeMsg[pSql->cmd.command]) (*tscUpdateVnodeMsg[pSql->cmd.command])(pSql, buf); + //if (tscUpdateVnodeMsg[pSql->cmd.command]) (*tscUpdateVnodeMsg[pSql->cmd.command])(pSql, pStart); + + int ret; + if (pSql->cmd.command < TSDB_SQL_MGMT) + ret = rpcSendRequest(pTscMgmtConn, pSql->cmd.msgType, pStart, pSql->cmd.payloadLen, pSql); + else + ret = rpcSendRequest(pVnodeConn, pSql->cmd.msgType, pStart, pSql->cmd.payloadLen, pSql); - int ret = taosSendMsgToPeerH(pSql->thandle, pStart, pSql->cmd.payloadLen, pSql); if (ret >= 0) { code = 0; } diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 3b30c9ccb69189bc2ed5b736f01cae621dcf26a9..ddccbccb23a96e675e8c1ddbc7ba4afcee7357f8 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -64,15 +64,15 @@ TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const } if (ip && ip[0]) { - tscMgmtIpList.numOfIps = 4; - strcpy(tscMgmtIpList.ipstr[0], ip); + tscMgmtIpList.numOfIps = 3; + strcpy(tscMgmtIpList.ipStr[0], ip); tscMgmtIpList.ip[0] = inet_addr(ip); - strcpy(tscMgmtIpList.ipstr[1], ip); - tscMgmtIpList.ip[1] = inet_addr(ip); - strcpy(tscMgmtIpList.ipstr[2], tsMasterIp); - tscMgmtIpList.ip[2] = inet_addr(tsMasterIp); - strcpy(tscMgmtIpList.ipstr[3], tsSecondIp); - tscMgmtIpList.ip[3] = inet_addr(tsSecondIp); + strcpy(tscMgmtIpList.ipStr[1], tsMasterIp); + tscMgmtIpList.ip[1] = inet_addr(tsMasterIp); + strcpy(tscMgmtIpList.ipStr[2], tsSecondIp); + tscMgmtIpList.ip[2] = inet_addr(tsSecondIp); + tscMgmtIpList.index = 0; + tscMgmtIpList.port = tsMgmtShellPort; } pObj = (STscObj *)malloc(sizeof(STscObj)); diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index 1b5b55352ebca20ec8d4496b76072bba32139568..79b524be0f8fd9b18847e03e9420941c6ec0c05c 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -19,7 +19,7 @@ #include "ttime.h" #include "ttimer.h" #include "tutil.h" - +#include "tsched.h" #include "taosmsg.h" #include "tscUtil.h" #include "tsclient.h" diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index e07f459cf4575a0b3c7ae0b2df6668cb2dd9c108..b9c0ae2018d5a32a0bfaf326b4e946caee0d597c 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -24,8 +24,9 @@ #include "ttime.h" #include "ttimer.h" #include "tutil.h" - +#include "tsched.h" #include "tsclient.h" + // global, not configurable void * pVnodeConn; void * pVMeterConn; @@ -94,18 +95,17 @@ void taos_init_imp() { if (tsTscEnableRecordSql != 0) { taosInitNote(tsNumOfLogLines / 10, 1, (char*)"tsc_note"); } - - tscMgmtIpList.numOfIps = 2; - strcpy(tscMgmtIpList.ipstr[0], tsMasterIp); - tscMgmtIpList.ip[0] = inet_addr(tsMasterIp); - strcpy(tscMgmtIpList.ipstr[1], tsMasterIp); - tscMgmtIpList.ip[1] = inet_addr(tsMasterIp); + tscMgmtIpList.index = 0; + tscMgmtIpList.port = tsMgmtShellPort; + tscMgmtIpList.numOfIps = 1; + strcpy(tscMgmtIpList.ipStr[0], tsMasterIp); + tscMgmtIpList.ip[0] = inet_addr(tsMasterIp); if (tsSecondIp[0]) { - tscMgmtIpList.numOfIps = 3; - strcpy(tscMgmtIpList.ipstr[2], tsSecondIp); - tscMgmtIpList.ip[2] = inet_addr(tsSecondIp); + tscMgmtIpList.numOfIps = 2; + strcpy(tscMgmtIpList.ipStr[1], tsSecondIp); + tscMgmtIpList.ip[1] = inet_addr(tsSecondIp); } tscInitMsgs(); @@ -132,42 +132,23 @@ void taos_init_imp() { rpcInit.label = "TSC-vnode"; rpcInit.numOfThreads = tscNumOfThreads; rpcInit.fp = tscProcessMsgFromServer; - rpcInit.bits = 20; - rpcInit.numOfChanns = tscNumOfThreads; - rpcInit.sessionsPerChann = tsMaxVnodeConnections / tscNumOfThreads; - rpcInit.idMgmt = TAOS_ID_FREE; - rpcInit.noFree = 0; + rpcInit.sessions = tsMaxVnodeConnections; rpcInit.connType = TAOS_CONN_SOCKET_TYPE_C(); - rpcInit.qhandle = tscQhandle; - pVnodeConn = taosOpenRpc(&rpcInit); + pVnodeConn = rpcOpen(&rpcInit); if (pVnodeConn == NULL) { tscError("failed to init connection to vnode"); return; } - for (int i = 0; i < tscNumOfThreads; ++i) { - int retVal = taosOpenRpcChann(pVnodeConn, i, rpcInit.sessionsPerChann); - if (0 != retVal) { - tError("TSC-vnode, failed to open rpc chann"); - taosCloseRpc(pVnodeConn); - return; - } - } - memset(&rpcInit, 0, sizeof(rpcInit)); rpcInit.localIp = tsLocalIp; rpcInit.localPort = 0; rpcInit.label = "TSC-mgmt"; rpcInit.numOfThreads = 1; rpcInit.fp = tscProcessMsgFromServer; - rpcInit.bits = 20; - rpcInit.numOfChanns = 1; - rpcInit.sessionsPerChann = tsMaxMgmtConnections; - rpcInit.idMgmt = TAOS_ID_FREE; - rpcInit.noFree = 0; + rpcInit.sessions = tsMaxMgmtConnections; rpcInit.connType = TAOS_CONN_SOCKET_TYPE_C(); - rpcInit.qhandle = tscQhandle; - pTscMgmtConn = taosOpenRpc(&rpcInit); + pTscMgmtConn = rpcOpen(&rpcInit); if (pTscMgmtConn == NULL) { tscError("failed to init connection to mgmt"); return; @@ -183,7 +164,7 @@ void taos_init_imp() { if (tscCacheHandle == NULL) tscCacheHandle = taosInitDataCache(tsMaxMeterConnections / 2, tscTmr, refreshTime); - tscConnCache = taosOpenConnCache(tsMaxMeterConnections * 2, taosCloseRpcConn, tscTmr, tsShellActivityTimer * 1000); + tscConnCache = taosOpenConnCache(tsMaxMeterConnections * 2, NULL/*taosCloseRpcConn*/, tscTmr, tsShellActivityTimer * 1000); initialized = 1; tscTrace("client is initialized successfully"); diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index debb1a9a6f875a2739cc331f64608a8263346d2b..f153a9a5739ebbae5c84b4a36065776472aa8083 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -26,7 +26,7 @@ extern "C" { #ifdef TAOS_ERROR_C #define TAOS_DEFINE_ERROR(name, mod, code, msg) {.val = (0x80000000 | ((mod)<<16) | (code)), .str=(msg)}, #else -#define TAOS_DEFINE_ERROR(name, mod, code, msg) const int32_t name = (0x80000000 | ((mod)<<16) | (code)); +#define TAOS_DEFINE_ERROR(name, mod, code, msg) static const int32_t name = (0x80000000 | ((mod)<<16) | (code)); #endif #define TAOS_SYSTEM_ERROR(code) (0x80ff0000 | (code)) diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 7a37c54f81c62a3b8a172cc8be86e67b8187cbbb..0c4fa999b9b358d67d338681ea2be85f394fe7c4 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -688,6 +688,7 @@ typedef struct { typedef struct { int32_t dnode; //the ID of dnode int32_t vnode; //the index of vnode + uint32_t ip; } SVPeerDesc; typedef struct { diff --git a/src/inc/trpc.h b/src/inc/trpc.h index 86d57385add4fd2cd67e4163422220296eeaf941..cb5a8aeaa3ac4abb612a0ca5e5ae6d59f9f4dd88 100644 --- a/src/inc/trpc.h +++ b/src/inc/trpc.h @@ -21,6 +21,7 @@ extern "C" { #include #include +#include "taosdef.h" #define TAOS_CONN_UDPS 0 #define TAOS_CONN_UDPC 1 @@ -37,38 +38,46 @@ extern "C" { extern int tsRpcHeadSize; -typedef struct { - char *localIp; // local IP used - uint16_t localPort; // local port - char *label; // for debug purpose - int numOfThreads; // number of threads to handle connections - void *(*fp)(int8_t type, void *pCont, int32_t contLen, void *handle, int32_t index); // function to process the incoming msg - int sessions; // number of sessions allowed - int connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS - int idleTime; // milliseconds, 0 means idle timer is disabled - char *meterId; // meter ID - char spi; // security parameter index - char encrypt; // encrypt algorithm - char *secret; // key for authentication - char *ckey; // ciphering key - int (*afp) (char *meterId, char *spi, char *encrypt, uint8_t *secret, uint8_t *ckey); // call back to retrieve auth info -} SRpcInit; - typedef struct { int16_t index; int16_t numOfIps; uint16_t port; uint32_t ip[TSDB_MAX_MPEERS]; - char ipStr[TSDB_MAX_MPEERS][TSDB_IPv4ADDR_LEN]; } SRpcIpSet; +typedef struct { + char *localIp; // local IP used + uint16_t localPort; // local port + char *label; // for debug purpose + int numOfThreads; // number of threads to handle connections + int sessions; // number of sessions allowed + int connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS + int idleTime; // milliseconds, 0 means idle timer is disabled + + // the following is for client security only + char *meterId; // meter ID + char spi; // security parameter index + char encrypt; // encrypt algorithm + char *secret; // key for authentication + char *ckey; // ciphering key + + // call back to process incoming msg + void (*cfp)(char type, void *pCont, int contLen, void *ahandle, int32_t code); + + // call back to process notify the ipSet changes + void (*ufp)(void *ahandle, SRpcIpSet ipSet); + + // call back to retrieve the client auth info + int (*afp)(char *meterId, char *spi, char *encrypt, uint8_t *secret, uint8_t *ckey); +} SRpcInit; + void *rpcOpen(SRpcInit *pRpc); void rpcClose(void *); void *rpcMallocCont(int contLen); void rpcFreeCont(void *pCont); void rpcSendRequest(void *thandle, SRpcIpSet ipSet, char msgType, void *pCont, int contLen, void *ahandle); -void rpcSendResponse(void *pConn, void *pCont, int contLen); -void rpcSendSimpleRsp(void *pConn, int32_t code); +void rpcSendResponse(void *pConn, int32_t code, void *pCont, int contLen); +void rpcSendRedirectRsp(void *pConn, SRpcIpSet ipSet); #ifdef __cplusplus diff --git a/src/rpc/CMakeLists.txt b/src/rpc/CMakeLists.txt index 70511db53369fc0e69ba870bd8c3c151e580cbe7..e8768c10dd074915e940599f609e5fb0491e4365 100644 --- a/src/rpc/CMakeLists.txt +++ b/src/rpc/CMakeLists.txt @@ -23,3 +23,6 @@ ENDIF () ADD_LIBRARY(trpc ${SRC}) TARGET_LINK_LIBRARIES(trpc tutil) + +ADD_SUBDIRECTORY(test) + diff --git a/src/rpc/inc/thaship.h b/src/rpc/inc/thaship.h index 4acf8b3fbbab14702b3f4e98dc40b0f68fa3a3e2..9d4396ce6a29ec0bd5ccb14d9b0e0713e951cceb 100644 --- a/src/rpc/inc/thaship.h +++ b/src/rpc/inc/thaship.h @@ -16,10 +16,18 @@ #ifndef _rpc_hash_ip_header_ #define _rpc_hash_ip_header_ +#ifdef __cplusplus +extern "C" { +#endif + void *taosOpenIpHash(int maxSessions); void taosCloseIpHash(void *handle); void *taosAddIpHash(void *handle, void *pData, uint32_t ip, uint16_t port); void taosDeleteIpHash(void *handle, uint32_t ip, uint16_t port); void *taosGetIpHash(void *handle, uint32_t ip, uint16_t port); +#ifdef __cplusplus +} +#endif + #endif diff --git a/src/rpc/inc/ttcpclient.h b/src/rpc/inc/ttcpclient.h index 1246b2560ef4e135bdc92e9eb7e34e6a2676d450..952d1c4a0ecb0d071bb7b96140d8c58e865924e9 100644 --- a/src/rpc/inc/ttcpclient.h +++ b/src/rpc/inc/ttcpclient.h @@ -16,6 +16,10 @@ #ifndef _taos_tcp_client_header_ #define _taos_tcp_client_header_ +#ifdef __cplusplus +extern "C" { +#endif + #include "taosdef.h" void *taosInitTcpClient(char *ip, uint16_t port, char *label, int num, void *fp, void *shandle); @@ -24,4 +28,8 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, char *ip, uint16 void taosCloseTcpClientConnection(void *chandle); int taosSendTcpClientData(uint32_t ip, uint16_t port, char *data, int len, void *chandle); +#ifdef __cplusplus +} +#endif + #endif diff --git a/src/rpc/inc/ttcpserver.h b/src/rpc/inc/ttcpserver.h index b62949e73e8ca38503eb1d2fb46e5cbdfd3042ef..299adb31694b14c77819d09fb5512c0c2b20e930 100644 --- a/src/rpc/inc/ttcpserver.h +++ b/src/rpc/inc/ttcpserver.h @@ -16,6 +16,10 @@ #ifndef _taos_tcp_server_header_ #define _taos_tcp_server_header_ +#ifdef __cplusplus +extern "C" { +#endif + #include "taosdef.h" void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle); @@ -23,4 +27,8 @@ void taosCleanUpTcpServer(void *param); void taosCloseTcpServerConnection(void *param); int taosSendTcpServerData(uint32_t ip, uint16_t port, char *data, int len, void *chandle); +#ifdef __cplusplus +} +#endif + #endif diff --git a/src/rpc/inc/tudp.h b/src/rpc/inc/tudp.h index 647d54badeed8ea08336fff113d56168df662f9e..cb2f8d2b1084ea25d9ea737f2549a2b206cbcd39 100644 --- a/src/rpc/inc/tudp.h +++ b/src/rpc/inc/tudp.h @@ -16,6 +16,10 @@ #ifndef _taos_udp_header_ #define _taos_udp_header_ +#ifdef __cplusplus +extern "C" { +#endif + #include "taosdef.h" void *taosInitUdpServer(char *ip, uint16_t port, char *label, int, void *fp, void *shandle); @@ -30,4 +34,8 @@ void taosSendMsgHdr(void *hdr, int fd); void taosInitMsgHdr(void **hdr, void *dest, int maxPkts); void taosSetMsgHdrData(void *hdr, char *data, int dataLen); +#ifdef __cplusplus +} +#endif + #endif diff --git a/src/rpc/src/trpc.c b/src/rpc/src/trpc.c index dd881f069278938b8fa539114548354a93d98300..e3db06d952972e4e4069e7feade2afec3ae8ef8c 100755 --- a/src/rpc/src/trpc.c +++ b/src/rpc/src/trpc.c @@ -30,18 +30,19 @@ #include "lz4.h" #include "tconncache.h" #include "trpc.h" +#include "taoserror.h" #define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHeader) + sizeof(SRpcDigest)) #define rpcHeaderFromCont(cont) ((SRpcHeader *) (cont - sizeof(SRpcHeader))) -#define rpcContFromHeader(msg) ( msg + sizeof(SRpcHeader)) -#define rpcMsgLenFromCont(contLen) ( contLen + sizeof(SRpcHeader)) +#define rpcContFromHeader(msg) (msg + sizeof(SRpcHeader)) +#define rpcMsgLenFromCont(contLen) (contLen + sizeof(SRpcHeader)) #define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHeader)) #define rpcIsReq(type) (type & 1U) typedef struct { int sessions; int numOfThreads; - int idleTime; // milliseconds; + int idleTime; // milliseconds; char localIp[TSDB_IPv4ADDR_LEN]; uint16_t localPort; int connType; @@ -53,27 +54,29 @@ typedef struct { char *secret; // key for authentication char *ckey; // ciphering key - void *(*fp)(char type, void *pCont, int contLen, void *handle, int index); - int (*afp)(char *meterId, char *spi, char *encrypt, uint8_t *secret, uint8_t *ckey); // FP to retrieve auth info - struct _RpcConn *connList; - void *idPool; - void *tmrCtrl; - void *hash; + void (*cfp)(char type, void *pCont, int contLen, void *ahandle, int32_t code); + int (*afp)(char *meterId, char *spi, char *encrypt, uint8_t *secret, uint8_t *ckey); + void (*ufp)(void *ahandle, SRpcIpSet ipSet); + + void *idPool; // handle to ID pool + void *tmrCtrl; // handle to timer + void *hash; // handle returned by hash utility void *shandle; // returned handle from lower layer during initialization void *pCache; // connection cache - pthread_mutex_t mutex; + pthread_mutex_t mutex; + struct _RpcConn *connList; // connection list } SRpcInfo; typedef struct { SRpcIpSet ipSet; - void *ahandle; - SRpcInfo *pRpc; - char msgType; - char *pCont; - int contLen; - int numOfRetry; - int32_t code; - char msg[]; + void *ahandle; // handle provided by app + SRpcInfo *pRpc; // associated SRpcInfo + char msgType; // message type + char *pCont; // content provided by app + int contLen; // content length + int numOfRetry; // number of retry for different servers + int32_t code; // error code + char msg[0]; // RpcHeader starts from here } SRpcReqContext; typedef struct _RpcConn { @@ -124,6 +127,7 @@ typedef struct { char empty[1]; // reserved uint8_t msgType; // message type int32_t msgLen; // message length including the header iteslf + int32_t code; uint8_t content[0]; // message body starts from here } SRpcHeader; @@ -174,25 +178,25 @@ void (*taosCloseConn[])(void *chandle) = { }; static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerIpStr, uint16_t peerPort); -static void rpcCloseConn(void *thandle); -static SRpcConn *rpcSetConnToServer(void *shandle, SRpcIpSet ipSet); +static void rpcCloseConn(void *thandle); +static SRpcConn *rpcSetConnToServer(SRpcInfo *pRpc, SRpcIpSet ipSet); static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc); static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, char *meterId, char *hashstr); static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, char *meterId, char *hashstr); -static void rpcSendReqToServer(SRpcConn *pConn, SRpcReqContext *pContext); +static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext); static void rpcSendQuickRsp(SRpcConn *pConn, char code); static void rpcSendErrorMsgToPeer(SRpcInfo *pRpc, char *pMsg, int32_t code, uint32_t ip, uint16_t port, void *chandle); -static void rpcSendDataToPeer(SRpcConn *pConn, void *data, int dataLen); +static void rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen); -static void *rpcProcessDataFromPeer(void *data, int dataLen, uint32_t ip, uint16_t port, void *shandle, void *chandle); +static void *rpcProcessMsgFromPeer(void *data, int dataLen, uint32_t ip, uint16_t port, void *shandle, void *chandle); static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHeader *pHeader); static void rpcProcessConnError(void *param, void *id); static void rpcProcessRetryTimer(void *, void *); static void rpcProcessIdleTimer(void *param, void *tmrId); static void rpcProcessProgressTimer(void *param, void *tmrId); -static void rpcFreeMsg(void *msg); +static void rpcFreeOutMsg(void *msg); static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen); static SRpcHeader *rpcDecompressRpcMsg(SRpcHeader *pHeader); static int rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen); @@ -208,7 +212,6 @@ void *rpcOpen(SRpcInit *pInit) { if (pRpc == NULL) return NULL; strcpy(pRpc->label, pInit->label); - pRpc->fp = pInit->fp; pRpc->connType = pInit->connType; pRpc->idleTime = pInit->idleTime; pRpc->numOfThreads = pInit->numOfThreads; @@ -224,10 +227,12 @@ void *rpcOpen(SRpcInit *pInit) { pRpc->spi = pInit->spi; strcpy(pRpc->secret, pInit->secret); strcpy(pRpc->ckey, pInit->ckey); + pRpc->ufp = pInit->ufp; + pRpc->cfp = pInit->cfp; pRpc->afp = pInit->afp; pRpc->shandle = (*taosInitConn[pRpc->connType])(pRpc->localIp, pRpc->localPort, pRpc->label, - pRpc->numOfThreads, rpcProcessDataFromPeer, pRpc); + pRpc->numOfThreads, rpcProcessMsgFromPeer, pRpc); if (pRpc->shandle == NULL) { tError("%s failed to init network, %s:%d", pRpc->label, pRpc->localIp, pRpc->localPort); rpcClose(pRpc); @@ -318,7 +323,6 @@ void rpcFreeCont(void *cont) { void rpcSendRequest(void *shandle, SRpcIpSet ipSet, char type, void *pCont, int contLen, void *ahandle) { SRpcInfo *pRpc = (SRpcInfo *)shandle; - SRpcConn *pConn; SRpcReqContext *pContext; contLen = rpcCompressRpcMsg(pCont, contLen); @@ -330,22 +334,23 @@ void rpcSendRequest(void *shandle, SRpcIpSet ipSet, char type, void *pCont, int pContext->pCont = pCont; pContext->msgType = type; - pConn = rpcSetConnToServer(shandle, ipSet); - pContext->code = terrno; - if (pConn == NULL) taosTmrStart(rpcProcessConnError, 0, pContext, pRpc->tmrCtrl); - - rpcSendReqToServer(pConn, pContext); + rpcSendReqToServer(pRpc, pContext); return; } -void rpcSendResponse(void *handle, void *pCont, int contLen) { +void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) { int msgLen = 0; SRpcConn *pConn = (SRpcConn *)handle; SRpcInfo *pRpc = pConn->pRpc; SRpcHeader *pHeader = rpcHeaderFromCont(pCont); char *msg = (char *)pHeader; + if ( pCont == NULL ) { + pCont = rpcMallocCont(0); + contLen = 0; + } + contLen = rpcCompressRpcMsg(pCont, contLen); msgLen = rpcMsgLenFromCont(contLen); @@ -367,13 +372,14 @@ void rpcSendResponse(void *handle, void *pCont, int contLen) { pHeader->sourceId = pConn->ownId; pHeader->destId = pConn->peerId; pHeader->uid = 0; + pHeader->code = htonl(code); memcpy(pHeader->meterId, pConn->meterId, tListLen(pHeader->meterId)); // set pConn parameters pConn->inType = 0; // response message is released until new response is sent - rpcFreeMsg(pConn->pRspMsg); + rpcFreeOutMsg(pConn->pRspMsg); pConn->pRspMsg = msg; pConn->rspMsgLen = msgLen; if (pHeader->content[0] == TSDB_CODE_ACTION_IN_PROGRESS) pConn->inTranId--; @@ -381,28 +387,21 @@ void rpcSendResponse(void *handle, void *pCont, int contLen) { pthread_mutex_unlock(&pRpc->mutex); taosTmrStopA(&pConn->pTimer); - rpcSendDataToPeer(pConn, msg, msgLen); + rpcSendMsgToPeer(pConn, msg, msgLen); return; } -void rpcSendSimpleRsp(void *thandle, int32_t code) { +void rpcSendRedirectRsp(void *thandle, SRpcIpSet ipSet) { char *pMsg; - STaosRsp *pRsp; - int msgLen = sizeof(STaosRsp); - - if (thandle == NULL) { - tError("connection is gone, response could not be sent"); - return; - } + int msgLen = sizeof(SRpcIpSet); pMsg = rpcMallocCont(msgLen); if (pMsg == NULL) return; - pRsp = (STaosRsp *)pMsg; - pRsp->code = htonl(code); + memcpy(pMsg, &ipSet, sizeof(ipSet)); - rpcSendResponse(thandle, pMsg, msgLen); + rpcSendResponse(thandle, TSDB_CODE_REDIRECT, pMsg, msgLen); return; } @@ -442,34 +441,33 @@ static void rpcCloseConn(void *thandle) { pthread_mutex_lock(&pRpc->mutex); - if (taosCloseConn[pRpc->connType]) (*taosCloseConn[pRpc->connType])(pConn->chandle); + if (pConn->meterId[0]) { + pConn->meterId[0] = 0; + if (taosCloseConn[pRpc->connType]) (*taosCloseConn[pRpc->connType])(pConn->chandle); + + taosTmrStopA(&pConn->pTimer); + taosTmrStopA(&pConn->pIdleTimer); + + if ( pRpc->connType == TAOS_CONN_UDPS || TAOS_CONN_TCPS) { + char hashstr[40] = {0}; + sprintf(hashstr, "%x:%x:%x", pConn->peerIp, pConn->peerUid, pConn->peerId); + taosDeleteStrHash(pRpc->hash, hashstr); + rpcFreeOutMsg(pConn->pRspMsg); // it may have a response msg saved, but not request msg + pConn->pRspMsg = NULL; + pConn->inType = 0; + pConn->inTranId = 0; + } else { + pConn->outType = 0; + pConn->outTranId = 0; + pConn->pReqMsg = NULL; + } - taosTmrStopA(&pConn->pTimer); - taosTmrStopA(&pConn->pIdleTimer); + taosFreeId(pRpc->idPool, pConn->sid); + pConn->pContext = NULL; - if ( pRpc->connType == TAOS_CONN_UDPS || TAOS_CONN_TCPS) { - char hashstr[40] = {0}; - sprintf(hashstr, "%x:%x:%x", pConn->peerIp, pConn->peerUid, pConn->peerId); - taosDeleteStrHash(pRpc->hash, hashstr); - rpcFreeMsg(pConn->pRspMsg); // it may have a response msg saved, but not request msg + tTrace("%s pConn:%p, rpc connection is closed", pRpc->label, pConn); } - taosFreeId(pRpc->idPool, pConn->sid); - - // reset the link parameters - pConn->meterId[0] = 0; - pConn->outType = 0; - pConn->inType = 0; - pConn->inTranId = 0; - pConn->outTranId = 0; - pConn->pReqMsg = NULL; - pConn->reqMsgLen = 0; - pConn->pRspMsg = NULL; - pConn->rspMsgLen = 0; - pConn->pContext = NULL; - - tTrace("%s pConn:%p, rpc connection is closed", pRpc->label, pConn); - pthread_mutex_unlock(&pRpc->mutex); } @@ -553,13 +551,14 @@ static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, char *meterId, char *has return pConn; } -SRpcConn *rpcSetConnToServer(void *shandle, SRpcIpSet ipSet) { - SRpcInfo *pRpc = (SRpcInfo *)shandle; - - SRpcConn *pConn = taosGetConnFromCache(pRpc->pCache, ipSet.ip[ipSet.index], ipSet.port, pRpc->meterId); +SRpcConn *rpcSetConnToServer(SRpcInfo *pRpc, SRpcIpSet ipSet) { + SRpcConn *pConn; + pConn = taosGetConnFromCache(pRpc->pCache, ipSet.ip[ipSet.index], ipSet.port, pRpc->meterId); if ( pConn == NULL ) { - pConn = rpcOpenConn(pRpc, ipSet.ipStr[ipSet.index], ipSet.port); + char ipstr[20] = {0}; + tinet_ntoa(ipstr, ipSet.ip[ipSet.index]); + pConn = rpcOpenConn(pRpc, ipstr, ipSet.port); } return pConn; @@ -585,7 +584,7 @@ static int rpcProcessReqHeader(SRpcConn *pConn, SRpcHeader *pHeader) { } else if (pConn->inType == 0) { tTrace("%s pConn:%p, %s is already processed, tranId:%d", pRpc->label, pConn, taosMsg[pHeader->msgType], pConn->inTranId); - rpcSendDataToPeer(pConn, pConn->pRspMsg, pConn->rspMsgLen); // resend the response + rpcSendMsgToPeer(pConn, pConn->pRspMsg, pConn->rspMsgLen); // resend the response } else { tTrace("%s pConn:%p, mismatched message %s and tranId", pRpc->label, pConn, taosMsg[pHeader->msgType]); } @@ -658,13 +657,14 @@ static int rpcProcessHeader(SRpcInfo *pRpc, SRpcConn **ppConn, void *data, int d SRpcHeader *pHeader = (SRpcHeader *)data; sid = htonl(pHeader->destId); + pHeader->code = htonl(pHeader->code); + pHeader->msgLen = (int32_t)htonl((uint32_t)pHeader->msgLen); if (pHeader->msgType >= TSDB_MSG_TYPE_MAX || pHeader->msgType <= 0) { tTrace("%s sid:%d, invalid message type:%d", pRpc->label, sid, pHeader->msgType); return TSDB_CODE_INVALID_MSG_TYPE; } - pHeader->msgLen = (int32_t)htonl((uint32_t)pHeader->msgLen); if (dataLen != pHeader->msgLen) { tTrace("%s sid:%d, %s has invalid length, dataLen:%d, msgLen:%d", pRpc->label, sid, taosMsg[pHeader->msgType], dataLen, pHeader->msgLen); @@ -708,7 +708,7 @@ static int rpcProcessHeader(SRpcInfo *pRpc, SRpcConn **ppConn, void *data, int d return code; } -static void *rpcProcessDataFromPeer(void *data, int dataLen, uint32_t ip, uint16_t port, void *shandle, void *chandle) { +static void *rpcProcessMsgFromPeer(void *data, int dataLen, uint32_t ip, uint16_t port, void *shandle, void *chandle) { SRpcHeader *pHeader = (SRpcHeader *)data; SRpcInfo *pRpc = (SRpcInfo *)shandle; SRpcConn *pConn = NULL; @@ -764,42 +764,33 @@ static void *rpcProcessDataFromPeer(void *data, int dataLen, uint32_t ip, uint16 static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHeader *pHeader) { SRpcInfo *pRpc = pConn->pRpc; - int msgLen = rpcContFromHeader(pHeader->msgLen); - pHeader = rpcDecompressRpcMsg(pHeader); + int contLen = rpcContLenFromMsg(pHeader->msgLen); + uint8_t *pCont = pHeader->content; if ( rpcIsReq(pHeader->msgType) ) { taosTmrReset(rpcProcessProgressTimer, tsRpcTimer/2, pConn, pRpc->tmrCtrl, &pConn->pTimer); - (*(pRpc->fp))(pHeader->msgType, pHeader->content, msgLen, pConn, 0); + (*(pRpc->cfp))(pHeader->msgType, pCont, contLen, pConn, 0); } else { // it's a response - STaosRsp *pRsp = (STaosRsp *)pHeader->content; - int32_t code = htonl(pRsp->code); - + int32_t code = pHeader->code; SRpcReqContext *pContext = pConn->pContext; pConn->pContext = NULL; - taosAddConnIntoCache(pRpc->pCache, pConn, pConn->peerIp, pConn->peerPort, pConn->meterId); - if (code == TSDB_CODE_NO_MASTER) { - pContext->code = code; - taosTmrStart(rpcProcessConnError, 0, pContext, pRpc->tmrCtrl); + if (code == TSDB_CODE_REDIRECT) { + memcpy(&pContext->ipSet, pHeader->content, sizeof(pContext->ipSet)); + rpcSendReqToServer(pRpc, pContext); } else { - rpcFreeMsg(rpcHeaderFromCont(pContext->pCont)); // free the request msg - (*(pRpc->fp))(pHeader->msgType, pHeader->content, msgLen, pContext->ahandle, pContext->ipSet.index); + rpcFreeOutMsg(rpcHeaderFromCont(pContext->pCont)); // free the request msg + (*(pRpc->cfp))(pHeader->msgType, pCont, contLen, pContext->ahandle, pContext->ipSet.index); } } } static void rpcSendQuickRsp(SRpcConn *pConn, char code) { - char msg[RPC_MSG_OVERHEAD + sizeof(STaosRsp)]; + char msg[RPC_MSG_OVERHEAD]; SRpcHeader *pHeader; - int msgLen; - STaosRsp *pRsp; - - pRsp = (STaosRsp *)rpcContFromHeader(msg); - pRsp->code = htonl(code); - msgLen = sizeof(STaosRsp); // set msg header memset(msg, 0, sizeof(SRpcHeader)); @@ -814,14 +805,14 @@ static void rpcSendQuickRsp(SRpcConn *pConn, char code) { pHeader->destId = pConn->peerId; pHeader->uid = 0; memcpy(pHeader->meterId, pConn->meterId, tListLen(pHeader->meterId)); + pHeader->code = htonl(code); - rpcSendDataToPeer(pConn, msg, msgLen); + rpcSendMsgToPeer(pConn, msg, 0); } static void rpcSendErrorMsgToPeer(SRpcInfo *pRpc, char *pMsg, int32_t code, uint32_t ip, uint16_t port, void *chandle) { SRpcHeader *pRecvHeader, *pReplyHeader; - char msg[sizeof(SRpcHeader) + sizeof(SRpcDigest) + sizeof(STaosRsp)]; - STaosRsp *pRsp; + char msg[sizeof(SRpcHeader) + sizeof(SRpcDigest) + sizeof(uint32_t) ]; uint32_t timeStamp; int msgLen; @@ -839,13 +830,12 @@ static void rpcSendErrorMsgToPeer(SRpcInfo *pRpc, char *pMsg, int32_t code, uint pReplyHeader->destId = pRecvHeader->sourceId; memcpy(pReplyHeader->meterId, pRecvHeader->meterId, tListLen(pReplyHeader->meterId)); - pRsp = (STaosRsp *)pReplyHeader->content; - pRsp->code = htonl(code); - msgLen = sizeof(STaosRsp); - char *pContent = pRsp->more; + pReplyHeader->code = htonl(code); + msgLen = sizeof(SRpcHeader); if (code == TSDB_CODE_INVALID_TIME_STAMP) { // include a time stamp if client's time is not synchronized well + uint8_t *pContent = pReplyHeader->content; timeStamp = taosGetTimestampSec(); memcpy(pContent, &timeStamp, sizeof(timeStamp)); msgLen += sizeof(timeStamp); @@ -857,13 +847,19 @@ static void rpcSendErrorMsgToPeer(SRpcInfo *pRpc, char *pMsg, int32_t code, uint return; } -static void rpcSendReqToServer(SRpcConn *pConn, SRpcReqContext *pContext) { +static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { SRpcHeader *pHeader = rpcHeaderFromCont(pContext->pCont); - SRpcInfo *pRpc = pConn->pRpc; char *msg = (char *)pHeader; int msgLen = rpcMsgLenFromCont(pContext->contLen); char msgType = pContext->msgType; + SRpcConn *pConn = rpcSetConnToServer(pRpc, pContext->ipSet); + if (pConn == NULL) { + pContext->code = terrno; + taosTmrStart(rpcProcessConnError, 0, pContext, pRpc->tmrCtrl); + return; + } + pthread_mutex_lock(&pRpc->mutex); // set the message header @@ -889,37 +885,37 @@ static void rpcSendReqToServer(SRpcConn *pConn, SRpcReqContext *pContext) { pthread_mutex_unlock(&pRpc->mutex); - rpcSendDataToPeer(pConn, msg, msgLen); + rpcSendMsgToPeer(pConn, msg, msgLen); taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer); } - -static void rpcSendDataToPeer(SRpcConn *pConn, void *data, int dataLen) { + +static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) { int writtenLen = 0; SRpcInfo *pRpc = pConn->pRpc; - SRpcHeader *pHeader = (SRpcHeader *)data; + SRpcHeader *pHeader = (SRpcHeader *)msg; - dataLen = rpcAddAuthPart(pConn, data, dataLen); + msgLen = rpcAddAuthPart(pConn, msg, msgLen); if ( rpcIsReq(pHeader->msgType)) { if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) tTrace("%s pConn:%p, %s is sent to %s:%hu, len:%d source:0x%08x dest:0x%08x tranId:%d", pRpc->label, pConn, taosMsg[pHeader->msgType], pConn->peerIpstr, - pConn->peerPort, dataLen, pHeader->sourceId, pHeader->destId, pHeader->tranId); + pConn->peerPort, msgLen, pHeader->sourceId, pHeader->destId, pHeader->tranId); } else { if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) tTrace( "%s pConn:%p, %s is sent to %s:%hu, code:%u len:%d source:0x%08x dest:0x%08x tranId:%d", pRpc->label, pConn, taosMsg[pHeader->msgType], pConn->peerIpstr, pConn->peerPort, - (uint8_t)pHeader->content[0], dataLen, pHeader->sourceId, pHeader->destId, pHeader->tranId); + (uint8_t)pHeader->content[0], msgLen, pHeader->sourceId, pHeader->destId, pHeader->tranId); } - writtenLen = (*taosSendData[pRpc->connType])(pConn->peerIp, pConn->peerPort, (char *)pHeader, dataLen, pConn->chandle); + writtenLen = (*taosSendData[pRpc->connType])(pConn->peerIp, pConn->peerPort, (char *)pHeader, msgLen, pConn->chandle); - if (writtenLen != dataLen) { + if (writtenLen != msgLen) { tError("%s pConn:%p, failed to send, dataLen:%d writtenLen:%d, reason:%s", pRpc->label, pConn, - dataLen, writtenLen, strerror(errno)); + msgLen, writtenLen, strerror(errno)); } - tDump(data, dataLen); + tDump(msg, msgLen); } static void rpcProcessConnError(void *param, void *id) { @@ -927,34 +923,20 @@ static void rpcProcessConnError(void *param, void *id) { SRpcInfo *pRpc = pContext->pRpc; if ( pContext->numOfRetry >= pContext->ipSet.numOfIps ) { - rpcFreeMsg(rpcHeaderFromCont(pContext->pCont)); // free the request msg - char *rsp = calloc(1, RPC_MSG_OVERHEAD + sizeof(STaosRsp)); - if ( rsp ) { - STaosRsp *pRsp = (STaosRsp *)(rsp+sizeof(SRpcHeader)); - pRsp->code = pContext->code; - (*(pRpc->fp))(pContext->msgType+1, pRsp, sizeof(STaosRsp), pContext->ahandle, 0); - } else { - tError("%s failed to malloc RSP", pRpc->label); - } + rpcFreeOutMsg(rpcHeaderFromCont(pContext->pCont)); // free the request msg + (*(pRpc->cfp))(pContext->msgType+1, NULL, 0, pContext->ahandle, pContext->code); } else { // move to next IP pContext->ipSet.index++; pContext->ipSet.index = pContext->ipSet.index % pContext->ipSet.numOfIps; - - SRpcConn *pConn = rpcSetConnToServer(pContext->pRpc, pContext->ipSet); - pContext->code = terrno; - if (pConn == NULL) taosTmrStart(rpcProcessConnError, 0, pContext, pRpc->tmrCtrl); - - rpcSendReqToServer(pConn, pContext); + rpcSendReqToServer(pRpc, pContext); } } static void rpcProcessRetryTimer(void *param, void *tmrId) { SRpcConn *pConn = (SRpcConn *)param; - int reportDisc = 0; - SRpcInfo *pRpc = pConn->pRpc; - if (pRpc == NULL) return; // it means it is already released + int reportDisc = 0; pthread_mutex_lock(&pRpc->mutex); @@ -966,7 +948,7 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) { if (pConn->retry < 4) { tTrace("%s pConn:%p, re-send msg:%s to %s:%hu", pRpc->label, taosMsg[pConn->outType], pConn->peerIpstr, pConn->peerPort); - rpcSendDataToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen); + rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen); taosTmrReset(rpcProcessRetryTimer, tsRpcTimer<retry, pConn, pRpc->tmrCtrl, &pConn->pTimer); } else { // close the connection @@ -990,18 +972,13 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) { static void rpcProcessIdleTimer(void *param, void *tmrId) { SRpcConn *pConn = (SRpcConn *)param; SRpcInfo *pRpc = pConn->pRpc; - assert(pRpc); - pthread_mutex_lock(&pRpc->mutex); - - if (pConn->inType == 0 && pConn->meterId[0]) { + if (pConn->meterId[0]) { tTrace("%s pConn:%p, close the connection since no activity", pRpc->label, pConn); rpcCloseConn(pConn); } else { tTrace("%s pConn:%p, idle timer:%p not processed", pRpc->label, pConn, tmrId); } - - pthread_mutex_unlock(&pRpc->mutex); } static void rpcProcessProgressTimer(void *param, void *tmrId) { @@ -1021,22 +998,27 @@ static void rpcProcessProgressTimer(void *param, void *tmrId) { pthread_mutex_unlock(&pRpc->mutex); } -static void rpcFreeMsg(void *msg) { +static void rpcFreeOutMsg(void *msg) { if ( msg == NULL ) return; char *req = ((char *)msg) - sizeof(SRpcReqContext); free(req); } +typedef struct { + int32_t reserved; + int32_t contLen; +} SRpcComp; + static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) { SRpcHeader *pHeader = rpcHeaderFromCont(pCont); - int32_t overhead = sizeof(int32_t) * 2; int32_t finalLen = 0; + int overhead = sizeof(SRpcComp); if (!NEEDTO_COMPRESSS_MSG(contLen)) { return contLen; } - char *buf = malloc (contLen + overhead+8); // 16 extra bytes + char *buf = malloc (contLen + overhead + 8); // 8 extra bytes if (buf == NULL) { tError("failed to allocate memory for rpc msg compression, contLen:%d, reason:%s", contLen, strerror(errno)); return contLen; @@ -1049,20 +1031,15 @@ static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) { * The first four bytes is set to 0, the second four bytes are utilized to keep the original length of message */ if (compLen < contLen - overhead) { - //tDump(pCont, contLen); - int32_t *pLen = (int32_t *)pCont; - - *pLen = 0; // first 4 bytes must be zero - pLen = (int32_t *)(pCont + sizeof(int32_t)); - - *pLen = htonl(contLen); // contLen is encoded in second 4 bytes + SRpcComp *pComp = (SRpcComp *)pCont; + pComp->reserved = 0; + pComp->contLen = htonl(contLen); memcpy(pCont + overhead, buf, compLen); pHeader->comp = 1; tTrace("compress rpc msg, before:%d, after:%d", contLen, compLen); finalLen = compLen + overhead; - //tDump(pCont, contLen); } else { finalLen = contLen; } @@ -1072,16 +1049,15 @@ static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) { } static SRpcHeader *rpcDecompressRpcMsg(SRpcHeader *pHeader) { - int overhead = sizeof(int32_t) * 2; + int overhead = sizeof(SRpcComp); SRpcHeader *pNewHeader = NULL; uint8_t *pCont = pHeader->content; + SRpcComp *pComp = (SRpcComp *)pHeader->content; if (pHeader->comp) { // decompress the content - assert(GET_INT32_VAL(pHeader->content) == 0); - - // contLen is original message length before compression applied - int contLen = htonl(GET_INT32_VAL(pCont + sizeof(int32_t))); + assert(pComp->reserved == 0); + int contLen = htonl(pComp->contLen); // prepare the temporary buffer to decompress message char *buf = rpcMallocCont(contLen); diff --git a/src/rpc/test/CMakeLists.txt b/src/rpc/test/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..f1e7a1cd390b08893b8323aab08c47c7e7e9c305 --- /dev/null +++ b/src/rpc/test/CMakeLists.txt @@ -0,0 +1,15 @@ +CMAKE_MINIMUM_REQUIRED(VERSION 2.8) +PROJECT(TDengine) + +IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) + INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc) + INCLUDE_DIRECTORIES(../inc) + AUX_SOURCE_DIRECTORY(./ TEST_SRC) + + ADD_EXECUTABLE(rpcTest ${TEST_SRC}) + TARGET_LINK_LIBRARIES(rpcTest trpc) +ENDIF () + + diff --git a/src/rpc/test/unittest.c b/src/rpc/test/unittest.c new file mode 100644 index 0000000000000000000000000000000000000000..262c93a62b95432ab81b68fa76d184446ea4cc8c --- /dev/null +++ b/src/rpc/test/unittest.c @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +//#define _DEFAULT_SOURCE +#include "os.h" +#include "tlog.h" +#include "trpc.h" +#include + +int32_t main(int32_t argc, char *argv[]) { + dPrint("unit test for rpc module"); + + SRpcInit rpcInit; + memset(&rpcInit, 0, sizeof(rpcInit)); + rpcInit.localIp = "0.0.0.0"; + rpcInit.localPort = 7000; + rpcInit.label = "unittest"; + rpcInit.numOfThreads = 1; + rpcInit.fp = NULL; + rpcInit.sessions = 1000; + rpcInit.connType = TAOS_CONN_SOCKET_TYPE_S(); + rpcInit.idleTime = 2000; + + void *pConn = rpcOpen(&rpcInit); + if (pConn != NULL) { + dPrint("conection is opened"); + } else { + dError("failed to initialize rpc"); + } + + return 0; +} + +