From 92fa341fc56edeb94bf0d679ade08d493fed3683 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Sat, 28 Mar 2020 15:45:17 +0800 Subject: [PATCH] add the code for synchronous client API --- src/inc/trpc.h | 1 + src/rpc/src/rpcMain.c | 52 ++++++++- src/rpc/test/CMakeLists.txt | 4 + src/rpc/test/rclient.c | 8 +- src/rpc/test/rsclient.c | 212 ++++++++++++++++++++++++++++++++++++ src/rpc/test/rserver.c | 2 +- 6 files changed, 268 insertions(+), 11 deletions(-) create mode 100644 src/rpc/test/rsclient.c diff --git a/src/inc/trpc.h b/src/inc/trpc.h index e545abfed3..a34d107474 100644 --- a/src/inc/trpc.h +++ b/src/inc/trpc.h @@ -85,6 +85,7 @@ void rpcSendRequest(void *thandle, SRpcIpSet *pIpSet, SRpcMsg *pMsg); void rpcSendResponse(SRpcMsg *pMsg); void rpcSendRedirectRsp(void *pConn, SRpcIpSet *pIpSet); int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); +void rpcSendRecv(void *shandle, SRpcIpSet *pIpSet, SRpcMsg *pOut, SRpcMsg *pRsp); #ifdef __cplusplus } diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 9748f5d730..e0d2191888 100755 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -82,6 +82,9 @@ typedef struct { int8_t oldInUse; // server IP inUse passed by app int8_t redirect; // flag to indicate redirect int8_t connType; // connection type + SRpcMsg *pRsp; // for synchronous API + tsem_t *pSem; // for synchronous API + SRpcIpSet *pSet; // for synchronous API char msg[0]; // RpcHead starts from here } SRpcReqContext; @@ -454,6 +457,26 @@ int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) { return 0; } +void rpcSendRecv(void *shandle, SRpcIpSet *pIpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { + SRpcReqContext *pContext; + pContext = (SRpcReqContext *) (pMsg->pCont-sizeof(SRpcHead)-sizeof(SRpcReqContext)); + + memset(pRsp, 0, sizeof(SRpcMsg)); + + tsem_t sem; + tsem_init(&sem, 0, 0); + pContext->pSem = &sem; + pContext->pRsp = pRsp; + pContext->pSet = pIpSet; + + rpcSendRequest(shandle, pIpSet, pMsg); + + tsem_wait(&sem); + tsem_destroy(&sem); + + return; +} + static void rpcFreeMsg(void *msg) { if ( msg ) { char *temp = (char *)msg - sizeof(SRpcReqContext); @@ -855,6 +878,26 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { return pConn; } +static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) { + SRpcInfo *pRpc = pContext->pRpc; + + if (pContext->pRsp) { + // for synchronous API + tsem_post(pContext->pSem); + memcpy(pContext->pSet, &pContext->ipSet, sizeof(SRpcIpSet)); + memcpy(pContext->pRsp, pMsg, sizeof(SRpcMsg)); + } else { + // for asynchronous API + if (pRpc->ufp && (pContext->ipSet.inUse != pContext->oldInUse || pContext->redirect)) + (*pRpc->ufp)(pContext->ahandle, &pContext->ipSet); // notify the update of ipSet + + (*pRpc->cfp)(pMsg); + } + + // free the request message + rpcFreeCont(pContext->pCont); +} + static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { SRpcInfo *pRpc = pConn->pRpc; @@ -887,10 +930,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { tTrace("%s %p, redirect is received, numOfIps:%d", pRpc->label, pConn, pContext->ipSet.numOfIps); rpcSendReqToServer(pRpc, pContext); } else { - if ( pRpc->ufp && (pContext->ipSet.inUse != pContext->oldInUse || pContext->redirect) ) - (*pRpc->ufp)(pContext->ahandle, &pContext->ipSet); // notify the update of ipSet - (*pRpc->cfp)(&rpcMsg); - rpcFreeCont(pContext->pCont); // free the request msg + rpcNotifyClient(pContext, &rpcMsg); } } } @@ -1059,8 +1099,8 @@ static void rpcProcessConnError(void *param, void *id) { rpcMsg.code = pContext->code; rpcMsg.pCont = NULL; rpcMsg.contLen = 0; - (*(pRpc->cfp))(&rpcMsg); - rpcFreeCont(pContext->pCont); // free the request msg + + rpcNotifyClient(pContext, &rpcMsg); } else { // move to next IP pContext->ipSet.inUse++; diff --git a/src/rpc/test/CMakeLists.txt b/src/rpc/test/CMakeLists.txt index 15780a396c..b519ae7578 100644 --- a/src/rpc/test/CMakeLists.txt +++ b/src/rpc/test/CMakeLists.txt @@ -11,6 +11,10 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) ADD_EXECUTABLE(rclient ${CLIENT_SRC}) TARGET_LINK_LIBRARIES(rclient trpc) + LIST(APPEND SCLIENT_SRC ./rsclient.c) + ADD_EXECUTABLE(rsclient ${SCLIENT_SRC}) + TARGET_LINK_LIBRARIES(rsclient trpc) + LIST(APPEND SERVER_SRC ./rserver.c) ADD_EXECUTABLE(rserver ${SERVER_SRC}) TARGET_LINK_LIBRARIES(rserver trpc) diff --git a/src/rpc/test/rclient.c b/src/rpc/test/rclient.c index 562d0fff96..f000ab91a2 100644 --- a/src/rpc/test/rclient.c +++ b/src/rpc/test/rclient.c @@ -40,7 +40,7 @@ typedef struct { void *pRpc; } SInfo; -void processResponse(SRpcMsg *pMsg) { +static void processResponse(SRpcMsg *pMsg) { SInfo *pInfo = (SInfo *)pMsg->handle; tTrace("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen, pMsg->code); @@ -49,16 +49,16 @@ void processResponse(SRpcMsg *pMsg) { sem_post(&pInfo->rspSem); } -void processUpdateIpSet(void *handle, SRpcIpSet *pIpSet) { +static void processUpdateIpSet(void *handle, SRpcIpSet *pIpSet) { SInfo *pInfo = (SInfo *)handle; tTrace("thread:%d, ip set is changed, index:%d", pInfo->index, pIpSet->inUse); pInfo->ipSet = *pIpSet; } -int tcount = 0; +static int tcount = 0; -void *sendRequest(void *param) { +static void *sendRequest(void *param) { SInfo *pInfo = (SInfo *)param; SRpcMsg rpcMsg; diff --git a/src/rpc/test/rsclient.c b/src/rpc/test/rsclient.c new file mode 100644 index 0000000000..b99387e097 --- /dev/null +++ b/src/rpc/test/rsclient.c @@ -0,0 +1,212 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include "os.h" +#include "tlog.h" +#include "trpc.h" +#include "taoserror.h" +#include +#include + +typedef struct { + int index; + SRpcIpSet ipSet; + int num; + int numOfReqs; + int msgSize; + sem_t rspSem; + sem_t *pOverSem; + pthread_t thread; + void *pRpc; +} SInfo; + +static void processUpdateIpSet(void *handle, SRpcIpSet *pIpSet) { + SInfo *pInfo = (SInfo *)handle; + + tTrace("thread:%d, ip set is changed, index:%d", pInfo->index, pIpSet->inUse); + pInfo->ipSet = *pIpSet; +} + +static int tcount = 0; +static int terror = 0; + +static void *sendRequest(void *param) { + SInfo *pInfo = (SInfo *)param; + SRpcMsg rpcMsg, rspMsg; + + tTrace("thread:%d, start to send request", pInfo->index); + + while ( pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) { + pInfo->num++; + rpcMsg.pCont = rpcMallocCont(pInfo->msgSize); + rpcMsg.contLen = pInfo->msgSize; + rpcMsg.handle = pInfo; + rpcMsg.msgType = 1; + tTrace("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num); + + rpcSendRecv(pInfo->pRpc, &pInfo->ipSet, &rpcMsg, &rspMsg); + + // handle response + if (rspMsg.code != 0) terror++; + + tTrace("thread:%d, rspLen:%d code:%d", pInfo->index, rspMsg.contLen, rspMsg.code); + + rpcFreeCont(rspMsg.pCont); + + if ( pInfo->num % 20000 == 0 ) + tPrint("thread:%d, %d requests have been sent", pInfo->index, pInfo->num); + } + + tTrace("thread:%d, it is over", pInfo->index); + tcount++; + + return NULL; +} + +int main(int argc, char *argv[]) { + SRpcInit rpcInit; + SRpcIpSet ipSet; + int msgSize = 128; + int numOfReqs = 0; + int appThreads = 1; + char serverIp[40] = "127.0.0.1"; + struct timeval systemTime; + int64_t startTime, endTime; + pthread_attr_t thattr; + + // server info + ipSet.numOfIps = 1; + ipSet.inUse = 0; + ipSet.port = 7000; + ipSet.ip[0] = inet_addr(serverIp); + ipSet.ip[1] = inet_addr("192.168.0.1"); + + // client info + memset(&rpcInit, 0, sizeof(rpcInit)); + rpcInit.localIp = "0.0.0.0"; + rpcInit.localPort = 0; + rpcInit.label = "APP"; + rpcInit.numOfThreads = 1; + // rpcInit.cfp = processResponse; + rpcInit.ufp = processUpdateIpSet; + rpcInit.sessions = 100; + rpcInit.idleTime = tsShellActivityTimer*1000; + rpcInit.user = "michael"; + rpcInit.secret = "mypassword"; + rpcInit.ckey = "key"; + rpcInit.spi = 1; + rpcInit.connType = TAOS_CONN_CLIENT; + + for (int i=1; iindex = i; + pInfo->ipSet = ipSet; + pInfo->numOfReqs = numOfReqs; + pInfo->msgSize = msgSize; + sem_init(&pInfo->rspSem, 0, 0); + pInfo->pRpc = pRpc; + pthread_create(&pInfo->thread, &thattr, sendRequest, pInfo); + pInfo++; + } + + do { + usleep(1); + } while ( tcount < appThreads); + + gettimeofday(&systemTime, NULL); + endTime = systemTime.tv_sec*1000000 + systemTime.tv_usec; + float usedTime = (endTime - startTime)/1000.0; // mseconds + + tPrint("it takes %.3f mseconds to send %d requests to server, error num:%d", usedTime, numOfReqs*appThreads, terror); + tPrint("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0*numOfReqs*appThreads/usedTime, msgSize); + + taosCloseLogger(); + + return 0; +} + + diff --git a/src/rpc/test/rserver.c b/src/rpc/test/rserver.c index d39caed5b0..48ae02a1d5 100644 --- a/src/rpc/test/rserver.c +++ b/src/rpc/test/rserver.c @@ -72,7 +72,7 @@ void processShellMsg() { rpcMsg.pCont = rpcMallocCont(msgSize); rpcMsg.contLen = msgSize; rpcMsg.handle = pRpcMsg->handle; - rpcMsg.code = 1; + rpcMsg.code = 0; rpcSendResponse(&rpcMsg); taosFreeQitem(pRpcMsg); -- GitLab