提交 92fa341f 编写于 作者: 陶建辉(Jeff)'s avatar 陶建辉(Jeff)

add the code for synchronous client API

上级 5e5f32b0
......@@ -85,6 +85,7 @@ void rpcSendRequest(void *thandle, SRpcIpSet *pIpSet, SRpcMsg *pMsg);
void rpcSendResponse(SRpcMsg *pMsg);
void rpcSendRedirectRsp(void *pConn, SRpcIpSet *pIpSet);
int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
void rpcSendRecv(void *shandle, SRpcIpSet *pIpSet, SRpcMsg *pOut, SRpcMsg *pRsp);
#ifdef __cplusplus
}
......
......@@ -82,6 +82,9 @@ typedef struct {
int8_t oldInUse; // server IP inUse passed by app
int8_t redirect; // flag to indicate redirect
int8_t connType; // connection type
SRpcMsg *pRsp; // for synchronous API
tsem_t *pSem; // for synchronous API
SRpcIpSet *pSet; // for synchronous API
char msg[0]; // RpcHead starts from here
} SRpcReqContext;
......@@ -454,6 +457,26 @@ int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) {
return 0;
}
void rpcSendRecv(void *shandle, SRpcIpSet *pIpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
SRpcReqContext *pContext;
pContext = (SRpcReqContext *) (pMsg->pCont-sizeof(SRpcHead)-sizeof(SRpcReqContext));
memset(pRsp, 0, sizeof(SRpcMsg));
tsem_t sem;
tsem_init(&sem, 0, 0);
pContext->pSem = &sem;
pContext->pRsp = pRsp;
pContext->pSet = pIpSet;
rpcSendRequest(shandle, pIpSet, pMsg);
tsem_wait(&sem);
tsem_destroy(&sem);
return;
}
static void rpcFreeMsg(void *msg) {
if ( msg ) {
char *temp = (char *)msg - sizeof(SRpcReqContext);
......@@ -855,6 +878,26 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
return pConn;
}
static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) {
SRpcInfo *pRpc = pContext->pRpc;
if (pContext->pRsp) {
// for synchronous API
tsem_post(pContext->pSem);
memcpy(pContext->pSet, &pContext->ipSet, sizeof(SRpcIpSet));
memcpy(pContext->pRsp, pMsg, sizeof(SRpcMsg));
} else {
// for asynchronous API
if (pRpc->ufp && (pContext->ipSet.inUse != pContext->oldInUse || pContext->redirect))
(*pRpc->ufp)(pContext->ahandle, &pContext->ipSet); // notify the update of ipSet
(*pRpc->cfp)(pMsg);
}
// free the request message
rpcFreeCont(pContext->pCont);
}
static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
SRpcInfo *pRpc = pConn->pRpc;
......@@ -887,10 +930,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
tTrace("%s %p, redirect is received, numOfIps:%d", pRpc->label, pConn, pContext->ipSet.numOfIps);
rpcSendReqToServer(pRpc, pContext);
} else {
if ( pRpc->ufp && (pContext->ipSet.inUse != pContext->oldInUse || pContext->redirect) )
(*pRpc->ufp)(pContext->ahandle, &pContext->ipSet); // notify the update of ipSet
(*pRpc->cfp)(&rpcMsg);
rpcFreeCont(pContext->pCont); // free the request msg
rpcNotifyClient(pContext, &rpcMsg);
}
}
}
......@@ -1059,8 +1099,8 @@ static void rpcProcessConnError(void *param, void *id) {
rpcMsg.code = pContext->code;
rpcMsg.pCont = NULL;
rpcMsg.contLen = 0;
(*(pRpc->cfp))(&rpcMsg);
rpcFreeCont(pContext->pCont); // free the request msg
rpcNotifyClient(pContext, &rpcMsg);
} else {
// move to next IP
pContext->ipSet.inUse++;
......
......@@ -11,6 +11,10 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
ADD_EXECUTABLE(rclient ${CLIENT_SRC})
TARGET_LINK_LIBRARIES(rclient trpc)
LIST(APPEND SCLIENT_SRC ./rsclient.c)
ADD_EXECUTABLE(rsclient ${SCLIENT_SRC})
TARGET_LINK_LIBRARIES(rsclient trpc)
LIST(APPEND SERVER_SRC ./rserver.c)
ADD_EXECUTABLE(rserver ${SERVER_SRC})
TARGET_LINK_LIBRARIES(rserver trpc)
......
......@@ -40,7 +40,7 @@ typedef struct {
void *pRpc;
} SInfo;
void processResponse(SRpcMsg *pMsg) {
static void processResponse(SRpcMsg *pMsg) {
SInfo *pInfo = (SInfo *)pMsg->handle;
tTrace("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen, pMsg->code);
......@@ -49,16 +49,16 @@ void processResponse(SRpcMsg *pMsg) {
sem_post(&pInfo->rspSem);
}
void processUpdateIpSet(void *handle, SRpcIpSet *pIpSet) {
static void processUpdateIpSet(void *handle, SRpcIpSet *pIpSet) {
SInfo *pInfo = (SInfo *)handle;
tTrace("thread:%d, ip set is changed, index:%d", pInfo->index, pIpSet->inUse);
pInfo->ipSet = *pIpSet;
}
int tcount = 0;
static int tcount = 0;
void *sendRequest(void *param) {
static void *sendRequest(void *param) {
SInfo *pInfo = (SInfo *)param;
SRpcMsg rpcMsg;
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/time.h>
#include <pthread.h>
#include <errno.h>
#include <signal.h>
#include <semaphore.h>
#include "os.h"
#include "tlog.h"
#include "trpc.h"
#include "taoserror.h"
#include <stdint.h>
#include <unistd.h>
typedef struct {
int index;
SRpcIpSet ipSet;
int num;
int numOfReqs;
int msgSize;
sem_t rspSem;
sem_t *pOverSem;
pthread_t thread;
void *pRpc;
} SInfo;
static void processUpdateIpSet(void *handle, SRpcIpSet *pIpSet) {
SInfo *pInfo = (SInfo *)handle;
tTrace("thread:%d, ip set is changed, index:%d", pInfo->index, pIpSet->inUse);
pInfo->ipSet = *pIpSet;
}
static int tcount = 0;
static int terror = 0;
static void *sendRequest(void *param) {
SInfo *pInfo = (SInfo *)param;
SRpcMsg rpcMsg, rspMsg;
tTrace("thread:%d, start to send request", pInfo->index);
while ( pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) {
pInfo->num++;
rpcMsg.pCont = rpcMallocCont(pInfo->msgSize);
rpcMsg.contLen = pInfo->msgSize;
rpcMsg.handle = pInfo;
rpcMsg.msgType = 1;
tTrace("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num);
rpcSendRecv(pInfo->pRpc, &pInfo->ipSet, &rpcMsg, &rspMsg);
// handle response
if (rspMsg.code != 0) terror++;
tTrace("thread:%d, rspLen:%d code:%d", pInfo->index, rspMsg.contLen, rspMsg.code);
rpcFreeCont(rspMsg.pCont);
if ( pInfo->num % 20000 == 0 )
tPrint("thread:%d, %d requests have been sent", pInfo->index, pInfo->num);
}
tTrace("thread:%d, it is over", pInfo->index);
tcount++;
return NULL;
}
int main(int argc, char *argv[]) {
SRpcInit rpcInit;
SRpcIpSet ipSet;
int msgSize = 128;
int numOfReqs = 0;
int appThreads = 1;
char serverIp[40] = "127.0.0.1";
struct timeval systemTime;
int64_t startTime, endTime;
pthread_attr_t thattr;
// server info
ipSet.numOfIps = 1;
ipSet.inUse = 0;
ipSet.port = 7000;
ipSet.ip[0] = inet_addr(serverIp);
ipSet.ip[1] = inet_addr("192.168.0.1");
// client info
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localIp = "0.0.0.0";
rpcInit.localPort = 0;
rpcInit.label = "APP";
rpcInit.numOfThreads = 1;
// rpcInit.cfp = processResponse;
rpcInit.ufp = processUpdateIpSet;
rpcInit.sessions = 100;
rpcInit.idleTime = tsShellActivityTimer*1000;
rpcInit.user = "michael";
rpcInit.secret = "mypassword";
rpcInit.ckey = "key";
rpcInit.spi = 1;
rpcInit.connType = TAOS_CONN_CLIENT;
for (int i=1; i<argc; ++i) {
if (strcmp(argv[i], "-p")==0 && i < argc-1) {
ipSet.port = atoi(argv[++i]);
} else if (strcmp(argv[i], "-i") ==0 && i < argc-1) {
ipSet.ip[0] = inet_addr(argv[++i]);
} else if (strcmp(argv[i], "-l")==0 && i < argc-1) {
strcpy(rpcInit.localIp, argv[++i]);
} else if (strcmp(argv[i], "-t")==0 && i < argc-1) {
rpcInit.numOfThreads = atoi(argv[++i]);
} else if (strcmp(argv[i], "-m")==0 && i < argc-1) {
msgSize = atoi(argv[++i]);
} else if (strcmp(argv[i], "-s")==0 && i < argc-1) {
rpcInit.sessions = atoi(argv[++i]);
} else if (strcmp(argv[i], "-n")==0 && i < argc-1) {
numOfReqs = atoi(argv[++i]);
} else if (strcmp(argv[i], "-a")==0 && i < argc-1) {
appThreads = atoi(argv[++i]);
} else if (strcmp(argv[i], "-o")==0 && i < argc-1) {
tsCompressMsgSize = atoi(argv[++i]);
} else if (strcmp(argv[i], "-u")==0 && i < argc-1) {
rpcInit.user = argv[++i];
} else if (strcmp(argv[i], "-k")==0 && i < argc-1) {
rpcInit.secret = argv[++i];
} else if (strcmp(argv[i], "-spi")==0 && i < argc-1) {
rpcInit.spi = atoi(argv[++i]);
} else if (strcmp(argv[i], "-d")==0 && i < argc-1) {
rpcDebugFlag = atoi(argv[++i]);
} else {
printf("\nusage: %s [options] \n", argv[0]);
printf(" [-i ip]: first server IP address, default is:%s\n", serverIp);
printf(" [-p port]: server port number, default is:%d\n", ipSet.port);
printf(" [-t threads]: number of rpc threads, default is:%d\n", rpcInit.numOfThreads);
printf(" [-s sessions]: number of rpc sessions, default is:%d\n", rpcInit.sessions);
printf(" [-l localIp]: local IP address, default is:%s\n", rpcInit.localIp);
printf(" [-m msgSize]: message body size, default is:%d\n", msgSize);
printf(" [-a threads]: number of app threads, default is:%d\n", appThreads);
printf(" [-n requests]: number of requests per thread, default is:%d\n", numOfReqs);
printf(" [-o compSize]: compression message size, default is:%d\n", tsCompressMsgSize);
printf(" [-u user]: user name for the connection, default is:%s\n", rpcInit.user);
printf(" [-k secret]: password for the connection, default is:%s\n", rpcInit.secret);
printf(" [-spi SPI]: security parameter index, default is:%d\n", rpcInit.spi);
printf(" [-d debugFlag]: debug flag, default:%d\n", rpcDebugFlag);
printf(" [-h help]: print out this help\n\n");
exit(0);
}
}
taosInitLog("client.log", 100000, 10);
void *pRpc = rpcOpen(&rpcInit);
if (pRpc == NULL) {
dError("failed to initialize RPC");
return -1;
}
tPrint("client is initialized");
gettimeofday(&systemTime, NULL);
startTime = systemTime.tv_sec*1000000 + systemTime.tv_usec;
SInfo *pInfo = (SInfo *)calloc(1, sizeof(SInfo)*appThreads);
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
for (int i=0; i<appThreads; ++i) {
pInfo->index = i;
pInfo->ipSet = ipSet;
pInfo->numOfReqs = numOfReqs;
pInfo->msgSize = msgSize;
sem_init(&pInfo->rspSem, 0, 0);
pInfo->pRpc = pRpc;
pthread_create(&pInfo->thread, &thattr, sendRequest, pInfo);
pInfo++;
}
do {
usleep(1);
} while ( tcount < appThreads);
gettimeofday(&systemTime, NULL);
endTime = systemTime.tv_sec*1000000 + systemTime.tv_usec;
float usedTime = (endTime - startTime)/1000.0; // mseconds
tPrint("it takes %.3f mseconds to send %d requests to server, error num:%d", usedTime, numOfReqs*appThreads, terror);
tPrint("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0*numOfReqs*appThreads/usedTime, msgSize);
taosCloseLogger();
return 0;
}
......@@ -72,7 +72,7 @@ void processShellMsg() {
rpcMsg.pCont = rpcMallocCont(msgSize);
rpcMsg.contLen = msgSize;
rpcMsg.handle = pRpcMsg->handle;
rpcMsg.code = 1;
rpcMsg.code = 0;
rpcSendResponse(&rpcMsg);
taosFreeQitem(pRpcMsg);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册