rclient.c 7.1 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
H
Haojun Liao 已提交
15
#include <tdatablock.h>
dengyihao's avatar
dengyihao 已提交
16 17 18
#include "os.h"
#include "taoserror.h"
#include "tglobal.h"
dengyihao's avatar
dengyihao 已提交
19
#include "transLog.h"
dengyihao's avatar
dengyihao 已提交
20 21 22 23
#include "trpc.h"
#include "tutil.h"

typedef struct {
dengyihao's avatar
dengyihao 已提交
24 25 26 27 28 29 30
  int      index;
  SEpSet   epSet;
  int      num;
  int      numOfReqs;
  int      msgSize;
  tsem_t   rspSem;
  tsem_t * pOverSem;
wafwerar's avatar
wafwerar 已提交
31
  TdThread thread;
dengyihao's avatar
dengyihao 已提交
32
  void *   pRpc;
dengyihao's avatar
dengyihao 已提交
33 34
} SInfo;
static void processResponse(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
S
Shengliang Guan 已提交
35
  SInfo *pInfo = (SInfo *)pMsg->info.ahandle;
dengyihao's avatar
dengyihao 已提交
36 37
  // tError("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen,
  //       pMsg->code);
dengyihao's avatar
dengyihao 已提交
38 39 40 41

  if (pEpSet) pInfo->epSet = *pEpSet;

  rpcFreeCont(pMsg->pCont);
dengyihao's avatar
dengyihao 已提交
42
  // tsem_post(&pInfo->rspSem);
dengyihao's avatar
dengyihao 已提交
43 44 45 46 47 48 49 50 51
  tsem_post(&pInfo->rspSem);
}

static int tcount = 0;

static void *sendRequest(void *param) {
  SInfo * pInfo = (SInfo *)param;
  SRpcMsg rpcMsg = {0};

dengyihao's avatar
dengyihao 已提交
52
  tError("thread:%d, start to send request", pInfo->index);
dengyihao's avatar
dengyihao 已提交
53

dengyihao's avatar
dengyihao 已提交
54
  tError("thread:%d, reqs: %d", pInfo->index, pInfo->numOfReqs);
dengyihao's avatar
dengyihao 已提交
55 56 57 58 59
  int u100 = 0;
  int u500 = 0;
  int u1000 = 0;
  int u10000 = 0;

dengyihao's avatar
dengyihao 已提交
60 61 62 63
  while (pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) {
    pInfo->num++;
    rpcMsg.pCont = rpcMallocCont(pInfo->msgSize);
    rpcMsg.contLen = pInfo->msgSize;
S
Shengliang Guan 已提交
64
    rpcMsg.info.ahandle = pInfo;
dengyihao's avatar
dengyihao 已提交
65
    rpcMsg.msgType = 1;
dengyihao's avatar
dengyihao 已提交
66
    // tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num);
dengyihao's avatar
dengyihao 已提交
67
    int64_t start = taosGetTimestampUs();
dengyihao's avatar
dengyihao 已提交
68
    rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg, NULL);
dengyihao's avatar
dengyihao 已提交
69
    if (pInfo->num % 20000 == 0) tError("thread:%d, %d requests have been sent", pInfo->index, pInfo->num);
dengyihao's avatar
dengyihao 已提交
70
    // tsem_wait(&pInfo->rspSem);
dengyihao's avatar
dengyihao 已提交
71
    tsem_wait(&pInfo->rspSem);
dengyihao's avatar
dengyihao 已提交
72 73 74 75 76 77 78 79 80 81 82
    int64_t end = taosGetTimestampUs() - start;
    if (end <= 100) {
      u100++;
    } else if (end > 100 && end <= 500) {
      u500++;
    } else if (end > 500 && end < 1000) {
      u1000++;
    } else {
      u10000++;
    }

83
    tDebug("recv response succefully");
dengyihao's avatar
dengyihao 已提交
84

wafwerar's avatar
wafwerar 已提交
85
    // taosSsleep(100);
dengyihao's avatar
dengyihao 已提交
86 87
  }

