/* * Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #include #include #include #include #include #include #include #include #include "os.h" #include "tlog.h" #include "trpc.h" #include "taoserror.h" #include #include typedef struct { int index; SRpcIpSet ipSet; int num; int numOfReqs; int msgSize; sem_t rspSem; sem_t *pOverSem; pthread_t thread; void *pRpc; } SInfo; static void processUpdateIpSet(void *handle, SRpcIpSet *pIpSet) { SInfo *pInfo = (SInfo *)handle; tTrace("thread:%d, ip set is changed, index:%d", pInfo->index, pIpSet->inUse); pInfo->ipSet = *pIpSet; } static int tcount = 0; static int terror = 0; static void *sendRequest(void *param) { SInfo *pInfo = (SInfo *)param; SRpcMsg rpcMsg, rspMsg; tTrace("thread:%d, start to send request", pInfo->index); while ( pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) { pInfo->num++; rpcMsg.pCont = rpcMallocCont(pInfo->msgSize); rpcMsg.contLen = pInfo->msgSize; rpcMsg.handle = pInfo; rpcMsg.msgType = 1; tTrace("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num); rpcSendRecv(pInfo->pRpc, &pInfo->ipSet, &rpcMsg, &rspMsg); // handle response if (rspMsg.code != 0) terror++; tTrace("thread:%d, rspLen:%d code:%d", pInfo->index, rspMsg.contLen, rspMsg.code); rpcFreeCont(rspMsg.pCont); if ( pInfo->num % 20000 == 0 ) tPrint("thread:%d, %d requests have been sent", pInfo->index, pInfo->num); } tTrace("thread:%d, it is over", pInfo->index); tcount++; return NULL; } int main(int argc, char *argv[]) { SRpcInit rpcInit; SRpcIpSet ipSet; int msgSize = 128; int numOfReqs = 0; int appThreads = 1; char serverIp[40] = "127.0.0.1"; struct timeval systemTime; int64_t startTime, endTime; pthread_attr_t thattr; // server info ipSet.numOfIps = 1; ipSet.inUse = 0; ipSet.port = 7000; ipSet.ip[0] = inet_addr(serverIp); ipSet.ip[1] = inet_addr("192.168.0.1"); // client info memset(&rpcInit, 0, sizeof(rpcInit)); rpcInit.localIp = "0.0.0.0"; rpcInit.localPort = 0; rpcInit.label = "APP"; rpcInit.numOfThreads = 1; // rpcInit.cfp = processResponse; rpcInit.ufp = processUpdateIpSet; rpcInit.sessions = 100; rpcInit.idleTime = tsShellActivityTimer*1000; rpcInit.user = "michael"; rpcInit.secret = "mypassword"; rpcInit.ckey = "key"; rpcInit.spi = 1; rpcInit.connType = TAOS_CONN_CLIENT; for (int i=1; iindex = i; pInfo->ipSet = ipSet; pInfo->numOfReqs = numOfReqs; pInfo->msgSize = msgSize; sem_init(&pInfo->rspSem, 0, 0); pInfo->pRpc = pRpc; pthread_create(&pInfo->thread, &thattr, sendRequest, pInfo); pInfo++; } do { usleep(1); } while ( tcount < appThreads); gettimeofday(&systemTime, NULL); endTime = systemTime.tv_sec*1000000 + systemTime.tv_usec; float usedTime = (endTime - startTime)/1000.0; // mseconds tPrint("it takes %.3f mseconds to send %d requests to server, error num:%d", usedTime, numOfReqs*appThreads, terror); tPrint("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0*numOfReqs*appThreads/usedTime, msgSize); taosCloseLogger(); return 0; }