rclient.c 7.2 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
dengyihao's avatar
dengyihao 已提交
15
#include <sys/time.h>
dengyihao's avatar
dengyihao 已提交
16

H
Haojun Liao 已提交
17
#include <tep.h>
dengyihao's avatar
dengyihao 已提交
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
#include "os.h"
#include "rpcLog.h"
#include "taoserror.h"
#include "tglobal.h"
#include "trpc.h"
#include "tutil.h"

typedef struct {
  int       index;
  SEpSet    epSet;
  int       num;
  int       numOfReqs;
  int       msgSize;
  tsem_t    rspSem;
  tsem_t *  pOverSem;
  pthread_t thread;
  void *    pRpc;
} SInfo;
static void processResponse(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
  SInfo *pInfo = (SInfo *)pMsg->ahandle;
38 39
  tDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen,
         pMsg->code);
dengyihao's avatar
dengyihao 已提交
40 41 42 43

  if (pEpSet) pInfo->epSet = *pEpSet;

  rpcFreeCont(pMsg->pCont);
dengyihao's avatar
dengyihao 已提交
44
  // tsem_post(&pInfo->rspSem);
dengyihao's avatar
dengyihao 已提交
45 46 47 48 49 50 51 52 53 54 55
  tsem_post(&pInfo->rspSem);
}

static int tcount = 0;

static void *sendRequest(void *param) {
  SInfo * pInfo = (SInfo *)param;
  SRpcMsg rpcMsg = {0};

  tDebug("thread:%d, start to send request", pInfo->index);

dengyihao's avatar
dengyihao 已提交
56 57 58 59 60 61
  tDebug("thread:%d, reqs: %d", pInfo->index, pInfo->numOfReqs);
  int u100 = 0;
  int u500 = 0;
  int u1000 = 0;
  int u10000 = 0;

dengyihao's avatar
dengyihao 已提交
62 63 64 65 66 67
  while (pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) {
    pInfo->num++;
    rpcMsg.pCont = rpcMallocCont(pInfo->msgSize);
    rpcMsg.contLen = pInfo->msgSize;
    rpcMsg.ahandle = pInfo;
    rpcMsg.msgType = 1;
dengyihao's avatar
dengyihao 已提交
68
    // tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num);
dengyihao's avatar
dengyihao 已提交
69
    int64_t start = taosGetTimestampUs();
dengyihao's avatar
dengyihao 已提交
70 71
    rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg, NULL);
    if (pInfo->num % 20000 == 0) tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num);
dengyihao's avatar
dengyihao 已提交
72
    // tsem_wait(&pInfo->rspSem);
dengyihao's avatar
dengyihao 已提交
73
    tsem_wait(&pInfo->rspSem);
dengyihao's avatar
dengyihao 已提交
74 75 76 77 78 79 80 81 82 83 84
    int64_t end = taosGetTimestampUs() - start;
    if (end <= 100) {
      u100++;
    } else if (end > 100 && end <= 500) {
      u500++;
    } else if (end > 500 && end < 1000) {
      u1000++;
    } else {
      u10000++;
    }

85
    tDebug("recv response succefully");
dengyihao's avatar
dengyihao 已提交
86

87
    // usleep(100000000);
dengyihao's avatar
dengyihao 已提交
88 89
  }

dengyihao's avatar
dengyihao 已提交
90
  tError("send and recv sum: %d, %d, %d, %d", u100, u500, u1000, u10000);
dengyihao's avatar
dengyihao 已提交
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
  tDebug("thread:%d, it is over", pInfo->index);
  tcount++;

  return NULL;
}

