rsclient.c 6.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16

17
#include "os.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
18
#include "tutil.h"
S
slguan 已提交
19 20
#include "tglobal.h"
#include "rpcLog.h"
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
#include "trpc.h"
#include "taoserror.h"

typedef struct {
  int       index;
  SRpcIpSet ipSet;
  int       num;
  int       numOfReqs;
  int       msgSize;
  sem_t     rspSem; 
  sem_t    *pOverSem; 
  pthread_t thread;
  void     *pRpc;
} SInfo;


static int tcount = 0;
static int terror = 0;

static void *sendRequest(void *param) {
  SInfo  *pInfo = (SInfo *)param;
  SRpcMsg rpcMsg, rspMsg; 
  
44
  tDebug("thread:%d, start to send request", pInfo->index);
45 46 47 48 49 50 51

  while ( pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) {
    pInfo->num++;
    rpcMsg.pCont = rpcMallocCont(pInfo->msgSize);
    rpcMsg.contLen = pInfo->msgSize;
    rpcMsg.handle = pInfo;
    rpcMsg.msgType = 1;
52
    tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num);
53 54 55 56 57 58

    rpcSendRecv(pInfo->pRpc, &pInfo->ipSet, &rpcMsg, &rspMsg);
    
    // handle response
    if (rspMsg.code != 0) terror++;

59
    tDebug("thread:%d, rspLen:%d code:%d", pInfo->index, rspMsg.contLen, rspMsg.code);
60 61 62 63

    rpcFreeCont(rspMsg.pCont);

    if ( pInfo->num % 20000 == 0 ) 
64
      tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num);
65 66
  }

67
  tDebug("thread:%d, it is over", pInfo->index);
68 69 70 71 72 73 74 75 76 77 78 79
  tcount++;

  return NULL;
}

