rclient.c 6.6 KB
Newer Older
S
slguan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "os.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
17
#include "tutil.h"
S
slguan 已提交
18 19
#include "tglobal.h"
#include "rpcLog.h"
S
slguan 已提交
20
#include "trpc.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
21
#include "taoserror.h"
S
slguan 已提交
22

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
23 24
typedef struct {
  int       index;
25
  SRpcEpSet epSet;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
26 27 28
  int       num;
  int       numOfReqs;
  int       msgSize;
S
TD-1057  
Shengliang Guan 已提交
29 30
  tsem_t    rspSem; 
  tsem_t   *pOverSem; 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
31 32 33 34
  pthread_t thread;
  void     *pRpc;
} SInfo;

35
static void processResponse(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
36
  SInfo *pInfo = (SInfo *)pMsg->ahandle;
37
  tDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen, pMsg->code);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
38

39
  if (pEpSet) pInfo->epSet = *pEpSet;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
40

J
jtao1735 已提交
41
  rpcFreeCont(pMsg->pCont);
S
TD-1057  
Shengliang Guan 已提交
42
  tsem_post(&pInfo->rspSem); 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
43 44
}

45
static int tcount = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
46

47
static void *sendRequest(void *param) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
48
  SInfo  *pInfo = (SInfo *)param;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
49
  SRpcMsg rpcMsg = {0}; 
50 51

  setThreadName("sendCliReq");
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
52
  
53
  tDebug("thread:%d, start to send request", pInfo->index);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
54 55 56

  while ( pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) {
    pInfo->num++;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
57 58
    rpcMsg.pCont = rpcMallocCont(pInfo->msgSize);
    rpcMsg.contLen = pInfo->msgSize;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
59
    rpcMsg.ahandle = pInfo;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
60
    rpcMsg.msgType = 1;
61
    tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num);
62
    rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg, NULL);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
63
    if ( pInfo->num % 20000 == 0 ) 
64
      tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num);
S
TD-1057  
Shengliang Guan 已提交
65
    tsem_wait(&pInfo->rspSem);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
66
  }
S
slguan 已提交
67

68
  tDebug("thread:%d, it is over", pInfo->index);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
69 70 71 72 73 74 75
  tcount++;

  return NULL;
}