dengyihao's avatar
dengyihao 已提交
88
  tError("send and recv sum: %d, %d, %d, %d", u100, u500, u1000, u10000);
dengyihao's avatar
dengyihao 已提交
89
  tError("thread:%d, it is over", pInfo->index);
dengyihao's avatar
dengyihao 已提交
90 91 92 93 94 95 96
  tcount++;

  return NULL;
}

int main(int argc, char *argv[]) {
  SRpcInit       rpcInit;
dengyihao's avatar
dengyihao 已提交
97
  SEpSet         epSet = {0};
dengyihao's avatar
dengyihao 已提交
98 99 100 101 102 103 104
  int            msgSize = 128;
  int            numOfReqs = 0;
  int            appThreads = 1;
  char           serverIp[40] = "127.0.0.1";
  char           secret[20] = "mypassword";
  struct timeval systemTime;
  int64_t        startTime, endTime;
dengyihao's avatar
dengyihao 已提交
105
  TdThreadAttr   thattr;
dengyihao's avatar
dengyihao 已提交
106 107 108

  // server info
  epSet.inUse = 0;
H
Haojun Liao 已提交
109 110
  addEpIntoEpSet(&epSet, serverIp, 7000);
  addEpIntoEpSet(&epSet, "192.168.0.1", 7000);
dengyihao's avatar
dengyihao 已提交
111 112 113 114 115 116 117 118

  // client info
  memset(&rpcInit, 0, sizeof(rpcInit));
  rpcInit.localPort = 0;
  rpcInit.label = "APP";
  rpcInit.numOfThreads = 1;
  rpcInit.cfp = processResponse;
  rpcInit.sessions = 100;
119
  rpcInit.idleTime = 100;
dengyihao's avatar
dengyihao 已提交
120 121
  rpcInit.user = "michael";
  rpcInit.connType = TAOS_CONN_CLIENT;
dengyihao's avatar
dengyihao 已提交
122
  rpcDebugFlag = 131;
dengyihao's avatar
dengyihao 已提交
123 124 125

  for (int i = 1; i < argc; ++i) {
    if (strcmp(argv[i], "-p") == 0 && i < argc - 1) {
H
Haojun Liao 已提交
126
      epSet.eps[0].port = atoi(argv[++i]);
dengyihao's avatar
dengyihao 已提交
127
    } else if (strcmp(argv[i], "-i") == 0 && i < argc - 1) {
H
Haojun Liao 已提交
128
      tstrncpy(epSet.eps[0].fqdn, argv[++i], sizeof(epSet.eps[0].fqdn));
dengyihao's avatar
dengyihao 已提交
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149
    } else if (strcmp(argv[i], "-t") == 0 && i < argc - 1) {
      rpcInit.numOfThreads = atoi(argv[++i]);
    } else if (strcmp(argv[i], "-m") == 0 && i < argc - 1) {
      msgSize = atoi(argv[++i]);
    } else if (strcmp(argv[i], "-s") == 0 && i < argc - 1) {
      rpcInit.sessions = atoi(argv[++i]);
    } else if (strcmp(argv[i], "-n") == 0 && i < argc - 1) {
      numOfReqs = atoi(argv[++i]);
    } else if (strcmp(argv[i], "-a") == 0 && i < argc - 1) {
      appThreads = atoi(argv[++i]);
    } else if (strcmp(argv[i], "-o") == 0 && i < argc - 1) {
      tsCompressMsgSize = atoi(argv[++i]);
    } else if (strcmp(argv[i], "-u") == 0 && i < argc - 1) {
      rpcInit.user = argv[++i];
    } else if (strcmp(argv[i], "-k") == 0 && i < argc - 1) {
    } else if (strcmp(argv[i], "-spi") == 0 && i < argc - 1) {
    } else if (strcmp(argv[i], "-d") == 0 && i < argc - 1) {
      rpcDebugFlag = atoi(argv[++i]);
    } else {
      printf("\nusage: %s [options] \n", argv[0]);
      printf("  [-i ip]: first server IP address, default is:%s\n", serverIp);
H
Haojun Liao 已提交
150
      printf("  [-p port]: server port number, default is:%d\n", epSet.eps[0].port);
dengyihao's avatar
dengyihao 已提交
151 152 153 154 155 156 157 158 159 160 161 162 163
      printf("  [-t threads]: number of rpc threads, default is:%d\n", rpcInit.numOfThreads);
      printf("  [-s sessions]: number of rpc sessions, default is:%d\n", rpcInit.sessions);
      printf("  [-m msgSize]: message body size, default is:%d\n", msgSize);
      printf("  [-a threads]: number of app threads, default is:%d\n", appThreads);
      printf("  [-n requests]: number of requests per thread, default is:%d\n", numOfReqs);
      printf("  [-o compSize]: compression message size, default is:%d\n", tsCompressMsgSize);
      printf("  [-u user]: user name for the connection, default is:%s\n", rpcInit.user);
      printf("  [-d debugFlag]: debug flag, default:%d\n", rpcDebugFlag);
      printf("  [-h help]: print out this help\n\n");
      exit(0);
    }
  }

164
  const char *path = TD_TMP_DIR_PATH "transport/client";
dengyihao's avatar
dengyihao 已提交
165 166 167
  taosRemoveDir(path);
  taosMkDir(path);
  tstrncpy(tsLogDir, path, PATH_MAX);
S
Shengliang Guan 已提交
168
  taosInitLog("client.log", 10);
dengyihao's avatar
dengyihao 已提交
169 170 171 172 173 174 175

  void *pRpc = rpcOpen(&rpcInit);
  if (pRpc == NULL) {
    tError("failed to initialize RPC");
    return -1;
  }

dengyihao's avatar
dengyihao 已提交
176 177
  tError("client is initialized");
  tError("threads:%d msgSize:%d requests:%d", appThreads, msgSize, numOfReqs);
dengyihao's avatar
dengyihao 已提交
178

wafwerar's avatar
wafwerar 已提交
179
  taosGetTimeOfDay(&systemTime);
dengyihao's avatar
dengyihao 已提交
180
  startTime = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
dengyihao's avatar
dengyihao 已提交
181

wafwerar's avatar
wafwerar 已提交
182
  SInfo *pInfo = (SInfo *)taosMemoryCalloc(1, sizeof(SInfo) * appThreads);
dengyihao's avatar
dengyihao 已提交
183

wafwerar's avatar
wafwerar 已提交
184 185
  taosThreadAttrInit(&thattr);
  taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE);