int main(int argc, char *argv[]) {
  SRpcInit  rpcInit;
  SRpcIpSet ipSet;
  int      msgSize = 128;
  int      numOfReqs = 0;
  int      appThreads = 1;
  char     serverIp[40] = "127.0.0.1";
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
80
  char     secret[TSDB_KEY_LEN] = "mypassword";
81 82 83 84 85 86 87
  struct   timeval systemTime;
  int64_t  startTime, endTime;
  pthread_attr_t thattr;

  // server info
  ipSet.numOfIps = 1;
  ipSet.inUse = 0;
J
jtao1735 已提交
88 89 90 91
  ipSet.port[0] = 7000;
  ipSet.port[1] = 7000;
  strcpy(ipSet.fqdn[0], serverIp);
  strcpy(ipSet.fqdn[1], "192.168.0.1");
92 93 94

  // client info
  memset(&rpcInit, 0, sizeof(rpcInit));
J
jtao1735 已提交
95
  //rpcInit.localIp      = "0.0.0.0";
96 97 98 99 100 101
  rpcInit.localPort    = 0;
  rpcInit.label        = "APP";
  rpcInit.numOfThreads = 1;
  rpcInit.sessions     = 100;
  rpcInit.idleTime     = tsShellActivityTimer*1000;
  rpcInit.user         = "michael";
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
102
  rpcInit.secret       = secret;
103 104 105 106 107 108
  rpcInit.ckey         = "key";
  rpcInit.spi          = 1;
  rpcInit.connType     = TAOS_CONN_CLIENT;

  for (int i=1; i<argc; ++i) { 
    if (strcmp(argv[i], "-p")==0 && i < argc-1) {
J
jtao1735 已提交
109
      ipSet.port[0] = atoi(argv[++i]);
110
    } else if (strcmp(argv[i], "-i") ==0 && i < argc-1) {
H
Hongze Cheng 已提交
111
      tstrncpy(ipSet.fqdn[0], argv[++i], sizeof(ipSet.fqdn[0])); 
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
    } else if (strcmp(argv[i], "-t")==0 && i < argc-1) {
      rpcInit.numOfThreads = atoi(argv[++i]);
    } else if (strcmp(argv[i], "-m")==0 && i < argc-1) {
      msgSize = atoi(argv[++i]);
    } else if (strcmp(argv[i], "-s")==0 && i < argc-1) {
      rpcInit.sessions = atoi(argv[++i]); 
    } else if (strcmp(argv[i], "-n")==0 && i < argc-1) {
      numOfReqs = atoi(argv[++i]); 
    } else if (strcmp(argv[i], "-a")==0 && i < argc-1) {
      appThreads = atoi(argv[++i]); 
    } else if (strcmp(argv[i], "-o")==0 && i < argc-1) {
      tsCompressMsgSize = atoi(argv[++i]); 
    } else if (strcmp(argv[i], "-u")==0 && i < argc-1) {
      rpcInit.user = argv[++i];
    } else if (strcmp(argv[i], "-k")==0 && i < argc-1) {
      rpcInit.secret = argv[++i];
    } else if (strcmp(argv[i], "-spi")==0 && i < argc-1) {
      rpcInit.spi = atoi(argv[++i]);
    } else if (strcmp(argv[i], "-d")==0 && i < argc-1) {
      rpcDebugFlag = atoi(argv[++i]);
    } else {
      printf("\nusage: %s [options] \n", argv[0]);
      printf("  [-i ip]: first server IP address, default is:%s\n", serverIp);
J
jtao1735 已提交
135
      printf("  [-p port]: server port number, default is:%d\n", ipSet.port[0]);
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154
      printf("  [-t threads]: number of rpc threads, default is:%d\n", rpcInit.numOfThreads);
      printf("  [-s sessions]: number of rpc sessions, default is:%d\n", rpcInit.sessions);
      printf("  [-m msgSize]: message body size, default is:%d\n", msgSize);
      printf("  [-a threads]: number of app threads, default is:%d\n", appThreads);
      printf("  [-n requests]: number of requests per thread, default is:%d\n", numOfReqs);
      printf("  [-o compSize]: compression message size, default is:%d\n", tsCompressMsgSize);
      printf("  [-u user]: user name for the connection, default is:%s\n", rpcInit.user);
      printf("  [-k secret]: password for the connection, default is:%s\n", rpcInit.secret);
      printf("  [-spi SPI]: security parameter index, default is:%d\n", rpcInit.spi);
      printf("  [-d debugFlag]: debug flag, default:%d\n", rpcDebugFlag);
      printf("  [-h help]: print out this help\n\n");
      exit(0);
    }
  }

  taosInitLog("client.log", 100000, 10);

  void *pRpc = rpcOpen(&rpcInit);
  if (pRpc == NULL) {
S
slguan 已提交
155
    tError("failed to initialize RPC");
156 157 158
    return -1;
  }

159
  tInfo("client is initialized");
160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187

  gettimeofday(&systemTime, NULL);
  startTime = systemTime.tv_sec*1000000 + systemTime.tv_usec;

  SInfo *pInfo = (SInfo *)calloc(1, sizeof(SInfo)*appThreads);
 
  pthread_attr_init(&thattr);
  pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);

  for (int i=0; i<appThreads; ++i) {
    pInfo->index = i;
    pInfo->ipSet = ipSet;
    pInfo->numOfReqs = numOfReqs;
    pInfo->msgSize = msgSize;
    sem_init(&pInfo->rspSem, 0, 0);
    pInfo->pRpc = pRpc;
    pthread_create(&pInfo->thread, &thattr, sendRequest, pInfo);
    pInfo++;
  }

  do {
    usleep(1);
  } while ( tcount < appThreads);

  gettimeofday(&systemTime, NULL);
  endTime = systemTime.tv_sec*1000000 + systemTime.tv_usec;  
  float usedTime = (endTime - startTime)/1000.0;  // mseconds

188 189
  tInfo("it takes %.3f mseconds to send %d requests to server, error num:%d", usedTime, numOfReqs*appThreads, terror);
  tInfo("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0*numOfReqs*appThreads/usedTime, msgSize);
190

S
slguan 已提交
191
  taosCloseLog();
192 193 194 195 196

  return 0;
}