rclient.c 6.6 KB
Newer Older
S
slguan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "os.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
17
#include "tutil.h"
S
slguan 已提交
18 19
#include "tglobal.h"
#include "rpcLog.h"
S
slguan 已提交
20
#include "trpc.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
21
#include "taoserror.h"
S
slguan 已提交
22

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
23 24 25 26 27 28 29 30 31 32 33 34
typedef struct {
  int       index;
  SRpcIpSet ipSet;
  int       num;
  int       numOfReqs;
  int       msgSize;
  sem_t     rspSem; 
  sem_t    *pOverSem; 
  pthread_t thread;
  void     *pRpc;
} SInfo;

J
jtao1735 已提交
35
static void processResponse(SRpcMsg *pMsg, SRpcIpSet *pIpSet) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
36
  SInfo *pInfo = (SInfo *)pMsg->ahandle;
37
  tDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen, pMsg->code);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
38

J
jtao1735 已提交
39
  if (pIpSet) pInfo->ipSet = *pIpSet;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
40

J
jtao1735 已提交
41
  rpcFreeCont(pMsg->pCont);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
42
  sem_post(&pInfo->rspSem); 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
43 44
}

45
static int tcount = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
46

47
static void *sendRequest(void *param) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
48
  SInfo  *pInfo = (SInfo *)param;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
49
  SRpcMsg rpcMsg = {0}; 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
50
  
51
  tDebug("thread:%d, start to send request", pInfo->index);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
52 53 54

  while ( pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) {
    pInfo->num++;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
55 56
    rpcMsg.pCont = rpcMallocCont(pInfo->msgSize);
    rpcMsg.contLen = pInfo->msgSize;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
57
    rpcMsg.ahandle = pInfo;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
58
    rpcMsg.msgType = 1;
59
    tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
60
    rpcSendRequest(pInfo->pRpc, &pInfo->ipSet, &rpcMsg);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
61
    if ( pInfo->num % 20000 == 0 ) 
62
      tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
63 64
    sem_wait(&pInfo->rspSem);
  }
S
slguan 已提交
65

66
  tDebug("thread:%d, it is over", pInfo->index);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
67 68 69 70 71 72 73 74 75 76 77 78
  tcount++;

  return NULL;
}