int main(int argc, char *argv[]) {
  SRpcInit       rpcInit;
  SEpSet         epSet;
  int            msgSize = 128;
  int            numOfReqs = 0;
  int            appThreads = 1;
  char           serverIp[40] = "127.0.0.1";
  char           secret[20] = "mypassword";
  struct timeval systemTime;
  int64_t        startTime, endTime;
  pthread_attr_t thattr;

  // server info
  epSet.inUse = 0;
H
Haojun Liao 已提交
111 112
  addEpIntoEpSet(&epSet, serverIp, 7000);
  addEpIntoEpSet(&epSet, "192.168.0.1", 7000);
dengyihao's avatar
dengyihao 已提交
113 114 115 116 117 118 119 120

  // client info
  memset(&rpcInit, 0, sizeof(rpcInit));
  rpcInit.localPort = 0;
  rpcInit.label = "APP";
  rpcInit.numOfThreads = 1;
  rpcInit.cfp = processResponse;
  rpcInit.sessions = 100;
121
  rpcInit.idleTime = 100;
dengyihao's avatar
dengyihao 已提交
122 123 124 125 126 127 128 129
  rpcInit.user = "michael";
  rpcInit.secret = secret;
  rpcInit.ckey = "key";
  rpcInit.spi = 1;
  rpcInit.connType = TAOS_CONN_CLIENT;

  for (int i = 1; i < argc; ++i) {
    if (strcmp(argv[i], "-p") == 0 && i < argc - 1) {
H
Haojun Liao 已提交
130
      epSet.eps[0].port = atoi(argv[++i]);
dengyihao's avatar
dengyihao 已提交
131
    } else if (strcmp(argv[i], "-i") == 0 && i < argc - 1) {
H
Haojun Liao 已提交
132
      tstrncpy(epSet.eps[0].fqdn, argv[++i], sizeof(epSet.eps[0].fqdn));
dengyihao's avatar
dengyihao 已提交
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
    } else if (strcmp(argv[i], "-t") == 0 && i < argc - 1) {
      rpcInit.numOfThreads = atoi(argv[++i]);
    } else if (strcmp(argv[i], "-m") == 0 && i < argc - 1) {
      msgSize = atoi(argv[++i]);
    } else if (strcmp(argv[i], "-s") == 0 && i < argc - 1) {
      rpcInit.sessions = atoi(argv[++i]);
    } else if (strcmp(argv[i], "-n") == 0 && i < argc - 1) {
      numOfReqs = atoi(argv[++i]);
    } else if (strcmp(argv[i], "-a") == 0 && i < argc - 1) {
      appThreads = atoi(argv[++i]);
    } else if (strcmp(argv[i], "-o") == 0 && i < argc - 1) {
      tsCompressMsgSize = atoi(argv[++i]);
    } else if (strcmp(argv[i], "-u") == 0 && i < argc - 1) {
      rpcInit.user = argv[++i];
    } else if (strcmp(argv[i], "-k") == 0 && i < argc - 1) {
      rpcInit.secret = argv[++i];
    } else if (strcmp(argv[i], "-spi") == 0 && i < argc - 1) {
      rpcInit.spi = atoi(argv[++i]);
    } else if (strcmp(argv[i], "-d") == 0 && i < argc - 1) {
      rpcDebugFlag = atoi(argv[++i]);
    } else {
      printf("\nusage: %s [options] \n", argv[0]);
      printf("  [-i ip]: first server IP address, default is:%s\n", serverIp);
H
Haojun Liao 已提交
156
      printf("  [-p port]: server port number, default is:%d\n", epSet.eps[0].port);
dengyihao's avatar
dengyihao 已提交
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182
      printf("  [-t threads]: number of rpc threads, default is:%d\n", rpcInit.numOfThreads);
      printf("  [-s sessions]: number of rpc sessions, default is:%d\n", rpcInit.sessions);
      printf("  [-m msgSize]: message body size, default is:%d\n", msgSize);
      printf("  [-a threads]: number of app threads, default is:%d\n", appThreads);
      printf("  [-n requests]: number of requests per thread, default is:%d\n", numOfReqs);
      printf("  [-o compSize]: compression message size, default is:%d\n", tsCompressMsgSize);
      printf("  [-u user]: user name for the connection, default is:%s\n", rpcInit.user);
      printf("  [-k secret]: password for the connection, default is:%s\n", rpcInit.secret);
      printf("  [-spi SPI]: security parameter index, default is:%d\n", rpcInit.spi);
      printf("  [-d debugFlag]: debug flag, default:%d\n", rpcDebugFlag);
      printf("  [-h help]: print out this help\n\n");
      exit(0);
    }
  }

  taosInitLog("client.log", 100000, 10);

  void *pRpc = rpcOpen(&rpcInit);
  if (pRpc == NULL) {
    tError("failed to initialize RPC");
    return -1;
  }

  tInfo("client is initialized");
  tInfo("threads:%d msgSize:%d requests:%d", appThreads, msgSize, numOfReqs);

dengyihao's avatar
dengyihao 已提交
183 184
  gettimeofday(&systemTime, NULL);
  startTime = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
dengyihao's avatar
dengyihao 已提交
185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205

  SInfo *pInfo = (SInfo *)calloc(1, sizeof(SInfo) * appThreads);

  pthread_attr_init(&thattr);
  pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);

  for (int i = 0; i < appThreads; ++i) {
    pInfo->index = i;
    pInfo->epSet = epSet;
    pInfo->numOfReqs = numOfReqs;
    pInfo->msgSize = msgSize;
    tsem_init(&pInfo->rspSem, 0, 0);
    pInfo->pRpc = pRpc;
    pthread_create(&pInfo->thread, &thattr, sendRequest, pInfo);
    pInfo++;
  }

  do {
    usleep(1);
  } while (tcount < appThreads);

dengyihao's avatar
dengyihao 已提交
206 207 208
  gettimeofday(&systemTime, NULL);
  endTime = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
  float usedTime = (endTime - startTime) / 1000.0f;  // mseconds
dengyihao's avatar
dengyihao 已提交
209

dengyihao's avatar
dengyihao 已提交
210 211
  tInfo("it takes %.3f mseconds to send %d requests to server", usedTime, numOfReqs * appThreads);
  tInfo("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0 * numOfReqs * appThreads / usedTime, msgSize);
dengyihao's avatar
dengyihao 已提交
212 213 214 215 216 217 218 219

  int ch = getchar();
  UNUSED(ch);

  taosCloseLog();

  return 0;
}