dengyihao's avatar
dengyihao 已提交
186 187 188 189 190 191 192 193

  for (int i = 0; i < appThreads; ++i) {
    pInfo->index = i;
    pInfo->epSet = epSet;
    pInfo->numOfReqs = numOfReqs;
    pInfo->msgSize = msgSize;
    tsem_init(&pInfo->rspSem, 0, 0);
    pInfo->pRpc = pRpc;
wafwerar's avatar
wafwerar 已提交
194
    taosThreadCreate(&pInfo->thread, &thattr, sendRequest, pInfo);
dengyihao's avatar
dengyihao 已提交
195 196 197 198
    pInfo++;
  }

  do {
wafwerar's avatar
wafwerar 已提交
199
    taosUsleep(1);
dengyihao's avatar
dengyihao 已提交
200 201
  } while (tcount < appThreads);

wafwerar's avatar
wafwerar 已提交
202
  taosGetTimeOfDay(&systemTime);
dengyihao's avatar
dengyihao 已提交
203 204
  endTime = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
  float usedTime = (endTime - startTime) / 1000.0f;  // mseconds
dengyihao's avatar
dengyihao 已提交
205

dengyihao's avatar
dengyihao 已提交
206 207 208
  tError("it takes %.3f mseconds to send %d requests to server", usedTime, numOfReqs * appThreads);
  tError("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0 * numOfReqs * appThreads / usedTime,
         msgSize);
dengyihao's avatar
dengyihao 已提交
209 210 211 212 213 214 215 216

  int ch = getchar();
  UNUSED(ch);

  taosCloseLog();

  return 0;
}