rsclient.c 6.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16

17
#include "os.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
18
#include "tutil.h"
S
slguan 已提交
19 20
#include "tglobal.h"
#include "rpcLog.h"
21 22 23 24 25
#include "trpc.h"
#include "taoserror.h"

typedef struct {
  int       index;
26
  SRpcEpSet epSet;
27 28 29
  int       num;
  int       numOfReqs;
  int       msgSize;
S
TD-1057  
Shengliang Guan 已提交
30 31
  tsem_t    rspSem; 
  tsem_t   *pOverSem; 
32 33 34 35 36 37 38 39 40 41
  pthread_t thread;
  void     *pRpc;
} SInfo;


static int tcount = 0;
static int terror = 0;

static void *sendRequest(void *param) {
  SInfo  *pInfo = (SInfo *)param;
42
  SRpcMsg rpcMsg, rspMsg;
43
  
44 45
  setThreadName("sendSrvReq");
 
46
  tDebug("thread:%d, start to send request", pInfo->index);
47 48 49 50 51 52 53

  while ( pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) {
    pInfo->num++;
    rpcMsg.pCont = rpcMallocCont(pInfo->msgSize);
    rpcMsg.contLen = pInfo->msgSize;
    rpcMsg.handle = pInfo;
    rpcMsg.msgType = 1;
54
    tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num);
55

56
    rpcSendRecv(pInfo->pRpc, &pInfo->epSet, &rpcMsg, &rspMsg);
57 58 59 60
    
    // handle response
    if (rspMsg.code != 0) terror++;

61
    tDebug("thread:%d, rspLen:%d code:%d", pInfo->index, rspMsg.contLen, rspMsg.code);
62 63 64 65

    rpcFreeCont(rspMsg.pCont);

    if ( pInfo->num % 20000 == 0 ) 
66
      tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num);
67 68
  }

69
  tDebug("thread:%d, it is over", pInfo->index);
70 71 72 73 74 75 76
  tcount++;

  return NULL;
}

