rclient.c 6.6 KB
Newer Older
S
slguan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "os.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
17
#include "tutil.h"
S
slguan 已提交
18 19
#include "tglobal.h"
#include "rpcLog.h"
S
slguan 已提交
20
#include "trpc.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
21
#include "taoserror.h"
S
slguan 已提交
22

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
23 24
typedef struct {
  int       index;
25
  SRpcEpSet epSet;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
26 27 28 29 30 31 32 33 34
  int       num;
  int       numOfReqs;
  int       msgSize;
  sem_t     rspSem; 
  sem_t    *pOverSem; 
  pthread_t thread;
  void     *pRpc;
} SInfo;

35
static void processResponse(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
36
  SInfo *pInfo = (SInfo *)pMsg->ahandle;
37
  tDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen, pMsg->code);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
38

39
  if (pEpSet) pInfo->epSet = *pEpSet;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
40

J
jtao1735 已提交
41
  rpcFreeCont(pMsg->pCont);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
42
  sem_post(&pInfo->rspSem); 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
43 44
}

45
static int tcount = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
46

47
static void *sendRequest(void *param) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
48
  SInfo  *pInfo = (SInfo *)param;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
49
  SRpcMsg rpcMsg = {0}; 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
50
  
51
  tDebug("thread:%d, start to send request", pInfo->index);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
52 53 54

  while ( pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) {
    pInfo->num++;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
55 56
    rpcMsg.pCont = rpcMallocCont(pInfo->msgSize);
    rpcMsg.contLen = pInfo->msgSize;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
57
    rpcMsg.ahandle = pInfo;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
58
    rpcMsg.msgType = 1;
59
    tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num);
60
    rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
61
    if ( pInfo->num % 20000 == 0 ) 
62
      tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
63 64
    sem_wait(&pInfo->rspSem);
  }
S
slguan 已提交
65

66
  tDebug("thread:%d, it is over", pInfo->index);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
67 68 69 70 71 72 73
  tcount++;

  return NULL;
}