int main(int argc, char *argv[]) {
  SRpcInit  rpcInit;
76
  SRpcEpSet epSet;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
77 78 79 80
  int      msgSize = 128;
  int      numOfReqs = 0;
  int      appThreads = 1;
  char     serverIp[40] = "127.0.0.1";
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
81
  char     secret[TSDB_KEY_LEN] = "mypassword";
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
82 83 84 85 86
  struct   timeval systemTime;
  int64_t  startTime, endTime;
  pthread_attr_t thattr;

  // server info
87 88 89 90 91 92
  epSet.numOfEps = 1;
  epSet.inUse = 0;
  epSet.port[0] = 7000;
  epSet.port[1] = 7000;
  strcpy(epSet.fqdn[0], serverIp);
  strcpy(epSet.fqdn[1], "192.168.0.1");
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
93 94

  // client info
S
slguan 已提交
95
  memset(&rpcInit, 0, sizeof(rpcInit));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
96 97
  rpcInit.localPort    = 0;
  rpcInit.label        = "APP";
S
slguan 已提交
98
  rpcInit.numOfThreads = 1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
99 100
  rpcInit.cfp          = processResponse;
  rpcInit.sessions     = 100;
101
  rpcInit.idleTime     = tsShellActivityTimer*1000;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
102
  rpcInit.user         = "michael";
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
103
  rpcInit.secret       = secret;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
104
  rpcInit.ckey         = "key";
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
105
  rpcInit.spi          = 1;
106
  rpcInit.connType     = TAOS_CONN_CLIENT;
S
slguan 已提交
107

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
108
  for (int i=1; i<argc; ++i) { 
109
    if (strcmp(argv[i], "-p")==0 && i < argc-1) {
110
      epSet.port[0] = atoi(argv[++i]);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
111
    } else if (strcmp(argv[i], "-i") ==0 && i < argc-1) {
112
      tstrncpy(epSet.fqdn[0], argv[++i], sizeof(epSet.fqdn[0])); 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
113 114 115 116 117 118 119 120 121 122 123 124
    } else if (strcmp(argv[i], "-t")==0 && i < argc-1) {
      rpcInit.numOfThreads = atoi(argv[++i]);
    } else if (strcmp(argv[i], "-m")==0 && i < argc-1) {
      msgSize = atoi(argv[++i]);
    } else if (strcmp(argv[i], "-s")==0 && i < argc-1) {
      rpcInit.sessions = atoi(argv[++i]); 
    } else if (strcmp(argv[i], "-n")==0 && i < argc-1) {
      numOfReqs = atoi(argv[++i]); 
    } else if (strcmp(argv[i], "-a")==0 && i < argc-1) {
      appThreads = atoi(argv[++i]); 
    } else if (strcmp(argv[i], "-o")==0 && i < argc-1) {
      tsCompressMsgSize = atoi(argv[++i]); 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
125 126 127 128 129 130
    } else if (strcmp(argv[i], "-u")==0 && i < argc-1) {
      rpcInit.user = argv[++i];
    } else if (strcmp(argv[i], "-k")==0 && i < argc-1) {
      rpcInit.secret = argv[++i];
    } else if (strcmp(argv[i], "-spi")==0 && i < argc-1) {
      rpcInit.spi = atoi(argv[++i]);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
131
    } else if (strcmp(argv[i], "-d")==0 && i < argc-1) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
132
      rpcDebugFlag = atoi(argv[++i]);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
133 134 135
    } else {
      printf("\nusage: %s [options] \n", argv[0]);
      printf("  [-i ip]: first server IP address, default is:%s\n", serverIp);
136
      printf("  [-p port]: server port number, default is:%d\n", epSet.port[0]);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
137 138 139 140 141 142
      printf("  [-t threads]: number of rpc threads, default is:%d\n", rpcInit.numOfThreads);
      printf("  [-s sessions]: number of rpc sessions, default is:%d\n", rpcInit.sessions);
      printf("  [-m msgSize]: message body size, default is:%d\n", msgSize);
      printf("  [-a threads]: number of app threads, default is:%d\n", appThreads);
      printf("  [-n requests]: number of requests per thread, default is:%d\n", numOfReqs);
      printf("  [-o compSize]: compression message size, default is:%d\n", tsCompressMsgSize);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
143 144 145
      printf("  [-u user]: user name for the connection, default is:%s\n", rpcInit.user);
      printf("  [-k secret]: password for the connection, default is:%s\n", rpcInit.secret);
      printf("  [-spi SPI]: security parameter index, default is:%d\n", rpcInit.spi);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
146 147 148 149 150 151 152 153
      printf("  [-d debugFlag]: debug flag, default:%d\n", rpcDebugFlag);
      printf("  [-h help]: print out this help\n\n");
      exit(0);
    }
  }

  taosInitLog("client.log", 100000, 10);

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
154 155
  void *pRpc = rpcOpen(&rpcInit);
  if (pRpc == NULL) {
S
slguan 已提交
156
    tError("failed to initialize RPC");
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
157
    return -1;
S
slguan 已提交
158 159
  }

160
  tInfo("client is initialized");
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
161
  tInfo("threads:%d msgSize:%d requests:%d", appThreads, msgSize, numOfReqs);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
162 163 164 165 166 167 168 169 170 171 172

  gettimeofday(&systemTime, NULL);
  startTime = systemTime.tv_sec*1000000 + systemTime.tv_usec;

  SInfo *pInfo = (SInfo *)calloc(1, sizeof(SInfo)*appThreads);
 
  pthread_attr_init(&thattr);
  pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);

  for (int i=0; i<appThreads; ++i) {
    pInfo->index = i;
173
    pInfo->epSet = epSet;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
174 175
    pInfo->numOfReqs = numOfReqs;
    pInfo->msgSize = msgSize;
S
TD-1057  
Shengliang Guan 已提交
176
    tsem_init(&pInfo->rspSem, 0, 0);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
177 178 179 180 181 182 183 184 185 186 187
    pInfo->pRpc = pRpc;
    pthread_create(&pInfo->thread, &thattr, sendRequest, pInfo);
    pInfo++;
  }

  do {
    usleep(1);
  } while ( tcount < appThreads);

  gettimeofday(&systemTime, NULL);
  endTime = systemTime.tv_sec*1000000 + systemTime.tv_usec;  
S
TD-1057  
Shengliang Guan 已提交
188
  float usedTime = (endTime - startTime)/1000.0f;  // mseconds
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
189

190 191
  tInfo("it takes %.3f mseconds to send %d requests to server", usedTime, numOfReqs*appThreads);
  tInfo("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0*numOfReqs*appThreads/usedTime, msgSize);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
192

Y
yihaoDeng 已提交
193 194
  int ch = getchar();
  UNUSED(ch); 
J
Jeff Tao 已提交
195

S
slguan 已提交
196
  taosCloseLog();
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
197

S
slguan 已提交
198 199 200 201
  return 0;
}