int main(int argc, char *argv[]) {
  SRpcInit  rpcInit;
77
  SRpcEpSet epSet;
78 79 80 81
  int      msgSize = 128;
  int      numOfReqs = 0;
  int      appThreads = 1;
  char     serverIp[40] = "127.0.0.1";
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
82
  char     secret[TSDB_KEY_LEN] = "mypassword";
83 84 85 86 87
  struct   timeval systemTime;
  int64_t  startTime, endTime;
  pthread_attr_t thattr;

  // server info
88 89 90 91 92 93
  epSet.numOfEps = 1;
  epSet.inUse = 0;
  epSet.port[0] = 7000;
  epSet.port[1] = 7000;
  strcpy(epSet.fqdn[0], serverIp);
  strcpy(epSet.fqdn[1], "192.168.0.1");
94 95 96

  // client info
  memset(&rpcInit, 0, sizeof(rpcInit));
J
jtao1735 已提交
97
  //rpcInit.localIp      = "0.0.0.0";
98 99 100 101 102 103
  rpcInit.localPort    = 0;
  rpcInit.label        = "APP";
  rpcInit.numOfThreads = 1;
  rpcInit.sessions     = 100;
  rpcInit.idleTime     = tsShellActivityTimer*1000;
  rpcInit.user         = "michael";
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
104
  rpcInit.secret       = secret;
105 106 107 108 109 110
  rpcInit.ckey         = "key";
  rpcInit.spi          = 1;
  rpcInit.connType     = TAOS_CONN_CLIENT;

  for (int i=1; i<argc; ++i) { 
    if (strcmp(argv[i], "-p")==0 && i < argc-1) {
111
      epSet.port[0] = atoi(argv[++i]);
112
    } else if (strcmp(argv[i], "-i") ==0 && i < argc-1) {
113
      tstrncpy(epSet.fqdn[0], argv[++i], sizeof(epSet.fqdn[0])); 
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
    } else if (strcmp(argv[i], "-t")==0 && i < argc-1) {
      rpcInit.numOfThreads = atoi(argv[++i]);
    } else if (strcmp(argv[i], "-m")==0 && i < argc-1) {
      msgSize = atoi(argv[++i]);
    } else if (strcmp(argv[i], "-s")==0 && i < argc-1) {
      rpcInit.sessions = atoi(argv[++i]); 
    } else if (strcmp(argv[i], "-n")==0 && i < argc-1) {
      numOfReqs = atoi(argv[++i]); 
    } else if (strcmp(argv[i], "-a")==0 && i < argc-1) {
      appThreads = atoi(argv[++i]); 
    } else if (strcmp(argv[i], "-o")==0 && i < argc-1) {
      tsCompressMsgSize = atoi(argv[++i]); 
    } else if (strcmp(argv[i], "-u")==0 && i < argc-1) {
      rpcInit.user = argv[++i];
    } else if (strcmp(argv[i], "-k")==0 && i < argc-1) {
      rpcInit.secret = argv[++i];
    } else if (strcmp(argv[i], "-spi")==0 && i < argc-1) {
      rpcInit.spi = atoi(argv[++i]);
    } else if (strcmp(argv[i], "-d")==0 && i < argc-1) {
      rpcDebugFlag = atoi(argv[++i]);
    } else {
      printf("\nusage: %s [options] \n", argv[0]);
      printf("  [-i ip]: first server IP address, default is:%s\n", serverIp);
137
      printf("  [-p port]: server port number, default is:%d\n", epSet.port[0]);
138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156
      printf("  [-t threads]: number of rpc threads, default is:%d\n", rpcInit.numOfThreads);
      printf("  [-s sessions]: number of rpc sessions, default is:%d\n", rpcInit.sessions);
      printf("  [-m msgSize]: message body size, default is:%d\n", msgSize);
      printf("  [-a threads]: number of app threads, default is:%d\n", appThreads);
      printf("  [-n requests]: number of requests per thread, default is:%d\n", numOfReqs);
      printf("  [-o compSize]: compression message size, default is:%d\n", tsCompressMsgSize);
      printf("  [-u user]: user name for the connection, default is:%s\n", rpcInit.user);
      printf("  [-k secret]: password for the connection, default is:%s\n", rpcInit.secret);
      printf("  [-spi SPI]: security parameter index, default is:%d\n", rpcInit.spi);
      printf("  [-d debugFlag]: debug flag, default:%d\n", rpcDebugFlag);
      printf("  [-h help]: print out this help\n\n");
      exit(0);
    }
  }

  taosInitLog("client.log", 100000, 10);

  void *pRpc = rpcOpen(&rpcInit);
  if (pRpc == NULL) {
S
slguan 已提交
157
    tError("failed to initialize RPC");
158 159 160
    return -1;
  }

161
  tInfo("client is initialized");
162 163 164 165 166 167 168 169 170 171 172

  gettimeofday(&systemTime, NULL);
  startTime = systemTime.tv_sec*1000000 + systemTime.tv_usec;

  SInfo *pInfo = (SInfo *)calloc(1, sizeof(SInfo)*appThreads);
 
  pthread_attr_init(&thattr);
  pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);

  for (int i=0; i<appThreads; ++i) {
    pInfo->index = i;
173
    pInfo->epSet = epSet;
174 175
    pInfo->numOfReqs = numOfReqs;
    pInfo->msgSize = msgSize;
S
TD-1057  
Shengliang Guan 已提交
176
    tsem_init(&pInfo->rspSem, 0, 0);
177 178 179 180 181 182 183 184 185 186 187 188 189
    pInfo->pRpc = pRpc;
    pthread_create(&pInfo->thread, &thattr, sendRequest, pInfo);
    pInfo++;
  }

  do {
    usleep(1);
  } while ( tcount < appThreads);

  gettimeofday(&systemTime, NULL);
  endTime = systemTime.tv_sec*1000000 + systemTime.tv_usec;  
  float usedTime = (endTime - startTime)/1000.0;  // mseconds

190 191
  tInfo("it takes %.3f mseconds to send %d requests to server, error num:%d", usedTime, numOfReqs*appThreads, terror);
  tInfo("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0*numOfReqs*appThreads/usedTime, msgSize);
192

S
slguan 已提交
193
  taosCloseLog();
194 195 196 197 198

  return 0;
}