未验证 提交 22635992 编写于 作者: dengyihao's avatar dengyihao 提交者: GitHub

Merge pull request #15534 from taosdata/opt/bench

fix: add bench to rpc
add_executable(transportTest "") add_executable(transportTest "")
add_executable(transUT "") add_executable(transUT "")
add_executable(pushServer "") add_executable(svrBench "")
add_executable(cliBench "")
target_sources(transUT target_sources(transUT
PRIVATE PRIVATE
...@@ -12,9 +13,13 @@ target_sources(transportTest ...@@ -12,9 +13,13 @@ target_sources(transportTest
"transportTests.cpp" "transportTests.cpp"
) )
target_sources(pushServer target_sources(svrBench
PRIVATE PRIVATE
"pushServer.c" "svrBench.c"
)
target_sources(cliBench
PRIVATE
"cliBench.c"
) )
target_include_directories(transportTest target_include_directories(transportTest
...@@ -45,13 +50,37 @@ target_include_directories(transUT ...@@ -45,13 +50,37 @@ target_include_directories(transUT
"${CMAKE_CURRENT_SOURCE_DIR}/../inc" "${CMAKE_CURRENT_SOURCE_DIR}/../inc"
) )
target_include_directories(pushServer target_include_directories(svrBench
PUBLIC
"${TD_SOURCE_DIR}/include/libs/transport"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(svrBench
PUBLIC
"${TD_SOURCE_DIR}/include/libs/transport"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_link_libraries (svrBench
os
util
common
gtest_main
transport
)
target_include_directories(cliBench
PUBLIC
"${TD_SOURCE_DIR}/include/libs/transport"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(cliBench
PUBLIC PUBLIC
"${TD_SOURCE_DIR}/include/libs/transport" "${TD_SOURCE_DIR}/include/libs/transport"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc" "${CMAKE_CURRENT_SOURCE_DIR}/../inc"
) )
target_link_libraries (pushServer target_link_libraries (cliBench
os os
util util
common common
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "taoserror.h"
#include "tglobal.h"
#include "transLog.h"
#include "trpc.h"
#include "tutil.h"
typedef struct {
int index;
SEpSet epSet;
int num;
int numOfReqs;
int msgSize;
tsem_t rspSem;
tsem_t *pOverSem;
TdThread thread;
void *pRpc;
} SInfo;
static void processResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
SInfo *pInfo = (SInfo *)pMsg->info.ahandle;
tDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen,
pMsg->code);
if (pEpSet) pInfo->epSet = *pEpSet;
rpcFreeCont(pMsg->pCont);
tsem_post(&pInfo->rspSem);
}
static int tcount = 0;
static void *sendRequest(void *param) {
SInfo *pInfo = (SInfo *)param;
SRpcMsg rpcMsg = {0};
tDebug("thread:%d, start to send request", pInfo->index);
while (pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) {
pInfo->num++;
rpcMsg.pCont = rpcMallocCont(pInfo->msgSize);
rpcMsg.contLen = pInfo->msgSize;
rpcMsg.info.ahandle = pInfo;
rpcMsg.msgType = 1;
tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num);
rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg, NULL);
if (pInfo->num % 20000 == 0) tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num);
tsem_wait(&pInfo->rspSem);
}
tDebug("thread:%d, it is over", pInfo->index);
tcount++;
return NULL;
}
int main(int argc, char *argv[]) {
SRpcInit rpcInit;
SEpSet epSet;
int msgSize = 128;
int numOfReqs = 0;
int appThreads = 1;
char serverIp[40] = "127.0.0.1";
struct timeval systemTime;
int64_t startTime, endTime;
// server info
epSet.numOfEps = 1;
epSet.inUse = 0;
epSet.eps[0].port = 7000;
epSet.eps[1].port = 7000;
strcpy(epSet.eps[0].fqdn, serverIp);
strcpy(epSet.eps[1].fqdn, "192.168.0.1");
// client info
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localPort = 0;
rpcInit.label = "APP";
rpcInit.numOfThreads = 1;
rpcInit.cfp = processResponse;
rpcInit.sessions = 100;
rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.user = "michael";
rpcInit.connType = TAOS_CONN_CLIENT;
rpcDebugFlag = 131;
for (int i = 1; i < argc; ++i) {
if (strcmp(argv[i], "-p") == 0 && i < argc - 1) {
} else if (strcmp(argv[i], "-i") == 0 && i < argc - 1) {
} else if (strcmp(argv[i], "-t") == 0 && i < argc - 1) {
rpcInit.numOfThreads = atoi(argv[++i]);
} else if (strcmp(argv[i], "-m") == 0 && i < argc - 1) {
msgSize = atoi(argv[++i]);
} else if (strcmp(argv[i], "-s") == 0 && i < argc - 1) {
rpcInit.sessions = atoi(argv[++i]);
} else if (strcmp(argv[i], "-n") == 0 && i < argc - 1) {
numOfReqs = atoi(argv[++i]);
} else if (strcmp(argv[i], "-a") == 0 && i < argc - 1) {
appThreads = atoi(argv[++i]);
} else if (strcmp(argv[i], "-o") == 0 && i < argc - 1) {
tsCompressMsgSize = atoi(argv[++i]);
} else if (strcmp(argv[i], "-u") == 0 && i < argc - 1) {
} else if (strcmp(argv[i], "-k") == 0 && i < argc - 1) {
} else if (strcmp(argv[i], "-spi") == 0 && i < argc - 1) {
} else if (strcmp(argv[i], "-d") == 0 && i < argc - 1) {
rpcDebugFlag = atoi(argv[++i]);
} else {
printf("\nusage: %s [options] \n", argv[0]);
printf(" [-i ip]: first server IP address, default is:%s\n", serverIp);
printf(" [-t threads]: number of rpc threads, default is:%d\n", rpcInit.numOfThreads);
printf(" [-m msgSize]: message body size, default is:%d\n", msgSize);
printf(" [-a threads]: number of app threads, default is:%d\n", appThreads);
printf(" [-n requests]: number of requests per thread, default is:%d\n", numOfReqs);
printf(" [-u user]: user name for the connection, default is:%s\n", rpcInit.user);
printf(" [-d debugFlag]: debug flag, default:%d\n", rpcDebugFlag);
printf(" [-h help]: print out this help\n\n");
exit(0);
}
}
taosInitLog("client.log", 100000);
void *pRpc = rpcOpen(&rpcInit);
if (pRpc == NULL) {
tError("failed to initialize RPC");
return -1;
}
tInfo("client is initialized");
tInfo("threads:%d msgSize:%d requests:%d", appThreads, msgSize, numOfReqs);
int64_t now = taosGetTimestampUs();
SInfo *pInfo = (SInfo *)taosMemoryCalloc(1, sizeof(SInfo) * appThreads);
SInfo *p = pInfo;
for (int i = 0; i < appThreads; ++i) {
pInfo->index = i;
pInfo->epSet = epSet;
pInfo->numOfReqs = numOfReqs;
pInfo->msgSize = msgSize;
tsem_init(&pInfo->rspSem, 0, 0);
pInfo->pRpc = pRpc;
taosThreadCreate(&pInfo->thread, NULL, sendRequest, pInfo);
pInfo++;
}
do {
taosUsleep(1);
} while (tcount < appThreads);
float usedTime = (taosGetTimestampUs() - now) / 1000.0f;
tInfo("it takes %.3f mseconds to send %d requests to server", usedTime, numOfReqs * appThreads);
tInfo("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0 * numOfReqs * appThreads / usedTime, msgSize);
for (int i = 0; i < appThreads; i++) {
SInfo *pInfo = p;
taosThreadJoin(pInfo->thread, NULL);
p++;
}
int ch = getchar();
UNUSED(ch);
taosCloseLog();
return 0;
}
...@@ -24,12 +24,12 @@ int msgSize = 128; ...@@ -24,12 +24,12 @@ int msgSize = 128;
int commit = 0; int commit = 0;
TdFilePtr pDataFile = NULL; TdFilePtr pDataFile = NULL;
STaosQueue *qhandle = NULL; STaosQueue *qhandle = NULL;
STaosQset * qset = NULL; STaosQset *qset = NULL;
void processShellMsg() { void processShellMsg() {
static int num = 0; static int num = 0;
STaosQall *qall; STaosQall *qall;
SRpcMsg * pRpcMsg, rpcMsg; SRpcMsg *pRpcMsg, rpcMsg;
int type; int type;
SQueueInfo qinfo = {0}; SQueueInfo qinfo = {0};
...@@ -77,7 +77,6 @@ void processShellMsg() { ...@@ -77,7 +77,6 @@ void processShellMsg() {
taosFreeQitem(pRpcMsg); taosFreeQitem(pRpcMsg);
{ {
// taosSsleep(1);
SRpcMsg nRpcMsg = {0}; SRpcMsg nRpcMsg = {0};
nRpcMsg.pCont = rpcMallocCont(msgSize); nRpcMsg.pCont = rpcMallocCont(msgSize);
nRpcMsg.contLen = msgSize; nRpcMsg.contLen = msgSize;
...@@ -93,26 +92,6 @@ void processShellMsg() { ...@@ -93,26 +92,6 @@ void processShellMsg() {
taosFreeQall(qall); taosFreeQall(qall);
} }
int retrieveAuthInfo(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey) {
// app shall retrieve the auth info based on meterID from DB or a data file
// demo code here only for simple demo
int ret = 0;
if (strcmp(meterId, "michael") == 0) {
*spi = 1;
*encrypt = 0;
strcpy(secret, "mypassword");
strcpy(ckey, "key");
} else if (strcmp(meterId, "jeff") == 0) {
*spi = 0;
*encrypt = 0;
} else {
ret = -1; // user not there
}
return ret;
}
void processRequestMsg(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { void processRequestMsg(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
SRpcMsg *pTemp; SRpcMsg *pTemp;
...@@ -131,11 +110,12 @@ int main(int argc, char *argv[]) { ...@@ -131,11 +110,12 @@ int main(int argc, char *argv[]) {
memset(&rpcInit, 0, sizeof(rpcInit)); memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localPort = 7000; rpcInit.localPort = 7000;
memcpy(rpcInit.localFqdn, "localhost", strlen("localhost"));
rpcInit.label = "SER"; rpcInit.label = "SER";
rpcInit.numOfThreads = 1; rpcInit.numOfThreads = 1;
rpcInit.cfp = processRequestMsg; rpcInit.cfp = processRequestMsg;
rpcInit.sessions = 1000;
rpcInit.idleTime = 2 * 1500; rpcInit.idleTime = 2 * 1500;
rpcDebugFlag = 131;
for (int i = 1; i < argc; ++i) { for (int i = 1; i < argc; ++i) {
if (strcmp(argv[i], "-p") == 0 && i < argc - 1) { if (strcmp(argv[i], "-p") == 0 && i < argc - 1) {
...@@ -170,7 +150,7 @@ int main(int argc, char *argv[]) { ...@@ -170,7 +150,7 @@ int main(int argc, char *argv[]) {
tsAsyncLog = 0; tsAsyncLog = 0;
rpcInit.connType = TAOS_CONN_SERVER; rpcInit.connType = TAOS_CONN_SERVER;
taosInitLog("server.log", 10); taosInitLog("server.log", 100000);
void *pRpc = rpcOpen(&rpcInit); void *pRpc = rpcOpen(&rpcInit);
if (pRpc == NULL) { if (pRpc == NULL) {
......
...@@ -16,8 +16,8 @@ ...@@ -16,8 +16,8 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "tlog.h" #include "tlog.h"
#include "os.h" #include "os.h"
#include "tutil.h"
#include "tconfig.h" #include "tconfig.h"
#include "tutil.h"
#define LOG_MAX_LINE_SIZE (1024) #define LOG_MAX_LINE_SIZE (1024)
#define LOG_MAX_LINE_BUFFER_SIZE (LOG_MAX_LINE_SIZE + 3) #define LOG_MAX_LINE_BUFFER_SIZE (LOG_MAX_LINE_SIZE + 3)
...@@ -40,7 +40,7 @@ ...@@ -40,7 +40,7 @@
#define LOG_BUF_MUTEX(x) ((x)->buffMutex) #define LOG_BUF_MUTEX(x) ((x)->buffMutex)
typedef struct { typedef struct {
char * buffer; char *buffer;
int32_t buffStart; int32_t buffStart;
int32_t buffEnd; int32_t buffEnd;
int32_t buffSize; int32_t buffSize;
...@@ -59,7 +59,7 @@ typedef struct { ...@@ -59,7 +59,7 @@ typedef struct {
int32_t openInProgress; int32_t openInProgress;
pid_t pid; pid_t pid;
char logName[LOG_FILE_NAME_LEN]; char logName[LOG_FILE_NAME_LEN];
SLogBuff * logHandle; SLogBuff *logHandle;
TdThreadMutex logMutex; TdThreadMutex logMutex;
} SLogObj; } SLogObj;
...@@ -106,7 +106,7 @@ int64_t dbgSmallWN = 0; ...@@ -106,7 +106,7 @@ int64_t dbgSmallWN = 0;
int64_t dbgBigWN = 0; int64_t dbgBigWN = 0;
int64_t dbgWSize = 0; int64_t dbgWSize = 0;
static void * taosAsyncOutputLog(void *param); static void *taosAsyncOutputLog(void *param);
static int32_t taosPushLogBuffer(SLogBuff *pLogBuf, const char *msg, int32_t msgLen); static int32_t taosPushLogBuffer(SLogBuff *pLogBuf, const char *msg, int32_t msgLen);
static SLogBuff *taosLogBuffNew(int32_t bufSize); static SLogBuff *taosLogBuffNew(int32_t bufSize);
static void taosCloseLogByFd(TdFilePtr pFile); static void taosCloseLogByFd(TdFilePtr pFile);
...@@ -128,7 +128,11 @@ int32_t taosInitLog(const char *logName, int32_t maxFiles) { ...@@ -128,7 +128,11 @@ int32_t taosInitLog(const char *logName, int32_t maxFiles) {
osUpdate(); osUpdate();
char fullName[PATH_MAX] = {0}; char fullName[PATH_MAX] = {0};
if (strlen(tsLogDir) != 0) {
snprintf(fullName, PATH_MAX, "%s" TD_DIRSEP "%s", tsLogDir, logName); snprintf(fullName, PATH_MAX, "%s" TD_DIRSEP "%s", tsLogDir, logName);
} else {
snprintf(fullName, PATH_MAX, "%s", logName);
}
tsLogObj.logHandle = taosLogBuffNew(LOG_DEFAULT_BUF_SIZE); tsLogObj.logHandle = taosLogBuffNew(LOG_DEFAULT_BUF_SIZE);
if (tsLogObj.logHandle == NULL) return -1; if (tsLogObj.logHandle == NULL) return -1;
...@@ -704,7 +708,7 @@ int32_t taosCompressFile(char *srcFileName, char *destFileName) { ...@@ -704,7 +708,7 @@ int32_t taosCompressFile(char *srcFileName, char *destFileName) {
int32_t compressSize = 163840; int32_t compressSize = 163840;
int32_t ret = 0; int32_t ret = 0;
int32_t len = 0; int32_t len = 0;
char * data = taosMemoryMalloc(compressSize); char *data = taosMemoryMalloc(compressSize);
// gzFile dstFp = NULL; // gzFile dstFp = NULL;
// srcFp = fopen(srcFileName, "r"); // srcFp = fopen(srcFileName, "r");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册