int main(int argc, char *argv[]) {
  SRpcInit  rpcInit;
74
  SRpcEpSet epSet;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
75 76 77 78
  int      msgSize = 128;
  int      numOfReqs = 0;
  int      appThreads = 1;
  char     serverIp[40] = "127.0.0.1";
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
79
  char     secret[TSDB_KEY_LEN] = "mypassword";
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
80 81 82 83 84
  struct   timeval systemTime;
  int64_t  startTime, endTime;
  pthread_attr_t thattr;

  // server info
85 86 87 88 89 90
  epSet.numOfEps = 1;
  epSet.inUse = 0;
  epSet.port[0] = 7000;
  epSet.port[1] = 7000;
  strcpy(epSet.fqdn[0], serverIp);
  strcpy(epSet.fqdn[1], "192.168.0.1");
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
91 92

  // client info
S
slguan 已提交
93
  memset(&rpcInit, 0, sizeof(rpcInit));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
94 95
  rpcInit.localPort    = 0;
  rpcInit.label        = "APP";
S
slguan 已提交
96
  rpcInit.numOfThreads = 1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
97 98
  rpcInit.cfp          = processResponse;
  rpcInit.sessions     = 100;
99
  rpcInit.idleTime     = tsShellActivityTimer*1000;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
100
  rpcInit.user         = "michael";
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
101
  rpcInit.secret       = secret;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
102
  rpcInit.ckey         = "key";
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
103
  rpcInit.spi          = 1;
104
  rpcInit.connType     = TAOS_CONN_CLIENT;
S
slguan 已提交
105

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
106
  for (int i=1; i<argc; ++i) { 
107
    if (strcmp(argv[i], "-p")==0 && i < argc-1) {
108
      epSet.port[0] = atoi(argv[++i]);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
109
    } else if (strcmp(argv[i], "-i") ==0 && i < argc-1) {
110
      tstrncpy(epSet.fqdn[0], argv[++i], sizeof(epSet.fqdn[0])); 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
111 112 113 114 115 116 117 118 119 120 121 122
    } else if (strcmp(argv[i], "-t")==0 && i < argc-1) {
      rpcInit.numOfThreads = atoi(argv[++i]);
    } else if (strcmp(argv[i], "-m")==0 && i < argc-1) {
      msgSize = atoi(argv[++i]);
    } else if (strcmp(argv[i], "-s")==0 && i < argc-1) {
      rpcInit.sessions = atoi(argv[++i]); 
    } else if (strcmp(argv[i], "-n")==0 && i < argc-1) {
      numOfReqs = atoi(argv[++i]); 
    } else if (strcmp(argv[i], "-a")==0 && i < argc-1) {
      appThreads = atoi(argv[++i]); 
    } else if (strcmp(argv[i], "-o")==0 && i < argc-1) {
      tsCompressMsgSize = atoi(argv[++i]); 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
123 124 125 126 127 128
    } else if (strcmp(argv[i], "-u")==0 && i < argc-1) {
      rpcInit.user = argv[++i];
    } else if (strcmp(argv[i], "-k")==0 && i < argc-1) {
      rpcInit.secret = argv[++i];
    } else if (strcmp(argv[i], "-spi")==0 && i < argc-1) {
      rpcInit.spi = atoi(argv[++i]);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
129
    } else if (strcmp(argv[i], "-d")==0 && i < argc-1) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
130
      rpcDebugFlag = atoi(argv[++i]);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
131 132 133
    } else {
      printf("\nusage: %s [options] \n", argv[0]);
      printf("  [-i ip]: first server IP address, default is:%s\n", serverIp);
134
      printf("  [-p port]: server port number, default is:%d\n", epSet.port[0]);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
135 136 137 138 139 140
      printf("  [-t threads]: number of rpc threads, default is:%d\n", rpcInit.numOfThreads);
      printf("  [-s sessions]: number of rpc sessions, default is:%d\n", rpcInit.sessions);
      printf("  [-m msgSize]: message body size, default is:%d\n", msgSize);
      printf("  [-a threads]: number of app threads, default is:%d\n", appThreads);
      printf("  [-n requests]: number of requests per thread, default is:%d\n", numOfReqs);
      printf("  [-o compSize]: compression message size, default is:%d\n", tsCompressMsgSize);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
141 142 143
      printf("  [-u user]: user name for the connection, default is:%s\n", rpcInit.user);
      printf("  [-k secret]: password for the connection, default is:%s\n", rpcInit.secret);
      printf("  [-spi SPI]: security parameter index, default is:%d\n", rpcInit.spi);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
144 145 146 147 148 149 150 151
      printf("  [-d debugFlag]: debug flag, default:%d\n", rpcDebugFlag);
      printf("  [-h help]: print out this help\n\n");
      exit(0);
    }
  }

  taosInitLog("client.log", 100000, 10);

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
152 153
  void *pRpc = rpcOpen(&rpcInit);
  if (pRpc == NULL) {
S
slguan 已提交
154
    tError("failed to initialize RPC");
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
155
    return -1;
S
slguan 已提交
156 157
  }

158
  tInfo("client is initialized");
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
159
  tInfo("threads:%d msgSize:%d requests:%d", appThreads, msgSize, numOfReqs);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
160 161 162 163 164 165 166 167 168 169 170

  gettimeofday(&systemTime, NULL);
  startTime = systemTime.tv_sec*1000000 + systemTime.tv_usec;

  SInfo *pInfo = (SInfo *)calloc(1, sizeof(SInfo)*appThreads);
 
  pthread_attr_init(&thattr);
  pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);

  for (int i=0; i<appThreads; ++i) {
    pInfo->index = i;
171
    pInfo->epSet = epSet;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
172 173 174 175 176 177 178 179 180 181 182 183 184 185
    pInfo->numOfReqs = numOfReqs;
    pInfo->msgSize = msgSize;
    sem_init(&pInfo->rspSem, 0, 0);
    pInfo->pRpc = pRpc;
    pthread_create(&pInfo->thread, &thattr, sendRequest, pInfo);
    pInfo++;
  }

  do {
    usleep(1);
  } while ( tcount < appThreads);

  gettimeofday(&systemTime, NULL);
  endTime = systemTime.tv_sec*1000000 + systemTime.tv_usec;  
S
TD-1057  
Shengliang Guan 已提交
186
  float usedTime = (endTime - startTime)/1000.0f;  // mseconds
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
187

188 189
  tInfo("it takes %.3f mseconds to send %d requests to server", usedTime, numOfReqs*appThreads);
  tInfo("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0*numOfReqs*appThreads/usedTime, msgSize);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
190

J
Jeff Tao 已提交
191 192
  getchar();

S
slguan 已提交
193
  taosCloseLog();
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
194

S
slguan 已提交
195 196 197 198
  return 0;
}