int main(int argc, char *argv[]) {
  SRpcInit  rpcInit;
  SRpcIpSet ipSet;
  int      msgSize = 128;
  int      numOfReqs = 0;
  int      appThreads = 1;
  char     serverIp[40] = "127.0.0.1";
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
79
  char     secret[TSDB_KEY_LEN] = "mypassword";
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
80 81 82 83 84 85
  struct   timeval systemTime;
  int64_t  startTime, endTime;
  pthread_attr_t thattr;

  // server info
  ipSet.numOfIps = 1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
86
  ipSet.inUse = 0;
J
jtao1735 已提交
87 88 89 90
  ipSet.port[0] = 7000;
  ipSet.port[1] = 7000;
  strcpy(ipSet.fqdn[0], serverIp);
  strcpy(ipSet.fqdn[1], "192.168.0.1");
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
91 92

  // client info
S
slguan 已提交
93
  memset(&rpcInit, 0, sizeof(rpcInit));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
94 95
  rpcInit.localPort    = 0;
  rpcInit.label        = "APP";
S
slguan 已提交
96
  rpcInit.numOfThreads = 1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
97 98
  rpcInit.cfp          = processResponse;
  rpcInit.sessions     = 100;
99
  rpcInit.idleTime     = tsShellActivityTimer*1000;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
100
  rpcInit.user         = "michael";
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
101
  rpcInit.secret       = secret;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
102
  rpcInit.ckey         = "key";
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
103
  rpcInit.spi          = 1;
104
  rpcInit.connType     = TAOS_CONN_CLIENT;
S
slguan 已提交
105

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
106
  for (int i=1; i<argc; ++i) { 
107
    if (strcmp(argv[i], "-p")==0 && i < argc-1) {
J
jtao1735 已提交
108
      ipSet.port[0] = atoi(argv[++i]);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
109
    } else if (strcmp(argv[i], "-i") ==0 && i < argc-1) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
110
      tstrncpy(ipSet.fqdn[0], argv[++i], sizeof(ipSet.fqdn[0])); 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
111 112 113 114 115 116 117 118 119 120 121 122
    } else if (strcmp(argv[i], "-t")==0 && i < argc-1) {
      rpcInit.numOfThreads = atoi(argv[++i]);
    } else if (strcmp(argv[i], "-m")==0 && i < argc-1) {
      msgSize = atoi(argv[++i]);
    } else if (strcmp(argv[i], "-s")==0 && i < argc-1) {
      rpcInit.sessions = atoi(argv[++i]); 
    } else if (strcmp(argv[i], "-n")==0 && i < argc-1) {
      numOfReqs = atoi(argv[++i]); 
    } else if (strcmp(argv[i], "-a")==0 && i < argc-1) {
      appThreads = atoi(argv[++i]); 
    } else if (strcmp(argv[i], "-o")==0 && i < argc-1) {
      tsCompressMsgSize = atoi(argv[++i]); 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
123 124 125 126 127 128
    } else if (strcmp(argv[i], "-u")==0 && i < argc-1) {
      rpcInit.user = argv[++i];
    } else if (strcmp(argv[i], "-k")==0 && i < argc-1) {
      rpcInit.secret = argv[++i];
    } else if (strcmp(argv[i], "-spi")==0 && i < argc-1) {
      rpcInit.spi = atoi(argv[++i]);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
129
    } else if (strcmp(argv[i], "-d")==0 && i < argc-1) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
130
      rpcDebugFlag = atoi(argv[++i]);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
131 132 133
    } else {
      printf("\nusage: %s [options] \n", argv[0]);
      printf("  [-i ip]: first server IP address, default is:%s\n", serverIp);
J
jtao1735 已提交
134
      printf("  [-p port]: server port number, default is:%d\n", ipSet.port[0]);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
135 136 137 138 139 140
      printf("  [-t threads]: number of rpc threads, default is:%d\n", rpcInit.numOfThreads);
      printf("  [-s sessions]: number of rpc sessions, default is:%d\n", rpcInit.sessions);
      printf("  [-m msgSize]: message body size, default is:%d\n", msgSize);
      printf("  [-a threads]: number of app threads, default is:%d\n", appThreads);
      printf("  [-n requests]: number of requests per thread, default is:%d\n", numOfReqs);
      printf("  [-o compSize]: compression message size, default is:%d\n", tsCompressMsgSize);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
141 142 143
      printf("  [-u user]: user name for the connection, default is:%s\n", rpcInit.user);
      printf("  [-k secret]: password for the connection, default is:%s\n", rpcInit.secret);
      printf("  [-spi SPI]: security parameter index, default is:%d\n", rpcInit.spi);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
144 145 146 147 148 149 150 151
      printf("  [-d debugFlag]: debug flag, default:%d\n", rpcDebugFlag);
      printf("  [-h help]: print out this help\n\n");
      exit(0);
    }
  }

  taosInitLog("client.log", 100000, 10);

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
152 153
  void *pRpc = rpcOpen(&rpcInit);
  if (pRpc == NULL) {
S
slguan 已提交
154
    tError("failed to initialize RPC");
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
155
    return -1;
S
slguan 已提交
156 157
  }

158
  tInfo("client is initialized");
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
159
  tInfo("threads:%d msgSize:%d requests:%d", appThreads, msgSize, numOfReqs);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186

  gettimeofday(&systemTime, NULL);
  startTime = systemTime.tv_sec*1000000 + systemTime.tv_usec;

  SInfo *pInfo = (SInfo *)calloc(1, sizeof(SInfo)*appThreads);
 
  pthread_attr_init(&thattr);
  pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);

  for (int i=0; i<appThreads; ++i) {
    pInfo->index = i;
    pInfo->ipSet = ipSet;
    pInfo->numOfReqs = numOfReqs;
    pInfo->msgSize = msgSize;
    sem_init(&pInfo->rspSem, 0, 0);
    pInfo->pRpc = pRpc;
    pthread_create(&pInfo->thread, &thattr, sendRequest, pInfo);
    pInfo++;
  }

  do {
    usleep(1);
  } while ( tcount < appThreads);

  gettimeofday(&systemTime, NULL);
  endTime = systemTime.tv_sec*1000000 + systemTime.tv_usec;  
  float usedTime = (endTime - startTime)/1000.0;  // mseconds
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
187

188 189
  tInfo("it takes %.3f mseconds to send %d requests to server", usedTime, numOfReqs*appThreads);
  tInfo("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0*numOfReqs*appThreads/usedTime, msgSize);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
190

J
Jeff Tao 已提交
191 192
  getchar();

S
slguan 已提交
193
  taosCloseLog();
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
194

S
slguan 已提交
195 196 197 198
  return 0;
}